from __future__ import annotations

import asyncio
import json
import os
import time
from collections import defaultdict
from pathlib import Path
from typing import Any, Dict, List

import httpx
import jmespath
import redis.asyncio as aioredis
import yaml

REDIS_URL = os.environ.get("REDIS_URL", "redis://default:eK2XVEAWB7G7oWAsTTJPYDOqX6Y8lPa2@redis-14639.c241.us-east-1-4.ec2.cloud.redislabs.com:14639")
SRC_STREAM = "ctx_events"
FEEDBACK_STREAM = "feedback"
GROUP = "rule_engine"
ME = f"re_{int(time.time())}"

TERAMIND_HOST = os.environ.get("TERAMIND_HOST")
TERAMIND_TOKEN = os.environ.get("TERAMIND_TOKEN")
SLACK_WEBHOOK = os.environ.get("SLACK_WEBHOOK_URL")
RULES_PATH = Path(os.environ.get("RULES_PATH", "rules.yaml"))

# Aggregator for rolling counters: {(user_id, rule_id): seconds_matched}
COUNTERS: Dict[tuple[str, str], int] = defaultdict(int)
WINDOW_SECS = 60 * 60  # 1-hour sliding window for counters

class Rule:
    def __init__(self, cfg: dict[str, Any]):
        self.id = cfg["id"]
        self.desc = cfg.get("description", "")
        self.match_expr = jmespath.compile(cfg["match"])
        self.threshold = cfg.get("threshold_seconds", 0)
        self.action = cfg["action"]

    def matches(self, event: dict[str, Any], context: dict[str, Any] = None) -> bool:
        data = {"event": event, "context": context or {}}
        return bool(self.match_expr.search(data))


async def load_rules() -> List[Rule]:
    if not RULES_PATH.exists():
        raise FileNotFoundError(f"Rules file not found: {RULES_PATH}")
    with open(RULES_PATH, "r", encoding="utf-8") as f:
        doc = yaml.safe_load(f)
    return [Rule(r) for r in doc]


async def send_teramind_popup(user_id: str, message: str):
    if not TERAMIND_HOST or not TERAMIND_TOKEN:
        return
    url = f"https://{TERAMIND_HOST}/api/v3/user_alert"
    json_payload = {
        "userId": user_id,
        "message": message,
        "severity": 1,
    }
    headers = {"x-access-token": TERAMIND_TOKEN}
    async with httpx.AsyncClient() as client:
        try:
            await client.post(url, json=json_payload, headers=headers, timeout=5)
        except httpx.HTTPError:
            pass


async def send_slack(message: str):
    if not SLACK_WEBHOOK:
        return
    async with httpx.AsyncClient() as client:
        try:
            await client.post(SLACK_WEBHOOK, json={"text": message}, timeout=5)
        except httpx.HTTPError:
            pass


async def dispatch_action(rule: Rule, event: dict[str, Any]):
    user_id = event["user_id"]
    msg = rule.action["message"]

    if rule.action["type"] == "warn":
        await send_teramind_popup(user_id, msg)
        await send_slack(f"[WARN] {user_id}: {msg}")
    elif rule.action["type"] == "critical":
        await send_teramind_popup(user_id, msg + " (Critical)")
        await send_slack(f"[CRIT] {user_id}: {msg}")


async def main():
    print(f"Connecting to Redis at {REDIS_URL}...")
    redis = aioredis.from_url(REDIS_URL, decode_responses=True)

    try:
        await redis.ping()
        print("✅ Connected to Redis")
    except ConnectionError as e:
        print(f"❌ Could not connect to Redis at {REDIS_URL}: {e}")
        return

    rules = await load_rules()
    print(f"Loaded {len(rules)} rules")

    try:
        await redis.xgroup_create(SRC_STREAM, GROUP, mkstream=True)
        print(f"Consumer group '{GROUP}' ready on stream '{SRC_STREAM}'")
    except aioredis.ResponseError:
        print(f"Consumer group '{GROUP}' already exists on stream '{SRC_STREAM}'")

    print("Worker is now listening for events...")
    while True:
        resp = await redis.xreadgroup(GROUP, ME, {SRC_STREAM: ">"}, count=10, block=5000)
        if not resp:
            continue
        for _, msgs in resp:
            for msg_id, data in msgs:
                print(f"Processing message {msg_id}")
                event = json.loads(data["data"])
                user_key = event["user_id"]
                for rule in rules:
                    if rule.matches(event):
                        COUNTERS[(user_key, rule.id)] += (
                            event["payload"].get("elapsed", 1)
                            if event["type"] == "SCREENSHOT"
                            else 1
                        )
                        if COUNTERS[(user_key, rule.id)] >= rule.threshold:
                            await dispatch_action(rule, event)
                            COUNTERS[(user_key, rule.id)] = 0
                await redis.xack(SRC_STREAM, GROUP, msg_id)

if __name__ == "__main__":
    asyncio.run(main())
