#!/usr/bin/env python3
"""Collect charging sessions from monthly OCPP broker messages.

Usage::
    python services/getSessionsOfMonthByBroker.py [YYMM]

If *YYMM* is omitted, the script processes messages for the current month.

Reads StartTransaction/StopTransaction messages from op_messages_YYMM and
stores aggregated sessions in op_charging_sessions.
"""

import json
import sys
from datetime import datetime, timezone
import logging
from decimal import Decimal, InvalidOperation, ROUND_HALF_UP

import pymysql

CONFIG_FILE = "config.json"

logger = logging.getLogger(__name__)


# ---------------------------------------------------------------------------
# helpers
# ---------------------------------------------------------------------------

def load_config(path: str = CONFIG_FILE) -> dict:
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)


def init_db(cfg: dict):
    return pymysql.connect(
        host=cfg["host"],
        user=cfg["user"],
        password=cfg["password"],
        db=cfg["db"],
        charset=cfg.get("charset", "utf8"),
        autocommit=True,
    )


def parse_time(ts: str | None):
    if not ts:
        return None
    try:
        # replace trailing Z with +00:00 so fromisoformat can parse it
        dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
        return dt.astimezone(timezone.utc)
    except ValueError:
        return None


def create_session_table(cur) -> None:
    cur.execute(
        """
        CREATE TABLE IF NOT EXISTS op_charging_sessions (
            id INT AUTO_INCREMENT PRIMARY KEY,
            station_id VARCHAR(50) NOT NULL,
            connector_id INT NOT NULL,
            transaction_id INT NOT NULL,
            id_tag VARCHAR(100) NOT NULL,
            vehicle_id INT NOT NULL DEFAULT 0,
            session_start TIMESTAMP NULL,
            session_end TIMESTAMP NULL,
            meter_start INT,
            meter_stop INT,
            energyChargedWh INT,
            ocmf_energy_wh INT NOT NULL DEFAULT 0,
            reason VARCHAR(50),
            UNIQUE KEY uniq_station_txn (station_id, transaction_id)
        )
        """
    )

    # ensure column exists for installations created with older versions
    cur.execute(
        "SHOW COLUMNS FROM op_charging_sessions LIKE 'energyChargedWh'"
    )
    if not cur.fetchone():
        cur.execute(
            "ALTER TABLE op_charging_sessions ADD COLUMN energyChargedWh INT AFTER meter_stop"
        )

    cur.execute(
        "SHOW COLUMNS FROM op_charging_sessions LIKE 'vehicle_id'"
    )
    if not cur.fetchone():
        cur.execute(
            "ALTER TABLE op_charging_sessions ADD COLUMN vehicle_id INT NOT NULL DEFAULT 0 AFTER id_tag"
        )

    cur.execute(
        "SHOW COLUMNS FROM op_charging_sessions LIKE 'ocmf_energy_wh'"
    )
    if not cur.fetchone():
        cur.execute(
            "ALTER TABLE op_charging_sessions ADD COLUMN ocmf_energy_wh INT NOT NULL DEFAULT 0 AFTER energyChargedWh"
        )


def normalize_connector_id(value):
    if value is None or isinstance(value, bool):
        return None
    if isinstance(value, int):
        return value
    if isinstance(value, Decimal):
        try:
            integral = value.to_integral_value()
        except InvalidOperation:
            return None
        if value != integral:
            return None
        try:
            return int(integral)
        except (OverflowError, ValueError):
            return None
    try:
        text = str(value).strip()
    except Exception:
        return None
    if not text:
        return None
    try:
        number = Decimal(text)
    except (InvalidOperation, ValueError):
        return None
    integral = number.to_integral_value()
    if number != integral:
        return None
    try:
        return int(integral)
    except (OverflowError, ValueError):
        return None


def extract_ocmf_energy_wh(payload: dict) -> int | None:
    """Return the charged energy in Wh from an OCMF payload if present."""

    tx_data = payload.get("transactionData")
    if not isinstance(tx_data, list):
        return None

    for entry in tx_data:
        if not isinstance(entry, dict):
            continue
        samples = entry.get("sampledValue")
        if not isinstance(samples, list):
            continue
        for sample in samples:
            if not isinstance(sample, dict):
                continue
            value = sample.get("value")
            if not isinstance(value, str) or not value.startswith("OCMF|"):
                continue
            result = _parse_ocmf_payload(value)
            if result is not None:
                return result
    return None


def _parse_ocmf_payload(value: str) -> int | None:
    parts = value.split("|", 2)
    if len(parts) < 3:
        return None
    payload_json = parts[1]
    try:
        payload = json.loads(payload_json)
    except json.JSONDecodeError:
        return None

    readings = payload.get("RD")
    if not isinstance(readings, list):
        return None

    energy_values: list[Decimal] = []
    for reading in readings:
        if not isinstance(reading, dict):
            continue
        if reading.get("RI") != "1-0:1.8.0":
            continue
        raw_value = reading.get("RV")
        if raw_value is None:
            continue
        try:
            value_decimal = Decimal(str(raw_value))
        except (InvalidOperation, ValueError):
            continue
        energy_values.append(value_decimal)

    if len(energy_values) < 2:
        return None

    difference = max(energy_values) - min(energy_values)
    if difference <= 0:
        return 0

    try:
        energy_wh = (difference * Decimal(1000)).to_integral_value(
            rounding=ROUND_HALF_UP
        )
    except InvalidOperation:
        return None
    return int(energy_wh)


def collect_sessions(cur, month: str):
    table = f"op_messages_{month}"
    logger.debug("Querying messages for %s", month)
    cur.execute(
        f'''
        SELECT source_url, message
        FROM `{table}`
        WHERE message LIKE '%StartTransaction%'
           OR message LIKE '%StopTransaction%'
           OR message LIKE '%"transactionId"%'
        ORDER BY id
        '''
    )
    rows = cur.fetchall()
    logger.debug("Fetched %d messages", len(rows))

    start_requests: dict[str, dict] = {}
    start_transactions: dict[tuple[str, int | None, int], dict] = {}
    sessions = []

    for source_url, message in rows:
        try:
            data = json.loads(message)
        except json.JSONDecodeError:
            continue
        if not isinstance(data, list) or len(data) < 3:
            continue

        msg_type = data[0]
        unique_id = data[1]
        station_id = source_url.rsplit("/", 1)[-1]

        # StartTransaction CALL from charge point -> central system
        if msg_type == 2 and data[2] == "StartTransaction":
            payload = data[3]
            logger.debug(
                "StartTransaction from %s (uid %s)", station_id, unique_id
            )
            connector_id = normalize_connector_id(payload.get("connectorId"))
            start_requests[unique_id] = {
                "station_id": station_id,
                "connector_id": connector_id,
                "id_tag": payload.get("idTag"),
                "meter_start": payload.get("meterStart"),
                "start": parse_time(payload.get("timestamp")),
            }

        # StartTransaction CALLRESULT from central system -> charge point
        elif msg_type == 3 and unique_id in start_requests:
            payload = data[2]
            tx_id = payload.get("transactionId")
            info = start_requests.pop(unique_id)
            if tx_id is not None:
                info["transaction_id"] = tx_id
                key = (info["station_id"], info.get("connector_id"), tx_id)
                start_transactions[key] = info
                logger.debug("StartTransaction confirmed for tx %s", tx_id)

        # StopTransaction CALL from charge point -> central system
        elif msg_type == 2 and data[2] == "StopTransaction":
            payload = data[3]
            tx_id = payload.get("transactionId")
            connector_id = normalize_connector_id(payload.get("connectorId"))
            info = None
            if tx_id is not None:
                if connector_id is not None:
                    key = (station_id, connector_id, tx_id)
                    info = start_transactions.pop(key, None)
                if info is None:
                    fallback_key = None
                    for candidate_key in list(start_transactions.keys()):
                        candidate_station, candidate_connector, candidate_tx = candidate_key
                        if candidate_station != station_id or candidate_tx != tx_id:
                            continue
                        if (
                            connector_id is not None
                            and candidate_connector is not None
                            and candidate_connector != connector_id
                        ):
                            continue
                        fallback_key = candidate_key
                        break
                    if fallback_key is not None:
                        info = start_transactions.pop(fallback_key)
            if info:
                logger.debug("StopTransaction for tx %s", tx_id)
                meter_stop = payload.get("meterStop")
                energy = (
                    meter_stop - info["meter_start"]
                    if meter_stop is not None and info["meter_start"] is not None
                    else None
                )
                ocmf_energy = extract_ocmf_energy_wh(payload) or 0
                sessions.append(
                    (
                        info["station_id"],
                        info["connector_id"],
                        tx_id,
                        info["id_tag"],
                        info["start"],
                        parse_time(payload.get("timestamp")),
                        info["meter_start"],
                        meter_stop,
                        energy,
                        payload.get("reason"),
                        ocmf_energy,
                    )
                )
            else:
                logger.debug("StopTransaction for unknown tx %s", tx_id)

    logger.debug("Collected %d sessions", len(sessions))
    return sessions


def insert_sessions(cur, sessions) -> int:
    if not sessions:
        return 0
    sql = (
        "INSERT IGNORE INTO op_charging_sessions "
        "(station_id, connector_id, transaction_id, id_tag, session_start, session_end, meter_start, meter_stop, energyChargedWh, reason, ocmf_energy_wh) "
        "VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
    )
    logger.debug("Inserting %d sessions into database", len(sessions))
    cur.executemany(sql, sessions)
    logger.debug("Inserted %d new rows", cur.rowcount)
    return cur.rowcount


# ---------------------------------------------------------------------------
# main
# ---------------------------------------------------------------------------

def main():
    if len(sys.argv) > 2:
        print("Usage: getSessionsOfMonthByBroker.py [YYMM]")
        sys.exit(1)

    if len(sys.argv) == 2:
        month = sys.argv[1]
    else:
        month = datetime.now(timezone.utc).strftime("%y%m")

    logging.basicConfig(level=logging.DEBUG, format="%(levelname)s:%(message)s")
    logger.info("Collecting sessions for %s", month)

    config = load_config()
    db_cfg = config.get("mysql", {})
    logger.debug("Connecting to database %s", db_cfg.get("db"))
    conn = init_db(db_cfg)
    try:
        with conn.cursor() as cur:
            create_session_table(cur)
            logger.debug("Session table checked/created")
            sessions = collect_sessions(cur, month)
            inserted = insert_sessions(cur, sessions)
        logger.info("Inserted %d sessions for %s", inserted, month)
    finally:
        conn.close()


if __name__ == "__main__":
    main()
