import json
import logging
from datetime import datetime, timezone
import threading
import time
import fnmatch
import re

import paho.mqtt.client as mqtt
import pymysql
import requests


CONFIG_FILE = "config.json"
MQTT_TOPIC_PREFIX = ""


# --------------------------------------------------------------------------------------
# Database setup and rule loading
# --------------------------------------------------------------------------------------

def load_config(path=CONFIG_FILE):
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)


def load_runtime_config(conn):
    cur = conn.cursor(pymysql.cursors.DictCursor)
    cur.execute("SELECT config_key, config_value FROM op_config")
    rows = cur.fetchall()
    return {r["config_key"]: r["config_value"] for r in rows}


def _get_config_value(conn, key):
    cur = conn.cursor(pymysql.cursors.DictCursor)
    cur.execute("SELECT config_value FROM op_config WHERE config_key=%s", (key,))
    row = cur.fetchone()
    return row["config_value"] if row else None


def init_db(cfg):
    conn = pymysql.connect(
        host=cfg["host"],
        user=cfg["user"],
        password=cfg["password"],
        db=cfg["db"],
        charset=cfg.get("charset", "utf8"),
        autocommit=True,
    )
    cur = conn.cursor()

    cur.execute(
        """
        CREATE TABLE IF NOT EXISTS op_config_station_rules (
            station_id VARCHAR(50) PRIMARY KEY,
            rule_set INT NOT NULL
        )
        """
    )

    cur.execute(
        """
        CREATE TABLE IF NOT EXISTS op_config_rules (
            id INT AUTO_INCREMENT PRIMARY KEY,
            rule_set INT NOT NULL,
            configuration_key VARCHAR(255) NOT NULL,
            value VARCHAR(255) NOT NULL,
            `condition` ENUM('equals','larger','smaller','contains') NOT NULL,
            auto_fix BOOLEAN DEFAULT FALSE,
            explanation TEXT
        )
        """
    )

    cur.execute(
        """
        CREATE TABLE IF NOT EXISTS op_config_results (
            id INT AUTO_INCREMENT PRIMARY KEY,
            station_id VARCHAR(50) NOT NULL,
            message TEXT NOT NULL,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
        """
    )

    cur.execute(
        """
        CREATE TABLE IF NOT EXISTS op_energymqtt_check (
            id INT AUTO_INCREMENT PRIMARY KEY,
            topic VARCHAR(255) NOT NULL,
            message TEXT NOT NULL,
            frequenz INT NOT NULL,
            ts TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
            UNIQUE KEY topic_unique (topic)
        )
        """
    )

    cur.execute(
        """
        CREATE TABLE IF NOT EXISTS op_energymqtt_ignore (
            topic VARCHAR(255) PRIMARY KEY
        )
        """
    )

    cur.execute(
        """
        CREATE TABLE IF NOT EXISTS op_energymqtt_disconnected (
            id INT AUTO_INCREMENT PRIMARY KEY,
            topic VARCHAR(255) NOT NULL,
            message TEXT NOT NULL,
            frequenz INT NOT NULL,
            ts TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
            UNIQUE KEY topic_unique (topic)
        )
        """
    )

    cur.execute(
        """
        CREATE TABLE IF NOT EXISTS op_wb_config_keys (
            station_id VARCHAR(50) NOT NULL,
            config_key VARCHAR(255) NOT NULL,
            config_value VARCHAR(255) NOT NULL,
            PRIMARY KEY (station_id, config_key)
        )
        """
    )

    cur.execute(
        """
        CREATE TABLE IF NOT EXISTS op_ocpi_backends (
            backend_id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
            name VARCHAR(255) NOT NULL,
            url VARCHAR(1024) NOT NULL,
            remote_versions_url VARCHAR(1024) DEFAULT NULL,
            peer_versions_url VARCHAR(1024) DEFAULT NULL,
            active_version VARCHAR(16) DEFAULT NULL,
            token TEXT,
            peer_token TEXT,
            credentials_token TEXT,
            modules VARCHAR(255) NOT NULL DEFAULT 'cdrs',
            enabled TINYINT(1) NOT NULL DEFAULT 1,
            last_credentials_status VARCHAR(255) NULL,
            last_credentials_at TIMESTAMP NULL DEFAULT NULL,
            created_at TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
        )
        """
    )

    cur.execute("SHOW COLUMNS FROM op_ocpi_backends LIKE 'remote_versions_url'")
    if not cur.fetchone():
        cur.execute(
            "ALTER TABLE op_ocpi_backends ADD COLUMN remote_versions_url VARCHAR(1024) DEFAULT NULL AFTER url"
        )

    cur.execute("SHOW COLUMNS FROM op_ocpi_backends LIKE 'peer_versions_url'")
    if not cur.fetchone():
        cur.execute(
            "ALTER TABLE op_ocpi_backends ADD COLUMN peer_versions_url VARCHAR(1024) DEFAULT NULL AFTER remote_versions_url"
        )

    cur.execute("SHOW COLUMNS FROM op_ocpi_backends LIKE 'active_version'")
    if not cur.fetchone():
        cur.execute(
            "ALTER TABLE op_ocpi_backends ADD COLUMN active_version VARCHAR(16) DEFAULT NULL AFTER peer_versions_url"
        )

    cur.execute("SHOW COLUMNS FROM op_ocpi_backends LIKE 'peer_token'")
    if not cur.fetchone():
        cur.execute("ALTER TABLE op_ocpi_backends ADD COLUMN peer_token TEXT NULL AFTER token")

    cur.execute("SHOW COLUMNS FROM op_ocpi_backends LIKE 'credentials_token'")
    if not cur.fetchone():
        cur.execute("ALTER TABLE op_ocpi_backends ADD COLUMN credentials_token TEXT NULL AFTER peer_token")

    cur.execute("SHOW COLUMNS FROM op_ocpi_backends LIKE 'last_credentials_status'")
    if not cur.fetchone():
        cur.execute(
            "ALTER TABLE op_ocpi_backends ADD COLUMN last_credentials_status VARCHAR(255) NULL AFTER enabled"
        )

    cur.execute("SHOW COLUMNS FROM op_ocpi_backends LIKE 'last_credentials_at'")
    if not cur.fetchone():
        cur.execute(
            "ALTER TABLE op_ocpi_backends ADD COLUMN last_credentials_at TIMESTAMP NULL DEFAULT NULL AFTER last_credentials_status"
        )

    cur.execute(
        """
        CREATE TABLE IF NOT EXISTS op_ocpi_wallbox_backends (
            station_id VARCHAR(50) NOT NULL,
            backend_id INT NOT NULL,
            enabled TINYINT(1) NOT NULL DEFAULT 1,
            priority INT NOT NULL DEFAULT 100,
            created_at TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
            PRIMARY KEY (station_id, backend_id),
            KEY idx_backend_id (backend_id),
            CONSTRAINT fk_ocpi_wallbox_backend_backend FOREIGN KEY (backend_id) REFERENCES op_ocpi_backends(backend_id) ON DELETE CASCADE
        )
        """
    )

    cur.execute(
        """
        CREATE TABLE IF NOT EXISTS op_broker_api_logging (
            id INT AUTO_INCREMENT PRIMARY KEY,
            created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
            chargepoint_id VARCHAR(255),
            partner_id VARCHAR(255),
            module VARCHAR(255),
            endpoint VARCHAR(255) NOT NULL,
            request_payload LONGTEXT,
            response_code INT,
            response_body LONGTEXT,
            KEY idx_created_at (created_at),
            KEY idx_endpoint (endpoint),
            KEY idx_partner (partner_id),
            KEY idx_module (module),
            KEY idx_response_code (response_code)
        ) CHARACTER SET utf8mb4
        """
    )
    cur.execute(
        """
        SELECT COUNT(*)
        FROM information_schema.COLUMNS
        WHERE table_schema = DATABASE()
          AND table_name = 'op_broker_api_logging'
          AND column_name = 'response_body'
        """
    )
    response_body_column_exists = cur.fetchone()[0] > 0
    if not response_body_column_exists:
        cur.execute(
            """
            ALTER TABLE op_broker_api_logging
                ADD COLUMN response_body LONGTEXT
            """
        )
    cur.execute(
        """
        SELECT COUNT(*)
        FROM information_schema.COLUMNS
        WHERE table_schema = DATABASE()
          AND table_name = 'op_broker_api_logging'
          AND column_name = 'partner_id'
        """
    )
    partner_column_exists = cur.fetchone()[0] > 0
    if not partner_column_exists:
        cur.execute(
            """
            ALTER TABLE op_broker_api_logging
                ADD COLUMN partner_id VARCHAR(255) AFTER chargepoint_id
            """
        )
    cur.execute(
        """
        SELECT COUNT(*)
        FROM information_schema.COLUMNS
        WHERE table_schema = DATABASE()
          AND table_name = 'op_broker_api_logging'
          AND column_name = 'module'
        """
    )
    module_column_exists = cur.fetchone()[0] > 0
    if not module_column_exists:
        cur.execute(
            """
            ALTER TABLE op_broker_api_logging
                ADD COLUMN module VARCHAR(255) AFTER partner_id
            """
        )

    cur.execute(
        """
        SHOW INDEX FROM op_broker_api_logging WHERE Key_name='idx_partner'
        """
    )
    if not cur.fetchone():
        cur.execute(
            """
            ALTER TABLE op_broker_api_logging
                ADD KEY idx_partner (partner_id)
            """
        )
    cur.execute(
        """
        SHOW INDEX FROM op_broker_api_logging WHERE Key_name='idx_module'
        """
    )
    if not cur.fetchone():
        cur.execute(
            """
            ALTER TABLE op_broker_api_logging
                ADD KEY idx_module (module)
            """
        )
    cur.execute(
        """
        SHOW INDEX FROM op_broker_api_logging WHERE Key_name='idx_response_code'
        """
    )
    if not cur.fetchone():
        cur.execute(
            """
            ALTER TABLE op_broker_api_logging
                ADD KEY idx_response_code (response_code)
            """
        )

    return conn


def ensure_default_ocpi_backend(conn):
    cur = conn.cursor(pymysql.cursors.DictCursor)
    cur.execute("SELECT backend_id FROM op_ocpi_backends LIMIT 1")
    row = cur.fetchone()
    if row:
        return row["backend_id"]

    ocpi_url = _get_config_value(conn, "ocpi_backend_url")
    ocpi_token = _get_config_value(conn, "ocpi_backend_token")
    ocpi_modules = _get_config_value(conn, "ocpi_backend_modules") or "cdrs"
    ocpi_enabled = 1 if (_get_config_value(conn, "ocpi_backend_enabled") or "0") == "1" else 0

    if not ocpi_url:
        cfg = load_config(CONFIG_FILE)
        ocpi_url = (
            cfg.get("ocpi", {}).get("cdrs_endpoint")
            if isinstance(cfg.get("ocpi"), dict)
            else None
        )
    if not ocpi_url:
        return None

    cur.execute(
        """
        INSERT INTO op_ocpi_backends (name, url, remote_versions_url, token, modules, enabled)
        VALUES (%s, %s, %s, %s, %s, %s)
        """,
        (
            "Default OCPI Backend",
            ocpi_url,
            ocpi_url,
            ocpi_token or "",
            ocpi_modules,
            ocpi_enabled,
        ),
    )
    conn.commit()
    return cur.lastrowid


def migrate_ocpi_wallboxes(conn):
    cur = conn.cursor(pymysql.cursors.DictCursor)
    cur.execute("SHOW TABLES LIKE 'op_ocpi_wallboxes'")
    if cur.fetchone() is None:
        return

    backend_id = ensure_default_ocpi_backend(conn)

    cur.execute("SELECT station_id, enabled FROM op_ocpi_wallboxes")
    rows = cur.fetchall()
    if backend_id is not None:
        for row in rows:
            cur.execute(
                """
                INSERT INTO op_ocpi_wallbox_backends (station_id, backend_id, enabled, priority)
                VALUES (%s, %s, %s, %s)
                ON DUPLICATE KEY UPDATE enabled = VALUES(enabled), priority = VALUES(priority)
                """,
                (row["station_id"], backend_id, row["enabled"], 100),
            )
        conn.commit()


STATION_RULES = {}
RULES = {}
ENERGY_LAST_TS = {}

ENERGY_IGNORE = []
OCPI_BACKENDS: dict[int, dict] = {}
OCPI_WALLBOX_BACKENDS: dict[str, list[dict]] = {}
OCPI_WALLBOXES = set()

# Runtime API configuration
API_ENABLED = False
API_BASE_URL = ""
API_TOKEN = ""
API_KEY = ""
API_LOG_DB_CONFIG = None


def _serialize_api_payload(payload):
    if payload is None:
        return ""
    try:
        return json.dumps(payload, ensure_ascii=False)
    except TypeError:
        try:
            return json.dumps(payload, ensure_ascii=False, default=str)
        except Exception:
            return str(payload)


def log_external_api_request_sync(
    endpoint,
    payload,
    response_code,
    station_id=None,
    response_body=None,
    *,
    partner_id=None,
    module=None,
):
    global API_LOG_DB_CONFIG
    if not API_LOG_DB_CONFIG:
        return

    serialized_payload = _serialize_api_payload(payload)
    serialized_response = _serialize_api_payload(response_body)
    resolved_module = module or (endpoint.split("-", 1)[0] if endpoint else None)
    resolved_partner = partner_id
    if resolved_partner is None and isinstance(payload, dict):
        resolved_partner = payload.get("partner_id") or payload.get("partnerId")
    conn = None
    try:
        conn = pymysql.connect(
            host=API_LOG_DB_CONFIG["host"],
            user=API_LOG_DB_CONFIG["user"],
            password=API_LOG_DB_CONFIG["password"],
            db=API_LOG_DB_CONFIG["db"],
            charset=API_LOG_DB_CONFIG.get("charset", "utf8"),
            autocommit=True,
        )
        with conn.cursor() as cur:
            cur.execute(
                """
                INSERT INTO op_broker_api_logging
                    (chargepoint_id, partner_id, module, endpoint, request_payload, response_code, response_body)
                VALUES (%s, %s, %s, %s, %s, %s, %s)
                """,
                (
                    station_id,
                    resolved_partner,
                    resolved_module,
                    endpoint,
                    serialized_payload,
                    response_code,
                    serialized_response,
                ),
            )
    except Exception:
        logging.debug(
            "Failed to log external API request (sync) for endpoint %s",
            endpoint,
            exc_info=True,
        )
    finally:
        if conn is not None:
            try:
                conn.close()
            except Exception:
                pass

# Track pending and active transactions for stop-transaction generation
PENDING_STARTS = {}
ACTIVE_TRANSACTIONS = {}

# Track requests for full configuration and status triggers
PENDING_FULL_CONFIG = set()
STATUS_TRIGGER = set()

def load_rules(conn):
    global STATION_RULES, RULES
    cur = conn.cursor(pymysql.cursors.DictCursor)
    STATION_RULES.clear()
    RULES.clear()
    cur.execute("SELECT station_id, rule_set FROM op_config_station_rules")
    for row in cur.fetchall():
        STATION_RULES[row["station_id"]] = row["rule_set"]

    cur.execute(
        "SELECT rule_set, configuration_key, value, `condition`, auto_fix, explanation "
        "FROM op_config_rules"
    )
    for row in cur.fetchall():
        RULES.setdefault(row["rule_set"], []).append(row)


def load_energy_ignore(conn):
    """Load topics to ignore from ``op_energymqtt_ignore`` table."""
    global ENERGY_IGNORE
    cur = conn.cursor()
    cur.execute("SELECT topic FROM op_energymqtt_ignore")
    ENERGY_IGNORE = [row[0] for row in cur.fetchall()]

    # Purge any matching entries from the check table
    for pattern in ENERGY_IGNORE:
        cur.execute(
            "DELETE FROM op_energymqtt_check WHERE topic LIKE %s",
            (pattern.replace("*", "%").replace("?", "_"),),
        )
    conn.commit()


def load_ocpi_backends(conn):
    global OCPI_BACKENDS, OCPI_WALLBOX_BACKENDS, OCPI_WALLBOXES
    cur = conn.cursor(pymysql.cursors.DictCursor)
    cur.execute(
        "SELECT backend_id, name, url, token, modules, enabled FROM op_ocpi_backends"
    )
    OCPI_BACKENDS = {row["backend_id"]: row for row in cur.fetchall()}

    cur.execute(
        """
        SELECT wb.station_id, wb.backend_id, wb.enabled, wb.priority, b.name
        FROM op_ocpi_wallbox_backends AS wb
        JOIN op_ocpi_backends AS b ON wb.backend_id = b.backend_id
        ORDER BY wb.station_id, wb.priority, wb.backend_id
        """
    )

    assignments: dict[str, list[dict]] = {}
    for row in cur.fetchall():
        assignments.setdefault(row["station_id"], []).append(row)
    OCPI_WALLBOX_BACKENDS = assignments
    OCPI_WALLBOXES = {station for station, entries in assignments.items() if any(e.get("enabled") for e in entries)}


def api_post(path, payload):
    """Send JSON payload to configured API if enabled."""
    if not API_ENABLED or not API_BASE_URL:
        return
    url = API_BASE_URL.rstrip("/") + path
    logging.info(
        "External API POST %s payload=%s",
        url,
        _serialize_api_payload(payload),
    )
    response_status = None
    response_body = None
    try:
        headers = {}
        if API_TOKEN:
            headers["Authorization"] = f"Bearer {API_TOKEN}"
        elif API_KEY:
            headers["X-API-KEY"] = API_KEY
        response = requests.post(
            url, json=payload, timeout=3, headers=headers or None
        )
        response_status = response.status_code
        try:
            response_body = response.json()
        except ValueError:
            response_body = response.text
        logging.info(
            "External API POST %s responded with status=%s",
            url,
            response_status,
        )
        if response_status >= 400:
            logging.warning(
                "API call to %s failed with status %s", path, response_status
            )
    except Exception as e:
        logging.error("API call to %s failed: %s", path, e)
        response_body = str(e)
    finally:
        endpoint = path.strip("/").split("/")[-1] if path else ""
        module = path.strip("/").split("/", 1)[0] if path else None
        station_id = None
        partner_id = None
        if isinstance(payload, dict):
            station_id = (
                payload.get("stationId")
                or payload.get("station_id")
                or payload.get("chargepoint_id")
            )
            partner_id = payload.get("partner_id") or payload.get("partnerId")
        log_external_api_request_sync(
            endpoint,
            payload,
            response_status,
            station_id,
            response_body=response_body,
            partner_id=partner_id,
            module=module,
        )


def post_update_transaction(info, tx_id):
    payload = {
        "stationId": info.get("stationId"),
        "connectorId": info.get("connectorId"),
        "idToken": info.get("idToken"),
        "sessionStartTimestamp": info.get("sessionStartTimestamp"),
        "transactionId": str(tx_id),
        "timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
    }
    api_post("/charging-station/update-transaction", payload)


def post_stop_transaction(info, stop):
    payload = {
        "stationId": info.get("stationId"),
        "connectorId": info.get("connectorId") or stop.get("connectorId"),
        "idToken": info.get("idToken") or stop.get("idTag"),
        "transactionId": str(stop.get("transactionId")) if stop.get("transactionId") is not None else None,
        "sessionStartTimestamp": info.get("sessionStartTimestamp"),
        "sessionEndTimestamp": stop.get("timestamp"),
        "meterStartWh": info.get("meterStartWh"),
        "meterStopWh": stop.get("meterStop"),
        "timestamp": stop.get("timestamp")
        or datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
        "stopReason": stop.get("reason"),
    }

    sanitized_payload = {k: v for k, v in payload.items() if v is not None}
    logging.info(
        "StopTransaction API enabled=%s url=%s payload=%s",
        API_ENABLED,
        API_BASE_URL,
        sanitized_payload,
    )
    api_post("/charging-station/stop-transaction", sanitized_payload)


def post_configuration_update(station_id, configs):
    mapping = {
        "ChargePointVendor": "manufacturer",
        "ChargePointModel": "model",
        "ChargePointSerialNumber": "serialNumber",
        "FirmwareVersion": "firmwareVersion",
        "NumberOfConnectors": "numberOfConnectors",
        "SupportedFeatureProfiles": "supportedFeatureProfiles",
    }
    payload = {
        "stationId": station_id,
        "timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
    }
    for key, field in mapping.items():
        val = configs.get(key)
        if val is not None:
            payload[field] = val
    api_post("/charging-station/configuration-update", payload)


def post_status_notification(station_id, body):
    payload = {
        "stationId": station_id,
        "connectorId": body.get("connectorId"),
        "status": body.get("status"),
        "timestamp": body.get(
            "timestamp",
            datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
        ),
    }
    if body.get("errorCode"):
        payload["errorCode"] = body.get("errorCode")
    api_post("/charging-station/status", payload)


def ensure_station_rules_entry(conn, station_id, rule_set_name="default"):
    """Ensure a station has an entry in ``op_config_station_rules``.

    If ``station_id`` is missing, insert it with the rule set named
    ``rule_set_name``.
    """
    if station_id in STATION_RULES:
        return

    cur = conn.cursor()
    cur.execute(
        "SELECT rule_set FROM op_config_station_rules WHERE station_id=%s",
        (station_id,),
    )
    row = cur.fetchone()
    if row:
        STATION_RULES[station_id] = row[0]
        return

    cur.execute(
        "SELECT id FROM op_verfication_lists WHERE name=%s",
        (rule_set_name,),
    )
    rs_row = cur.fetchone()
    if not rs_row:
        logging.warning("Default rule set '%s' not found", rule_set_name)
        return

    rule_set_id = rs_row[0]
    cur.execute(
        "INSERT INTO op_config_station_rules (station_id, rule_set) VALUES (%s, %s)",
        (station_id, rule_set_id),
    )
    conn.commit()
    STATION_RULES[station_id] = rule_set_id


# --------------------------------------------------------------------------------------
# MQTT handling and message processing
# --------------------------------------------------------------------------------------


def check_rules(conn, station_id, configs):
    rule_set = STATION_RULES.get(station_id)
    if not rule_set:
        return

    rules = RULES.get(rule_set, [])
    for rule in rules:
        key = rule["configuration_key"]
        expected = rule["value"]
        condition = rule["condition"]
        actual = configs.get(key)
        if actual is None:
            continue

        ok = True
        try:
            if condition == "equals":
                ok = str(actual) == expected
            elif condition == "contains":
                ok = expected in str(actual)
            elif condition == "larger":
                ok = float(actual) > float(expected)
            elif condition == "smaller":
                ok = float(actual) < float(expected)
        except Exception:
            ok = False

        if not ok:
            msg = (
                f"{key} expected {condition} {expected} but got {actual}"
            )
            cur = conn.cursor()
            cur.execute(
                "INSERT INTO op_config_results (station_id, message) VALUES (%s, %s)",
                (station_id, msg),
            )
            conn.commit()


def store_config_keys(conn, station_id, configs):
    """Persist configuration keys for a wallbox."""
    if not configs:
        return
    cur = conn.cursor()
    for key, value in configs.items():
        cur.execute(
            """
            INSERT INTO op_wb_config_keys (station_id, config_key, config_value)
            VALUES (%s, %s, %s) AS new(station_id, config_key, config_value)
            ON DUPLICATE KEY UPDATE config_value = new.config_value
            """,
            (station_id, key, str(value)),
        )
    conn.commit()


def on_connect(client, userdata, flags, *args):
    """MQTT connect callback supporting both callback API versions."""
    rc = args[0] if args else None
    properties = args[1] if len(args) > 1 else None
    logging.info("Connected to MQTT with result code %s", rc)
    topic_base = MQTT_TOPIC_PREFIX.rstrip('/') + '/' if MQTT_TOPIC_PREFIX else ''
    client.subscribe(f"{topic_base}wallbox/+/client_to_server")
    client.subscribe(f"{topic_base}wallbox/+/server_to_client")
    client.subscribe(f"{topic_base}wallbox/+/Connected")


def _parse_payload(raw: bytes):
    """Return JSON object from MQTT payload.

    Some wallboxes send Python-style literals which are not valid JSON.
    In that case fall back to ``ast.literal_eval``.
    """
    text = raw.decode()
    try:
        return json.loads(text)
    except Exception:
        try:
            import ast

            return ast.literal_eval(text)
        except Exception:
            logging.debug("Unable to parse payload: %s", text)
            return None


def on_message(client, userdata, msg):
    topic = msg.topic
    if MQTT_TOPIC_PREFIX:
        prefix = MQTT_TOPIC_PREFIX.rstrip('/') + '/'
        if not topic.startswith(prefix):
            return
        topic = topic[len(prefix):]

    parts = topic.split("/")
    if len(parts) < 3:
        return
    station_id = parts[1]
    direction = parts[2]

    if direction == "Connected":
        ensure_station_rules_entry(userdata["db"], station_id)
        return

    payload = _parse_payload(msg.payload)
    if payload is None or not (isinstance(payload, list) and len(payload) > 2):
        return

    m_type = payload[0]
    if isinstance(m_type, str) and m_type.isdigit():
        m_type = int(m_type)

    # Detect BootNotification calls and create a default rule entry for new stations
    if (
        m_type == 2
        and len(payload) > 3
        and isinstance(payload[2], str)
        and payload[2] == "BootNotification"
    ):
        ensure_station_rules_entry(userdata["db"], station_id)

    # Track GetConfiguration requests without keys to send configuration update later
    if (
        direction == "server_to_client"
        and m_type == 2
        and len(payload) > 3
        and payload[2] == "GetConfiguration"
    ):
        params = payload[3] if isinstance(payload[3], dict) else {}
        if not params.get("key"):
            PENDING_FULL_CONFIG.add(station_id)

    # Handle StartTransaction request from charge point
    if (
        direction == "client_to_server"
        and m_type == 2
        and payload[2] == "StartTransaction"
    ):
        body = payload[3] if len(payload) > 3 and isinstance(payload[3], dict) else {}
        info = {
            "stationId": station_id,
            "connectorId": body.get("connectorId"),
            "idToken": body.get("idTag"),
            "sessionStartTimestamp": body.get("timestamp"),
            "meterStartWh": body.get("meterStart"),
        }
        PENDING_STARTS[payload[1]] = info

    # Handle StartTransaction confirmation with transactionId
    if (
        direction == "server_to_client"
        and m_type == 3
        and isinstance(payload[2], dict)
        and "transactionId" in payload[2]
    ):
        start_info = PENDING_STARTS.pop(payload[1], None)
        if start_info:
            tx_id = payload[2].get("transactionId")
            post_update_transaction(start_info, tx_id)
            start_info["transactionId"] = tx_id
            ACTIVE_TRANSACTIONS[str(tx_id)] = start_info

    # Handle StopTransaction to send stop-transaction data
    if (
        direction == "client_to_server"
        and m_type == 2
        and payload[2] == "StopTransaction"
    ):
        body = payload[3] if len(payload) > 3 and isinstance(payload[3], dict) else {}
        tx_id = str(body.get("transactionId"))
        start_info = ACTIVE_TRANSACTIONS.pop(tx_id, {}) or {}
        start_info.setdefault("stationId", station_id)
        post_stop_transaction(start_info, body)

    # Track TriggerMessage for StatusNotification
    if (
        direction == "server_to_client"
        and m_type == 2
        and payload[2] == "TriggerMessage"
    ):
        body = payload[3] if len(payload) > 3 and isinstance(payload[3], dict) else {}
        if body.get("requestedMessage") == "StatusNotification":
            STATUS_TRIGGER.add(station_id)

    # Handle StatusNotification only if triggered
    if (
        direction == "client_to_server"
        and m_type == 2
        and payload[2] == "StatusNotification"
    ):
        if station_id in STATUS_TRIGGER:
            STATUS_TRIGGER.remove(station_id)
            body = payload[3] if len(payload) > 3 and isinstance(payload[3], dict) else {}
            post_status_notification(station_id, body)

    # Handle configuration responses
    if (
        m_type == 3
        and isinstance(payload[2], dict)
        and "configurationKey" in payload[2]
    ):
        items = payload[2].get("configurationKey", [])
        configs = {i.get("key"): i.get("value") for i in items if isinstance(i, dict)}
        store_config_keys(userdata["db"], station_id, configs)
        check_rules(userdata["db"], station_id, configs)
        if station_id in PENDING_FULL_CONFIG:
            PENDING_FULL_CONFIG.remove(station_id)
            post_configuration_update(station_id, configs)


def energy_on_connect(client, userdata, flags, *args):
    """Energy MQTT connect callback supporting both callback API versions."""
    rc = args[0] if args else None
    properties = args[1] if len(args) > 1 else None
    logging.info("Connected to energy MQTT with result code %s", rc)
    client.subscribe("#")


def _purge_energy_topic(cur, topic):
    """Remove existing energy log entries for a topic."""
    cur.execute(
        "DELETE FROM op_energymqtt_check WHERE topic = %s",
        (topic,),
    )


def energy_on_message(client, userdata, msg):

    for pattern in ENERGY_IGNORE:
        if fnmatch.fnmatch(msg.topic, pattern):
            return


    now = datetime.now()
    last = ENERGY_LAST_TS.get(msg.topic)
    freq = int((now - last).total_seconds()) if last else 0
    ENERGY_LAST_TS[msg.topic] = now
    payload = msg.payload.decode("utf-8", errors="replace")
    cur = userdata["db"].cursor()

    # Ensure only the newest entry per topic remains
    _purge_energy_topic(cur, msg.topic)

    cur.execute(
        """
        INSERT INTO op_energymqtt_check (topic, message, frequenz, ts)
        VALUES (%s, %s, %s, %s) AS new(topic, message, frequenz, ts)
        ON DUPLICATE KEY UPDATE
            message = new.message,
            frequenz = new.frequenz,
            ts = new.ts
        """,
        (msg.topic, payload, freq, now),
    )

    # Remove from disconnected list when a message arrives again
    cur.execute(
        "DELETE FROM op_energymqtt_disconnected WHERE topic = %s",
        (msg.topic,),
    )



def _disconnect_monitor(mysql_cfg):
    """Background thread checking for outdated MQTT topics."""

    def loop():
        conn = init_db(mysql_cfg)
        while True:
            try:
                cur = conn.cursor(pymysql.cursors.DictCursor)
                cur.execute(
                    "SELECT topic, message, frequenz, ts FROM op_energymqtt_check"
                )
                rows = cur.fetchall()
                now = datetime.now()
                for row in rows:
                    if (now - row["ts"]).total_seconds() > 6 * 3600:
                        cur.execute(
                            """
                            INSERT INTO op_energymqtt_disconnected (topic, message, frequenz, ts)
                            VALUES (%s, %s, %s, %s) AS new(topic, message, frequenz, ts)
                            ON DUPLICATE KEY UPDATE
                                message = new.message,
                                frequenz = new.frequenz,
                                ts = new.ts
                            """,
                            (
                                row["topic"],
                                row["message"],
                                row["frequenz"],
                                row["ts"],
                            ),
                        )
                conn.commit()
            except Exception:
                logging.exception("Error while checking disconnected topics")
                try:
                    conn.close()
                except Exception:
                    pass
                conn = init_db(mysql_cfg)
            time.sleep(3600)

    threading.Thread(target=loop, daemon=True).start()






def main():
    logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

    cfg = load_config()
    db_conn = init_db(cfg["mysql"])
    load_rules(db_conn)
    load_energy_ignore(db_conn)
    migrate_ocpi_wallboxes(db_conn)
    ensure_default_ocpi_backend(db_conn)
    load_ocpi_backends(db_conn)

    runtime_cfg = load_runtime_config(db_conn)
    global MQTT_TOPIC_PREFIX, API_ENABLED, API_BASE_URL, API_TOKEN, API_KEY, API_LOG_DB_CONFIG
    MQTT_TOPIC_PREFIX = runtime_cfg.get("mqtt_topic_prefix", "")
    API_ENABLED = runtime_cfg.get("authorizeTransaction_enabled") == "1"
    API_BASE_URL = runtime_cfg.get("authorizeTransaction_url", "")
    API_TOKEN = runtime_cfg.get("authorizeTransaction_token", "")
    API_KEY = runtime_cfg.get("authorizeTransaction_api_key", "")
    API_LOG_DB_CONFIG = dict(cfg["mysql"])

    # Start background monitor for outdated topics
    _disconnect_monitor(cfg["mysql"])

    client = mqtt.Client(
        userdata={"db": db_conn},
        callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
    )
    if runtime_cfg.get("mqtt_user"):
        client.username_pw_set(runtime_cfg.get("mqtt_user"), runtime_cfg.get("mqtt_password"))

    client.on_connect = on_connect
    client.on_message = on_message

    broker = runtime_cfg.get("mqtt_broker", "127.0.0.1")
    port = int(runtime_cfg.get("mqtt_port", 1883))
    logging.info("Connecting to MQTT broker %s:%s", broker, port)
    client.connect(broker, port)

    # Start secondary energy MQTT logger if configured
    energy_broker = runtime_cfg.get("energy_mqtt_broker")
    if energy_broker:
        energy_client = mqtt.Client(
            userdata={"db": init_db(cfg["mysql"])},
            callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
        )
        if runtime_cfg.get("energy_mqtt_user"):
            energy_client.username_pw_set(
                runtime_cfg.get("energy_mqtt_user"),
                runtime_cfg.get("energy_mqtt_password"),
            )
        energy_client.on_connect = energy_on_connect
        energy_client.on_message = energy_on_message
        energy_port = int(runtime_cfg.get("energy_mqtt_port", 1883))
        logging.info("Connecting to energy MQTT broker %s:%s", energy_broker, energy_port)
        energy_client.connect(energy_broker, energy_port)
        energy_client.loop_start()

    client.loop_forever()


if __name__ == "__main__":
    main()
