"""
SmartStop Sentinel Connect – Module 3
Event Bus Normaliser Service
---------------------------------
Listens to Kafka topic `events.raw` for heterogeneous JSON payloads from connectors.
Maps each payload into the canonical schema and publishes onto `events.norm`.
Canonical schema (v1):
{
  event_id: str,
  org_id: str,
  source: str,               # 'epic_fhir', 'pump.icu_med', etc.
  event_type: str,           # MedicationSafety, ProcedureSafety, Falls, etc.
  severity: str | int,       # A–I or numeric
  timestamp: str (ISO 8601),
  payload: dict              # vendor-specific full blob (optionally redacted)
}
"""
from __future__ import annotations
import json
import os
import signal
import sys
import uuid
from datetime import datetime
from typing import Any, Dict

from confluent_kafka import Consumer, Producer, KafkaError

# Kafka configuration
KAFKA_BOOTSTRAP = os.getenv("KAFKA_BOOTSTRAP", "localhost:9092")
RAW_TOPIC = os.getenv("RAW_TOPIC", "events.raw")
NORM_TOPIC = os.getenv("NORM_TOPIC", "events.norm")
GROUP_ID = os.getenv("GROUP_ID", "normaliser-service")

consumer_conf = {
    "bootstrap.servers": KAFKA_BOOTSTRAP,
    "group.id": GROUP_ID,
    "auto.offset.reset": "earliest",
}
producer_conf = {
    "bootstrap.servers": KAFKA_BOOTSTRAP,
}

consumer = Consumer(consumer_conf)
producer = Producer(producer_conf)

def canonicalise(event: Dict[str, Any]) -> Dict[str, Any]:
    """Return canonical dict or raise ValueError if mapping fails."""
    src = event.get("source", "unknown")

    # Map based on source namespace
    if src.startswith("epic_fhir"):
        # Expect raw line to be FHIR Observation NDJSON
        raw = json.loads(event["raw"])
        return {
            "event_id": str(uuid.uuid4()),
            "org_id": event.get("org_id", ""),
            "source": src,
            "event_type": "Observation",
            "severity": 0,
            "timestamp": raw.get("effectiveDateTime", datetime.utcnow().isoformat()),
            "payload": raw,
        }

    if src.startswith("pump.icu_med"):
        return {
            "event_id": str(uuid.uuid4()),
            "org_id": event.get("org_id", ""),
            "source": src,
            "event_type": "MedicationSafety",
            "severity": event.get("severity", 0),
            "timestamp": event.get("timestamp", datetime.utcnow().isoformat()),
            "payload": event,
        }

    # Default fall-through
    raise ValueError(f"Unsupported source: {src}")

def shutdown(sig, frame):
    print("Shutting down normaliser…")
    consumer.close()
    producer.flush(5)
    sys.exit(0)

signal.signal(signal.SIGINT, shutdown)

def main():
    consumer.subscribe([RAW_TOPIC])
    print("[normaliser] listening …")
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() != KafkaError._PARTITION_EOF:
                print("Kafka error", msg.error())
            continue
        try:
            raw_event = json.loads(msg.value())
            norm_event = canonicalise(raw_event)
            producer.produce(NORM_TOPIC, json.dumps(norm_event).encode())
        except Exception as exc:
            print("Mapping error", exc)
        producer.poll(0)

if __name__ == "__main__":
    main()
