U
    $FZhnd                     @   sb  d dl Z d dlZd dlZd dlZd dlmZ d dlZd dlmZ d dl	m
Z
mZ d dlmZ d dlmZmZmZ ejZeeZdZd dd	d
dddddddddddddddZdZejejejejejejejej fZ!ejfZ"e #dddgZ$G dd de%Z&G d d! d!eZ'G d"d# d#e%Z(G d$d% d%e%Z)d&d' Z*d(d) Z+d*d+ Z,d,d- Z-G d.d/ d/e%Z.dS )0    N)Enum)
exceptions)BackgroundConsumerResumableBidiRpc)_helpers)ListenRequestTargetTargetChangeiyP                             	   
                  )OKZ	CANCELLEDUNKNOWNZINVALID_ARGUMENTZDEADLINE_EXCEEDED	NOT_FOUNDZALREADY_EXISTSZPERMISSION_DENIEDZUNAUTHENTICATEDZRESOURCE_EXHAUSTEDZFAILED_PRECONDITIONZABORTEDZOUT_OF_RANGEZUNIMPLEMENTEDZINTERNALUNAVAILABLEZ	DATA_LOSSZ
DO_NOT_USEzThread-OnRpcTerminatedDocTreeEntryvalueindexc                   @   sT   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dd ZdS )WatchDocTreec                 C   s   i | _ d| _d S )Nr   )_dict_indexself r'   f/home/aprabhat/apps/x.techxrdev.in/venv/lib/python3.8/site-packages/google/cloud/firestore_v1/watch.py__init__N   s    zWatchDocTree.__init__c                 C   s   t | j S N)listr#   keysr%   r'   r'   r(   r,   R   s    zWatchDocTree.keysc                 C   s"   t  }| j |_| j|_|} | S r*   )r"   r#   copyr$   )r&   Zwdtr'   r'   r(   _copyU   s
    zWatchDocTree._copyc                 C   s,   |   } t|| j| j|< |  jd7  _| S )Nr
   )r.   r   r$   r#   )r&   keyr    r'   r'   r(   insert\   s    zWatchDocTree.insertc                 C   s
   | j | S r*   r#   r&   r/   r'   r'   r(   findb   s    zWatchDocTree.findc                 C   s   |   } | j|= | S r*   )r.   r#   r2   r'   r'   r(   removee   s    zWatchDocTree.removec                 c   s   | j D ]
}|V  qd S r*   r1   r&   kr'   r'   r(   __iter__j   s    
zWatchDocTree.__iter__c                 C   s
   t | jS r*   )lenr#   r%   r'   r'   r(   __len__n   s    zWatchDocTree.__len__c                 C   s
   || j kS r*   r1   r5   r'   r'   r(   __contains__q   s    zWatchDocTree.__contains__N)__name__
__module____qualname__r)   r,   r.   r0   r3   r4   r7   r9   r:   r'   r'   r'   r(   r"   J   s   r"   c                   @   s   e Zd ZdZdZdZdS )
ChangeTyper
   r   r   N)r;   r<   r=   ADDEDREMOVEDMODIFIEDr'   r'   r'   r(   r>   u   s   r>   c                   @   s   e Zd Zdd ZdS )DocumentChangec                 C   s   || _ || _|| _|| _dS )zDocumentChange

        Args:
            type (ChangeType):
            document (document.DocumentSnapshot):
            old_index (int):
            new_index (int):
        N)typedocument	old_index	new_index)r&   rC   rD   rE   rF   r'   r'   r(   r)   |   s    
zDocumentChange.__init__Nr;   r<   r=   r)   r'   r'   r'   r(   rB   {   s   rB   c                   @   s   e Zd Zdd ZdS )WatchResultc                 C   s   || _ || _|| _d S r*   )snapshotnamechange_type)r&   rI   rJ   rK   r'   r'   r(   r)      s    zWatchResult.__init__NrG   r'   r'   r'   r(   rH      s   rH   c                 C   s   t | tjrt| S | S )z(Wraps a gRPC exception class, if needed.)
isinstancegrpcZRpcErrorr   Zfrom_grpc_error)	exceptionr'   r'   r(   _maybe_wrap_exception   s    
rO   c                 C   s   | |kst ddS )Nz+Document watches only support one document.r   )AssertionError)Zdoc1Zdoc2r'   r'   r(   document_watch_comparator   s    rQ   c                 C   s   t | }t|tS r*   )rO   rL   _RECOVERABLE_STREAM_EXCEPTIONSrN   wrappedr'   r'   r(   _should_recover   s    rU   c                 C   s   t | }t|tS r*   )rO   rL   _TERMINATING_STREAM_EXCEPTIONSrS   r'   r'   r(   _should_terminate   s    rW   c                
   @   s   e Zd Zdd Zdd Zedd Zedd Zd	d
 Zdd Z	e
dd Zd.ddZdd Zdd Zdd Zdd Zdd Zdd Zdd ZejeejeejeejeejeiZd d! Zd"d# Zd$d% Zed&d' Zd(d) Z d*d+ Z!d,d- Z"dS )/Watchc                 C   sz   || _ || _|| _|| _|| _|| _|j| _t	 | _
d| _| |j d| _t | _i | _i | _d| _d| _|   dS )a  
        Args:
            firestore:
            target:
            comparator:
            snapshot_callback: Callback method to process snapshots.
                Args:
                    docs (List(DocumentSnapshot)): A callback that returns the
                        ordered list of documents stored in this snapshot.
                    changes (List(str)): A callback that returns the list of
                        changed documents since the last snapshot delivered for
                        this watch.
                    read_time (string): The ISO 8601 time at which this
                        snapshot was obtained.

            document_snapshot_cls: factory for instances of DocumentSnapshot
        FN)Z_document_reference
_firestore_targets_comparator_document_snapshot_cls_snapshot_callbackZ_firestore_api_api	threadingLock_closing_closed_set_documents_pfx_database_stringresume_tokenr"   doc_treedoc_map
change_mapcurrent
has_pushed_init_stream)r&   Zdocument_referenceZ	firestoretargetZ
comparatorsnapshot_callbackdocument_snapshot_clsr'   r'   r(   r)      s"    
zWatch.__init__c                 C   sP   | j }t| jjjtt|| jjd| _	| j	
| j t| j	| j| _| j  d S )N)Z	start_rpcZshould_recoverZshould_terminateZinitial_requestmetadata)_get_rpc_requestr   r^   Z
_transportlistenrU   rW   rY   Z_rpc_metadata_rpcZadd_done_callback_on_rpc_doner   on_snapshot	_consumerstart)r&   Zrpc_requestr'   r'   r(   rk      s    zWatch._init_streamc                 C   s"   | ||j d|jgitdt||S )a  
        Creates a watch snapshot listener for a document. snapshot_callback
        receives a DocumentChange object, but may also start to get
        targetChange and such soon

        Args:
            document_ref: Reference to Document
            snapshot_callback: callback to be called on snapshot
            document_snapshot_cls: class to make snapshots with
            reference_class_instance: class make references

        	documents)rw   	target_id)_client_document_pathWATCH_TARGET_IDrQ   )clsdocument_refrm   rn   r'   r'   r(   for_document   s    
zWatch.for_documentc                 C   s>   |j  \}}tj|| d}| ||j|jtd|j||S )N)parentZstructured_query)queryrx   )	Z_parentZ_parent_infor   ZQueryTargetZ_to_protobufry   _pbr{   r[   )r|   r   rm   rn   parent_path_Zquery_targetr'   r'   r(   	for_query  s     
zWatch.for_queryc                 C   s8   | j d k	r| j | jd< n| jdd  t| jj| jdS )Nre   )ZdatabaseZ
add_target)re   rZ   popr   rY   rd   r%   r'   r'   r(   rp   (  s    
 zWatch._get_rpc_requestc                 C   s   | d| _ t| j | _d S )Nz/documents/)_documents_pfxr8   _documents_pfx_len)r&   Zdatabase_stringr'   r'   r(   rc   2  s    zWatch._set_documents_pfxc                 C   s   | j dk	o| j jS )zbool: True if this manager is actively streaming.

        Note that ``False`` does not indicate this is complete shut down,
        just that it stopped getting new messages.
        N)ru   	is_activer%   r'   r'   r(   r   6  s    zWatch.is_activeNc              	   C   s   | j x | jrW 5 Q R  dS | jr6td | j  d| j_d| _d| _| j	
  d| j	_g | j	_d| _	d| _td W 5 Q R X |rtd|  t|tr|t|dS )a  Stop consuming messages and shutdown all helper threads.

        This method is idempotent. Additional calls will have no effect.

        Args:
            reason (Any): The reason to close this. If None, this is considered
                an "intentional" shutdown.
        NzStopping consumer.TzFinished stopping manager.zreason for closing: %s)ra   rb   r   _LOGGERdebugru   stopZ_on_responser]   rr   closeZ_initial_requestZ
_callbacksrL   	ExceptionRuntimeError)r&   reasonr'   r'   r(   r   ?  s(    	



zWatch.closec                 C   s:   t d t|}tjt| jd|id}d|_|  dS )a
  Triggered whenever the underlying RPC terminates without recovery.

        This is typically triggered from one of two threads: the background
        consumer thread (when calling ``recv()`` produces a non-recoverable
        error) or the grpc management thread (when cancelling the RPC).

        This method is *non-blocking*. It will start another thread to deal
        with shutting everything down. This is to prevent blocking in the
        background consumer and preventing it from being ``joined()``.
        z.RPC termination has signaled manager shutdown.r   )rJ   rl   kwargsTN)	r   inforO   r_   Thread_RPC_ERROR_THREAD_NAMEr   daemonrv   )r&   futurethreadr'   r'   r(   rs   b  s    
  zWatch._on_rpc_donec                 C   s   |    d S r*   )r   r%   r'   r'   r(   unsubscribeu  s    zWatch.unsubscribec                 C   sF   t d |jd kp t|jdk}|rB|jrB| jrB| |j|j d S )Nz%on_snapshot: target change: NO_CHANGEr   )r   r   
target_idsr8   	read_timeri   pushre   )r&   target_changeZno_target_idsr'   r'   r(   $_on_snapshot_target_change_no_changex  s
    
z*Watch._on_snapshot_target_change_no_changec                 C   s,   t d |jd }|tkr(td| d S )Nzon_snapshot: target change: ADDr   z&Unexpected target ID %s sent by server)r   r   r   r{   r   )r&   r   rx   r'   r'   r(   _on_snapshot_target_change_add  s    

z$Watch._on_snapshot_target_change_addc                 C   sN   t d |jjr$|jj}|jj}nd}d}d||f }t|t||d S )Nz"on_snapshot: target change: REMOVEr   zinternal errorzError %s:  %s)r   r   causecodemessager   r   Zfrom_grpc_status)r&   r   r   r   error_messager'   r'   r(   !_on_snapshot_target_change_remove  s    


 z'Watch._on_snapshot_target_change_removec                 C   s   t d |   d S )Nz!on_snapshot: target change: RESET)r   r   _reset_docsr&   r   r'   r'   r(    _on_snapshot_target_change_reset  s    
z&Watch._on_snapshot_target_change_resetc                 C   s   t d d| _d S )Nz#on_snapshot: target change: CURRENTT)r   r   ri   r   r'   r'   r(   "_on_snapshot_target_change_current  s    
z(Watch._on_snapshot_target_change_currentc                 C   s   | | jr|| jd  }|S r*   )
startswithr   r   )r&   document_namer'   r'   r(   _strip_document_pfx  s    zWatch._strip_document_pfxc              
   C   sl  | j  rdS |dkr"|   dS |j}|d}|dkr|jj}td|  | j	
|}|dkrd| }td|  | jt|d z|| |j W n4 tk
r } ztd|   W 5 d}~X Y nX n|d	krtd
 t|jjk}t|jjk}	|jj}
|rntd t|
j| j}| |
j}| j|}| j||dd|
j|
jd}|| j|
j< n|	rhtd tj| j|
j< n|dkrtd |jj}tj| j|< n|dkrtd |j j}tj| j|< n|dkrDtd |j!j"| # krhtd t$j%t&| jd}|'  |(  | )  | *  n$td d| }| jt|d dS )aS  Process a response from the bi-directional gRPC stream.

        Collect changes and push the changes in a batch to the customer
        when we receive 'current' from the listen response.

        Args:
            proto(`google.cloud.firestore_v1.types.ListenResponse`):
                Callback method that receives a object to
        NZresponse_typer   zon_snapshot: target change: zUnknown target change type: zon_snapshot: )r   zmeth(proto) exc: document_changezon_snapshot: document changez%on_snapshot: document change: CHANGEDT)	referencedataexistsr   create_timeupdate_timez%on_snapshot: document change: REMOVEDdocument_deletez$on_snapshot: document change: DELETEdocument_removez$on_snapshot: document change: REMOVEfilterzon_snapshot: filter updatez%Filter mismatch -- restarting stream.)rJ   rl   zUNKNOWN TYPE. UHOHzUnknown listen response type: )+ra   lockedr   r   Z
WhichOneofr   target_change_typer   r   _target_changetype_dispatchgetr   
ValueErrorr   r{   r   r   Zremoved_target_idsrD   r   Zdecode_dictfieldsrY   r   rJ   r\   r   r   rh   r>   r@   r   r   r   count_current_sizer_   r   r   rv   joinr   rk   )r&   protoZpbwhichr   methr   Zexc2changedZremovedrD   r   r   r}   rI   rJ   r   r'   r'   r(   rt     s    

















zWatch.on_snapshotc                 C   s   |  | j| j|\}}}| | j| j|||\}}}| jrBt|rrt| j	}	t
| |	d}
| |
|| d| _|| _|| _| j  || _dS )zInvoke the callback with a new snapshot

        Build the sntapshot from the current set of changes.

        Clear the current changes on completion.
        r/   TN)_extract_changesrg   rh   _compute_snapshotrf   rj   r8   	functools
cmp_to_keyr[   sortedr,   r]   clearre   )r&   r   Znext_resume_tokendeletesaddsupdatesupdated_treeupdated_mapappliedChangesr/   r,   r'   r'   r(   r   "  s*      
    

z
Watch.pushc                 C   s   g }g }g }|  D ]`\}}|tjkr:|| krt|| q|| kr\|d k	rP||_|| q|d k	rj||_|| q|||fS r*   )itemsr>   r@   appendr   )rg   changesr   r   r   r   rJ   r    r'   r'   r(   r   ?  s    
zWatch._extract_changesc                    s   |}|}t |t |ks tddd dd   fdd}g }	t| j}
t|}|D ] }|||\}}}|	| qZt||
d}td	 |D ]*}td
  |||\}}}|	| qt||
d}|D ](}||||\}}}|d k	r|	| qt |t |kstd|||	fS )NzJThe document tree and document map should have the same number of entries.c                 S   sP   | |kst d|| }||}|j}||}|| = ttj||d||fS )z
            Applies a document delete to the document tree and document map.
            Returns the corresponding DocumentChange event.
            z!Document to delete does not existr   )rP   r   r3   r!   r4   rB   r>   r@   )rJ   r   r   old_documentexistingrE   r'   r'   r(   
delete_doc_  s    


z+Watch._compute_snapshot.<locals>.delete_docc                 S   sN   | j j}||kstd|| d}|| j}| ||< ttj| d|||fS )z
            Applies a document add to the document tree and the document map.
            Returns the corresponding DocumentChange event.
            zDocument to add already existsNr   )	r   rz   rP   r0   r3   r!   rB   r>   r?   )new_documentr   r   rJ   rF   r'   r'   r(   add_docq  s    z(Watch._compute_snapshot.<locals>.add_docc                    sv   | j j}||kstd||}|j| jkrl|||\}}} | ||\}}}ttj| |j|j	||fS d||fS )z
            Applies a document modification to the document tree and the
            document map.
            Returns the DocumentChange event for successful modifications.
            z!Document to modify does not existN)
r   rz   rP   r   r   rB   r>   rA   rE   rF   )r   r   r   rJ   r   Zremove_changeZ
add_changer   r   r'   r(   
modify_doc  s0    
  
  
z+Watch._compute_snapshot.<locals>.modify_docr   zwalk over add_changeszin add_changeszQThe update document tree and document map should have the same number of entries.)	r8   rP   r   r   r[   r   r   r   r   )r&   rf   rg   Zdelete_changesZadd_changesZupdate_changesr   r   r   r   r/   rJ   ZchangerI   r'   r   r(   r   T  sR    !  


  
  
zWatch._compute_snapshotc                 C   s2   |  | j| jd\}}}t| jt| t| S )zsReturn the current count of all documents.

        Count includes the changes from the current changeMap.
        N)r   rg   rh   r8   )r&   r   r   r   r'   r'   r(   r     s    zWatch._current_sizec                 C   sH   t d | j  d| _| j D ]}|jj}t	j
| j|< q$d| _dS )zG
        Helper to clear the docs on RESET or filter mismatch.
        zresetting documentsNF)r   r   rh   r   re   rf   r,   r   rz   r>   r@   ri   )r&   rI   rJ   r'   r'   r(   r     s    

zWatch._reset_docs)N)#r;   r<   r=   r)   rk   classmethodr~   r   rp   rc   propertyr   r   rs   r   r   r   r   r   r   TargetChangeType	NO_CHANGEZADDZREMOVERESETZCURRENTr   r   rt   r   staticmethodr   r   r   r   r'   r'   r'   r(   rX      sJ   >




#     q
qrX   )/collectionsr   loggingr_   enumr   rM   Zgoogle.api_corer   Zgoogle.api_core.bidir   r   Zgoogle.cloud.firestore_v1r   Z)google.cloud.firestore_v1.types.firestorer   r   r	   r   	getLoggerr;   r   r{   ZGRPC_STATUS_CODEr   ZAbortedZ	CancelledUnknownZDeadlineExceededZResourceExhaustedZInternalServerErrorZServiceUnavailableZUnauthenticatedrR   rV   
namedtupler   objectr"   r>   rB   rH   rO   rQ   rU   rW   rX   r'   r'   r'   r(   <module>   sh   

+