
    2 i                    .   d Z ddlmZ ddlZddlZddlZddlZddlmZm	Z	m
Z
mZ ddlZddlmZ ddlmZmZ ddlmZmZ ej                            d          Zej                            d          Zej                            d	d
          ZdZdZeres ed           G d de          Zd.dZd/dZdddddZ d0dZ!d1d$Z"d2d%Z# ed&d'(          Z$e$%                    d)          d2d*            Z&e$                    d+          d3d-            Z'dS )4u  ingest_svc.py

FastAPI-based background worker that:
1. Streams live user-session data (screenshots/events) from Teramind’s REST/WebSocket API.
2. Normalises each payload into the canonical event schema.
3. Pushes events onto a Redis Stream named ``events`` for downstream ML/Rule-Engine workers.

Environment variables expected
-----------------------------
TERAMIND_HOST          e.g. retroindustr.us.teramind.co
TERAMIND_TOKEN         API access token with at least ``sessions:read`` & ``events:read`` scopes.
REDIS_URL              redis://localhost:6379/0

Run with:
    uvicorn ingest_svc:app --port 9000 --reload

Requires Python >=3.11 and:
    pip install fastapi uvicorn httpx redis pydantic[dotenv]
    )annotationsN)AnyAsyncGeneratorDictList)BackgroundTasksFastAPI)	BaseModelFieldTERAMIND_HOSTTERAMIND_TOKEN	REDIS_URLzredis://localhost:6379/0events   z,TERAMIND_HOST and TERAMIND_TOKEN must be setc                  \    e Zd ZU ded<   ded<   ded<   ded<   ded<    G d d	          Zd
S )RawEventstrtsuser_id	device_idtypezDict[str, Any]payloadc                      e Zd Zed iZdS )RawEvent.Configc                N    t          j        |                                           S N)base64	b64encodedecode)bs    ;/var/www/html/blood_donation_traceloop/python/ingest_svc.py<lambda>zRawEvent.Config.<lambda>4   s    &*:1*=*=*D*D*F*F     N)__name__
__module____qualname__bytesjson_encoders r#   r!   Configr   3   s         F FGr#   r*   N)r$   r%   r&   __annotations__r*   r)   r#   r!   r   r   ,   sx         GGGLLLNNNIIIH H H H H H H H H Hr#   r   returnDict[str, str]c                     t           ddS )Nzapplication/json)zx-access-tokenAccept)r   r)   r#   r!   make_headersr0   7   s    ($  r#   sessionhttpx.AsyncClientAsyncGenerator[dict, None]c                 K   dt            d}	 	 |                     d|t                      d          4 d{V }|                                2 3 d{V }|s
	 t	          j        |          W V  ## t          j        $ r Y 4w xY w6 	 ddd          d{V  n# 1 d{V swxY w Y   n@# t          j        t          j	        f$ r" t          j        t                     d{V  Y w xY w)z<Yield JSON lines from /events?stream=1 endpoint (long-poll).zhttps://z/api/v3/events?stream=1TGETN)headerstimeout)r   streamr0   aiter_linesjsonloadsJSONDecodeErrorhttpx	HTTPError	ReadErrorasynciosleepWS_RETRY_DELAY)r1   urlresplines       r!   fetch_live_eventsrF   =   s     
;]
;
;
;C	~~eS,..RV~WW ! ! ! ! ! ! ![_"&"2"2"4"4 ! ! ! ! ! ! !$ ! !"j......./ ! ! ! ! #5"4! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! 1 	 	 	-/////////H	sd   +B. BBBA32B3BBBB
B. 
B&&B. )B&*B. .:C+*C+WINDOW
SCREENSHOT	KEYSTROKEURL)            rawdictRawEvent | Nonec           	     n   |                      d          }t                               |          }|sdS t          |                      d          t          |                      d                    t          |                      d                    |d |                                 D                       S )z2Convert Teramind event JSON to canonical RawEvent.eventTypeIdN	timestampuserId
computerIdc                "    i | ]\  }}|d v	||S )>   rU   rV   rS   r)   ).0kvs      r!   
<dictcomp>znormalise.<locals>.<dictcomp>c   s4     
 
 
1??? q???r#   )r   r   r   r   r   )getEVENT_TYPE_MAPr   r   items)rO   
etype_code
event_types      r!   	normalisera   W   s    ''J##J//J t77;CGGH%%&&cggl++,,
 
		
 
 

 
 
 
r#   rdsaioredis.RediseventNonec                z   K   |                      t          d|                                idd           d {V  d S )Ndatai T)maxlenapproximate)xadd
STREAM_KEYr:   )rb   rd   s     r!   push_to_redisrl   k   sF      
((:

5fRV(
W
WWWWWWWWWWr#   c                 f  K   t          j        t          d          } t          j        t          j        d                     4 d {V }t          |          2 3 d {V }t          |          }|t          | |           d {V  06 	 d d d           d {V  d S # 1 d {V swxY w Y   d S )NT)decode_responses)r7   )	aioredisfrom_urlr   r=   AsyncClientTimeoutrF   ra   rl   )redis_clientr1   rO   evts       r!   workerru   o   sd     $YFFFL t)<)<=== 3 3 3 3 3 3 3*733 	3 	3 	3 	3 	3 	3 	3#C..C{c2222222222	 433 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3s   B B!+B  
B*-B*z
ingest-svcz0.1.0)titleversionstartupc                 |   K   t                      } |                     t                     | t          j        _        d S r   )r   add_taskru   appstatebg)r}   s    r!   startup_eventr~   {   s0      			BKKCILLLr#   z/healthzdict[str, str]c                    K   ddiS )Nstatusokr)   r)   r#   r!   healthcheckr      s      dr#   )r,   r-   )r1   r2   r,   r3   )rO   rP   r,   rQ   )rb   rc   rd   r   r,   re   )r,   re   )r,   r   )(__doc__
__future__r   r@   r   r:   ostypingr   r   r   r   r=   redis.asyncioro   fastapir   r	   pydanticr
   r   environr\   r   r   r   rk   rB   RuntimeErrorr   r0   rF   r]   ra   rl   ru   r{   on_eventr~   r   r)   r#   r!   <module>r      sM   & # " " " " "    				 2 2 2 2 2 2 2 2 2 2 2 2              , , , , , , , , % % % % % % % % 
// 011JNN;(BCC	
 GN G
,E
F
FFH H H H Hy H H H      ( 	    (X X X X3 3 3 3 gL'222i   
      r#   