"""
SmartStop Sentinel Connect – Module 8
Prometheus Metrics Exporter & Grafana Seed
-----------------------------------------
Lightweight service that scrapes internal Kafka consumer group lag, HTTP service
health, and custom counters published via UDP from other modules. Exposes them
at /metrics for Prometheus. Also includes a Grafana dashboard JSON seed.
"""
from __future__ import annotations
import os
import threading
import time
from datetime import datetime

from confluent_kafka.admin import AdminClient
from fastapi import FastAPI
from prometheus_client import (
    Counter,
    Gauge,
    Histogram,
    generate_latest,
    CONTENT_TYPE_LATEST,
    start_http_server,
)

# ------------------------------------------------------------------
# Prometheus metrics registry
# ------------------------------------------------------------------
SERVICE_START = datetime.utcnow().isoformat()
EVENTS_TOTAL = Counter(
    "sentinel_events_total",
    "Total events processed",
    ["service"],
)
EXPORT_FAILURES = Counter(
    "sentinel_export_failures",
    "Failed report transmissions",
    ["destination"],
)
CONSUMER_LAG = Gauge(
    "sentinel_consumer_group_lag",
    "Kafka consumer lag",
    ["group", "topic", "partition"],
)
SERVICE_UP = Gauge(
    "sentinel_service_up",
    "Heartbeat for each module",
    ["service"],
)

# ------------------------------------------------------------------
# Kafka Group Lag poller
# ------------------------------------------------------------------
BOOT = os.getenv("KAFKA_BOOTSTRAP", "localhost:9092")
GROUPS = os.getenv(
    "KAFKA_GROUPS",
    "normaliser-service,rules-engine,exporter-service,ledger-writer,alert-dispatcher",
).split(",")
POLL_INTERVAL = int(os.getenv("LAG_INTERVAL", 30))

admin = AdminClient({"bootstrap.servers": BOOT})


def poll_lag():
    while True:
        for group in GROUPS:
            try:
                status = admin.list_consumer_groups(timeout=5)
                topic_partitions = admin.list_consumer_group_offsets(group)
                for tp, cp in topic_partitions.items():
                    committed = cp.offset
                    end = admin.get_watermark_offsets(tp)[1]
                    lag = max(end - committed, 0)
                    CONSUMER_LAG.labels(group, tp.topic, tp.partition).set(lag)
            except Exception:
                pass
        time.sleep(POLL_INTERVAL)


t = threading.Thread(target=poll_lag, daemon=True)
t.start()

# ------------------------------------------------------------------
# FastAPI metrics endpoint (optional)
# ------------------------------------------------------------------
app = FastAPI()


@app.get("/metrics")
def metrics():
    SERVICE_UP.labels("metrics_exporter").set(1)
    return generate_latest(), 200, {"Content-Type": CONTENT_TYPE_LATEST}


# ------------------------------------------------------------------
# Grafana dashboard seed (write to file on first start)
# ------------------------------------------------------------------
DASH_PATH = os.getenv("GRAFANA_DASH", "/dashboards/sentinel_kpi.json")
if not os.path.exists(DASH_PATH):
    import json, pathlib

    pathlib.Path(DASH_PATH).parent.mkdir(parents=True, exist_ok=True)
    dashboard = {
        "title": "Sentinel Connect KPIs",
        "panels": [
            {
                "type": "stat",
                "title": "Total Events",
                "targets": [{"expr": "sum(sentinel_events_total)"}],
            },
            {
                "type": "stat",
                "title": "Export Failures",
                "targets": [{"expr": "sum(sentinel_export_failures)"}],
            },
            {
                "type": "table",
                "title": "Consumer Lag",
                "targets": [{"expr": "sentinel_consumer_group_lag"}],
            },
        ],
        "time": {"from": "now-24h", "to": "now"},
    }
    with open(DASH_PATH, "w") as fp:
        json.dump({"dashboard": dashboard, "overwrite": True}, fp, indent=2)

if __name__ == "__main__":
    start_http_server(9102)
    print("Prometheus metrics exporter running on :9102 …")
    while True:
        time.sleep(3600)
