"""
SmartStop Sentinel Connect – Module 4
Compliance Rules Engine Harness
---------------------------------
Consumes canonical events from Kafka topic `events.norm`, evaluates
jurisdictional + institutional rules loaded from YAML, and emits
a decision document to topic `events.decided`.

Decision schema (v1):
{
  event_id: str,
  org_id: str,
  state_report: bool,
  pso_flag: bool,
  rca_required: bool,
  rule_ids: list[str],        # which rules fired
  timestamp: str (ISO 8601)
}
"""
from __future__ import annotations
import json
import os
import signal
import sys
from datetime import datetime
from typing import Any, Dict, List

import yaml
from confluent_kafka import Consumer, Producer, KafkaError

# ------------------------------------------------------------------
# Config
# ------------------------------------------------------------------
KAFKA_BOOTSTRAP = os.getenv("KAFKA_BOOTSTRAP", "localhost:9092")
NORM_TOPIC = os.getenv("NORM_TOPIC", "events.norm")
DECIDED_TOPIC = os.getenv("DECIDED_TOPIC", "events.decided")
GROUP_ID = os.getenv("GROUP_ID", "rules-engine")
RULES_PATH = os.getenv("RULES_PATH", "rules.yaml")

consumer_conf = {
    "bootstrap.servers": KAFKA_BOOTSTRAP,
    "group.id": GROUP_ID,
    "auto.offset.reset": "earliest",
}
producer_conf = {"bootstrap.servers": KAFKA_BOOTSTRAP}

consumer = Consumer(consumer_conf)
producer = Producer(producer_conf)

# ------------------------------------------------------------------
# Rule-loading
# ------------------------------------------------------------------

def load_rules(path: str) -> List[Dict[str, Any]]:
    with open(path, "r") as fp:
        data = yaml.safe_load(fp)
    return data.get("rules", [])

RULES = load_rules(RULES_PATH)

# Simple evaluator: iterate rules until match

def evaluate(event: Dict[str, Any]) -> Dict[str, Any]:
    matched_rules: List[str] = []
    decision = {
        "state_report": False,
        "pso_flag": False,
        "rca_required": False,
    }
    for rule in RULES:
        cond = rule["when"]
        try:
            if eval(cond, {}, {"e": event}):  # dangerous but simple; sandbox in prod
                matched_rules.append(rule["id"])
                for k, v in rule["actions"].items():
                    decision[k] = v or decision[k]
        except Exception as exc:
            print("Rule eval error", exc, cond)
    decision["rule_ids"] = matched_rules
    return decision

# ------------------------------------------------------------------
# Signal handler
# ------------------------------------------------------------------

def shutdown(sig, frame):
    consumer.close()
    producer.flush(5)
    sys.exit(0)

signal.signal(signal.SIGINT, shutdown)

# ------------------------------------------------------------------
# Main loop
# ------------------------------------------------------------------

def main():
    consumer.subscribe([NORM_TOPIC])
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error() and msg.error().code() != KafkaError._PARTITION_EOF:
            print("Kafka error", msg.error())
            continue
        try:
            ev = json.loads(msg.value())
            decision = evaluate(ev)
            out = {
                "event_id": ev["event_id"],
                "org_id": ev["org_id"],
                **decision,
                "timestamp": datetime.utcnow().isoformat(),
            }
            producer.produce(DECIDED_TOPIC, json.dumps(out).encode())
        except Exception as exc:
            print("Rules engine error", exc)
        producer.poll(0)

if __name__ == "__main__":
    main()
