import asyncio
from collections import defaultdict, deque
import base64
import json
import math
import uuid
import websockets
import aiomysql
import logging
import aiohttp
import threading
import socket
import uuid
from http import HTTPStatus
from websockets.legacy.client import WebSocketClientProtocol
from websockets.legacy.server import WebSocketServerProtocol, serve as ws_serve
from websockets.datastructures import Headers
from websockets.exceptions import SecurityError
import smtplib
from email.message import EmailMessage
try:
    from websockets.legacy.http import MAX_LINE_LENGTH, MAX_NUM_HEADERS, d
except ImportError:  # pragma: no cover - compatibility for newer websockets releases
    from websockets.legacy.http import MAX_LINE as MAX_LINE_LENGTH, MAX_HEADERS as MAX_NUM_HEADERS, d
import uuid, json, logging
import asyncio
import ast
from urllib.parse import ParseResult, urlparse, parse_qs, urlunparse, quote
from datetime import datetime, timezone, timedelta
import warnings
import json
import paho.mqtt.client as mqtt
from paho.mqtt.client import CallbackAPIVersion
from contextlib import asynccontextmanager
from typing import Any, Awaitable, Callable, Deque, Mapping, Optional, Tuple, Union
from pathlib import Path
import re
import time
from collections.abc import Mapping, MutableMapping, Iterable
import ssl
import os
from ipaddress import ip_address
from dataclasses import dataclass
from functools import partial
import resource

import ocpi_cdr_forwarder

from hubject_client import HubjectClient, HubjectConfigurationError, load_hubject_config

try:
    import inspect
except ImportError:  # pragma: no cover - fallback for very small Python builds
    inspect = None

# ---------------------------------------------------------------------------
# Helper for HTTP responses
# ---------------------------------------------------------------------------

def http_response(status: HTTPStatus, body: bytes, content_type: str = "text/plain; charset=utf-8"):
    """Return a minimal HTTP response tuple for websockets.legacy."""
    return status, [("Content-Type", content_type)], body


def _ensure_str(value: Any) -> Any:
    """Return ``value`` as ``str`` if it is ``bytes``-like.

    The helper coerces ``bytes``/``bytearray`` (and ``memoryview``) instances to
    UTF-8 strings so that downstream consumers don't have to handle the
    different input types explicitly. If decoding fails we fall back to
    replacement characters to guarantee a ``str`` return value while logging a
    warning for visibility.
    """

    if value is None or isinstance(value, str):
        return value

    if isinstance(value, (bytes, bytearray, memoryview)):
        raw_bytes = bytes(value)
        try:
            return raw_bytes.decode("utf-8")
        except UnicodeDecodeError:
            logging.warning(
                "Failed to decode bytes value as UTF-8 – using replacement characters"
            )
            return raw_bytes.decode("utf-8", errors="replace")

    return value

def get_nofile_limits() -> tuple[Optional[int], Optional[int]]:
	"""Return the current ``RLIMIT_NOFILE`` soft and hard limits."""

	try:
		soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
		return int(soft), int(hard)
	except Exception:
		logging.exception("Unable to read RLIMIT_NOFILE")
		return None, None

# ---------------------------------------------------------------------------
# Configuration handling
# ---------------------------------------------------------------------------

# Try to load configuration from external JSON file. If the file is missing,
# fall back to the defaults that were previously hard coded in this script.

CONFIG_FILE = "config.json"

try:
    with open(CONFIG_FILE, "r", encoding="utf-8") as f:
        _config = json.load(f)
except FileNotFoundError:
    _config = {}

HUBJECT_CLIENT: HubjectClient | None = None
try:
    _hubject_config = load_hubject_config(
        _config,
        config_dir=Path(CONFIG_FILE).resolve().parent,
        certs_dir=Path(CONFIG_FILE).resolve().parent / "certs",
    )
except HubjectConfigurationError as exc:
    logging.info("Hubject integration disabled: %s", exc)
except FileNotFoundError as exc:  # pragma: no cover - config removed at runtime
    logging.info("Hubject configuration could not be loaded: %s", exc)
else:
    HUBJECT_CLIENT = HubjectClient(_hubject_config)

HUBJECT_PUSH_RETRIES = 1

log_cfg = _config.get("log_levels", {})
level_name = log_cfg.get("ocpp_broker", "INFO").upper()
LOG_LEVEL = getattr(logging, level_name, logging.INFO)

_raw_listener_cfg = _config.get("ocpp_proxy_listeners")
if isinstance(_raw_listener_cfg, list) and _raw_listener_cfg:
    _ocpp_proxy_listeners_cfg = list(_raw_listener_cfg)
    _ocpp_proxy_cfg = _ocpp_proxy_listeners_cfg[0]
else:
    _ocpp_proxy_cfg = _config.get("ocpp_proxy", {}) or {}
    _ocpp_proxy_listeners_cfg = [_ocpp_proxy_cfg]


def _coerce_proxy_port(value: Any, default: int = 7020) -> int:
    """Best-effort conversion of the configured proxy port to ``int``."""

    try:
        return int(value)
    except (TypeError, ValueError):
        return default


def _coerce_optional_bool(value: Any) -> Optional[bool]:
    """Convert config values to ``bool`` when possible, otherwise ``None``."""

    if isinstance(value, bool):
        return value

    if value is None:
        return None

    if isinstance(value, str):
        lowered = value.strip().lower()
        if lowered in {"1", "true", "yes", "y", "on"}:
            return True
        if lowered in {"0", "false", "no", "n", "off"}:
            return False

    if isinstance(value, (int, float)):
        return bool(value)

    return None


def _coerce_int(value: Any, default: int) -> int:
    try:
        return int(value)
    except (TypeError, ValueError):
        return default


_mail_cfg = _config.get("mail", {}) or {}
MAIL_SERVER = _mail_cfg.get("MAIL_SERVER") or _mail_cfg.get("server") or "smtp.ionos.de"
MAIL_PORT = _coerce_int(_mail_cfg.get("MAIL_PORT") or _mail_cfg.get("port"), 587)
_mail_use_tls = _coerce_optional_bool(_mail_cfg.get("MAIL_USE_TLS") or _mail_cfg.get("use_tls"))
MAIL_USE_TLS = True if _mail_use_tls is None else bool(_mail_use_tls)
_mail_use_ssl = _coerce_optional_bool(_mail_cfg.get("MAIL_USE_SSL") or _mail_cfg.get("use_ssl"))
MAIL_USE_SSL = False if _mail_use_ssl is None else bool(_mail_use_ssl)
MAIL_USERNAME = _mail_cfg.get("MAIL_USERNAME") or _mail_cfg.get("username")
MAIL_PASSWORD = _mail_cfg.get("MAIL_PASSWORD") or _mail_cfg.get("password")
MAIL_DEFAULT_SENDER = _mail_cfg.get("MAIL_DEFAULT_SENDER") or _mail_cfg.get("default_sender") or MAIL_USERNAME

@dataclass(frozen=True)
class MailSettings:
    server: str | None
    port: int
    use_tls: bool
    use_ssl: bool
    username: str | None
    password: str | None
    default_sender: str | None


def _resolve_mail_settings() -> MailSettings:
    server = os.getenv("MAIL_SERVER") or MAIL_SERVER
    port = _coerce_int(os.getenv("MAIL_PORT"), MAIL_PORT)

    env_use_tls = _coerce_optional_bool(os.getenv("MAIL_USE_TLS"))
    use_tls = MAIL_USE_TLS if env_use_tls is None else bool(env_use_tls)
    env_use_ssl = _coerce_optional_bool(os.getenv("MAIL_USE_SSL"))
    use_ssl = MAIL_USE_SSL if env_use_ssl is None else bool(env_use_ssl)

    env_username = os.getenv("MAIL_USERNAME")
    username = env_username or MAIL_USERNAME
    password = os.getenv("MAIL_PASSWORD") or MAIL_PASSWORD
    env_default_sender = os.getenv("MAIL_DEFAULT_SENDER")
    if env_default_sender:
        default_sender = env_default_sender
    elif env_username:
        default_sender = username
    else:
        default_sender = MAIL_DEFAULT_SENDER

    return MailSettings(
        server=server,
        port=port,
        use_tls=use_tls,
        use_ssl=use_ssl,
        username=username,
        password=password,
        default_sender=default_sender,
    )


def _coerce_optional_int_config(value: Any) -> Optional[int]:
    """Convert config values to ``int`` when possible, otherwise ``None``."""

    try:
        return int(value)
    except (TypeError, ValueError):
        return None


_CONFIG_HAS_OCPP_PROXY_IP = "ip" in _ocpp_proxy_cfg
_CONFIG_HAS_OCPP_PROXY_PORT = "port" in _ocpp_proxy_cfg
_CONFIG_HAS_OCPP_PROXY_SSL_ENABLED = "use_ssl" in _ocpp_proxy_cfg
_CONFIG_HAS_OCPP_PROXY_CERTFILE = "certfile" in _ocpp_proxy_cfg
_CONFIG_HAS_OCPP_PROXY_KEYFILE = "keyfile" in _ocpp_proxy_cfg
_CONFIG_HAS_OCPP_PROXY_FORCE_TLS12 = "force_tls12" in _ocpp_proxy_cfg
_CONFIG_HAS_OCPP_PROXY_API_PORT = "api_port" in _ocpp_proxy_cfg
_CONFIGURED_THROTTLE_ENABLED = _coerce_optional_bool(_config.get("throttle_enabled"))
_CONFIG_HAS_THROTTLE_ENABLED = _CONFIGURED_THROTTLE_ENABLED is not None
_CONFIGURED_THROTTLE_SECONDS = _coerce_optional_int_config(
    _config.get("throttle_seconds")
)
_CONFIG_HAS_THROTTLE_SECONDS = _CONFIGURED_THROTTLE_SECONDS is not None

_CONFIGURED_OCPP_PROXY_IP = (
    str(_ocpp_proxy_cfg.get("ip", "")).strip() if _CONFIG_HAS_OCPP_PROXY_IP else ""
)
if not _CONFIGURED_OCPP_PROXY_IP:
    _CONFIGURED_OCPP_PROXY_IP = "0.0.0.0"

_CONFIGURED_OCPP_PROXY_PORT = (
    _coerce_proxy_port(_ocpp_proxy_cfg.get("port"), 7020)
    if _CONFIG_HAS_OCPP_PROXY_PORT
    else 7020
)
_CONFIGURED_OCPP_PROXY_API_PORT = (
    _coerce_proxy_port(_ocpp_proxy_cfg.get("api_port"), 9900)
    if _CONFIG_HAS_OCPP_PROXY_API_PORT
    else 9900
)

_CONFIGURED_OCPP_PROXY_SSL_ENABLED = bool(_ocpp_proxy_cfg.get("use_ssl", False))
if not _CONFIG_HAS_OCPP_PROXY_SSL_ENABLED:
    _CONFIGURED_OCPP_PROXY_SSL_ENABLED = False

_CONFIGURED_OCPP_PROXY_CERTFILE = (
    _ocpp_proxy_cfg.get("certfile") if _CONFIG_HAS_OCPP_PROXY_CERTFILE else None
)
if not _CONFIGURED_OCPP_PROXY_CERTFILE:
    _CONFIGURED_OCPP_PROXY_CERTFILE = None

_CONFIGURED_OCPP_PROXY_KEYFILE = (
    _ocpp_proxy_cfg.get("keyfile") if _CONFIG_HAS_OCPP_PROXY_KEYFILE else None
)
if not _CONFIGURED_OCPP_PROXY_KEYFILE:
    _CONFIGURED_OCPP_PROXY_KEYFILE = None

_CONFIGURED_OCPP_PROXY_FORCE_TLS12 = bool(
    _ocpp_proxy_cfg.get("force_tls12", False)
)
if not _CONFIG_HAS_OCPP_PROXY_FORCE_TLS12:
    _CONFIGURED_OCPP_PROXY_FORCE_TLS12 = False

OCPP_PROXY_IP = _CONFIGURED_OCPP_PROXY_IP
OCPP_PROXY_PORT = _CONFIGURED_OCPP_PROXY_PORT
OCPP_PROXY_API_PORT = _CONFIGURED_OCPP_PROXY_API_PORT
OCPP_PROXY_SSL_ENABLED = _CONFIGURED_OCPP_PROXY_SSL_ENABLED
OCPP_PROXY_CERTFILE = _CONFIGURED_OCPP_PROXY_CERTFILE
OCPP_PROXY_KEYFILE = _CONFIGURED_OCPP_PROXY_KEYFILE
OCPP_PROXY_FORCE_TLS12 = _CONFIGURED_OCPP_PROXY_FORCE_TLS12


def _collect_server_certificate_identifiers(certfile: str | None) -> list[str]:
    """Return hostnames and IPs contained in the configured TLS certificate."""

    if not certfile:
        return []

    identifiers: list[str] = []
    try:
        decoded = ssl._ssl._test_decode_cert(certfile)
    except Exception:
        logging.debug("Failed to decode TLS certificate %s", certfile, exc_info=True)
        return identifiers

    subject = decoded.get("subject", ())
    for entry in subject:
        for key, value in entry:
            if key == "commonName" and isinstance(value, str):
                identifiers.append(value)

    alt_names = decoded.get("subjectAltName", ())
    for name_type, value in alt_names:
        if name_type in {"DNS", "IP Address"} and isinstance(value, str):
            identifiers.append(value)

    return identifiers


@dataclass(frozen=True)
class ProxyListenerSettings:
    name: str
    ip: str
    port: int
    use_ssl: bool
    certfile: str | None
    keyfile: str | None
    force_tls12: bool
    cert_identifiers: tuple[str, ...]


def _build_proxy_listener_settings(cfg: Mapping[str, Any], index: int) -> ProxyListenerSettings:
    if not isinstance(cfg, Mapping):
        raise TypeError("Listener configuration must be a mapping")

    ip = str(cfg.get("ip", "")).strip() or "0.0.0.0"
    port = _coerce_proxy_port(cfg.get("port"), 7020)
    use_ssl = bool(cfg.get("use_ssl", False))
    certfile = cfg.get("certfile") or None
    keyfile = cfg.get("keyfile") or None
    force_tls12 = bool(cfg.get("force_tls12", False))
    name = str(cfg.get("name") or "").strip() or f"listener-{index + 1}"

    if use_ssl and (not certfile or not keyfile):
        raise RuntimeError(
            f"Listener '{name}' requires both certfile and keyfile when SSL is enabled"
        )

    cert_identifiers = tuple(_collect_server_certificate_identifiers(certfile))
    return ProxyListenerSettings(
        name=name,
        ip=ip,
        port=port,
        use_ssl=use_ssl,
        certfile=certfile,
        keyfile=keyfile,
        force_tls12=force_tls12,
        cert_identifiers=cert_identifiers,
    )


def _load_proxy_listener_settings(config_entries: list[Mapping[str, Any]]) -> tuple[ProxyListenerSettings, ...]:
    listeners: list[ProxyListenerSettings] = []
    for idx, cfg in enumerate(config_entries):
        listeners.append(_build_proxy_listener_settings(cfg or {}, idx))

    if not listeners:
        listeners.append(_build_proxy_listener_settings({}, 0))

    return tuple(listeners)


_CONFIGURED_PROXY_LISTENERS = _load_proxy_listener_settings(_ocpp_proxy_listeners_cfg)
if not _CONFIGURED_PROXY_LISTENERS:
    raise RuntimeError("At least one OCPP proxy listener must be configured")

OCPP_API_LISTENER: ProxyListenerSettings | None = None


def _rebuild_runtime_proxy_listeners() -> None:
    global OCPP_PROXY_LISTENERS, OCPP_PROXY_CERT_IDENTIFIERS, OCPP_API_LISTENER

    listeners = list(_CONFIGURED_PROXY_LISTENERS)
    if listeners:
        primary = listeners[0]
        listeners[0] = ProxyListenerSettings(
            name=primary.name,
            ip=OCPP_PROXY_IP,
            port=OCPP_PROXY_PORT,
            use_ssl=OCPP_PROXY_SSL_ENABLED,
            certfile=OCPP_PROXY_CERTFILE,
            keyfile=OCPP_PROXY_KEYFILE,
            force_tls12=OCPP_PROXY_FORCE_TLS12,
            cert_identifiers=tuple(_collect_server_certificate_identifiers(OCPP_PROXY_CERTFILE)),
        )
    api_cert_identifiers = tuple(_collect_server_certificate_identifiers(OCPP_PROXY_CERTFILE))
    OCPP_API_LISTENER = ProxyListenerSettings(
        name="api-port",
        ip=OCPP_PROXY_IP,
        port=OCPP_PROXY_API_PORT,
        use_ssl=OCPP_PROXY_SSL_ENABLED,
        certfile=OCPP_PROXY_CERTFILE,
        keyfile=OCPP_PROXY_KEYFILE,
        force_tls12=OCPP_PROXY_FORCE_TLS12,
        cert_identifiers=api_cert_identifiers,
    )
    OCPP_PROXY_LISTENERS = tuple(listeners)
    if listeners:
        OCPP_PROXY_CERT_IDENTIFIERS = listeners[0].cert_identifiers
    else:
        OCPP_PROXY_CERT_IDENTIFIERS = tuple()


_rebuild_runtime_proxy_listeners()


def _normalise_identifier(value: str) -> str:
    return value.rstrip(".").lower()


def _is_ip_address(value: str) -> bool:
    try:
        ip_address(value)
    except ValueError:
        return False
    return True

BACKEND_SSL_VERIFY = bool(_config.get("backend_ssl_verify", False))
BACKEND_SSL_CA_FILE = _config.get("backend_ssl_ca_file") or None

BACKEND_CONNECT_CFG = _config.get("backend_connect", {})
try:
    BACKEND_CONNECT_TIMEOUT = float(BACKEND_CONNECT_CFG.get("timeout", 10))
except (TypeError, ValueError):
    BACKEND_CONNECT_TIMEOUT = 10.0
try:
    BACKEND_CONNECT_RETRIES = int(BACKEND_CONNECT_CFG.get("retries", 3))
except (TypeError, ValueError):
    BACKEND_CONNECT_RETRIES = 3
try:
    BACKEND_CONNECT_BACKOFF_SECONDS = float(
        BACKEND_CONNECT_CFG.get("backoff_seconds", 5)
    )
except (TypeError, ValueError):
    BACKEND_CONNECT_BACKOFF_SECONDS = 5.0

ENABLE_OCPP_2X = bool(_config.get("enable_ocpp_2x", True))
SUBPROTOCOL_CLOSE_CODE = 4406


class BackendConnectionError(RuntimeError):
    """Raised when backend websocket connection attempts are exhausted."""

VERSION = _config.get("version", "2.1.0")

# Default websocket keep-alive settings
WS_CFG = _config.get("websocket", {})
DEFAULT_PING_INTERVAL = int(WS_CFG.get("ping_interval", 60))
DEFAULT_PING_TIMEOUT = int(WS_CFG.get("ping_timeout", 2 * DEFAULT_PING_INTERVAL))
EXISTING_WS_CLOSE_TIMEOUT = int(WS_CFG.get("close_timeout", 5))

# Diagnostics FTP defaults. These may be overridden from ``op_config`` at
# runtime.
DIAG_CFG = _config.get("diagnostics", {})
DIAG_FTP_HOST = DIAG_CFG.get("ftp_host", "217.160.79.201")
try:
    DIAG_FTP_PORT = int(DIAG_CFG.get("ftp_port", 21))
except (TypeError, ValueError):
    DIAG_FTP_PORT = 21
DIAG_FTP_USER = DIAG_CFG.get("ftp_user", "diag")
DIAG_FTP_PASSWORD = DIAG_CFG.get("ftp_password", "rem0tec0nnect2026x")

# Base directory for serving firmware files via HTTP.
FIRMWARE_DIR = Path(_config.get("firmware_dir", "./firmware"))

STATION_PATHS: dict[str, dict[str, Any]] = {}
STATION_PATH_ALIASES: dict[str, str] = {}

ConnectorKey = Union[int, str]

# Latest connector specific status information reported via StatusNotification.
LATEST_CONNECTOR_STATUS: dict[str, dict[ConnectorKey, dict[str, Any]]] = {}

# Latest connector specific meter information derived from MeterValues.
LATEST_CONNECTOR_METERS: dict[str, dict[ConnectorKey, dict[str, Any]]] = {}

# Recent energy readings per connector to derive charging power values from
# the REST endpoint.
CONNECTOR_METER_HISTORY: dict[str, dict[ConnectorKey, deque[tuple[datetime, float]]]] = {}

# Track database logging issues so we can emit meaningful error messages when
# MySQL logging stops working unexpectedly.  ``DB_LOG_FAILURE_COUNT`` counts
# consecutive failures and ``DB_LOG_LAST_ERROR`` stores the timestamp of the
# last error so we can throttle repeated log entries.
DB_LOG_FAILURE_COUNT = 0
DB_LOG_LAST_ERROR: Optional[datetime] = None
OP_MESSAGES_TIMESTAMP_PATCHED = False
MONTHLY_MESSAGE_TABLES_PATCHED: set[str] = set()
SUPPORTED_SUBPROTOCOLS: list[str] = ["ocpp2.0.1"] if ENABLE_OCPP_2X else []
SUPPORTED_SUBPROTOCOLS.append("ocpp1.6")

FAULT_RULES_TABLE_SQL = """
    CREATE TABLE IF NOT EXISTS op_fault_detection_rules (
        id INT AUTO_INCREMENT PRIMARY KEY,
        pattern_title VARCHAR(255) NOT NULL DEFAULT '',
        pattern VARCHAR(255) NOT NULL,
        explanation VARCHAR(255) NOT NULL,
        criticality ENUM('low', 'medium', 'high') NOT NULL DEFAULT 'medium',
        is_active TINYINT(1) NOT NULL DEFAULT 1,
        created_at TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP
    ) CHARACTER SET utf8mb4
"""

FAULT_MARKS_TABLE_SQL = """
    CREATE TABLE IF NOT EXISTS op_station_marks (
        id INT AUTO_INCREMENT PRIMARY KEY,
        station_id VARCHAR(255) NOT NULL,
        reason VARCHAR(255) NOT NULL,
        source VARCHAR(32) NOT NULL DEFAULT 'manual',
        pattern_id INT DEFAULT NULL,
        created_at TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP,
        updated_at TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
        service_requested_at TIMESTAMP NULL DEFAULT NULL,
        UNIQUE KEY uniq_station_source_pattern (station_id, source, pattern_id),
        CONSTRAINT fk_station_marks_pattern
            FOREIGN KEY (pattern_id)
            REFERENCES op_fault_detection_rules(id)
            ON DELETE CASCADE
    ) CHARACTER SET utf8mb4
"""

FAULT_TRIGGERS_TABLE_SQL = """
    CREATE TABLE IF NOT EXISTS op_fault_detection_trigger (
        id INT AUTO_INCREMENT PRIMARY KEY,
        report_reason_id INT NOT NULL,
        report_reason_trigger VARCHAR(255) NOT NULL,
        report_reason_title VARCHAR(255) NOT NULL,
        report_reason_description TEXT NOT NULL,
        cluster VARCHAR(255) DEFAULT 'other',
        is_enabled TINYINT(1) NOT NULL DEFAULT 1,
        threshold INT NOT NULL DEFAULT 1,
        priority ENUM('LOW', 'MEDIUM', 'HIGH') NOT NULL DEFAULT 'MEDIUM',
        UNIQUE KEY uniq_op_fault_detection_trigger_reason_id (report_reason_id),
        UNIQUE KEY uniq_op_fault_detection_trigger_reason_trigger (report_reason_trigger)
    ) CHARACTER SET utf8mb4
"""

_fault_detection_tables_ready = False
_fault_detection_table_lock: Optional[asyncio.Lock] = None

_WEBSOCKETS_SUPPORTS_CREATE_PROTOCOL = False
if inspect is not None:
    try:
        _WEBSOCKETS_SUPPORTS_CREATE_PROTOCOL = (
            "create_protocol" in inspect.signature(websockets.connect).parameters
        )
    except (TypeError, ValueError):  # pragma: no cover - inspect edge cases
        _WEBSOCKETS_SUPPORTS_CREATE_PROTOCOL = False

_fault_patterns: list[dict] = []
_fault_patterns_lock: Optional[asyncio.Lock] = None
_fault_patterns_last_load = 0.0
FAULT_PATTERN_CACHE_SECONDS = 60.0

_PREPARING_FAILURE_THRESHOLD = 3
_PREPARING_FAILURE_TRACKER: dict[tuple[str, int], dict[str, Any]] = {}
_PREPARING_FAILURE_TERMINAL_STATUSES = {"Available", "Unavailable", "Faulted"}

BACKGROUND_TASKS: set[asyncio.Task[Any]] = set()


def create_background_task(
    coro: Awaitable[Any], *, name: Optional[str] = None
) -> asyncio.Task[Any]:
    """Create a tracked background task and ensure cleanup on completion."""

    if name is not None:
        task = asyncio.create_task(coro, name=name)
    else:
        task = asyncio.create_task(coro)

    BACKGROUND_TASKS.add(task)

    def _cleanup(completed: asyncio.Task[Any]) -> None:
        BACKGROUND_TASKS.discard(completed)
        try:
            completed.result()
        except asyncio.CancelledError:
            pass
        except Exception:
            logging.exception("Unhandled exception in background task")

    task.add_done_callback(_cleanup)
    return task


def as_bool(value: object) -> bool:
    """Convert various DB representations to a real boolean.

    The ``measure_ping`` and ``mqtt_enabled`` columns in ``op_redirects`` are
    stored as ``TINYINT`` but over the years we have seen a range of different
    types coming back from MySQL: integers, strings (``"0"``, ``"1"``,
    ``"true"``/``"false"``) or even raw bytes (``b"\x01"``).  A plain
    ``bool()`` cast would treat any non-empty string/bytes object as ``True``
    regardless of its content (e.g. ``bool("0")`` is ``True``).  This helper
    normalises those representations and therefore ensures that values like ``1``
    or ``"true"`` become ``True`` while ``0`` or ``"false"`` become ``False``.
    """

    if isinstance(value, (bytes, bytearray)):
        # MySQL may return tinyints as single-byte values
        return any(value)

    if isinstance(value, str):
        v = value.strip().lower()
        if v in {"1", "true", "t", "yes", "y", "on"}:
            return True
        if v in {"0", "false", "f", "no", "n", "off", ""}:
            return False

    try:
        return bool(int(value))
    except (TypeError, ValueError):
        return bool(value)


def _coerce_optional_int(value: object) -> Optional[int]:
    """Best-effort conversion of MySQL results to integers."""

    if value is None:
        return None

    if isinstance(value, (bytes, bytearray)):
        if len(value) == 0:
            return 0
        return int.from_bytes(value, byteorder="big")

    try:
        return int(value)
    except (TypeError, ValueError):
        return None


def normalize_station_id(path: str) -> str:
    """Return a charge point identifier independent of path prefixes."""

    return extract_chargepoint_id(path)


def extract_chargepoint_id(station_path: str) -> str:
    """Return only the charge point identifier from a station path."""

    cleaned = station_path.strip("/")
    if not cleaned:
        return ""

    return cleaned.rsplit("/", 1)[-1]


def _station_alias_candidates(source_url: str) -> set[str]:
    """Return alias keys that should resolve to ``source_url``."""

    aliases: set[str] = set()
    cleaned = (source_url or "").strip()
    if not cleaned:
        return aliases

    aliases.add(cleaned)

    stripped = cleaned.lstrip("/")
    if stripped:
        aliases.add(stripped)

    station_id = extract_chargepoint_id(cleaned)
    if station_id:
        aliases.add(station_id)

    return aliases


def _register_station_aliases(source_url: str) -> None:
    """Register lookup aliases for a canonical station ``source_url``."""

    for alias in _station_alias_candidates(source_url):
        existing = STATION_PATH_ALIASES.get(alias)
        if existing and existing != source_url:
            logging.debug(
                "Alias %s already mapped to %s – overriding with %s",
                alias,
                existing,
                source_url,
            )
        STATION_PATH_ALIASES[alias] = source_url


def resolve_station_entry(
    path: str, station_hint: Optional[str] = None
) -> tuple[Optional[dict[str, Any]], Optional[str]]:
    """Return the configured station entry for ``path`` or ``station_hint``.

    The helper first tries exact matches and then falls back to the extracted
    charge point identifier so that connections using arbitrary URL prefixes
    (e.g. ``/ocpp16/<ID>`` vs ``/<ID>``) resolve to the same database entry.
    The return value is a tuple of ``(entry, canonical_source_url)`` where the
    canonical source URL corresponds to the row in ``op_redirects``.
    """

    candidates: list[str] = []

    def _append_candidate(value: Optional[str]) -> None:
        if not value:
            return
        if value not in candidates:
            candidates.append(value)
        stripped = value.lstrip("/")
        if stripped and stripped not in candidates:
            candidates.append(stripped)

    _append_candidate(path)
    _append_candidate(station_hint)
    derived_id = extract_chargepoint_id(path)
    if derived_id:
        _append_candidate(derived_id)

    for candidate in candidates:
        canonical = STATION_PATH_ALIASES.get(candidate)
        if canonical:
            entry = STATION_PATHS.get(canonical)
            if entry:
                return entry, canonical
        entry = STATION_PATHS.get(candidate)
        if entry:
            return entry, candidate

    lookup_id = extract_chargepoint_id(station_hint or path)
    if lookup_id:
        for source_url, entry in STATION_PATHS.items():
            if extract_chargepoint_id(source_url) == lookup_id:
                return entry, source_url

    return None, None


def infer_subprotocol_from_path(source_url: str) -> Optional[str]:
    """Infer an OCPP subprotocol from a redirect path.

    Historically the deployment used path prefixes such as ``/ocpp16`` or
    ``/ocpp201`` to distinguish between protocol versions.  Newer stations may
    store their preferred subprotocol explicitly in the database, but older
    rows might still rely on the prefix.  This helper inspects the first
    non-empty path segment and translates common patterns into canonical OCPP
    subprotocol strings.  If the path does not encode a version, ``None`` is
    returned so that callers can fall back to defaults.
    """

    parts = [segment for segment in source_url.split("/") if segment]
    if not parts:
        return None

    prefix = parts[0].lower()
    if not prefix.startswith("ocpp"):
        return None

    suffix = prefix[4:]
    if not suffix:
        return None

    cleaned = suffix.replace("-", "").replace("_", "")
    if cleaned.count("."):
        # already looks like ocpp2.0.1 → just normalise dots
        try:
            version = cleaned.replace(".", "")
            if version.isdigit():
                digits = [int(d) for d in version]
                if len(digits) == 2:
                    return f"ocpp{digits[0]}.{digits[1]}"
                if len(digits) >= 3:
                    return f"ocpp{digits[0]}.{digits[1]}.{digits[2]}"
        except Exception:
            return None
        return None

    if cleaned.isdigit():
        digits = [int(d) for d in cleaned]
        if len(digits) == 1:
            return f"ocpp{digits[0]}"
        if len(digits) == 2:
            return f"ocpp{digits[0]}.{digits[1]}"
        if len(digits) >= 3:
            return f"ocpp{digits[0]}.{digits[1]}.{digits[2]}"

    return None


def normalize_configured_subprotocol(value: Any) -> Optional[str]:
    """Normalize a configured OCPP subprotocol value.

    Returns ``None`` for empty/"auto" values so the proxy can fall back to
    offering all supported versions.
    """

    if not isinstance(value, str):
        return None

    cleaned = value.strip()
    if not cleaned:
        return None

    lowered = cleaned.lower()
    if lowered in {"auto", "any"}:
        return None
    if lowered.startswith("ocpp1.6") or lowered == "ocpp16":
        return "ocpp1.6"
    if lowered.startswith("ocpp2.0"):
        return "ocpp2.0.1"

    return cleaned


def _trim_close_reason(reason: str, max_length: int = 120) -> str:
    """Return a close reason that fits into the WebSocket control frame."""

    if len(reason) <= max_length:
        return reason

    suffix = "..."
    return f"{reason[: max_length - len(suffix)]}{suffix}"


def _parse_subprotocol_header(raw_value: Optional[str]) -> list[str]:
    """Return the list of subprotocols from a Sec-WebSocket-Protocol header."""

    if not raw_value:
        return []

    values = []
    for part in raw_value.split(","):
        cleaned = part.strip()
        if cleaned:
            values.append(cleaned)
    return values


def _normalize_connector_key(value: object) -> Optional[ConnectorKey]:
    """Return a stable connector identifier for dictionary lookups."""

    if value is None:
        return None

    # bool is a subclass of int, but we should treat it as numeric 0/1
    if isinstance(value, bool):
        value = int(value)

    if isinstance(value, (int, float)):
        try:
            return int(value)
        except (TypeError, ValueError):
            pass

    try:
        text = str(value).strip()
    except Exception:
        return None

    if not text:
        return None

    try:
        return int(text)
    except ValueError:
        return text


def _normalize_identifier_component(value: object) -> Optional[ConnectorKey]:
    """Normalise connector/EVSE identifiers from nested payload structures."""

    if isinstance(value, (list, tuple, set)):
        for item in value:
            normalised = _normalize_identifier_component(item)
            if normalised is not None:
                return normalised
        return None

    if isinstance(value, dict):
        for key in ("connectorId", "id", "value"):
            if key in value:
                normalised = _normalize_identifier_component(value.get(key))
                if normalised is not None:
                    return normalised
        return None

    return _normalize_connector_key(value)


def _extract_connector_and_evse(
    payload: Mapping[str, Any]
) -> tuple[Optional[ConnectorKey], Optional[ConnectorKey]]:
    """Return connector and EVSE identifiers from an OCPP payload."""

    connector_candidate = payload.get("connectorId")
    evse_candidate = payload.get("evseId")

    evse_block = payload.get("evse")
    if isinstance(evse_block, Mapping):
        if connector_candidate is None:
            connector_candidate = evse_block.get("connectorId")
        if evse_candidate is None:
            evse_candidate = evse_block.get("id")

    connector_id = _normalize_identifier_component(connector_candidate)
    evse_id = _normalize_identifier_component(evse_candidate)

    if connector_id is None:
        connector_id = 1

    return connector_id, evse_id


def _extract_id_token_value(source: Mapping[str, Any]) -> Optional[str]:
    """Return the primary idToken/idTag value from a payload if present."""

    if "idTag" in source:
        value = source.get("idTag")
        return str(value) if value is not None else None

    token = source.get("idToken")
    if isinstance(token, Mapping):
        raw = token.get("idToken")
        if raw is not None:
            return str(raw)

    return None


def _normalise_id_token(token: object) -> Optional[str]:
    """Return a string representation for nested idToken structures."""

    if token is None:
        return None

    if isinstance(token, Mapping):
        if "idToken" in token:
            return _normalise_id_token(token.get("idToken"))
        if "value" in token:
            return _normalise_id_token(token.get("value"))
        return None

    try:
        text = str(token)
    except Exception:
        return None

    return text


def _set_id_token_value(container: MutableMapping[str, Any], new_value: str) -> bool:
    """Update idTag/idToken fields inside ``container`` with ``new_value``."""

    if "idTag" in container:
        container["idTag"] = new_value
        return True

    token = container.get("idToken")
    if isinstance(token, MutableMapping) and "idToken" in token:
        token["idToken"] = new_value
        return True

    return False


def _connector_sort_key(value: ConnectorKey) -> tuple[int, Union[int, str]]:
    """Ensure deterministic ordering of connectors in JSON output."""

    if isinstance(value, int):
        return (0, value)
    return (1, str(value))


def _append_connector_meter_history(
    station_short: str,
    connector_key: ConnectorKey,
    meter_payload: Mapping[str, Any],
    *,
    fallback_timestamp: Optional[object] = None,
) -> None:
    """Store the latest energy reading for derived charging power values."""

    energy_value = meter_payload.get("Energy.Active.Import.Register")
    if not isinstance(energy_value, (int, float)):
        return

    dt: Optional[datetime] = None

    timestamp_value = meter_payload.get("timestamp")
    candidates = [timestamp_value, fallback_timestamp]
    for candidate in candidates:
        if candidate is None:
            continue
        if isinstance(candidate, datetime):
            dt = candidate if candidate.tzinfo else candidate.replace(tzinfo=timezone.utc)
            break
        if isinstance(candidate, str) and candidate:
            try:
                dt = parse_ocpp_timestamp(candidate)
            except Exception:
                dt = None
            if dt is not None:
                break

    if dt is None:
        dt = datetime.now(timezone.utc)

    station_history = CONNECTOR_METER_HISTORY.setdefault(station_short, {})
    history = station_history.setdefault(connector_key, deque(maxlen=2))
    history.append((dt, float(energy_value)))


def _build_connectors_payload(station_short: str) -> list[dict[str, Any]]:
    """Return the connector status payload for the given station."""

    def _normalise_connector_map(
        source: Mapping[ConnectorKey, dict[str, Any]]
    ) -> dict[ConnectorKey, dict[str, Any]]:
        normalised: dict[ConnectorKey, dict[str, Any]] = {}
        for raw_key, value in source.items():
            normalised_key = _normalize_connector_key(raw_key)
            key: Optional[ConnectorKey]
            if normalised_key is not None:
                key = normalised_key
            elif raw_key is not None:
                key = raw_key
            else:
                key = None
            if key is None:
                continue
            normalised[key] = value
        return normalised

    status_source = LATEST_CONNECTOR_STATUS.get(station_short, {})
    meter_source = LATEST_CONNECTOR_METERS.get(station_short, {})
    history_source = CONNECTOR_METER_HISTORY.get(station_short, {})
    status_map = _normalise_connector_map(status_source)
    meter_map = _normalise_connector_map(meter_source)
    known_keys: set[ConnectorKey] = set(status_map.keys()) | set(meter_map.keys())

    active_lookup: dict[ConnectorKey, str] = {}
    active_info_lookup: dict[ConnectorKey, Mapping[str, Any]] = {}
    for tx_id, info in list(ACTIVE_TRANSACTIONS.items()):
        if info.get("stationId") != station_short:
            continue

        tx_value = info.get("transactionId") or tx_id
        if tx_value is None:
            continue
        tx_text = str(tx_value)

        connector_key = _normalize_identifier_component(info.get("connectorId"))
        evse_key = _normalize_identifier_component(info.get("evseId"))

        candidate_keys: list[ConnectorKey] = []
        if connector_key is not None:
            candidate_keys.append(connector_key)
        if evse_key is not None and evse_key not in candidate_keys:
            candidate_keys.append(evse_key)

        if not candidate_keys:
            continue

        chosen_key: Optional[ConnectorKey] = None
        for key in candidate_keys:
            if key in known_keys:
                chosen_key = key
                break

        if chosen_key is None:
            chosen_key = candidate_keys[0]

        active_lookup[chosen_key] = tx_text
        active_info_lookup[chosen_key] = info
        known_keys.add(chosen_key)

    connector_ids = (
        set(status_map.keys()) | set(meter_map.keys()) | set(active_lookup.keys())
    )

    connectors: list[dict[str, Any]] = []
    for connector_id in sorted(connector_ids, key=_connector_sort_key):
        connector_entry: dict[str, Any] = {"connectorId": connector_id}

        status_info = status_map.get(connector_id) or {}
        if "status" in status_info:
            connector_entry["status"] = status_info["status"]
        if "errorCode" in status_info:
            connector_entry["errorCode"] = status_info["errorCode"]
        if "timestamp" in status_info:
            connector_entry["timestamp"] = status_info["timestamp"]

        meter_info = meter_map.get(connector_id) or {}
        meter_payload: dict[str, Any] = {}
        if "timestamp" in meter_info:
            meter_payload["timestamp"] = meter_info["timestamp"]
        if "Energy.Active.Import.Register" in meter_info:
            meter_payload["Energy.Active.Import.Register"] = meter_info[
                "Energy.Active.Import.Register"
            ]
        current_values = meter_info.get("Current.Import")
        if isinstance(current_values, dict) and current_values:
            meter_payload["Current.Import"] = current_values
        if "Power.Active.Import" in meter_info:
            meter_payload["Power.Active.Import"] = meter_info["Power.Active.Import"]
        power_phase_values = meter_info.get("Power.Active.Import.Phases")
        if isinstance(power_phase_values, dict) and power_phase_values:
            meter_payload["Power.Active.Import.Phases"] = power_phase_values
        if meter_payload:
            connector_entry["meterValues"] = meter_payload

        transaction_id = active_lookup.get(connector_id)
        if transaction_id is not None:
            connector_entry["transactionId"] = transaction_id

        tx_info = active_info_lookup.get(connector_id)
        meter_start_value: Optional[float] = None
        meter_last_value: Optional[float] = None
        if tx_info is not None:
            meter_start_value = _safe_float(tx_info.get("meterStartWh"))
            meter_last_value = _safe_float(tx_info.get("meterLastWh"))

        if meter_last_value is None and meter_payload:
            energy_value = meter_payload.get("Energy.Active.Import.Register")
            meter_last_value = _safe_float(energy_value)

        if meter_start_value is not None:
            connector_entry["meterStartWh"] = meter_start_value
        if meter_last_value is not None:
            connector_entry["meterLastWh"] = meter_last_value
        if (
            meter_start_value is not None
            and meter_last_value is not None
            and meter_last_value >= meter_start_value
        ):
            connector_entry["sessionEnergyDeltaWh"] = (
                meter_last_value - meter_start_value
            )

        history = history_source.get(connector_id)
        if history and len(history) >= 2:
            earlier_ts, earlier_val = history[-2]
            latest_ts, latest_val = history[-1]
            delta_seconds = (latest_ts - earlier_ts).total_seconds()
            delta_wh = latest_val - earlier_val
            if delta_seconds > 0 and delta_wh >= 0:
                connector_entry["chargingEnergyDeltaWh"] = delta_wh
                connector_entry["chargingPowerW"] = (delta_wh * 3600.0) / delta_seconds
            elif delta_wh >= 0:
                connector_entry["chargingEnergyDeltaWh"] = delta_wh

        connectors.append(connector_entry)

    return connectors


def _find_active_session_uid(
    station_id: str,
    connector_id: Optional[ConnectorKey] = None,
    evse_id: Optional[ConnectorKey] = None,
) -> Optional[str]:
    """Return the active transaction id for a connector/EVSE if available."""

    connector_norm = _normalize_identifier_component(connector_id)
    evse_norm = _normalize_identifier_component(evse_id)

    for tx_id, info in list(ACTIVE_TRANSACTIONS.items()):
        if info.get("stationId") != station_id:
            continue

        info_connector = _normalize_identifier_component(info.get("connectorId"))
        info_evse = _normalize_identifier_component(info.get("evseId"))

        if connector_norm is not None and connector_norm == info_connector:
            return tx_id
        if evse_norm is not None and evse_norm == info_evse:
            return tx_id

    return None


def _safe_float(value: object) -> Optional[float]:
    """Convert various value representations to float when possible."""

    if value is None:
        return None

    try:
        return float(value)
    except (TypeError, ValueError):
        return None


def _parse_meter_value_entry(
    meter_values: object,
) -> tuple[Optional[str], Optional[float], dict[str, Any]]:
    """Extract timestamp and energy import from a meterValue array."""

    if not isinstance(meter_values, list) or not meter_values:
        return None, None, {}

    entry = meter_values[0]
    if not isinstance(entry, Mapping):
        return None, None, {}

    timestamp: Optional[str] = None
    ts_value = entry.get("timestamp")
    if isinstance(ts_value, str) and ts_value:
        timestamp = ts_value

    sampled = entry.get("sampledValue")
    energy_import: Optional[float] = None
    current_import: dict[str, float] = {}
    power_import_total: Optional[float] = None
    power_import_phases: dict[str, float] = {}

    if isinstance(sampled, list):
        fallback: Optional[float] = None
        for sv in sampled:
            if not isinstance(sv, Mapping):
                continue
            value = _safe_float(sv.get("value"))
            if value is None:
                continue
            measurand = sv.get("measurand") or "Energy.Active.Import.Register"
            if measurand == "Energy.Active.Import.Register":
                energy_import = value
            elif measurand == "Current.Import":
                phase = sv.get("phase")
                if isinstance(phase, str) and phase in {"L1", "L2", "L3"}:
                    current_import[phase] = value
            elif measurand == "Power.Active.Import":
                phase = sv.get("phase")
                if isinstance(phase, str) and phase:
                    power_import_phases[phase] = value
                else:
                    power_import_total = value
            if fallback is None:
                fallback = value
        if energy_import is None:
            energy_import = fallback

    meter_payload: dict[str, Any] = {}
    if timestamp:
        meter_payload["timestamp"] = timestamp
    if energy_import is not None:
        meter_payload["Energy.Active.Import.Register"] = energy_import
    if current_import:
        meter_payload["Current.Import"] = current_import
    if power_import_total is not None:
        meter_payload["Power.Active.Import"] = power_import_total
    elif power_import_phases:
        meter_payload["Power.Active.Import"] = power_import_phases
    if power_import_phases:
        meter_payload["Power.Active.Import.Phases"] = power_import_phases

    return timestamp, energy_import, meter_payload


def parse_ocpp_timestamp(value: Optional[str]) -> datetime:
    """Parse an OCPP timestamp string and fall back to current UTC time."""

    if not value:
        return datetime.now(timezone.utc)

    try:
        return datetime.fromisoformat(value.replace("Z", "+00:00"))
    except Exception:
        return datetime.now(timezone.utc)


def ensure_utc_timestamp(value: object) -> Optional[str]:
    """Normalise timestamps to an ISO 8601 UTC string without superfluous zeros."""

    if value is None:
        return None

    dt: Optional[datetime]

    if isinstance(value, datetime):
        dt = value
    elif isinstance(value, (int, float)):
        try:
            dt = datetime.fromtimestamp(value, tz=timezone.utc)
        except (OverflowError, OSError, ValueError):
            return None
    elif isinstance(value, str):
        trimmed = value.strip()
        if not trimmed:
            return None
        iso_value = trimmed[:-1] + "+00:00" if trimmed.endswith("Z") else trimmed
        try:
            dt = datetime.fromisoformat(iso_value)
        except ValueError:
            return trimmed
    else:
        return str(value)

    if dt.tzinfo is None:
        dt = dt.replace(tzinfo=timezone.utc)
    else:
        dt = dt.astimezone(timezone.utc)

    microseconds = dt.microsecond
    if microseconds == 0:
        formatted = dt.replace(microsecond=0).isoformat(timespec="seconds")
    elif microseconds % 1000 == 0:
        formatted = dt.isoformat(timespec="milliseconds")
    else:
        formatted = dt.isoformat(timespec="microseconds")

    return formatted.replace("+00:00", "Z")


def parse_basic_credentials(header_value: Optional[str]) -> Optional[tuple[str, str]]:
    """Decode HTTP Basic credentials from an Authorization header."""

    if not header_value:
        return None

    try:
        scheme, data = header_value.split(" ", 1)
    except ValueError:
        return None

    if scheme.lower() != "basic":
        return None

    try:
        decoded = base64.b64decode(data).decode("utf-8")
    except Exception:
        return None

    if ":" not in decoded:
        return None

    username, password = decoded.split(":", 1)
    return username, password


def build_basic_auth_header(username: str, password: str) -> str:
    token = base64.b64encode(f"{username}:{password}".encode("utf-8")).decode("ascii")
    return f"Basic {token}"


def _normalize_auth_value(value: Any) -> Any:
    """Normalize optional auth values by treating empty strings as missing."""

    normalized = _ensure_str(value)
    if isinstance(normalized, str):
        trimmed = normalized.strip()
        if not trimmed:
            return None
        return trimmed
    return normalized


def _filter_auth_headers(auth_headers, station_id=None):
    """Remove empty Authorization headers and emit warnings when filtered."""

    if not auth_headers:
        return auth_headers

    filtered = []
    removed = 0
    for name, value in auth_headers:
        if name.lower() != "authorization":
            filtered.append((name, value))
            continue

        normalized = _normalize_auth_value(value)
        if normalized is None:
            removed += 1
            continue

        filtered.append((name, normalized))

    if removed:
        logging.warning(
            "Omitting %s empty Authorization header(s) for backend connection",
            removed,
            extra={"station_id": station_id} if station_id else None,
        )

    return filtered


def _merge_extra_headers(existing_headers, auth_headers, station_id=None):
    """Combine headers while preserving order."""

    filtered_auth_headers = _filter_auth_headers(auth_headers, station_id)
    if not filtered_auth_headers:
        return existing_headers

    combined = []
    if existing_headers:
        if isinstance(existing_headers, Mapping):
            combined.extend(existing_headers.items())
        else:
            combined.extend(existing_headers)
    combined.extend(filtered_auth_headers)
    return combined


def _wrap_protocol_factory_with_headers(auth_headers, existing_factory=None, station_id=None):
    """Inject authorization headers by wrapping the protocol factory."""

    base_factory = existing_factory or WebSocketClientProtocol

    def factory(*args, **kwargs):
        protocol = base_factory(*args, **kwargs)

        original_handshake = protocol.handshake

        async def handshake(wsuri, origin=None, subprotocols=None, extra_headers=None):
            merged = _merge_extra_headers(extra_headers, auth_headers, station_id)
            return await original_handshake(
                wsuri,
                origin=origin,
                subprotocols=subprotocols,
                extra_headers=merged,
            )

        protocol.handshake = handshake.__get__(protocol, protocol.__class__)
        return protocol

    return factory


def _select_backend_auth(
    incoming_key: Optional[str],
    db_key: Any,
    db_basic_user: Any,
    db_basic_password: Any,
    station_id: Optional[str] = None,
) -> tuple[Optional[str], Optional[str], Optional[list[tuple[str, str]]]]:
    """Determine which Authorization header to forward to the backend."""

    normalized_db_key = _normalize_auth_value(db_key)
    normalized_basic_user = _normalize_auth_value(db_basic_user)
    normalized_basic_password = _normalize_auth_value(db_basic_password)

    db_basic_header = None
    if normalized_basic_user is not None or normalized_basic_password is not None:
        if normalized_basic_user and normalized_basic_password:
            db_basic_header = build_basic_auth_header(
                normalized_basic_user,
                normalized_basic_password,
            )
            logging.info(
                "Using configured backend Basic Auth credentials (username='%s')",
                normalized_basic_user,
                extra={"station_id": station_id},
            )
        else:
            logging.warning(
                "Backend Basic Auth override incomplete (username=%s, password set=%s)",
                normalized_basic_user,
                normalized_basic_password is not None,
                extra={"station_id": station_id},
            )

    used_key = None
    used_key_source = None
    if db_basic_header:
        used_key = db_basic_header
        used_key_source = "configured-basic"
    elif normalized_db_key:
        used_key = normalized_db_key
        used_key_source = "configured-auth-key"
    elif incoming_key:
        used_key = incoming_key
        used_key_source = "incoming"

    auth_headers = None
    if used_key:
        ensured_key = _ensure_str(used_key)
        if not isinstance(ensured_key, str):
            ensured_key = str(ensured_key)
        auth_headers = _filter_auth_headers(
            [("authorization", ensured_key)],
            station_id,
        )
        if not auth_headers:
            used_key = None
            used_key_source = None

    return used_key, used_key_source, auth_headers


def _inject_basic_auth_into_url(ws_url: str, auth_headers) -> str:
    """Return a websocket URL that embeds HTTP Basic credentials if possible."""

    if not auth_headers:
        return ws_url

    parsed = urlparse(ws_url)
    if parsed.hostname is None or parsed.username or parsed.password:
        return ws_url

    basic_token = None
    for name, value in auth_headers:
        if name.lower() == "authorization" and isinstance(value, str):
            if value.startswith("Basic "):
                basic_token = value[6:].strip()
                break

    if not basic_token:
        return ws_url

    try:
        decoded = base64.b64decode(basic_token, validate=True).decode("utf-8")
    except (ValueError, UnicodeDecodeError):
        return ws_url

    if ":" not in decoded:
        return ws_url

    username, password = decoded.split(":", 1)

    def _quote(value: str) -> str:
        return quote(value, safe="")

    host = parsed.hostname or ""
    if ":" in host and not host.startswith("["):
        host = f"[{host}]"

    port = f":{parsed.port}" if parsed.port else ""
    userinfo = f"{_quote(username)}:{_quote(password)}"
    new_netloc = f"{userinfo}@{host}{port}"

    return urlunparse(parsed._replace(netloc=new_netloc))


def _build_backend_ssl_context() -> ssl.SSLContext:
    """Return an SSL context for backend WebSocket connections."""

    if BACKEND_SSL_VERIFY:
        context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
        if BACKEND_SSL_CA_FILE:
            try:
                context.load_verify_locations(cafile=BACKEND_SSL_CA_FILE)
            except Exception as exc:  # pragma: no cover - depends on filesystem
                logging.error(
                    "Failed to load backend SSL CA bundle %s: %s",
                    BACKEND_SSL_CA_FILE,
                    exc,
                    extra={"station_id": "system"},
                )
        return context

    # When verification is disabled we intentionally accept any certificate.
    context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    context.check_hostname = False
    context.verify_mode = ssl.CERT_NONE
    return context


async def _connect_backend_with_retries(connect_factory, url_for_logging, mode, station_id):
    """Open a backend websocket connection with retries and timeout."""

    retries = max(1, BACKEND_CONNECT_RETRIES)
    last_exc: Exception | None = None
    for attempt in range(1, retries + 1):
        try:
            start_ts = time.perf_counter()
            remote_ws = await asyncio.wait_for(
                connect_factory(),
                timeout=BACKEND_CONNECT_TIMEOUT,
            )
            elapsed = time.perf_counter() - start_ts
            logging.debug(
                "Backend websocket connect (%s attempt %s/%s) to %s completed in %.3fs",
                mode,
                attempt,
                retries,
                url_for_logging,
                elapsed,
                extra={"station_id": station_id},
            )
            return remote_ws
        except TypeError:
            # Parameter incompatibility should be handled by the caller (e.g.,
            # switching auth injection strategies) rather than retried.
            raise
        except asyncio.TimeoutError as exc:
            last_exc = exc
            logging.warning(
                "Backend websocket connect (%s attempt %s/%s) to %s timed out after %.1fs",
                mode,
                attempt,
                retries,
                url_for_logging,
                BACKEND_CONNECT_TIMEOUT,
                extra={"station_id": station_id},
            )
        except Exception as exc:
            last_exc = exc
            logging.warning(
                "Backend websocket connect (%s attempt %s/%s) to %s failed: %s",
                mode,
                attempt,
                retries,
                url_for_logging,
                exc,
                extra={"station_id": station_id},
            )

        if attempt < retries:
            await asyncio.sleep(BACKEND_CONNECT_BACKOFF_SECONDS)

    raise BackendConnectionError(
        f"Backend websocket connect ({mode}) to {url_for_logging} failed after {retries} attempts"
    ) from last_exc


@asynccontextmanager
async def _backend_websocket_connection(ws_url, base_kwargs, auth_headers, station_id):
    """Open a backend websocket connection with optional Basic Auth headers."""

    connect_kwargs = dict(base_kwargs)
    if auth_headers:
        connect_kwargs["extra_headers"] = _filter_auth_headers(auth_headers, station_id)

    parsed_url = urlparse(ws_url)
    if parsed_url.scheme == "wss" and "ssl" not in connect_kwargs:
        connect_kwargs["ssl"] = _build_backend_ssl_context()
        if not BACKEND_SSL_VERIFY:
            logging.debug(
                "Backend TLS verification disabled; using unverified context for %s",
                ws_url,
                extra={"station_id": station_id},
            )

    remote_ws: WebSocketClientProtocol | None = None
    try:
        try:
            remote_ws = await _connect_backend_with_retries(
                lambda: websockets.connect(ws_url, **connect_kwargs),
                ws_url,
                "default",
                station_id,
            )
            yield remote_ws
            return
        except TypeError as exc:
            if not (auth_headers and "extra_headers" in str(exc)):
                raise

            logging.warning(
                "Backend websockets implementation rejected extra_headers; retrying with alternate auth handling",
                extra={"station_id": station_id},
            )

            connect_kwargs.pop("extra_headers", None)

            if _WEBSOCKETS_SUPPORTS_CREATE_PROTOCOL:
                existing_factory = connect_kwargs.pop("create_protocol", None)
                protocol_factory = _wrap_protocol_factory_with_headers(
                    auth_headers,
                    existing_factory,
                    station_id,
                )
                try:
                    remote_ws = await _connect_backend_with_retries(
                        lambda: websockets.connect(
                            ws_url,
                            create_protocol=protocol_factory,
                            **connect_kwargs,
                        ),
                        ws_url,
                        "protocol_factory",
                        station_id,
                    )
                    yield remote_ws
                    return
                except TypeError as protocol_exc:
                    if "create_protocol" not in str(protocol_exc):
                        raise
                    logging.warning(
                        "websockets.connect does not accept create_protocol; falling back to URL embedded credentials",
                        extra={"station_id": station_id},
                    )

            auth_ws_url = _inject_basic_auth_into_url(ws_url, auth_headers)
            remote_ws = await _connect_backend_with_retries(
                lambda: websockets.connect(auth_ws_url, **connect_kwargs),
                auth_ws_url,
                "url-auth",
                station_id,
            )
            yield remote_ws
    finally:
        if remote_ws is not None and not getattr(remote_ws, "closed", False):
            await remote_ws.close()

MYSQL_CONFIG = _config.get(
    "mysql",
    {
        "host": "172.211.136.203",
        "user": "charge",
        "password": "zm0dem123",
        "db": "op",
        "charset": "utf8",
    },
)

MYSQL_CONFIG.setdefault("pool_minsize", 1)
MYSQL_CONFIG.setdefault("pool_maxsize", 10)
MYSQL_CONFIG.setdefault("pool_recycle", None)

_mysql_pool_cfg = _config.get("mysql_pool")
if isinstance(_mysql_pool_cfg, Mapping):
    MYSQL_CONFIG.update(
        {
            "pool_minsize": _mysql_pool_cfg.get("minsize", MYSQL_CONFIG["pool_minsize"]),
            "pool_maxsize": _mysql_pool_cfg.get("maxsize", MYSQL_CONFIG["pool_maxsize"]),
            "pool_recycle": _mysql_pool_cfg.get("pool_recycle", MYSQL_CONFIG["pool_recycle"]),
        }
    )

_DB_POOL: Optional[aiomysql.Pool] = None
_DB_POOL_LOCK: Optional[asyncio.Lock] = None
_DB_LATENCIES: deque[float] = deque(maxlen=200)


def _describe_mysql_target(config: Mapping[str, Any] | dict[str, Any]) -> str:
    """Return a human readable description of the configured MySQL target."""

    host = config.get("host") or "localhost"
    port = config.get("port")
    user = config.get("user") or "<unknown-user>"
    database = config.get("db")

    host_part = f"{host}:{port}" if port else str(host)
    db_part = f"/{database}" if database else ""

    return f"{user}@{host_part}{db_part}"


def _coerce_pool_setting(value: Any, default: int) -> int:
    """Return an ``int`` pool setting or ``default`` when invalid."""

    try:
        coerced = int(value)
    except (TypeError, ValueError):
        return default

    return coerced if coerced > 0 else default


def _coerce_pool_recycle(value: Any) -> int:
    """Return a non-negative ``pool_recycle`` value."""

    try:
        if value is None:
            return 0
        coerced = int(value)
    except (TypeError, ValueError):
        return 0

    return coerced if coerced > 0 else 0


async def init_db_pool() -> aiomysql.Pool:
    """Initialise the global MySQL connection pool if needed."""

    global _DB_POOL, _DB_POOL_LOCK

    if _DB_POOL is not None:
        return _DB_POOL

    if _DB_POOL_LOCK is None:
        _DB_POOL_LOCK = asyncio.Lock()

    async with _DB_POOL_LOCK:
        if _DB_POOL is not None:
            return _DB_POOL

        minsize = _coerce_pool_setting(MYSQL_CONFIG.get("pool_minsize"), 1)
        maxsize = _coerce_pool_setting(MYSQL_CONFIG.get("pool_maxsize"), 10)
        pool_recycle = _coerce_pool_recycle(MYSQL_CONFIG.get("pool_recycle"))

        pool_kwargs = {
            key: value
            for key, value in MYSQL_CONFIG.items()
            if key not in {"pool_minsize", "pool_maxsize", "pool_recycle"}
        }
        _DB_POOL = await aiomysql.create_pool(
            **pool_kwargs,
            minsize=minsize,
            maxsize=maxsize,
            pool_recycle=pool_recycle,
        )

    return _DB_POOL


def get_db_pool() -> Optional[aiomysql.Pool]:
    """Return the global MySQL connection pool if initialised."""

    return _DB_POOL


async def close_db_pool() -> None:
    """Gracefully close the global MySQL connection pool."""

    global _DB_POOL, _DB_POOL_LOCK

    if _DB_POOL is not None:
        _DB_POOL.close()
        try:
            await _DB_POOL.wait_closed()
        finally:
            _DB_POOL = None
            _DB_POOL_LOCK = None


def _record_db_latency_sample(seconds: float) -> None:
    """Store a latency sample when acquiring a DB connection."""

    if seconds <= 0 or not math.isfinite(seconds):
        return
    _DB_LATENCIES.append(seconds)


def _calc_percentile(values: list[float], percentile: float) -> Optional[float]:
    """Return the percentile value or ``None`` when not enough samples."""

    if not values:
        return None
    ordered = sorted(values)
    index = max(0, min(len(ordered) - 1, math.ceil(percentile * len(ordered)) - 1))
    return ordered[index]


def _get_pool_queue_length(pool: aiomysql.Pool) -> Optional[int]:
    """Return the length of the internal waiter queue if exposed by aiomysql."""

    for attr in ("_queue", "_waiters"):
        queue = getattr(pool, attr, None)
        if queue is None:
            continue
        try:
            if hasattr(queue, "qsize"):
                return int(queue.qsize())
            return len(queue)
        except Exception:
            continue
    return None


def collect_db_pool_metrics() -> dict[str, Any]:
    """Return current aiomysql pool utilisation and latency samples."""

    pool_limits = {
        "pool_minsize": _coerce_pool_setting(MYSQL_CONFIG.get("pool_minsize"), 1),
        "pool_maxsize": _coerce_pool_setting(MYSQL_CONFIG.get("pool_maxsize"), 10),
        "pool_recycle": _coerce_pool_recycle(MYSQL_CONFIG.get("pool_recycle")),
    }

    pool = get_db_pool()
    metrics: dict[str, Any] = {
        "initialized": pool is not None,
        **pool_limits,
    }

    if pool is None:
        metrics["error"] = "Pool not initialised"
        return metrics

    size = getattr(pool, "size", None)
    freesize = getattr(pool, "freesize", None)
    waiting = _get_pool_queue_length(pool)

    if isinstance(size, int):
        metrics["size"] = size
    if isinstance(freesize, int):
        metrics["freesize"] = freesize
        if isinstance(size, int):
            metrics["in_use"] = max(0, size - freesize)
    if waiting is not None:
        metrics["waiting"] = max(0, waiting)

    samples = list(_DB_LATENCIES)
    if samples:
        avg_ms = (sum(samples) / len(samples)) * 1000.0
        p95_ms = _calc_percentile(samples, 0.95)
        metrics.update(
            {
                "latency_avg_ms": avg_ms,
                "latency_p95_ms": p95_ms * 1000.0 if p95_ms is not None else None,
                "latency_samples": len(samples),
            }
        )

    return metrics

API_LOGGING_TABLE_SQL = """
    CREATE TABLE IF NOT EXISTS op_broker_api_logging (
        id INT AUTO_INCREMENT PRIMARY KEY,
        created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
        chargepoint_id VARCHAR(255),
        partner_id VARCHAR(255),
        module VARCHAR(255),
        endpoint VARCHAR(255) NOT NULL,
        request_payload LONGTEXT,
        response_code INT,
        response_body LONGTEXT,
        KEY idx_created_at (created_at),
        KEY idx_endpoint (endpoint),
        KEY idx_partner (partner_id),
        KEY idx_module (module),
        KEY idx_response_code (response_code)
    ) CHARACTER SET utf8mb4
"""

LAST_METER_READING_TABLE_SQL = """
    CREATE TABLE IF NOT EXISTS op_broker_lastmeterreading (
        chargepoint_id VARCHAR(255) NOT NULL,
        connector_id INT NOT NULL,
        meter_value DOUBLE NOT NULL,
        ts TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
        PRIMARY KEY (chargepoint_id, connector_id)
    )
"""

SESSION_STATE_TABLE_SQL = """
    CREATE TABLE IF NOT EXISTS op_broker_sessions (
        id BIGINT AUTO_INCREMENT PRIMARY KEY,
        session_uid VARCHAR(100) NOT NULL,
        station_id VARCHAR(255) NOT NULL,
        connector_id VARCHAR(100),
        evse_id VARCHAR(100),
        transaction_id VARCHAR(100),
        id_tag VARCHAR(255),
        status VARCHAR(30) NOT NULL,
        start_timestamp DATETIME NULL,
        end_timestamp DATETIME NULL,
        meter_start_wh DOUBLE NULL,
        meter_stop_wh DOUBLE NULL,
        meter_last_wh DOUBLE NULL,
        last_status VARCHAR(50),
        last_status_timestamp DATETIME NULL,
        updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
        UNIQUE KEY uniq_station_session (station_id, session_uid),
        KEY idx_status (status),
        KEY idx_station (station_id),
        KEY idx_updated_at (updated_at)
    ) CHARACTER SET utf8mb4
"""

EVSE_STATUS_TABLE_SQL = """
    CREATE TABLE IF NOT EXISTS op_broker_evse_status (
        id BIGINT AUTO_INCREMENT PRIMARY KEY,
        station_id VARCHAR(255) NOT NULL,
        connector_id VARCHAR(100) NOT NULL,
        evse_id VARCHAR(100) NOT NULL,
        status VARCHAR(50),
        error_code VARCHAR(100),
        status_timestamp DATETIME NULL,
        updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
        UNIQUE KEY uniq_evse_status (station_id, connector_id, evse_id),
        KEY idx_updated_at (updated_at)
    ) CHARACTER SET utf8mb4
"""

CP_AUTH_LOG_TABLE_SQL = """
    CREATE TABLE IF NOT EXISTS op_broker_cp_auth_log (
        id INT AUTO_INCREMENT PRIMARY KEY,
        created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
        chargepoint_id VARCHAR(255) NOT NULL,
        incoming_auth_header TEXT,
        incoming_auth_username VARCHAR(255),
        incoming_auth_password VARCHAR(255),
        used_auth_header TEXT,
        used_auth_source ENUM('configured-basic', 'configured-auth-key', 'incoming', 'none') NOT NULL DEFAULT 'none',
        KEY idx_cp_auth_chargepoint (chargepoint_id),
        KEY idx_cp_auth_created (created_at)
    ) CHARACTER SET utf8mb4
"""

CP_CONNECT_LOG_TABLE_SQL = """
    CREATE TABLE IF NOT EXISTS op_broker_cp_connect_log (
        id BIGINT AUTO_INCREMENT PRIMARY KEY,
        chargepoint_id VARCHAR(255) NOT NULL,
        event ENUM('connect', 'disconnect') NOT NULL,
        disconnect_source ENUM('chargepoint', 'server') DEFAULT NULL,
        created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
        KEY idx_cp_connect_chargepoint (chargepoint_id),
        KEY idx_cp_connect_created (created_at)
    ) CHARACTER SET utf8mb4
"""

_api_log_table_lock = asyncio.Lock()
_api_log_table_ready = False

_last_meter_table_lock = asyncio.Lock()
_last_meter_table_ready = False

_session_state_table_lock = asyncio.Lock()
_session_state_table_ready = False

_evse_status_table_lock = asyncio.Lock()
_evse_status_table_ready = False

_cp_auth_log_table_lock = asyncio.Lock()
_cp_auth_log_table_ready = False

_cp_connect_log_table_lock = asyncio.Lock()
_cp_connect_log_table_ready = False

def _serialize_api_payload(payload: object) -> str:
    if payload is None:
        return ""
    try:
        return json.dumps(payload, ensure_ascii=False)
    except TypeError:
        try:
            return json.dumps(payload, ensure_ascii=False, default=str)
        except Exception:
            return str(payload)


async def ensure_cp_auth_log_table() -> None:
    """Create the charge point auth log table if it doesn't exist."""

    global _cp_auth_log_table_ready
    if _cp_auth_log_table_ready:
        return

    async with _cp_auth_log_table_lock:
        if _cp_auth_log_table_ready:
            return

        try:
            async with db_connection() as conn:
                async with conn.cursor() as cur:
                    await cur.execute(
                        """
                        SELECT 1
                        FROM information_schema.tables
                        WHERE table_schema = DATABASE()
                          AND table_name = %s
                        LIMIT 1
                        """,
                        ("op_broker_cp_auth_log",),
                    )
                    exists = await cur.fetchone()
                    if not exists:
                        await cur.execute(CP_AUTH_LOG_TABLE_SQL)
                await conn.commit()
            _cp_auth_log_table_ready = True
        except Exception:
            logging.debug(
                "Failed to ensure op_broker_cp_auth_log table",
                exc_info=True,
            )


async def log_cp_auth_event(
    station_id: str,
    incoming_auth_header: Optional[str],
    incoming_basic: Optional[Tuple[str, str]],
    used_auth_header: Optional[str],
    used_auth_source: Optional[str],
) -> None:
    """Persist charge point authentication details for diagnostics."""

    try:
        await ensure_cp_auth_log_table()
        username: Optional[str] = None
        password: Optional[str] = None
        if incoming_basic:
            username, password = incoming_basic

        async with db_connection() as conn:
            async with conn.cursor() as cur:
                await cur.execute(
                    """
                    INSERT INTO op_broker_cp_auth_log (
                        chargepoint_id,
                        incoming_auth_header,
                        incoming_auth_username,
                        incoming_auth_password,
                        used_auth_header,
                        used_auth_source
                    )
                    VALUES (%s, %s, %s, %s, %s, %s)
                    """,
                    (
                        station_id,
                        incoming_auth_header,
                        username,
                        password,
                        used_auth_header,
                        used_auth_source or "none",
                    ),
                )
            await conn.commit()
    except Exception:
        logging.debug(
            "Failed to log authentication information for %s",
            station_id,
            exc_info=True,
        )


async def ensure_cp_connect_log_table() -> None:
    """Create the CP connect/disconnect log table if it doesn't exist."""

    global _cp_connect_log_table_ready
    if _cp_connect_log_table_ready:
        return

    async with _cp_connect_log_table_lock:
        if _cp_connect_log_table_ready:
            return

        try:
            async with db_connection() as conn:
                async with conn.cursor() as cur:
                    await cur.execute(
                        """
                        SELECT 1
                        FROM information_schema.tables
                        WHERE table_schema = DATABASE()
                          AND table_name = %s
                        LIMIT 1
                        """,
                        ("op_broker_cp_connect_log",),
                    )
                    exists = await cur.fetchone()
                    if not exists:
                        await cur.execute(CP_CONNECT_LOG_TABLE_SQL)
                await conn.commit()
            _cp_connect_log_table_ready = True
        except Exception:
            logging.debug(
                "Failed to ensure op_broker_cp_connect_log table",
                exc_info=True,
            )


async def log_cp_connect_event(
    station_id: str, event: str, disconnect_source: Optional[str] = None
) -> None:
    """Persist connect/disconnect events for diagnostics."""

    normalized_event = "disconnect" if event == "disconnect" else "connect"
    source_value = disconnect_source if normalized_event == "disconnect" else None
    if source_value not in {"chargepoint", "server"}:
        source_value = None

    try:
        await ensure_cp_connect_log_table()
        async with db_connection() as conn:
            async with conn.cursor() as cur:
                await cur.execute(
                    """
                    INSERT INTO op_broker_cp_connect_log (
                        chargepoint_id, event, disconnect_source
                    ) VALUES (%s, %s, %s)
                    """,
                    (station_id, normalized_event, source_value),
                )
            await conn.commit()
    except Exception:
        logging.debug(
            "Failed to log connection event for %s",
            station_id,
            exc_info=True,
        )


async def _ensure_api_logging_table(conn) -> None:
    """Create the API logging table (and missing columns) exactly once."""

    global _api_log_table_ready
    if _api_log_table_ready:
        return

    async with _api_log_table_lock:
        if _api_log_table_ready:
            return

        async with conn.cursor() as cur:

            # CREATE TABLE IF NOT EXISTS still triggers warnings on some
            # installations. Query ``information_schema`` first so we can skip
            # the statement entirely when the table is already present.
            await cur.execute(
                """
                SELECT 1
                FROM information_schema.tables
                WHERE table_schema = DATABASE()
                  AND table_name = %s
                LIMIT 1
                """,
                ("op_broker_api_logging",),
            )
            table_exists = await cur.fetchone()

            if not table_exists:
                await cur.execute(API_LOGGING_TABLE_SQL)
                await conn.commit()

            # Older MySQL/MariaDB versions do not support
            # ``ADD COLUMN IF NOT EXISTS``. Instead we check the column's
            # existence explicitly before attempting to add it.
            await cur.execute(
                """
                SHOW COLUMNS FROM op_broker_api_logging LIKE 'response_body'
                """
            )
            has_response_body = await cur.fetchone()

            if not has_response_body:

                await cur.execute(
                    """
                    ALTER TABLE op_broker_api_logging
                        ADD COLUMN response_body LONGTEXT
                    """
                )

            await cur.execute(
                """
                SHOW COLUMNS FROM op_broker_api_logging LIKE 'partner_id'
                """
            )
            partner_column = await cur.fetchone()
            if not partner_column:
                await cur.execute(
                    """
                    ALTER TABLE op_broker_api_logging
                        ADD COLUMN partner_id VARCHAR(255) AFTER chargepoint_id
                    """
                )

            await cur.execute(
                """
                SHOW COLUMNS FROM op_broker_api_logging LIKE 'module'
                """
            )
            module_column = await cur.fetchone()
            if not module_column:
                await cur.execute(
                    """
                    ALTER TABLE op_broker_api_logging
                        ADD COLUMN module VARCHAR(255) AFTER partner_id
                    """
                )

            await cur.execute(
                """
                SHOW INDEX FROM op_broker_api_logging WHERE Key_name='idx_partner'
                """
            )
            partner_index = await cur.fetchone()
            if not partner_index:
                await cur.execute(
                    """
                    ALTER TABLE op_broker_api_logging
                        ADD KEY idx_partner (partner_id)
                    """
                )

            await cur.execute(
                """
                SHOW INDEX FROM op_broker_api_logging WHERE Key_name='idx_module'
                """
            )
            module_index = await cur.fetchone()
            if not module_index:
                await cur.execute(
                    """
                    ALTER TABLE op_broker_api_logging
                        ADD KEY idx_module (module)
                    """
                )

            await cur.execute(
                """
                SHOW INDEX FROM op_broker_api_logging WHERE Key_name='idx_response_code'
                """
            )
            status_index = await cur.fetchone()
            if not status_index:
                await cur.execute(
                    """
                    ALTER TABLE op_broker_api_logging
                        ADD KEY idx_response_code (response_code)
                    """
                )

        await conn.commit()

        _api_log_table_ready = True


async def log_external_api_request(
    endpoint: str,
    payload: object,
    response_code: Optional[int],
    station_id: Optional[str] = None,
    response_body: object = None,
    *,
    partner_id: Optional[str] = None,
    module: Optional[str] = None,
    station_path: Optional[str] = None,
) -> None:
    serialized_payload = _serialize_api_payload(payload)
    serialized_response = _serialize_api_payload(response_body)
    resolved_module = module or endpoint
    resolved_partner = partner_id
    if resolved_partner is None and isinstance(payload, Mapping):
        resolved_partner = payload.get("partner_id") or payload.get("partnerId")
    try:
        async with db_connection() as conn:
            await _ensure_api_logging_table(conn)
            async with conn.cursor() as cur:
                await cur.execute(
                    """
                    INSERT INTO op_broker_api_logging
                        (chargepoint_id, partner_id, module, endpoint, request_payload, response_code, response_body)
                    VALUES (%s, %s, %s, %s, %s, %s, %s)
                    """,
                    (
                        station_id,
                        resolved_partner,
                        resolved_module,
                        endpoint,
                        serialized_payload,
                        response_code,
                        serialized_response,
                    ),
                )
            await conn.commit()
    except Exception:
        logging.debug(
            "Failed to log external API request for endpoint %s",
            endpoint,
            exc_info=True,
        )
    if station_id:
        event_mapping = {
            "authorize-transaction": "authorizeTransaction",
            "stop-transaction": "stopTransaction",
            "update-transaction": "updateTransaction",
            "configuration-update": "configurationUpdate",
        }
        event_prefix = event_mapping.get(endpoint)
        if event_prefix:
            await _log_external_api_messages(
                station_id,
                event_prefix,
                serialized_payload,
                serialized_response,
                station_path=station_path,
            )


async def _log_external_api_messages(
    station_id: str,
    event_prefix: str,
    request_payload: str,
    response_payload: str,
    *,
    station_path: Optional[str] = None,
) -> None:
    def _wrap(event: str, content: str) -> str:
        if not content:
            return json.dumps({"event": event})
        try:
            parsed = json.loads(content)
        except json.JSONDecodeError:
            parsed = content
        return json.dumps({"event": event, "payload": parsed}, ensure_ascii=False)

    try:
        request_entry = _wrap(f"{event_prefix}_request", request_payload)
        target_station = station_path or station_id
        await log_message(
            target_station,
            "server_to_extapi",
            request_entry,
            topic_override=event_prefix,
        )
    except Exception:
        logging.debug(
            "Failed to log %s request to op_messages",
            event_prefix,
            exc_info=True,
        )

    try:
        response_entry = _wrap(f"{event_prefix}_response", response_payload)
        target_station = station_path or station_id
        await log_message(
            target_station,
            "extapi_to_server",
            response_entry,
            topic_override=event_prefix,
        )
    except Exception:
        logging.debug(
            "Failed to log %s response to op_messages",
            event_prefix,
            exc_info=True,
        )

async def ensure_last_meterreading_table() -> None:
    """Ensure the last meter reading table exists exactly once."""

    global _last_meter_table_ready
    if _last_meter_table_ready:
        return

    async with _last_meter_table_lock:
        if _last_meter_table_ready:
            return
        try:
            async with db_connection() as conn:
                async with conn.cursor() as cur:
                    await cur.execute(
                        """
                        SELECT 1
                        FROM information_schema.tables
                        WHERE table_schema = DATABASE()
                          AND table_name = %s
                        LIMIT 1
                        """,
                        ("op_broker_lastmeterreading",),
                    )
                    table_exists = await cur.fetchone()

                    if not table_exists:
                        await cur.execute(LAST_METER_READING_TABLE_SQL)
                await conn.commit()
            _last_meter_table_ready = True
        except Exception:
            logging.debug(
                "Failed to ensure op_broker_lastmeterreading table",
                exc_info=True,
            )


async def store_last_meter_reading(
    chargepoint_id: str, connector_id: int, meter_value: float, ts: datetime
) -> None:
    """Persist the latest meter reading for a connector."""

    try:
        await ensure_last_meterreading_table()
        async with db_connection() as conn:
            async with conn.cursor() as cur:
                await cur.execute(
                    """
                    INSERT INTO op_broker_lastmeterreading (chargepoint_id, connector_id, meter_value, ts)
                    VALUES (%s, %s, %s, %s) AS new_values
                    ON DUPLICATE KEY UPDATE
                        meter_value = new_values.meter_value,
                        ts = new_values.ts
                    """,
                    (chargepoint_id, connector_id, meter_value, ts),
                )
            await conn.commit()
    except Exception:
        logging.debug(
            "Failed to store last meter reading for %s/%s",
            chargepoint_id,
            connector_id,
            exc_info=True,
        )


async def ensure_session_state_table() -> None:
    """Ensure the session state table exists."""

    global _session_state_table_ready
    if _session_state_table_ready:
        return

    async with _session_state_table_lock:
        if _session_state_table_ready:
            return
        try:
            async with db_connection() as conn:
                async with conn.cursor() as cur:
                    await cur.execute(
                        """
                        SELECT 1
                        FROM information_schema.tables
                        WHERE table_schema = DATABASE()
                          AND table_name = %s
                        LIMIT 1
                        """,
                        ("op_broker_sessions",),
                    )
                    exists = await cur.fetchone()
                    if not exists:
                        await cur.execute(SESSION_STATE_TABLE_SQL)
                await conn.commit()
            _session_state_table_ready = True
        except Exception:
            logging.debug(
                "Failed to ensure op_broker_sessions table",
                exc_info=True,
            )


async def ensure_evse_status_table() -> None:
    """Ensure the EVSE status table exists."""

    global _evse_status_table_ready
    if _evse_status_table_ready:
        return

    async with _evse_status_table_lock:
        if _evse_status_table_ready:
            return
        try:
            async with db_connection() as conn:
                async with conn.cursor() as cur:
                    await cur.execute(
                        """
                        SELECT 1
                        FROM information_schema.tables
                        WHERE table_schema = DATABASE()
                          AND table_name = %s
                        LIMIT 1
                        """,
                        ("op_broker_evse_status",),
                    )
                    exists = await cur.fetchone()
                    if not exists:
                        await cur.execute(EVSE_STATUS_TABLE_SQL)
                await conn.commit()
            _evse_status_table_ready = True
        except Exception:
            logging.debug(
                "Failed to ensure op_broker_evse_status table",
                exc_info=True,
            )


def _normalise_session_status(value: Optional[str]) -> Optional[str]:
    if value is None:
        return None
    try:
        text = str(value).strip()
    except Exception:
        return None
    if not text:
        return None
    return text.upper()


def _coerce_optional_datetime(value: object) -> Optional[datetime]:
    if isinstance(value, datetime):
        return value
    if isinstance(value, str):
        try:
            return datetime.fromisoformat(value.replace("Z", "+00:00"))
        except Exception:
            try:
                return parse_ocpp_timestamp(value)
            except Exception:
                return None
    return None


async def upsert_session_state(
    station_id: str,
    session_uid: str,
    *,
    transaction_id: Optional[str] = None,
    connector_id: Optional[ConnectorKey] = None,
    evse_id: Optional[ConnectorKey] = None,
    id_tag: Optional[str] = None,
    status: Optional[str] = None,
    start_timestamp: Optional[object] = None,
    end_timestamp: Optional[object] = None,
    meter_start_wh: Optional[float] = None,
    meter_stop_wh: Optional[float] = None,
    meter_last_wh: Optional[float] = None,
    last_status: Optional[str] = None,
    last_status_timestamp: Optional[object] = None,
) -> None:
    """Insert or update a session state row."""

    if not session_uid:
        return

    status_value = _normalise_session_status(status)
    start_dt = _coerce_optional_datetime(start_timestamp)
    end_dt = _coerce_optional_datetime(end_timestamp)
    last_status_dt = _coerce_optional_datetime(last_status_timestamp)

    try:
        await ensure_session_state_table()
        async with db_connection() as conn:
            async with conn.cursor() as cur:
                await cur.execute(
                    """
                    INSERT INTO op_broker_sessions (
                        session_uid, station_id, connector_id, evse_id, transaction_id,
                        id_tag, status, start_timestamp, end_timestamp, meter_start_wh,
                        meter_stop_wh, meter_last_wh, last_status, last_status_timestamp
                    ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                    ON DUPLICATE KEY UPDATE
                        connector_id = COALESCE(VALUES(connector_id), connector_id),
                        evse_id = COALESCE(VALUES(evse_id), evse_id),
                        transaction_id = COALESCE(VALUES(transaction_id), transaction_id),
                        id_tag = COALESCE(VALUES(id_tag), id_tag),
                        status = COALESCE(VALUES(status), status),
                        start_timestamp = COALESCE(VALUES(start_timestamp), start_timestamp),
                        end_timestamp = COALESCE(VALUES(end_timestamp), end_timestamp),
                        meter_start_wh = COALESCE(VALUES(meter_start_wh), meter_start_wh),
                        meter_stop_wh = COALESCE(VALUES(meter_stop_wh), meter_stop_wh),
                        meter_last_wh = COALESCE(VALUES(meter_last_wh), meter_last_wh),
                        last_status = COALESCE(VALUES(last_status), last_status),
                        last_status_timestamp = COALESCE(VALUES(last_status_timestamp), last_status_timestamp),
                        updated_at = CURRENT_TIMESTAMP
                    """,
                    (
                        str(session_uid),
                        station_id,
                        None if connector_id is None else str(connector_id),
                        None if evse_id is None else str(evse_id),
                        transaction_id,
                        id_tag,
                        status_value or "ACTIVE",
                        start_dt,
                        end_dt,
                        meter_start_wh,
                        meter_stop_wh,
                        meter_last_wh,
                        last_status,
                        last_status_dt,
                    ),
                )
            await conn.commit()
    except Exception:
        logging.debug(
            "Failed to upsert session state for %s/%s",
            station_id,
            session_uid,
            exc_info=True,
        )


async def upsert_evse_status(
    station_id: str,
    *,
    connector_id: Optional[ConnectorKey],
    evse_id: Optional[ConnectorKey],
    status: Optional[str],
    error_code: Optional[str],
    status_timestamp: Optional[object],
) -> None:
    """Persist the latest EVSE status for availability lookups."""

    connector_text = str(connector_id) if connector_id is not None else ""
    evse_text = str(evse_id) if evse_id is not None else ""
    status_dt = _coerce_optional_datetime(status_timestamp)

    try:
        await ensure_evse_status_table()
        async with db_connection() as conn:
            async with conn.cursor() as cur:
                await cur.execute(
                    """
                    INSERT INTO op_broker_evse_status (
                        station_id, connector_id, evse_id, status, error_code, status_timestamp
                    ) VALUES (%s, %s, %s, %s, %s, %s)
                    ON DUPLICATE KEY UPDATE
                        status = COALESCE(VALUES(status), status),
                        error_code = COALESCE(VALUES(error_code), error_code),
                        status_timestamp = COALESCE(VALUES(status_timestamp), status_timestamp),
                        updated_at = CURRENT_TIMESTAMP
                    """,
                    (station_id, connector_text, evse_text, status, error_code, status_dt),
                )
            await conn.commit()
    except Exception:
        logging.debug(
            "Failed to upsert EVSE status for %s/%s",
            station_id,
            connector_text or evse_text,
            exc_info=True,
        )


@asynccontextmanager
async def db_connection(max_retries: int = 3, base_delay: float = 1.0):
    """Return a MySQL connection with retry and logging on failure."""

    attempt = 0
    delay = base_delay
    while True:
        try:
            pool = await init_db_pool()
            start = time.perf_counter()
            conn = await pool.acquire()
            _record_db_latency_sample(time.perf_counter() - start)
            break
        except Exception as exc:  # pragma: no cover - defensive logging
            attempt += 1
            target = _describe_mysql_target(MYSQL_CONFIG)
            logging.error(
                "Database connection attempt %s/%s to %s failed: %s",
                attempt,
                max_retries,
                target,
                exc,
            )
            if attempt >= max_retries:
                raise
            await asyncio.sleep(delay)
            delay *= 2

    try:
        yield conn
    finally:
        try:
            try:
                await conn.rollback()
            except Exception as exc:  # pragma: no cover - defensive logging
                logging.warning("Failed to roll back database connection: %s", exc)
            pool.release(conn)
        except Exception as exc:  # pragma: no cover - defensive logging
            logging.warning("Failed to close database connection: %s", exc)


async def is_station_oicp_enabled(chargepoint_id: str) -> bool:
    """Return True when the given station is flagged for Hubject integration."""

    try:
        async with db_connection() as conn:
            async with conn.cursor() as cur:
                await cur.execute(
                    """
                    SELECT enabled
                    FROM op_server_oicp_enable
                    WHERE chargepoint_id=%s
                    LIMIT 1
                    """,
                    (chargepoint_id,),
                )
                row = await cur.fetchone()
    except Exception:
        logging.exception("Failed to read OICP flag for %s", chargepoint_id)
        return False

    if row is None:
        return False

    if isinstance(row, Mapping):
        value = row.get("enabled")
    else:
        try:
            value = row[0]
        except (IndexError, TypeError):  # pragma: no cover - defensive guard
            value = None

    return bool(value)


async def _push_hubject_cdr_with_retry(
    station_id: str,
    start_info: Mapping[str, Any],
    stop_info: Mapping[str, Any],
    *,
    retries: int = HUBJECT_PUSH_RETRIES,
) -> None:
    if HUBJECT_CLIENT is None:
        return

    attempt = 0
    while True:
        attempt += 1
        try:
            success, status, _ = await HUBJECT_CLIENT.push_cdr(
                station_id, start_info, stop_info
            )
        except Exception:
            success = False
            status = 0
            logging.exception(
                "Hubject push_cdr raised an error for station_id=%s (attempt %s/%s)",
                station_id,
                attempt,
                retries + 1,
            )
        else:
            if success:
                return
            logging.warning(
                "Hubject push_cdr returned status=%s for station_id=%s (attempt %s/%s)",
                status,
                station_id,
                attempt,
                retries + 1,
            )

        if attempt > retries:
            logging.error(
                "Hubject push_cdr failed for station_id=%s after %s attempts",
                station_id,
                attempt,
            )
            return

        logging.info(
            "Retrying Hubject push_cdr for station_id=%s (attempt %s/%s)",
            station_id,
            attempt + 1,
            retries + 1,
        )
        await asyncio.sleep(0)


async def translate_id_tag(id_tag: str) -> Optional[str]:
    try:
        async with db_connection() as conn:
            async with conn.cursor() as cur:
                try:
                    await cur.execute(
                        "SELECT virtual_uid FROM op_idtags WHERE real_uid=%s",
                        (id_tag,),
                    )
                    row = await cur.fetchone()
                    if row:
                        return row[0]
                except Exception:
                    pass

                await cur.execute(
                    "SELECT config_key, config_value FROM op_config WHERE config_key IN (%s, %s)",
                    ("idtag_map_all", "idtag_master_uid"),
                )
                cfg_rows = await cur.fetchall()
        cfg = {k: v for k, v in cfg_rows}
        if cfg.get("idtag_map_all") == "1" and cfg.get("idtag_master_uid"):
            return cfg.get("idtag_master_uid")
    except Exception as e:
        logging.error(f"IDTag mapping error: {e}")
    return None


async def _ensure_fault_detection_tables(conn) -> None:
    global _fault_detection_tables_ready, _fault_detection_table_lock
    if _fault_detection_tables_ready:
        return

    if _fault_detection_table_lock is None:
        _fault_detection_table_lock = asyncio.Lock()

    async with _fault_detection_table_lock:
        if _fault_detection_tables_ready:
            return

        async with conn.cursor() as cur:
            await cur.execute(
                """
                SELECT 1
                FROM information_schema.tables
                WHERE table_schema = DATABASE()
                  AND table_name = %s
                LIMIT 1
                """,
                ("op_fault_detection_rules",),
            )
            rules_exists = await cur.fetchone()

            if not rules_exists:
                await cur.execute(FAULT_RULES_TABLE_SQL)
            else:
                await cur.execute(
                    """
                    SELECT 1
                    FROM information_schema.columns
                    WHERE table_schema = DATABASE()
                      AND table_name = 'op_fault_detection_rules'
                      AND column_name = 'pattern_title'
                    LIMIT 1
                    """
                )
                if not await cur.fetchone():
                    await cur.execute(
                        """
                        ALTER TABLE op_fault_detection_rules
                        ADD COLUMN pattern_title VARCHAR(255) NOT NULL DEFAULT ''
                            AFTER id
                        """
                    )
                await cur.execute(
                    """
                    SELECT 1
                    FROM information_schema.columns
                    WHERE table_schema = DATABASE()
                      AND table_name = 'op_fault_detection_rules'
                      AND column_name = 'criticality'
                    LIMIT 1
                    """
                )
                if not await cur.fetchone():
                    await cur.execute(
                        """
                        ALTER TABLE op_fault_detection_rules
                        ADD COLUMN criticality ENUM('low', 'medium', 'high') NOT NULL DEFAULT 'medium'
                            AFTER explanation
                        """
                    )
                await cur.execute(
                    """
                    SELECT 1
                    FROM information_schema.columns
                    WHERE table_schema = DATABASE()
                      AND table_name = 'op_fault_detection_rules'
                      AND column_name = 'is_active'
                    LIMIT 1
                    """
                )
                if not await cur.fetchone():
                    await cur.execute(
                        """
                        ALTER TABLE op_fault_detection_rules
                        ADD COLUMN is_active TINYINT(1) NOT NULL DEFAULT 1
                            AFTER criticality
                        """
                    )
                await cur.execute(
                    "UPDATE op_fault_detection_rules SET criticality = 'medium' WHERE criticality IS NULL"
                )
                await cur.execute(
                    "UPDATE op_fault_detection_rules SET is_active = 1 WHERE is_active IS NULL"
                )
                await cur.execute(
                    """
                    UPDATE op_fault_detection_rules
                    SET pattern_title = CASE
                            WHEN pattern_title IS NULL OR pattern_title = '' THEN pattern
                            ELSE pattern_title
                        END
                    """
                )

            await cur.execute(
                """
                SELECT 1
                FROM information_schema.tables
                WHERE table_schema = DATABASE()
                  AND table_name = %s
                LIMIT 1
                """,
                ("op_station_marks",),
            )
            marks_exists = await cur.fetchone()

            if not marks_exists:
                await cur.execute(FAULT_MARKS_TABLE_SQL)
            else:
                await cur.execute(
                    """
                    SELECT 1
                    FROM information_schema.columns
                    WHERE table_schema = DATABASE()
                      AND table_name = 'op_station_marks'
                      AND column_name = 'service_requested_at'
                    LIMIT 1
                    """
                )
                if not await cur.fetchone():
                    await cur.execute(
                        """
                        ALTER TABLE op_station_marks
                        ADD COLUMN service_requested_at TIMESTAMP NULL DEFAULT NULL
                            AFTER updated_at
                        """
                    )

            await cur.execute(
                """
                SELECT 1
                FROM information_schema.tables
                WHERE table_schema = DATABASE()
                  AND table_name = %s
                LIMIT 1
                """,
                ("op_fault_detection_trigger",),
            )
            triggers_exists = await cur.fetchone()

            if not triggers_exists:
                await cur.execute(FAULT_TRIGGERS_TABLE_SQL)
            else:
                await cur.execute(
                    """
                    SELECT 1
                    FROM information_schema.columns
                    WHERE table_schema = DATABASE()
                      AND table_name = 'op_fault_detection_trigger'
                      AND column_name = 'cluster'
                    LIMIT 1
                    """
                )
                if not await cur.fetchone():
                    await cur.execute(
                        """
                        ALTER TABLE op_fault_detection_trigger
                        ADD COLUMN cluster VARCHAR(255) DEFAULT 'other'
                            AFTER report_reason_description
                        """
                    )
                await cur.execute(
                    "ALTER TABLE op_fault_detection_trigger MODIFY threshold INT NOT NULL DEFAULT 1"
                )
                await cur.execute(
                    "UPDATE op_fault_detection_trigger SET threshold = 1 WHERE threshold IS NULL"
                )
                await cur.execute(
                    "UPDATE op_fault_detection_trigger SET is_enabled = 1 WHERE is_enabled IS NULL"
                )

        await conn.commit()
        _fault_detection_tables_ready = True


async def _load_fault_patterns(force: bool = False) -> list[dict]:
    global _fault_patterns, _fault_patterns_last_load, _fault_patterns_lock

    now = time.monotonic()
    if not force and _fault_patterns and (now - _fault_patterns_last_load) < FAULT_PATTERN_CACHE_SECONDS:
        return _fault_patterns

    if _fault_patterns_lock is None:
        _fault_patterns_lock = asyncio.Lock()

    async with _fault_patterns_lock:
        now = time.monotonic()
        if not force and _fault_patterns and (now - _fault_patterns_last_load) < FAULT_PATTERN_CACHE_SECONDS:
            return _fault_patterns

        try:
            async with db_connection() as conn:
                await _ensure_fault_detection_tables(conn)
                async with conn.cursor(aiomysql.DictCursor) as cur:
                    await cur.execute(
                        """
                        SELECT id, pattern_title, pattern, explanation, criticality
                        FROM op_fault_detection_rules
                        WHERE is_active = 1
                        ORDER BY id
                        """
                    )
                    rows = await cur.fetchall()
        except Exception:
            logging.exception("Failed to load fault detection rules from database")
            return _fault_patterns

        patterns: list[dict] = []
        for row in rows or []:
            pattern_text = (row.get('pattern') or '').strip()
            explanation = (row.get('explanation') or '').strip()
            if not pattern_text:
                continue
            try:
                compiled = re.compile(pattern_text, re.IGNORECASE)
            except re.error as exc:
                logging.warning(
                    "Skipping invalid fault detection pattern %s (%s): %s",
                    row.get('id'),
                    pattern_text,
                    exc,
                )
                continue

            patterns.append(
                {
                    'id': row.get('id'),
                    'title': (row.get('pattern_title') or '').strip(),
                    'pattern': pattern_text,
                    'explanation': explanation,
                    'criticality': (row.get('criticality') or 'medium').strip().lower(),
                    'regex': compiled,
                }
            )

        _fault_patterns = patterns
        _fault_patterns_last_load = now
        return _fault_patterns


async def _store_fault_mark(
    station_id: str, pattern_id: Optional[int], explanation: str
) -> None:
    try:
        async with db_connection() as conn:
            await _ensure_fault_detection_tables(conn)
            async with conn.cursor() as cur:
                await cur.execute(
                    """
                    INSERT INTO op_station_marks (station_id, reason, source, pattern_id)
                    VALUES (%s, %s, 'fault_detection', %s) AS new_values
                    ON DUPLICATE KEY UPDATE
                        reason = new_values.reason,
                        source = new_values.source
                    """,
                    (station_id, explanation, pattern_id),
                )
            await conn.commit()
    except Exception:
        logging.exception(
            "Failed to persist fault detection mark for station %s (pattern %s)",
            station_id,
            pattern_id,
        )


async def _mark_strict_availability_disconnect(station_id: str) -> None:
    if not station_id:
        return

    try:
        async with db_connection() as conn:
            await _ensure_fault_detection_tables(conn)
            async with conn.cursor() as cur:
                await cur.execute(
                    """
                    DELETE FROM op_station_marks
                    WHERE station_id=%s AND source='strict_availability' AND pattern_id IS NULL
                    """,
                    (station_id,),
                )
                await cur.execute(
                    """
                    INSERT INTO op_station_marks (station_id, reason, source, pattern_id)
                    VALUES (%s, 'Disconnected', 'strict_availability', NULL)
                    """,
                    (station_id,),
                )
            await conn.commit()
    except Exception:
        logging.exception(
            "Failed to persist strict availability mark for station %s",
            station_id,
        )


async def _handle_fault_detection(station_id: str, message: str) -> None:
    if not station_id or not message:
        return

    patterns = await _load_fault_patterns()
    if not patterns:
        return

    for entry in patterns:
        compiled = entry.get('regex')
        pattern_id = entry.get('id')
        explanation = entry.get('explanation') or ''
        if not compiled or pattern_id is None:
            continue
        try:
            if compiled.search(message):
                await _store_fault_mark(station_id, pattern_id, explanation or 'Fault detected')
        except Exception:
            logging.exception(
                "Fault detection evaluation failed for station %s and pattern %s",
                station_id,
                pattern_id,
            )


def trigger_fault_detection(station_id: str, message: str) -> None:
    if not station_id or not message:
        return

    async def _run() -> None:
        try:
            await _handle_fault_detection(station_id, message)
        except Exception:
            logging.exception("Unhandled error while processing fault detection for %s", station_id)

    create_background_task(_run())


async def load_external_api_config() -> dict[str, object]:
    """Fetch configuration for the external authorizeTransaction API."""

    keys = (
        "authorizeTransaction_enabled",
        "authorizeTransaction_url",
        "authorizeTransaction_token",
        "authorizeTransaction_api_key",
    )
    try:
        async with db_connection() as conn:
            async with conn.cursor() as cur:
                await cur.execute(
                    "SELECT config_key, config_value FROM op_config WHERE config_key IN (%s, %s, %s, %s)",
                    keys,
                )
                rows = await cur.fetchall()
    except Exception as exc:
        logging.error("Failed to load external API configuration: %r", exc)
        return {
            "enabled": False,
            "base_url": "",
            "token": "",
            "api_key": "",
        }

    cfg = {k: v for k, v in rows}
    config = {
        "enabled": cfg.get("authorizeTransaction_enabled") == "1",
        "base_url": cfg.get("authorizeTransaction_url") or "",
        "token": cfg.get("authorizeTransaction_token") or "",
        "api_key": cfg.get("authorizeTransaction_api_key") or "",
    }
    logging.debug(
        "External API config loaded: enabled=%s, base_url=%s, token_present=%s, api_key_present=%s",
        config["enabled"],
        config["base_url"],
        bool(config["token"]),
        bool(config["api_key"]),
    )
    return config


async def fetch_mapped_token(
    id_tag: str,
    station_id_short: str,
    station_path: Optional[str] = None,
    connector_hint: Optional[ConnectorKey] = None,
    evse_hint: Optional[ConnectorKey] = None,
) -> Tuple[str, bool]:
    """Call external authorizeTransaction API to map an id_tag.

    Returns the mapped token from the backend or the original id_tag on
    failure/timeout along with a boolean indicating if the token is
    blocked. The endpoint and enable flag are read from the op_config table
    so they can be toggled at runtime.
    """
    logging.debug(
        "fetch_mapped_token invoked for id_tag=%s, station_id_short=%s, station_path=%s",
        id_tag,
        station_id_short,
        station_path,
    )
    payload = None
    response_content: Optional[object] = None
    request_logged = False
    response_status: Optional[int] = None
    result_status: Optional[str] = None
    mapped_token: Optional[str] = None
    log_station = station_path or station_id_short
    try:
        config = await load_external_api_config()
        if not config["enabled"]:
            logging.debug("authorizeTransaction disabled - skipping external check")
            return id_tag, False

        base = config.get("base_url")
        if not base:
            logging.warning("authorizeTransaction URL missing - skipping external check")
            return id_tag, False
        url = base.rstrip("/") + "/charging-station/authorize-transaction"

        token = config.get("token")
        api_key = config.get("api_key")

        connector_value = _normalize_identifier_component(connector_hint)
        evse_value = _normalize_identifier_component(evse_hint)

        payload = {
            "stationId": station_id_short,
            "idToken": id_tag,
            "timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
        }

        if connector_value is not None:
            payload["connectorId"] = connector_value
        if evse_value is not None:
            payload["evseId"] = evse_value
        if "connectorId" not in payload:
            payload["connectorId"] = 1

        logging.debug(
            "authorizeTransaction request prepared: url=%s, headers=%s, payload=%s",
            url,
            {
                "Authorization": "<present>" if token else None,
                "X-API-KEY": "<present>" if api_key else None,
                "Content-Type": "application/json",
            },
            payload,
        )

        logging.info(
            "authorizeTransaction request for %s at %s", id_tag, log_station
        )

        logging.info(
            "authorizeTransaction url / token for %s / %s", url, token
        )
        
        timeout = aiohttp.ClientTimeout(total=3)
        async with aiohttp.ClientSession(timeout=timeout) as session:

            headers = {
                "Accept": "application/json",
                "Content-Type": "application/json",
            }
            if token:
                headers["Authorization"] = f"Bearer {token}"
            elif api_key:
                headers["X-API-KEY"] = api_key
            # headers = {"Authorization": f"Bearer {token}"} if token else None
            logging.debug(
                "authorizeTransaction POST %s with headers=%s", url, headers
            )
            try:
                async with session.post(url, json=payload, headers=headers) as resp:
                    response_status = resp.status
                    try:
                        data = await resp.json()
                        response_content = data
                    except Exception:
                        text = await resp.text()
                        response_content = text
                        logging.info(
                            "authorizeTransaction response for %s at %s: %s %s",
                            id_tag,
                            log_station,
                            resp.status,
                            text,
                        )
                        return id_tag, False
                    logging.info(
                        "authorizeTransaction response for %s at %s: %s",
                        id_tag,
                        log_station,
                        data,
                    )
                    if resp.status == 200:
                        status = (data.get("status") or "").upper()
                        mapped = data.get("mappedToken")
                        result_status = status
                        mapped_token = mapped
                        if status == "APPROVED" and mapped:
                            return mapped, False
                        if status == "BLOCKED":
                            logging.info(
                                "authorizeTransaction marked id_tag %s at %s as BLOCKED",
                                id_tag,
                                log_station,
                            )
                            return id_tag, True
                    else:
                        logging.info(
                            "authorizeTransaction non-success status %s for %s at %s",
                            resp.status,
                            id_tag,
                            log_station,
                        )
            finally:
                await log_external_api_request(
                    "authorize-transaction",
                    payload,
                    response_status,
                    station_id_short,
                    response_body=response_content,
                    module="authorization",
                    station_path=station_path,
                )
                request_logged = True
    except Exception as e:
        logging.error("authorizeTransaction call failed: %r", e)
        logging.debug(
            "authorizeTransaction call failed for payload=%s",
            payload or {"stationId": station_id_short, "idToken": id_tag},
            exc_info=True,
        )
        response_content = str(e)
        if not request_logged:
            await log_external_api_request(
                "authorize-transaction",
                payload,
                response_status,
                station_id_short,
                response_body=response_content,
                module="authorization",
                station_path=station_path,
            )
        return id_tag, False

    logging.debug(
        "authorizeTransaction defaulting to original token for %s at %s (status=%s, mappedToken=%s)",
        id_tag,
        log_station,
        result_status,
        mapped_token,
    )
    return id_tag, False


async def notify_external_update_transaction(transaction: dict) -> None:
    """Notify the external API about a new transaction identifier."""

    if not isinstance(transaction, dict):
        return

    chargepoint_id = transaction.get("stationId") or transaction.get("chargepoint_id")
    transaction_id = transaction.get("transactionId")

    if not chargepoint_id or transaction_id is None:
        return

    try:
        config = await load_external_api_config()
    except Exception:
        # load_external_api_config already logs the exception, just stop here
        return

    if not config["enabled"]:
        logging.debug("External API disabled - skipping update-transaction call")
        return

    base = config.get("base_url")
    if not base:
        logging.debug("External API base URL missing - skipping update-transaction call")
        return

    url = base.rstrip("/") + "/charging-station/update-transaction"
    headers = {
        "Accept": "application/json",
        "Content-Type": "application/json",
    }
    token = config.get("token")
    api_key = config.get("api_key")
    if token:
        headers["Authorization"] = f"Bearer {token}"
    elif api_key:
        headers["X-API-KEY"] = api_key

    timestamp = ensure_utc_timestamp(transaction.get("timestamp"))
    if not timestamp:
        timestamp = ensure_utc_timestamp(datetime.now(timezone.utc))

    connector_id = _normalize_identifier_component(transaction.get("connectorId"))
    evse_id = _normalize_identifier_component(transaction.get("evseId"))

    session_start_ts = ensure_utc_timestamp(
        transaction.get("sessionStartTimestamp") or transaction.get("timestamp")
    )
    if not session_start_ts:
        session_start_ts = timestamp

    id_token = (
        transaction.get("idTokenOriginal")
        or transaction.get("originalIdToken")
        or transaction.get("originalIdTag")
        or transaction.get("rawIdToken")
        or transaction.get("rawIdTag")
    )
    if id_token is None:
        id_token = _normalise_id_token(transaction.get("idToken"))
    if id_token is None:
        id_token = transaction.get("idTag")
    if id_token is not None:
        id_token = str(id_token)

    payload = {
        "stationId": chargepoint_id,
        "connectorId": connector_id,
        "evseId": evse_id,
        "idToken": id_token,
        "sessionStartTimestamp": session_start_ts,
        "transactionId": str(transaction_id),
        "timestamp": timestamp,
    }

    # Remove keys that have no value to avoid sending explicit nulls
    payload = {key: value for key, value in payload.items() if value is not None}

    timeout = aiohttp.ClientTimeout(total=3)
    response_status: Optional[int] = None
    response_body: Optional[str] = None
    try:
        logging.info(
            "Calling update-transaction for %s (%s) with payload: %s",
            chargepoint_id,
            transaction_id,
            payload,
        )
        async with aiohttp.ClientSession(timeout=timeout) as session:
            async with session.post(url, json=payload, headers=headers) as resp:
                response_status = resp.status
                body = await resp.text()
                response_body = body
                if resp.status >= 400:
                    logging.warning(
                        "update-transaction API call failed for %s (%s): %s %s",
                        chargepoint_id,
                        transaction_id,
                        resp.status,
                        body,
                    )
                else:
                    logging.info(
                        "update-transaction API call succeeded for %s (%s): %s",
                        chargepoint_id,
                        transaction_id,
                        resp.status,
                    )
    except Exception as exc:
        logging.error(
            "update-transaction API call failed for %s (%s): %s",
            chargepoint_id,
            transaction_id,
            exc,
        )
        response_body = str(exc)
    finally:
        await log_external_api_request(
            "update-transaction",
            payload,
            response_status,
            chargepoint_id,
            response_body=response_body,
            module="transactions",
        )


async def notify_external_stop_transaction(transaction: dict) -> None:
    """Notify the external API about a transaction stop event."""

    if not isinstance(transaction, dict):
        return

    station_id = transaction.get("stationId")
    transaction_id = transaction.get("transactionId")

    if not station_id or transaction_id is None:
        return

    try:
        config = await load_external_api_config()
    except Exception:
        return

    if not config["enabled"]:
        logging.debug("External API disabled - skipping stop-transaction call")
        return

    base = config.get("base_url")
    if not base:
        logging.debug("External API base URL missing - skipping stop-transaction call")
        return

    url = base.rstrip("/") + "/charging-station/stop-transaction"
    headers = {
        "Accept": "application/json",
        "Content-Type": "application/json",
    }
    token = config.get("token")
    api_key = config.get("api_key")
    if token:
        headers["Authorization"] = f"Bearer {token}"
    elif api_key:
        headers["X-API-KEY"] = api_key

    timestamp = transaction.get("timestamp")
    if not timestamp:
        timestamp = (
            datetime.now(timezone.utc)
            .replace(microsecond=0)
            .isoformat()
            .replace("+00:00", "Z")
        )

    connector_id = _normalize_identifier_component(transaction.get("connectorId"))
    evse_id = _normalize_identifier_component(transaction.get("evseId"))
    id_token = (
        transaction.get("idTokenOriginal")
        or transaction.get("rawIdToken")
        or _normalise_id_token(transaction.get("idToken"))
        or transaction.get("idTag")
    )
    if id_token is not None:
        id_token = str(id_token)

    payload = {
        "stationId": station_id,
        "connectorId": connector_id,
        "evseId": evse_id,
        "idToken": id_token,
        "transactionId": str(transaction_id),
        "sessionStartTimestamp": transaction.get("sessionStartTimestamp"),
        "sessionEndTimestamp": transaction.get("sessionEndTimestamp"),
        "meterStartWh": transaction.get("meterStartWh"),
        "meterStopWh": transaction.get("meterStopWh"),
        "timestamp": timestamp,
        "stopReason": transaction.get("stopReason"),
    }

    payload = {key: value for key, value in payload.items() if value is not None}

    timeout = aiohttp.ClientTimeout(total=3)
    response_status: Optional[int] = None
    response_body: Optional[str] = None
    try:
        async with aiohttp.ClientSession(timeout=timeout) as session:
            async with session.post(url, json=payload, headers=headers) as resp:
                response_status = resp.status
                body = await resp.text()
                response_body = body
                if resp.status >= 400:
                    logging.warning(
                        "stop-transaction API call failed for %s (%s): %s %s",
                        station_id,
                        transaction_id,
                        resp.status,
                        body,
                    )
                else:
                    logging.info(
                        "stop-transaction API call succeeded for %s (%s): %s - payload=%s",
                        station_id,
                        transaction_id,
                        resp.status,
                        payload,
                    )
    except Exception as exc:
        logging.error(
            "stop-transaction API call failed for %s (%s): %s",
            station_id,
            transaction_id,
            exc,
        )
        response_body = str(exc)
    finally:
        await log_external_api_request(
            "stop-transaction",
            payload,
            response_status,
            station_id,
            response_body=response_body,
            module="transactions",
        )


async def notify_external_configuration_update(
    station_id: str, configs: dict[str, object]
) -> None:
    """Send configuration information to the external API."""

    if not station_id:
        return

    try:
        config = await load_external_api_config()
    except Exception:
        return

    if not config["enabled"]:
        logging.debug("External API disabled - skipping configuration-update call")
        return

    base = config.get("base_url")
    if not base:
        logging.debug(
            "External API base URL missing - skipping configuration-update call"
        )
        return

    url = base.rstrip("/") + "/charging-station/configuration-update"
    headers = {
        "Accept": "application/json",
        "Content-Type": "application/json",
    }
    token = config.get("token")
    api_key = config.get("api_key")
    if token:
        headers["Authorization"] = f"Bearer {token}"
    elif api_key:
        headers["X-API-KEY"] = api_key

    timestamp = (
        datetime.now(timezone.utc)
        .replace(microsecond=0)
        .isoformat()
        .replace("+00:00", "Z")
    )

    payload: dict[str, object] = {
        "stationId": station_id,
        "timestamp": timestamp,
    }

    mapping = {
        "ChargePointVendor": "manufacturer",
        "ChargePointModel": "model",
        "ChargePointSerialNumber": "serialNumber",
        "ChargeBoxSerialNumber": "serialNumber",
        "FirmwareVersion": "firmwareVersion",
        "NumberOfConnectors": "numberOfConnectors",
        "SupportedFeatureProfiles": "supportedFeatureProfiles",
    }
    # Also accept direct BootNotification keys to reduce pre-processing.
    boot_mapping = {
        "chargePointVendor": "manufacturer",
        "chargePointModel": "model",
        "chargePointSerialNumber": "serialNumber",
        "chargeBoxSerialNumber": "serialNumber",
        "firmwareVersion": "firmwareVersion",
        "numberOfConnectors": "numberOfConnectors",
        "supportedFeatureProfiles": "supportedFeatureProfiles",
    }

    for key, field in {**mapping, **boot_mapping}.items():
        if field in payload and field != "supportedFeatureProfiles":
            # Preserve values already set by a previous mapping iteration except
            # for supportedFeatureProfiles which we allow to be overwritten.
            continue
        value = configs.get(key)
        if value is None:
            continue
        if field == "numberOfConnectors":
            try:
                payload[field] = int(value)
            except (TypeError, ValueError):
                payload[field] = value
        else:
            payload[field] = value

    timeout = aiohttp.ClientTimeout(total=3)
    response_status: Optional[int] = None
    response_body: Optional[str] = None
    try:
        async with aiohttp.ClientSession(timeout=timeout) as session:
            async with session.post(url, json=payload, headers=headers) as resp:
                response_status = resp.status
                body = await resp.text()
                response_body = body
                if resp.status >= 400:
                    payload_dump = json.dumps(payload, ensure_ascii=False)
                    if len(payload_dump) > 500:
                        payload_dump = f"{payload_dump[:497]}..."
                    response_dump = body if body is None or len(body) <= 500 else f"{body[:497]}..."
                    logging.debug(
                        "configuration-update API call error details for %s: payload=%s response_body=%s",
                        station_id,
                        payload_dump,
                        response_dump,
                    )
                    logging.warning(
                        "configuration-update API call failed for %s: %s %s",
                        station_id,
                        resp.status,
                        body,
                    )
                else:
                    logging.info(
                        "configuration-update API call succeeded for %s: %s",
                        station_id,
                        resp.status,
                    )
    except Exception as exc:
        logging.error(
            "configuration-update API call failed for %s: %s",
            station_id,
            exc,
        )
        response_body = str(exc)
    finally:
        await log_external_api_request(
            "configuration-update",
            payload,
            response_status,
            station_id,
            response_body=response_body,
            module="configuration",
        )


async def notify_external_status_notification(
    station_id: str, status: dict[str, object]
) -> None:
    """Notify the external API about a StatusNotification result."""

    if not station_id or not isinstance(status, dict):
        return

    try:
        config = await load_external_api_config()
    except Exception:
        return

    if not config["enabled"]:
        logging.debug("External API disabled - skipping status call")
        return

    base = config.get("base_url")
    if not base:
        logging.debug(
            "External API base URL missing - skipping status call"
        )
        return

    connector_id = status.get("connectorId")
    charge_status = status.get("status")

    if connector_id is None or charge_status is None:
        logging.debug(
            "StatusNotification payload missing connectorId or status - skipping"
        )
        return

    url = base.rstrip("/") + "/charging-station/status"
    headers = {
        "Accept": "application/json",
        "Content-Type": "application/json",
    }
    token = config.get("token")
    api_key = config.get("api_key")
    if token:
        headers["Authorization"] = f"Bearer {token}"
    elif api_key:
        headers["X-API-KEY"] = api_key

    timestamp = status.get("timestamp")
    if not timestamp:
        timestamp = (
            datetime.now(timezone.utc)
            .replace(microsecond=0)
            .isoformat()
            .replace("+00:00", "Z")
        )

    try:
        connector_value: object = int(connector_id)
    except (TypeError, ValueError):
        connector_value = connector_id

    payload = {
        "stationId": station_id,
        "connectorId": connector_value,
        "status": charge_status,
        "timestamp": timestamp,
    }
    error_code = status.get("errorCode")
    if error_code:
        payload["errorCode"] = error_code

    timeout = aiohttp.ClientTimeout(total=3)
    response_status: Optional[int] = None
    response_body: Optional[str] = None
    try:
        async with aiohttp.ClientSession(timeout=timeout) as session:
            async with session.post(url, json=payload, headers=headers) as resp:
                response_status = resp.status
                body = await resp.text()
                response_body = body
                if resp.status >= 400:
                    logging.warning(
                        "status API call failed for %s (%s): %s %s",
                        station_id,
                        connector_value,
                        resp.status,
                        body,
                    )
                else:
                    logging.info(
                        "status API call succeeded for %s (%s): %s",
                        station_id,
                        connector_value,
                        resp.status,
                    )
    except Exception as exc:
        logging.error(
            "status API call failed for %s (%s): %s",
            station_id,
            connector_value,
            exc,
        )
        response_body = str(exc)
    finally:
        await log_external_api_request(
            "status",
            payload,
            response_status,
            station_id,
            response_body=response_body,
            module="status",
        )


async def log_authorize_transaction_status() -> None:
    """Log whether external authorizeTransaction is enabled at startup."""
    try:
        async with db_connection() as conn:
            async with conn.cursor() as cur:
                await cur.execute(
                    "SELECT config_value FROM op_config WHERE config_key=%s",
                    ("authorizeTransaction_enabled",),
                )
                row = await cur.fetchone()
        enabled = row and row[0] == "1"
        state = "enabled" if enabled else "disabled"
        logging.info("External authorizeTransaction is %s", state)
    except Exception as e:
        logging.error(f"authorizeTransaction status check failed: {e}")


# Speichert Verbindungsstartzeiten jeder Station
# install --upgrade paho-mqtt
CONNECTION_START_TIMES: dict[str, datetime] = {}
RECONNECTION_COUNTER: dict[str, int] = {}
# Zeitfenster für Rolling-Reconnect-Auswertung
RECONNECT_ROLLUP_SECONDS: int = int(_config.get("reconnect_rollup_seconds", 24 * 3600))
# Warnschwelle pro Tag (z. B. 5 Reconnects / 24h)
RECONNECT_WARNING_THRESHOLD: float = float(_config.get("reconnect_warning_threshold", 5))
# Handshake-Fehler-Rollup und -Schwelle
HANDSHAKE_FAILURE_ROLLUP_SECONDS: int = int(
    _config.get("handshake_failure_rollup_seconds", 24 * 3600)
)
HANDSHAKE_FAILURE_WARNING_THRESHOLD: int = int(
    _config.get("handshake_failure_warning_threshold", 3)
)
RECONNECT_EVENTS: dict[str, Deque[datetime]] = defaultdict(deque)
HANDSHAKE_COUNTS: dict[str, dict[str, int]] = defaultdict(lambda: {"success": 0, "failure": 0})
HANDSHAKE_FAILURE_EVENTS: Deque[tuple[datetime, str, str]] = deque(maxlen=2000)
LAST_HANDSHAKE_FAILURE_REASON: dict[str, dict[str, object]] = {}
LAST_HANDSHAKE_SUCCESS: dict[str, datetime] = {}

# Global metrics for connection quality per station
BROKER_START = datetime.now(timezone.utc)
TARGET_RTT_MS = 500
RTT_SAMPLE_LIMIT = 50
CONNECTION_STATS: dict[str, dict] = {}

MQTT_BROKER = _config.get("mqtt", {}).get("broker", "127.0.0.1")
MQTT_PORT = _config.get("mqtt", {}).get("port", 1883)
MQTT_USER = _config.get("mqtt", {}).get("user", "")
MQTT_PASSWORD = _config.get("mqtt", {}).get("password", "")
MQTT_TOPIC_PREFIX = _config.get("mqtt", {}).get("topic_prefix", "")

# Endpoint identifier for this proxy instance
OCPP_ENDPOINT = _config.get("ocpp_endpoint", "default")

mqtt_client = mqtt.Client(callback_api_version=CallbackAPIVersion.VERSION2)

# Secret commands
# http://217.154.74.195/getConnectedDevices/
# http://217.154.74.195/refresh-01
# http://217.154.74.195/connectedWallboxes

# vm = psutil.virtual_memory()
# print(f"Total: {vm.total / (1024**3):.2f} GB")
# print(f"Available: {vm.available / (1024**3):.2f} GB")
# print(f"Used: {vm.used / (1024**3):.2f} GB ({vm.percent}%)")

# Instant CPU utilization (over a 1-second sample)
# cpu_pct = psutil.cpu_percent(interval=1)
# print(f"CPU Usage: {cpu_pct}%")

# System load averages (Linux/UNIX only)
# load1, load5, load15 = psutil.getloadavg()
# print(f"Load averages (1m,5m,15m): {load1:.2f}, {load5:.2f}, {load15:.2f}")

# For a given path (e.g. root '/')
# du = psutil.disk_usage('/')
# print(f"Total disk: {du.total / (1024**3):.2f} GB")
# print(f"Free disk:  {du.free  / (1024**3):.2f} GB")
# print(f"Used disk:  {du.used  / (1024**3):.2f} GB ({du.percent}%)")

# ----- Throttling-Konfiguration (Systemweit) -----
# Standardmäßig aktiviert
THROTTLE_ENABLED: bool = True

# Wartezeit nach Disconnect (Sekunden)
THROTTLE_SECONDS: int = 120

# Merkt sich für jede Wallbox-ID, wann sie zuletzt getrennt wurde
LAST_DISCONNECT: dict[str, datetime] = {}
# Merkt sich den Zeitpunkt der letzten erfolgreichen Verbindung
LAST_CONNECT: dict[str, datetime] = {}

DISCONNECT_NOTIFICATION_DELAY_SECONDS = 30 * 60
EMAIL_RATE_LIMIT_WINDOW_SECONDS = 60.0
EMAIL_RATE_LIMIT_MAX = 5
EMAIL_SEND_TIMESTAMPS: deque[float] = deque()
EMAIL_RATE_LIMIT_LOCK: asyncio.Lock = asyncio.Lock()
DISCONNECT_ALERT_TASKS: dict[str, asyncio.Task[Any]] = {}
DISCONNECT_ALERT_SENT: dict[str, datetime] = {}


def _parse_recipients(value: str) -> list[str]:
    return [entry.strip() for entry in value.split(";") if entry and entry.strip()]


def _build_email_message(
    subject: str, body: str, recipient: str, mail_settings: MailSettings
) -> EmailMessage | None:
    sender = mail_settings.default_sender or mail_settings.username
    if not sender or not recipient or not mail_settings.server:
        logging.info(
            "Skipping email due to missing configuration",
            extra={"station_id": "system", "recipient": recipient},
        )
        return None

    message = EmailMessage()
    message["Subject"] = subject
    message["From"] = sender
    message["To"] = recipient
    message.set_content(body)
    return message


def _send_email_sync(message: EmailMessage, mail_settings: MailSettings) -> None:
    if mail_settings.use_ssl:
        with smtplib.SMTP_SSL(mail_settings.server, mail_settings.port) as client:
            if mail_settings.username:
                client.login(mail_settings.username, mail_settings.password or "")
            client.send_message(message)
    else:
        with smtplib.SMTP(mail_settings.server, mail_settings.port) as client:
            client.ehlo()
            if mail_settings.use_tls:
                client.starttls()
                client.ehlo()
            if mail_settings.username:
                client.login(mail_settings.username, mail_settings.password or "")
            client.send_message(message)


async def send_notification_email(subject: str, body: str, recipients: Iterable[str] | str) -> bool:
    mail_settings = _resolve_mail_settings()
    recipient_list = (
        [recipients.strip()]
        if isinstance(recipients, str)
        else [recipient.strip() for recipient in recipients]
    )
    recipient_list = [recipient for recipient in recipient_list if recipient]
    if not recipient_list:
        return False

    async def _send_single(recipient: str) -> bool:
        message = _build_email_message(subject, body, recipient, mail_settings)
        if message is None:
            return False

        async with EMAIL_RATE_LIMIT_LOCK:
            now = time.monotonic()
            while EMAIL_SEND_TIMESTAMPS and now - EMAIL_SEND_TIMESTAMPS[0] > EMAIL_RATE_LIMIT_WINDOW_SECONDS:
                EMAIL_SEND_TIMESTAMPS.popleft()

            if len(EMAIL_SEND_TIMESTAMPS) >= EMAIL_RATE_LIMIT_MAX:
                logging.warning(
                    "Dropping notification email due to rate limit",
                    extra={"station_id": "system", "recipient": recipient},
                )
                return False

            EMAIL_SEND_TIMESTAMPS.append(now)

        try:
            await asyncio.to_thread(_send_email_sync, message, mail_settings)
            return True
        except Exception:
            logging.exception(
                "Failed to send notification email",
                extra={"station_id": "system", "recipient": recipient},
            )
            return False

    results = [await _send_single(recipient) for recipient in recipient_list]
    return any(results)


def _resolve_notification_settings(entry: Mapping[str, Any] | None) -> tuple[bool, list[str]]:
    if not entry:
        return False, []

    email_raw = _ensure_str(entry.get("disconnect_alert_email")) or ""
    recipients = _parse_recipients(email_raw)
    enabled = as_bool(entry.get("disconnect_alert_enabled")) and bool(recipients)
    if not enabled:
        return False, []

    return True, recipients


def _cancel_disconnect_task(station_id: str) -> None:
    task = DISCONNECT_ALERT_TASKS.pop(station_id, None)
    if task and not task.done():
        task.cancel()


def schedule_disconnect_notification(
    station_id: str, canonical_path: str | None, disconnect_time: datetime
) -> None:
    entry = STATION_PATHS.get(canonical_path or station_id, {})
    enabled, recipients = _resolve_notification_settings(entry)
    if not enabled:
        DISCONNECT_ALERT_SENT.pop(station_id, None)
        _cancel_disconnect_task(station_id)
        return

    _cancel_disconnect_task(station_id)

    async def _run() -> None:
        try:
            await asyncio.sleep(DISCONNECT_NOTIFICATION_DELAY_SECONDS)
            if station_id in ACTIVE_CLIENTS:
                return
            last_disconnect = LAST_DISCONNECT.get(station_id)
            if last_disconnect and last_disconnect > disconnect_time:
                return
            if station_id not in LAST_CONNECT:
                return

            subject = f"Wallbox {station_id} offline"
            body = (
                f"Die Wallbox {station_id} ist seit mindestens 30 Minuten nicht mehr mit dem Broker verbunden.\n"
                f"Letzter Verbindungsversuch: {disconnect_time.isoformat()}"
            )
            if await send_notification_email(subject, body, recipients):
                DISCONNECT_ALERT_SENT[station_id] = datetime.now(timezone.utc)
        except asyncio.CancelledError:
            raise
        except Exception:
            logging.exception(
                "Failed to process disconnect notification",
                extra={"station_id": station_id},
            )

    DISCONNECT_ALERT_TASKS[station_id] = create_background_task(
        _run(), name=f"disconnect-email-{station_id}"
    )


async def notify_station_connected(station_id: str, canonical_path: str | None) -> None:
    entry = STATION_PATHS.get(canonical_path or station_id, {})
    enabled, recipients = _resolve_notification_settings(entry)
    _cancel_disconnect_task(station_id)
    if not enabled:
        DISCONNECT_ALERT_SENT.pop(station_id, None)
        return

    if station_id not in DISCONNECT_ALERT_SENT:
        return

    subject = f"Wallbox {station_id} wieder verbunden"
    body = f"Die Wallbox {station_id} hat die Verbindung zum Broker wiederhergestellt."
    if await send_notification_email(subject, body, recipients):
        DISCONNECT_ALERT_SENT.pop(station_id, None)

# überall erreichbar: station_id → WebSocketServerProtocol
ACTIVE_CLIENTS: dict[str, WebSocketServerProtocol] = {}
# speichert aktuelle Verbindungs-ID je Station
ACTIVE_CONNECTION_IDS: dict[str, int] = {}
CONNECTION_SUBPROTOCOLS: dict[tuple[str, int], str] = {}
LAST_SEEN: dict[str, datetime] = {}

# track transactions for OCPI CDR forwarding
PENDING_STARTS: dict[str, dict] = {}
ACTIVE_TRANSACTIONS: dict[str, dict] = {}
# Track TriggerMessage requests that ask for a StatusNotification so we can
# forward the response to the external API once the station reports it.
STATUS_NOTIFICATION_TRIGGER: set[str] = set()

# tracks active websocket connections from broker to OCPP backends
ACTIVE_BACKEND_CONNECTIONS: dict[str, WebSocketClientProtocol] = {}

# Track the broker's periodic MySQL connectivity checks so other services can
# display the current status (e.g. in the operator dashboard).
LAST_MYSQL_CHECK_ATTEMPT: Optional[datetime] = None
LAST_MYSQL_CHECK_SUCCESS: Optional[bool] = None

def _estimate_websocket_fd_usage(wallbox_to_broker: int, broker_to_backend: int) -> int:
	"""Approximate socket usage for active WebSocket connections."""

	# Each WebSocket handshake results in a client + server socket. We treat
	# both directions equivalently to estimate the current file descriptor
	# pressure without walking the entire process FD table.
	return (wallbox_to_broker + broker_to_backend) * 2

# utility function to expose current websocket usage
def get_active_websocket_counts() -> dict[str, int | None]:
	"""Return counts of active WebSocket connections and FD headroom."""

	wallbox_to_broker = len(ACTIVE_CLIENTS)
	broker_to_backend = len(ACTIVE_BACKEND_CONNECTIONS)
	soft_limit, _ = get_nofile_limits()
	estimated_usage = _estimate_websocket_fd_usage(
		wallbox_to_broker,
		broker_to_backend,
	)
	estimated_free = (
		soft_limit - estimated_usage if soft_limit is not None else None
	)

	return {
		"wallbox_to_broker": wallbox_to_broker,
		"broker_to_backend": broker_to_backend,
		"estimated_fd_usage": estimated_usage,
		"estimated_free": estimated_free,
	}

def get_system_limit_status() -> dict[str, int | str | None]:
	"""Return a snapshot of file-descriptor limits and WebSocket usage."""

	soft_limit, hard_limit = get_nofile_limits()
	counts = get_active_websocket_counts()

	return {
		"ulimit_no_file_soft": soft_limit,
		"ulimit_no_file_hard": hard_limit,
		"wallbox_to_broker": counts["wallbox_to_broker"],
		"broker_to_backend": counts["broker_to_backend"],
		"estimated_fd_usage": counts["estimated_fd_usage"],
		"estimated_free": counts["estimated_free"],
		"estimation_note": "Approximate free FDs; assumes two sockets per WebSocket connection.",
	}

def _safe_int(value: Any) -> Optional[int]:
	"""Return ``value`` as ``int`` when possible, otherwise ``None``."""

	try:
		return int(value)
	except (TypeError, ValueError):
		return None

def get_mysql_pool_status() -> dict[str, int | bool | None]:
	"""Return configured and current MySQL pool statistics."""

	configured_minsize = _coerce_pool_setting(MYSQL_CONFIG.get("pool_minsize"), 1)
	configured_maxsize = _coerce_pool_setting(MYSQL_CONFIG.get("pool_maxsize"), 10)
	configured_pool_recycle = _coerce_pool_recycle(MYSQL_CONFIG.get("pool_recycle"))

	pool = get_db_pool()
	pool_minsize = _safe_int(getattr(pool, "minsize", None)) if pool else None
	pool_maxsize = _safe_int(getattr(pool, "maxsize", None)) if pool else None
	pool_size = _safe_int(getattr(pool, "size", None)) if pool else None
	pool_freesize = _safe_int(getattr(pool, "freesize", None)) if pool else None
	pool_recycle = (
		_safe_int(getattr(pool, "recycle", getattr(pool, "pool_recycle", None)))
		if pool
		else None
	)

	pool_used = (
		pool_size - pool_freesize
		if pool_size is not None and pool_freesize is not None
		else None
	)

	return {
		"initialized": pool is not None,
		"configured": {
			"minsize": configured_minsize,
			"maxsize": configured_maxsize,
			"pool_recycle": configured_pool_recycle,
		},
		"pool": {
			"minsize": pool_minsize,
			"maxsize": pool_maxsize,
			"size": pool_size,
			"freesize": pool_freesize,
			"used": pool_used,
			"pool_recycle": pool_recycle,
		},
	}


def get_connection_protocol(station_id: str, connection_id: Optional[int]) -> Optional[str]:
        """Return the negotiated subprotocol for a station/connection pair."""

        if connection_id is None:
                return None

        return CONNECTION_SUBPROTOCOLS.get((station_id, connection_id))


def is_ocpp2_protocol(subprotocol: Optional[str]) -> bool:
        """Return ``True`` when ``subprotocol`` refers to an OCPP 2.x variant."""

        if not subprotocol:
                return False

        try:
                return subprotocol.lower().startswith("ocpp2")
        except AttributeError:
                return False


def _collect_supported_subprotocols() -> list[str]:
        """Return the ordered list of WebSocket subprotocols the proxy will offer."""

        supported: list[str] = []
        base_candidates = ["ocpp2.0.1"] if ENABLE_OCPP_2X else []
        base_candidates.append("ocpp1.6")

        for candidate in base_candidates:
                if candidate not in supported:
                        supported.append(candidate)

        for source_url, entry in STATION_PATHS.items():
                configured = normalize_configured_subprotocol(
                        entry.get("configured_subprotocol") or entry.get("subprotocol")
                )
                if not configured or configured in supported:
                        continue

                if is_ocpp2_protocol(configured) and not ENABLE_OCPP_2X:
                        logging.warning(
                                "Station %s configured for %s but enable_ocpp_2x is disabled – ignoring",
                                source_url,
                                configured,
                                extra={"station_id": "system"},
                        )
                        continue

                supported.append(configured)

        return supported


def calculate_connection_quality(station_id: str, reference_time: datetime) -> Optional[int]:
        """Berechnet die Verbindungsqualität bis zu einem Referenzzeitpunkt."""
        stats = CONNECTION_STATS.get(station_id)
        if stats is None:
                return None

        online_time = stats.get("online_time", 0.0)
        offline_time = stats.get("offline_time", 0.0)
        last_state = stats.get("last_state")
        last_ts = stats.get("last_state_ts", reference_time)
        if last_state == "online":
                online_time += (reference_time - last_ts).total_seconds()
        else:
                offline_time += (reference_time - last_ts).total_seconds()

        total_time = online_time + offline_time
        if total_time <= 0:
                return None

        quality = 100.0
        offline_ratio = offline_time / total_time
        quality -= offline_ratio * 40

        reconnects = max(RECONNECTION_COUNTER.get(station_id, 0) - 1, 0)
        quality -= min(reconnects, 10) / 10 * 20

        pings_sent = stats.get("pings_sent", 0)
        pings_failed = stats.get("pings_failed", 0)
        if pings_sent > 0:
                successful = pings_sent - pings_failed
                if successful > 0:
                        avg_rtt = stats.get("rtt_total_ms", 0.0) / successful
                        quality -= min(avg_rtt / TARGET_RTT_MS, 1) * 20
                loss_rate = pings_failed / pings_sent
                quality -= min(loss_rate, 1) * 20

        return max(0, round(quality))


def _resolve_time_buckets(
        stats: Mapping[str, object], reference_time: datetime
) -> tuple[float, float]:
        """Return adjusted online/offline durations up to ``reference_time``."""
        online_time = float(stats.get("online_time", 0.0) or 0.0)
        offline_time = float(stats.get("offline_time", 0.0) or 0.0)
        last_state = stats.get("last_state")
        last_ts_raw = stats.get("last_state_ts", reference_time)
        if isinstance(last_ts_raw, datetime):
                last_ts = last_ts_raw
        else:
                last_ts = reference_time
        if last_state == "online":
                online_time += max((reference_time - last_ts).total_seconds(), 0.0)
        else:
                offline_time += max((reference_time - last_ts).total_seconds(), 0.0)
        return online_time, offline_time


def _percentile(values: list[float], fraction: float) -> Optional[float]:
        if not values:
                return None
        clamped = min(max(fraction, 0.0), 1.0)
        ordered = sorted(values)
        idx = clamped * (len(ordered) - 1)
        lower = math.floor(idx)
        upper = math.ceil(idx)
        if lower == upper:
                return float(ordered[int(idx)])
        lower_val = ordered[lower]
        upper_val = ordered[upper]
        return float(lower_val + (upper_val - lower_val) * (idx - lower))


def _handshake_station_key(station_id: Optional[str]) -> str:
        """Normalize handshake counters to a non-empty key."""
        return station_id or "<unknown>"


def _prune_event_deque(events: Deque, window_seconds: int, now: datetime) -> None:
        """Remove timestamps older than the rolling window."""
        cutoff = now - timedelta(seconds=window_seconds)
        while events:
                head = events[0]
                ts = head[0] if isinstance(head, tuple) else head
                if ts < cutoff:
                        events.popleft()
                        continue
                break


def _record_handshake_success(station_id: Optional[str]) -> None:
        key = _handshake_station_key(station_id)
        now = datetime.now(timezone.utc)
        counts = HANDSHAKE_COUNTS[key]
        counts["success"] = counts.get("success", 0) + 1
        LAST_HANDSHAKE_SUCCESS[key] = now


def _record_handshake_failure(reason: str, station_id: Optional[str] = None) -> None:
        key = _handshake_station_key(station_id)
        now = datetime.now(timezone.utc)
        normalized_reason = (reason or "unknown").strip() or "unknown"
        counts = HANDSHAKE_COUNTS[key]
        counts["failure"] = counts.get("failure", 0) + 1
        HANDSHAKE_FAILURE_EVENTS.append((now, key, normalized_reason))
        LAST_HANDSHAKE_FAILURE_REASON[key] = {
                "reason": normalized_reason,
                "timestamp": now,
        }
        _prune_event_deque(HANDSHAKE_FAILURE_EVENTS, HANDSHAKE_FAILURE_ROLLUP_SECONDS, now)


def _record_reconnect_event(station_id: str, now: Optional[datetime] = None) -> None:
        moment = now or datetime.now(timezone.utc)
        events = RECONNECT_EVENTS[station_id]
        _prune_event_deque(events, RECONNECT_ROLLUP_SECONDS, moment)
        events.append(moment)


def _collect_recent_handshake_failures(
        now: datetime, window_seconds: int
) -> tuple[dict[str, int], dict[str, dict[str, int]]]:
        """Aggregate handshake failures within the window."""
        _prune_event_deque(HANDSHAKE_FAILURE_EVENTS, window_seconds, now)
        by_reason: dict[str, int] = defaultdict(int)
        by_station: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int))
        for ts, station_key, reason in HANDSHAKE_FAILURE_EVENTS:
                if (now - ts).total_seconds() > window_seconds:
                        continue
                by_reason[reason] += 1
                if station_key:
                        by_station[station_key][reason] += 1
        return dict(by_reason), {k: dict(v) for k, v in by_station.items()}


def build_reconnect_stats_payload(
        reference_time: datetime, window_seconds: Optional[int] = None
) -> dict[str, object]:
        """Return reconnect and handshake metrics for API consumers."""
        window = window_seconds or RECONNECT_ROLLUP_SECONDS
        stations: dict[str, dict[str, object]] = {}
        total_recent = 0

        station_ids = set(RECONNECT_EVENTS.keys()) | set(HANDSHAKE_COUNTS.keys()) | set(RECONNECTION_COUNTER.keys())

        for station_id in station_ids:
                events = RECONNECT_EVENTS.get(station_id)
                if events is None:
                        events = RECONNECT_EVENTS[station_id]
                _prune_event_deque(events, window, reference_time)
                recent = len(events)
                total_recent += recent
                counts = HANDSHAKE_COUNTS.get(station_id, {})
                rate_per_day: Optional[float]
                if window > 0:
                        rate_per_day = (recent / window) * 86400
                else:  # pragma: no cover - defensive
                        rate_per_day = None
                last_disconnect = LAST_DISCONNECT.get(station_id)
                last_failure = LAST_HANDSHAKE_FAILURE_REASON.get(station_id)
                stations[station_id] = {
                        "reconnects": recent,
                        "reconnect_rate_per_day": rate_per_day,
                        "window_seconds": window,
                        "total_reconnects": RECONNECTION_COUNTER.get(station_id, 0),
                        "time_since_last_disconnect_s": (
                                (reference_time - last_disconnect).total_seconds()
                                if last_disconnect
                                else None
                        ),
                        "last_disconnect": (
                                last_disconnect.isoformat().replace("+00:00", "Z")
                                if isinstance(last_disconnect, datetime)
                                else None
                        ),
                        "last_failure_reason": (last_failure or {}).get("reason"),
                        "last_failure_ts": (
                                last_failure["timestamp"].isoformat().replace("+00:00", "Z")
                                if isinstance((last_failure or {}).get("timestamp"), datetime)
                                else None
                        ),
                        "handshake_success": counts.get("success", 0),
                        "handshake_failure": counts.get("failure", 0),
                }

        failure_by_reason, failure_by_station = _collect_recent_handshake_failures(
                reference_time, HANDSHAKE_FAILURE_ROLLUP_SECONDS
        )
        total_rate_per_day: Optional[float]
        if window > 0:
                total_rate_per_day = (total_recent / window) * 86400
        else:  # pragma: no cover - defensive
                total_rate_per_day = None
        total_success = sum((entry or {}).get("success", 0) or 0 for entry in HANDSHAKE_COUNTS.values())
        total_failure = sum((entry or {}).get("failure", 0) or 0 for entry in HANDSHAKE_COUNTS.values())

        payload: dict[str, object] = {
                "window_seconds": window,
                "stations": stations,
                "total": {
                        "reconnects": total_recent,
                        "reconnect_rate_per_day": total_rate_per_day,
                        "handshake_success": total_success,
                        "handshake_failure": total_failure,
                },
                "thresholds": {
                        "reconnect_rate_per_day": RECONNECT_WARNING_THRESHOLD,
                        "handshake_failures": HANDSHAKE_FAILURE_WARNING_THRESHOLD,
                },
                "recent_handshake_failures": {
                        "by_reason": failure_by_reason,
                        "by_station": failure_by_station,
                        "window_seconds": HANDSHAKE_FAILURE_ROLLUP_SECONDS,
                },
        }

        return payload


def _summarize_connection_stats(
        station_id: str, stats: Mapping[str, object], reference_time: datetime
) -> dict[str, object]:
        pings_sent = int(stats.get("pings_sent", 0) or 0)
        pings_failed = int(stats.get("pings_failed", 0) or 0)
        successful_pings = max(pings_sent - pings_failed, 0)
        rtt_total_ms = float(stats.get("rtt_total_ms", 0.0) or 0.0)
        rtt_samples_raw = list(stats.get("rtt_samples", []) or [])
        rtt_samples: list[float] = []
        for raw in rtt_samples_raw:
                try:
                        rtt_samples.append(float(raw))
                except (TypeError, ValueError):
                        continue

        avg_rtt_ms: Optional[float]
        if successful_pings > 0:
                avg_rtt_ms = rtt_total_ms / successful_pings if rtt_total_ms else 0.0
        else:
                avg_rtt_ms = None

        p95_rtt_ms = _percentile(rtt_samples, 0.95)
        online_time, offline_time = _resolve_time_buckets(stats, reference_time)
        total_time = online_time + offline_time
        uptime_ratio: Optional[float]
        if total_time > 0:
                uptime_ratio = online_time / total_time
        else:
                uptime_ratio = None

        success_rate: Optional[float]
        if pings_sent > 0:
                success_rate = max(0.0, min(1.0, 1.0 - (pings_failed / pings_sent)))
        else:
                success_rate = None

        return {
                "station_id": station_id,
                "pings_sent": pings_sent,
                "pings_failed": pings_failed,
                "success_rate": success_rate,
                "avg_rtt_ms": avg_rtt_ms,
                "p95_rtt_ms": p95_rtt_ms,
                "uptime_ratio": uptime_ratio,
                "online_time_s": online_time,
                "offline_time_s": offline_time,
        }


def build_connection_stats_payload(
        station_filter: Optional[set[str]], reference_time: datetime, include_total: bool = True
) -> dict[str, object]:
        stations: dict[str, dict[str, object]] = {}
        total_pings = 0
        total_failed = 0
        total_rtt_ms = 0.0
        total_online = 0.0
        total_offline = 0.0
        aggregate_samples: list[float] = []

        for station_id, raw_stats in CONNECTION_STATS.items():
                if station_filter and station_id not in station_filter:
                        continue
                summarized = _summarize_connection_stats(station_id, raw_stats, reference_time)
                stations[station_id] = summarized
                total_pings += summarized["pings_sent"]
                total_failed += summarized["pings_failed"]
                try:
                        total_rtt_ms += float(raw_stats.get("rtt_total_ms", 0.0) or 0.0)
                except (TypeError, ValueError):
                        total_rtt_ms += 0.0
                total_online += summarized["online_time_s"]
                total_offline += summarized["offline_time_s"]
                samples = raw_stats.get("rtt_samples", []) or []
                for raw_sample in samples:
                        try:
                                aggregate_samples.append(float(raw_sample))
                        except (TypeError, ValueError):
                                continue

        payload: dict[str, object] = {"stations": stations}
        if include_total:
                total_successful = max(total_pings - total_failed, 0)
                total_avg_rtt: Optional[float]
                if total_successful > 0:
                        total_avg_rtt = total_rtt_ms / total_successful if total_rtt_ms else 0.0
                else:
                        total_avg_rtt = None
                total_time = total_online + total_offline
                if total_time > 0:
                        total_uptime = total_online / total_time
                else:
                        total_uptime = None
                total_success_rate: Optional[float]
                if total_pings > 0:
                        total_success_rate = max(
                                0.0, min(1.0, 1.0 - (total_failed / total_pings))
                        )
                else:
                        total_success_rate = None
                payload["total"] = {
                        "pings_sent": total_pings,
                        "pings_failed": total_failed,
                        "success_rate": total_success_rate,
                        "avg_rtt_ms": total_avg_rtt,
                        "p95_rtt_ms": _percentile(aggregate_samples, 0.95),
                        "uptime_ratio": total_uptime,
                        "online_time_s": total_online,
                        "offline_time_s": total_offline,
                }

        return payload

async def ensure_websocket_log_table() -> None:
        """Create logging table if it doesn't exist."""
        try:
                async with db_connection() as conn:
                        async with conn.cursor() as cur:
                                await cur.execute(
                                        "SELECT COUNT(*) FROM information_schema.TABLES WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s",
                                        (MYSQL_CONFIG.get("db"), "op_log_websockets"),
                                )
                                exists = (await cur.fetchone())[0]
                                if not exists:
                                        await cur.execute(
                                                """
                                                CREATE TABLE op_log_websockets (
                                                        ocpp_endpoint VARCHAR(25) NOT NULL,
                                                        wallbox_to_broker INT NOT NULL,
                                                        broker_to_backend INT NOT NULL,
                                                        ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                                                )
                                                """
                                        )
                                        await conn.commit()
        except Exception as e:
                logging.error(f"websocket log table error: {e}")

async def log_websocket_counts_periodically() -> None:
        """Periodically store websocket counts in the database."""
        while True:
                try:
                        counts = get_active_websocket_counts()
                        async with db_connection() as conn:
                                async with conn.cursor() as cur:
                                        await cur.execute(
                                                "INSERT INTO op_log_websockets (ocpp_endpoint, wallbox_to_broker, broker_to_backend) VALUES (%s, %s, %s)",
                                                (
                                                        OCPP_ENDPOINT,
                                                        counts["wallbox_to_broker"],
                                                        counts["broker_to_backend"],
                                                ),
                                        )
                                        await conn.commit()
                except Exception as e:
                        logging.error(f"websocket count log error: {e}")
                await asyncio.sleep(300)

# hält alle UniqueIDs der gerade laufenden GetDiagnostics-Aufrufe
PENDING_DIAGNOSTICS: set[str] = set()

# zum Tracken ausstehender Update-Requests
PENDING_FIRMWARE = set()

# zum Tracken von Reboots
PENDING_REBOOT: set[str] = set()

# speichert ausstehende RFID-Authentifizierungen
PENDING_RFID: dict[str, str] = {}

# speichert externe ID-Tag-Overrides aus authorizeTransaction-Antworten
EXTERNAL_IDTAG_OVERRIDES: dict[tuple[str, str], str] = {}


def set_external_idtag_override(
        station_id_short: str, original_tag: Optional[str], mapped_tag: Optional[str]
) -> None:
        """Store or clear an override for the given idTag."""

        if not original_tag:
                return

        key = (station_id_short, original_tag)

        if mapped_tag and mapped_tag != original_tag:
                EXTERNAL_IDTAG_OVERRIDES[key] = mapped_tag
        else:
                EXTERNAL_IDTAG_OVERRIDES.pop(key, None)


def get_external_idtag_override(station_id_short: str, id_tag: Optional[str]) -> Optional[str]:
        """Return the mapped idTag for a station if present."""

        if not id_tag:
                return None

        return EXTERNAL_IDTAG_OVERRIDES.get((station_id_short, id_tag))

# speichert ausstehende GetConfiguration-Anfragen
PENDING_GET_CONFIGURATION: dict[str, int] = {}

async def ensure_rfid_log_table() -> None:
    """Create RFID log table if it doesn't exist."""
    try:
        async with db_connection() as conn:
            async with conn.cursor() as cur:
                await cur.execute(
                    "SELECT COUNT(*) FROM information_schema.TABLES WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s",
                    (MYSQL_CONFIG.get("db"), "op_rfid_log"),
                )
                exists = (await cur.fetchone())[0]
                if not exists:
                    await cur.execute(
                        """
                        CREATE TABLE op_rfid_log (
                            tag VARCHAR(255) NOT NULL,
                            source_url VARCHAR(255) NOT NULL,
                            ts_utc TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                            result VARCHAR(20) NOT NULL
                        )
                        """
                    )
                    await conn.commit()
    except Exception as e:
        logging.error(f"rfid log table error: {e}")

async def log_rfid_event(tag: str, station_id: str, result: str) -> None:
    """Insert RFID authentication attempt into database."""
    try:
        await ensure_rfid_log_table()
        async with db_connection() as conn:
            async with conn.cursor() as cur:
                await cur.execute(
                    "INSERT INTO op_rfid_log (tag, source_url, ts_utc, result) VALUES (%s, %s, UTC_TIMESTAMP(), %s)",
                    (tag, station_id, result),
                )
                await conn.commit()
    except Exception as e:
        logging.error(f"rfid log insert error: {e}")


async def ensure_bootnotification_table() -> None:
    """Create bootnotification table if it doesn't exist."""
    try:
        async with db_connection() as conn:
            async with conn.cursor() as cur:
                await cur.execute(
                    "SELECT COUNT(*) FROM information_schema.TABLES WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s",
                    (MYSQL_CONFIG.get("db"), "op_broker_bootnotifications"),
                )
                exists = (await cur.fetchone())[0]
                if not exists:
                    await cur.execute(
                        """
                        CREATE TABLE op_broker_bootnotifications (
                            station_id VARCHAR(255) PRIMARY KEY,
                            json_payload TEXT NOT NULL,
                            ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                        )
                        """,
                    )
                    await conn.commit()
    except Exception as e:
        logging.error(f"bootnotification table error: {e}")


async def store_bootnotification(station_id: str, message: object) -> None:
    """Insert or update latest BootNotification for a station."""
    try:
        await ensure_bootnotification_table()
        payload = json.dumps(message)
        async with db_connection() as conn:
            async with conn.cursor() as cur:
                await cur.execute(
                    """
                    INSERT INTO op_broker_bootnotifications (station_id, json_payload, ts)
                    VALUES (%s, %s, UTC_TIMESTAMP()) AS new(station_id, json_payload, ts)
                    ON DUPLICATE KEY UPDATE json_payload = new.json_payload, ts = new.ts
                    """,
                    (station_id, payload),
                )
                await conn.commit()
    except Exception as e:
        logging.error(f"bootnotification insert error: {e}")


async def ensure_getconfiguration_table() -> None:
    """Create getconfiguration table if it doesn't exist."""
    try:
        async with db_connection() as conn:
            async with conn.cursor() as cur:
                await cur.execute(
                    "SELECT COUNT(*) FROM information_schema.TABLES WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s",
                    (MYSQL_CONFIG.get("db"), "op_broker_getconfiguration"),
                )
                exists = (await cur.fetchone())[0]
                if not exists:
                    await cur.execute(
                        """
                        CREATE TABLE op_broker_getconfiguration (
                            station_id VARCHAR(255) PRIMARY KEY,
                            json_payload TEXT NOT NULL,
                            is_full_request TINYINT(1) NOT NULL,
                            ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                        )
                        """,
                    )
                    await conn.commit()
    except Exception as e:
        logging.error(f"getconfiguration table error: {e}")


async def store_getconfiguration(station_id: str, message: object, is_full_request: int) -> None:
    """Insert or update latest GetConfiguration response for a station."""
    try:
        payload = json.dumps(message)
        async with db_connection() as conn:
            async with conn.cursor() as cur:
                await cur.execute(
                    """
                    INSERT INTO op_broker_getconfiguration (station_id, json_payload, is_full_request, ts)
                    VALUES (%s, %s, %s, UTC_TIMESTAMP()) AS new(station_id, json_payload, is_full_request, ts)
                    ON DUPLICATE KEY UPDATE json_payload = new.json_payload, is_full_request = new.is_full_request, ts = new.ts
                    """,
                    (station_id, payload, is_full_request),
                )
                await conn.commit()
    except Exception as e:
        logging.error(f"getconfiguration insert error: {e}")

class StationIDFilter(logging.Filter):
        def filter(self, record):
                if not hasattr(record, 'station_id'):
                        if record.name.startswith("websockets.client"):
                                setattr(record, 'station_id', 'backend')
                        elif record.name.startswith("websockets.server"):
                                setattr(record, 'station_id', 'client')
                        else:
                                setattr(record, 'station_id', 'server')
                if not hasattr(record, 'system'):
                        if record.name.startswith("websockets.client"):
                                setattr(record, 'system', 'server-backend')
                        elif record.name.startswith("websockets.server"):
                                setattr(record, 'system', 'client-server')
                        else:
                                setattr(record, 'system', 'broker')
                websocket = getattr(record, 'websocket', None)
                if websocket is not None and not hasattr(record, 'connection_uri'):
                        connection_uri = getattr(websocket, 'connection_uri', None)
                        if connection_uri:
                                setattr(record, 'connection_uri', connection_uri)
                return True

class SafeFormatter(logging.Formatter):
        def format(self, record):
                if not hasattr(record, 'station_id'):
                        if record.name.startswith("websockets.client"):
                                record.station_id = 'backend'
                        elif record.name.startswith("websockets.server"):
                                record.station_id = 'client'
                        else:
                                record.station_id = 'server'
                if not hasattr(record, 'system'):
                        if record.name.startswith("websockets.client"):
                                record.system = 'server-backend'
                        elif record.name.startswith("websockets.server"):
                                record.system = 'client-server'
                        else:
                                record.system = 'broker'
                formatted = super().format(record)
                connection_uri = getattr(record, 'connection_uri', None)
                if connection_uri:
                        formatted = f"{formatted} [uri: {connection_uri}]"
                return formatted


def configure_root_logger(level: int = LOG_LEVEL) -> logging.Logger:
        """Initialize the root logger with the project defaults.

        ``logging.basicConfig`` is a no-op when handlers are already configured
        (e.g. when tests or other entrypoints set up logging before importing
        this module). In that case the formatter and filters defined below would
        never be attached, causing important startup messages such as
        ``"[Startup] ..."`` to disappear. By explicitly clearing existing
        handlers and re-applying our configuration we guarantee that logs always
        reach stdout with the expected formatting.
        """

        root_logger = logging.getLogger()
        root_logger.setLevel(level)

        # Remove any pre-existing handlers to avoid duplicate or misrouted
        # logs when another part of the application has already configured
        # logging.
        for handler in list(root_logger.handlers):
                root_logger.removeHandler(handler)

        handler = logging.StreamHandler()
        handler.setLevel(level)
        handler.setFormatter(
                SafeFormatter(
                        "%(asctime)s - %(levelname)s - [%(station_id)s] - [%(system)s] - %(message)s"
                )
        )
        root_logger.addHandler(handler)
        root_logger.addFilter(StationIDFilter())
        return root_logger


# Configure logging for debugging
logger = configure_root_logger()


def direction_to_system(direction: str) -> str:
        """Convert internal direction labels to human readable log target."""
        return "client-server" if direction == "client_to_server" else "server-backend"


class ChargePointLoggerAdapter(logging.LoggerAdapter):
    """Append the charge point ID to websocket log messages."""

    def process(self, msg, kwargs):  # type: ignore[override]
        msg, kwargs = super().process(msg, kwargs)
        cp_id = self.extra.get("chargepoint_id")
        if cp_id:
            # ``logging`` still performs %-style formatting on ``msg``.  When the
            # charge point ID contains percent characters (for example, when the
            # client encodes a slash as ``%2F`` in the URL), naively appending
            # it to the message would produce spurious format placeholders and
            # trigger ``TypeError: not enough arguments for format string``
            # during formatting.  Escaping ``%`` keeps the literal value while
            # preserving the placeholders that the logger still has to
            # substitute later on.
            safe_cp_id = str(cp_id).replace("%", "%%")
            msg = f"{msg} ({safe_cp_id})"
        return msg, kwargs


_HTTP_TOKEN_RE = re.compile(rb"[-!#$%&'*+.^_`|~0-9a-zA-Z]+")
_HTTP_VALUE_RE = re.compile(rb"[\x09\x20-\x7e\x80-\xff]*")


class UnsupportedHttpMethod(Exception):
        """Raised when a client attempts a handshake with a non-GET method."""

        def __init__(self, method: str, path: str) -> None:
                self.method = method
                self.path = path
                super().__init__(f"unsupported HTTP method: {method} {path}")


async def _read_line_optional_crlf(
        stream: asyncio.StreamReader, *, context: str
) -> tuple[bytes, bool, bool]:
        """Read a line allowing ``LF`` in addition to ``CRLF`` line endings.

        Some firmware (for example specific ABB wallboxes) may omit the trailing
        line feed in their initial HTTP handshake.  Instead of rejecting such
        connections outright we surface the anomaly to the caller so that it can be
        logged and the handshake can continue in a best-effort fashion.
        """

        line = await stream.readline()
        if line == b"":
                raise EOFError(f"connection closed while reading {context}")
        if len(line) > MAX_LINE_LENGTH:
                raise SecurityError("line too long")

        # Some firmwares split header fields with a bare carriage return.  When
        # ``StreamReader.readline`` doesn't encounter an ``LF`` separator, it
        # returns the entire remaining buffer which may contain several header
        # fields.  To preserve the expected line-by-line parsing we trim the
        # buffer at the first ``CR`` and push the remaining data back onto the
        # reader.  This mirrors how ``readline`` would behave if the firmware
        # had sent ``CRLF`` terminators.
        if b"\n" not in line and b"\r" in line:
                cr_index = line.find(b"\r")
                remainder = line[cr_index + 1 :]
                line = line[: cr_index + 1]

                if remainder:
                        buffer = getattr(stream, "_buffer", None)
                        if buffer is None:
                                buffer = bytearray()
                                setattr(stream, "_buffer", buffer)
                        elif not isinstance(buffer, bytearray):
                                buffer = bytearray(buffer)
                                setattr(stream, "_buffer", buffer)

                        buffer[:0] = remainder
                        maybe_resume = getattr(stream, "_maybe_resume_transport", None)
                        if callable(maybe_resume):
                                maybe_resume()

        missing_line_ending = False
        if line.endswith(b"\r\n"):
                return line[:-2], False, missing_line_ending
        if line.endswith(b"\n"):
                return line[:-1], True, missing_line_ending
        if line.endswith(b"\r"):
                missing_line_ending = True
                return line[:-1], False, missing_line_ending

        # No recognised line ending (likely connection closed mid-line).
        missing_line_ending = True
        return line, False, missing_line_ending


async def read_request_optional_crlf(
        stream: asyncio.StreamReader,
) -> tuple[str, Headers, dict[str, Any]]:
        """Read an HTTP GET request while tolerating non-standard line endings."""

        lf_only = False
        missing_line_endings = False

        request_line, used_lf, missing_line = await _read_line_optional_crlf(
                stream, context="HTTP request line"
        )
        lf_only |= used_lf
        missing_line_endings |= missing_line
        try:
                method, raw_path, version = request_line.split(b" ", 2)
        except ValueError:
                raise ValueError(f"invalid HTTP request line: {d(request_line)}") from None

        method_upper = method.upper()
        method_str = method.decode("ascii", "surrogateescape")
        path = raw_path.decode("ascii", "surrogateescape")
        if method_upper != b"GET":
                raise UnsupportedHttpMethod(method_str, path)

        version_upper = version.upper()
        http10_accepted = False
        if version_upper == b"HTTP/1.0":
                http10_accepted = True
        elif version_upper != b"HTTP/1.1":
                raise ValueError(f"unsupported HTTP version: {d(version)}")

        headers = Headers()
        for _ in range(MAX_NUM_HEADERS + 1):
                line, used_lf, missing_line = await _read_line_optional_crlf(
                        stream, context="HTTP headers"
                )
                lf_only |= used_lf
                missing_line_endings |= missing_line
                if line == b"":
                        break

                try:
                        raw_name, raw_value = line.split(b":", 1)
                except ValueError:
                        raise ValueError(f"invalid HTTP header line: {d(line)}") from None

                if not _HTTP_TOKEN_RE.fullmatch(raw_name):
                        raise ValueError(f"invalid HTTP header name: {d(raw_name)}")

                raw_value = raw_value.strip(b" \t")
                if not _HTTP_VALUE_RE.fullmatch(raw_value):
                        raise ValueError(f"invalid HTTP header value: {d(raw_value)}")

                name = raw_name.decode("ascii")
                value = raw_value.decode("ascii", "surrogateescape")
                headers[name] = value
        else:  # pragma: no cover - defensive programming
                raise SecurityError("too many HTTP headers")

        metadata: dict[str, Any] = {
                "lf_only": lf_only,
                "missing_line_endings": missing_line_endings,
                "http10_accepted": http10_accepted,
                "method_normalised": method_upper != method,
                "original_method": method.decode("ascii", "surrogateescape"),
                "original_http_version": version.decode("ascii", "surrogateescape"),
        }

        return path, headers, metadata


class LoggingWebSocketServerProtocol(WebSocketServerProtocol):
        def __init__(self, *args, cert_identifiers=None, listener_name: str | None = None, **kwargs):
                self.cert_identifiers = tuple(cert_identifiers or ())
                self.listener_name = listener_name or "default"
                super().__init__(*args, **kwargs)

        async def handler(self):
                try:
                        return await super().handler()
                except websockets.exceptions.InvalidMessage as exc:
                        cause = exc.__cause__
                        expected_disconnects = (
                                EOFError,
                                ConnectionResetError,
                                ConnectionAbortedError,
                                BrokenPipeError,
                        )
                        if isinstance(cause, expected_disconnects):
                                logger = getattr(self, "logger", logging.getLogger(__name__))
                                peer = getattr(self, "peer", None) or self.transport.get_extra_info("peername")
                                if logger.isEnabledFor(logging.INFO):
                                        logger.info(
                                                "Peer %s closed the connection during handshake (%s suppressed)",
                                                peer,
                                                type(cause).__name__,
                                                extra={"system": "client-server"},
                                        )
                                if logger.isEnabledFor(logging.DEBUG):
                                        logger.debug(
                                                "Suppressed InvalidMessage caused by %r while handling handshake from %s",
                                                cause,
                                                peer,
                                                extra={"system": "client-server"},
                                        )
                                transport = getattr(self, "transport", None)
                                if transport is not None:
                                        transport.close()
                                return
                        raise

        async def read_http_request(self):
                logger = getattr(self, "logger", logging.getLogger(__name__))
                peer = self.transport.get_extra_info("peername")
                try:
                        path, headers, request_meta = await read_request_optional_crlf(self.reader)
                        lf_only = bool(request_meta.get("lf_only"))
                        missing_line_endings = bool(request_meta.get("missing_line_endings"))
                        if missing_line_endings and logger.isEnabledFor(logging.INFO):
                                logger.info(
                                        "Accepted HTTP handshake with missing line terminators from %s",
                                        peer,
                                        extra={"system": "client-server"},
                                )
                        elif lf_only and logger.isEnabledFor(logging.INFO):
                                logger.info(
                                        "Accepted HTTP handshake without CRLF line endings from %s",
                                        peer,
                                        extra={"system": "client-server"},
                                )
                        if request_meta.get("http10_accepted") and logger.isEnabledFor(logging.INFO):
                                logger.info(
                                        "Upgraded HTTP/1.0 handshake from %s to HTTP/1.1 semantics",
                                        peer,
                                        extra={"system": "client-server"},
                                )
                        if request_meta.get("method_normalised") and logger.isEnabledFor(logging.DEBUG):
                                logger.debug(
                                        "Normalised HTTP method casing from %s: %s",
                                        peer,
                                        request_meta.get("original_method"),
                                        extra={"system": "client-server"},
                                )
                        self.http_handshake_metadata = request_meta
                        if logger.isEnabledFor(logging.DEBUG):
                                header_iter = getattr(headers, "raw_items", None)
                                if header_iter is not None:
                                        header_iter = list(header_iter())
                                else:
                                        header_iter = list(headers.items())
                                redacted_headers = []
                                for key, value in header_iter:
                                        if key.lower() in {"authorization", "proxy-authorization"}:
                                                value = "<redacted>"
                                        redacted_headers.append(f"{key}: {value}")
                                header_dump = "\n    " + "\n    ".join(redacted_headers) if redacted_headers else " <no headers>"
                                logger.debug(
                                        "Handshake request headers from %s:%s",
                                        peer,
                                        header_dump,
                                        extra={"system": "client-server"},
                                )
                        logging.info("Incoming request from %s: GET %s", peer, path)
                        self.peer = peer
                        self.path = path
                        self.request = type("Request", (), {"path": path, "headers": headers})()
                        cleaned = urlparse(path).path.strip("/") if path else ""
                        cp_id = cleaned.split("/")[-1] if cleaned else ""
                        self.chargepoint_id = cp_id
                        host_header = headers.get("Host") if headers else None
                        ssl_object = self.transport.get_extra_info("ssl_object")
                        scheme = "wss" if ssl_object else "ws"
                        if host_header:
                                authority = host_header
                        elif isinstance(peer, tuple) and len(peer) >= 2:
                                authority = f"{peer[0]}:{peer[1]}"
                        else:
                                authority = str(peer)
                        normalised_path = path or "/"
                        if not normalised_path.startswith("/"):
                                normalised_path = f"/{normalised_path}"
                        self.connection_uri = f"{scheme}://{authority}{normalised_path}"
                        self.logger = ChargePointLoggerAdapter(
                                self.logger,
                                {
                                        "chargepoint_id": cp_id,
                                        "station_id": cp_id,
                                        "connection_uri": self.connection_uri,
                                },
                        )
                except UnsupportedHttpMethod as exc:
                        if logger.isEnabledFor(logging.WARNING):
                                logger.warning(
                                        "Rejected handshake from %s: %s %s (only GET supported)",
                                        peer,
                                        exc.method,
                                        exc.path or "<no path>",
                                        extra={"system": "client-server"},
                                )
                        _record_handshake_failure("invalid_http_method", getattr(self, "chargepoint_id", None))
                        raise websockets.exceptions.InvalidMessage("unsupported HTTP method") from exc
                except Exception as exc:
                        if isinstance(exc, EOFError):
                                logger.info(
                                        "Peer %s closed the TLS connection before completing the HTTP handshake. "
                                        "This often indicates the client rejected the certificate or hostname.",
                                        peer,
                                        extra={"system": "client-server"},
                                )
                                _record_handshake_failure("tls_handshake_failed", getattr(self, "chargepoint_id", None))
                        if logger.isEnabledFor(logging.DEBUG):
                                logger.debug(
                                        "Failed to read HTTP request from %s: %s",
                                        peer,
                                        exc,
                                        exc_info=True,
                                        extra={"system": "client-server"},
                                )
                                ssl_object = self.transport.get_extra_info("ssl_object")
                                if ssl_object is not None:
                                        try:
                                                cipher = ssl_object.cipher()
                                        except Exception:  # pragma: no cover - defensive logging
                                                cipher = None
                                        try:
                                                peer_cert = ssl_object.getpeercert()
                                        except Exception:  # pragma: no cover - defensive logging
                                                peer_cert = None
                                        logger.debug(
                                                "TLS details for %s: cipher=%s, peer_cert_subject=%s, peer_cert_issuer=%s",
                                                peer,
                                                cipher,
                                                peer_cert.get("subject") if isinstance(peer_cert, dict) else peer_cert,
                                                peer_cert.get("issuer") if isinstance(peer_cert, dict) else None,
                                                extra={"system": "client-server"},
                                        )

                                        server_hostname = getattr(ssl_object, "server_hostname", None)
                                        cert_identifiers = self.cert_identifiers or OCPP_PROXY_CERT_IDENTIFIERS
                                        if not server_hostname:
                                                if logger.isEnabledFor(logging.INFO):
                                                        if cert_identifiers:
                                                                expected = ", ".join(
                                                                    sorted(
                                                                        {
                                                                            ident
                                                                            for ident in cert_identifiers
                                                                            if ident
                                                                        }
                                                                    )
                                                                )
                                                        else:
                                                                expected = "the configured OCPP hostname"
                                                        logger.info(
                                                                "TLS handshake from %s aborted before HTTP request without an SNI hostname. "
                                                                "Ensure the charge point connects via a hostname that matches the TLS certificate (%s) "
                                                                "instead of using the raw IP address.",
                                                                peer,
                                                                expected,
                                                                extra={"system": "client-server"},
                                                        )
                                        else:
                                                normalised_server = _normalise_identifier(server_hostname)
                                                expected = {
                                                    _normalise_identifier(ident)
                                                    for ident in cert_identifiers
                                                    if ident
                                                }
                                                if expected and normalised_server not in expected:
                                                        if logger.isEnabledFor(logging.INFO):
                                                                logger.info(
                                                                        "TLS handshake from %s aborted before HTTP request. The client presented hostname '%s' "
                                                                        "which does not match the configured certificate identifiers %s. Verify the charge point configuration.",
                                                                        peer,
                                                                        server_hostname,
                                                                        sorted(cert_identifiers),
                                                                        extra={"system": "client-server"},
                                                                )
                                                elif _is_ip_address(server_hostname):
                                                        if logger.isEnabledFor(logging.INFO):
                                                                logger.info(
                                                                        "TLS handshake from %s used the IP address '%s' as SNI. Ensure the certificate contains this IP address or configure the charge point to use the DNS hostname.",
                                                                        peer,
                                                                        server_hostname,
                                                                        extra={"system": "client-server"},
                                                                )
                                buffer = getattr(self.reader, "_buffer", None)
                                if buffer:
                                        preview = bytes(buffer)
                                        if preview:
                                                def _extract_masked_client_frame(frame_bytes):
                                                        if len(frame_bytes) < 2:
                                                                return None
                                                        first_byte, second_byte = frame_bytes[0], frame_bytes[1]
                                                        masked = bool(second_byte & 0x80)
                                                        if not masked:
                                                                return None
                                                        fin = bool(first_byte & 0x80)
                                                        opcode = first_byte & 0x0F
                                                        payload_length = second_byte & 0x7F
                                                        index = 2
                                                        if payload_length == 126:
                                                                if len(frame_bytes) < index + 2:
                                                                        return None
                                                                payload_length = int.from_bytes(frame_bytes[index : index + 2], "big")
                                                                index += 2
                                                        elif payload_length == 127:
                                                                if len(frame_bytes) < index + 8:
                                                                        return None
                                                                payload_length = int.from_bytes(frame_bytes[index : index + 8], "big")
                                                                index += 8
                                                        if len(frame_bytes) < index + 4:
                                                                return None
                                                        mask_key = frame_bytes[index : index + 4]
                                                        index += 4
                                                        available_payload = min(payload_length, max(0, len(frame_bytes) - index))
                                                        payload_bytes = bytes(
                                                                frame_bytes[index + i] ^ mask_key[i % 4]
                                                                for i in range(available_payload)
                                                        )
                                                        return {
                                                                "fin": fin,
                                                                "opcode": opcode,
                                                                "payload": payload_bytes,
                                                                "payload_length": payload_length,
                                                        }

                                                frame_details = _extract_masked_client_frame(preview)
                                                if frame_details is not None:
                                                        payload_preview_bytes = frame_details["payload"][:64]
                                                        payload_preview_text = (
                                                                payload_preview_bytes.decode("utf-8", "replace")
                                                                if payload_preview_bytes
                                                                else "<no payload>"
                                                        )
                                                        message = (
                                                                "Client %s sent masked WebSocket payload before completing the HTTP upgrade "
                                                                "(opcode=0x%X, fin=%s, length=%d, preview='%s')"
                                                        )
                                                        log_extra = {"system": "client-server"}
                                                        if logger.isEnabledFor(logging.INFO):
                                                                logger.info(
                                                                        message,
                                                                        peer,
                                                                        frame_details["opcode"],
                                                                        frame_details["fin"],
                                                                        frame_details["payload_length"],
                                                                        payload_preview_text,
                                                                        extra=log_extra,
                                                                )
                                                        else:
                                                                logger.debug(
                                                                        message,
                                                                        peer,
                                                                        frame_details["opcode"],
                                                                        frame_details["fin"],
                                                                        frame_details["payload_length"],
                                                                        payload_preview_text,
                                                                        extra=log_extra,
                                                                )

                                                max_length = 256
                                                preview_slice = preview[:max_length]
                                                logger.debug(
                                                        "Buffered request data from %s (%d bytes total, showing %d): %s",
                                                        peer,
                                                        len(preview),
                                                        len(preview_slice),
                                                        preview_slice.decode("utf-8", "replace"),
                                                        extra={"system": "client-server"},
                                                )
                        if not isinstance(exc, EOFError):
                                _record_handshake_failure("handshake_read_error", getattr(self, "chargepoint_id", None))
                        raise websockets.exceptions.InvalidMessage(
                                "did not receive a valid HTTP request"
                        ) from exc
                return path, headers

        def connection_lost(self, exc):
                code = getattr(self, "close_code", None)
                reason = getattr(self, "close_reason", "")
                peer = getattr(self, "peer", None) or self.transport.get_extra_info("peername")
                self.logger.info(
                        "Connection closed: %s - %s (peer: %s)",
                        code,
                        reason or "",
                        peer,
                        extra={"system": "client-server"},
                )
                super().connection_lost(exc)

        def select_subprotocol(self, client_subprotocols, server_subprotocols):
                logger = getattr(self, "logger", logging.getLogger(__name__))
                peer = getattr(self, "peer", None) or self.transport.get_extra_info("peername")

                path = getattr(self, "path", None) or getattr(getattr(self, "request", None), "path", None)
                station_id = normalize_station_id(path or "")
                entry, _ = resolve_station_entry(path or "", station_id)

                configured_subprotocol = normalize_configured_subprotocol(
                        entry.get("configured_subprotocol") if entry else None
                )

                client_offered = list(client_subprotocols) if client_subprotocols is not None else []
                supported_by_listener = (
                        list(server_subprotocols) if server_subprotocols is not None else []
                )
                self.client_offered_subprotocols = client_offered
                self.server_offered_subprotocols = supported_by_listener
                self.configured_subprotocol = configured_subprotocol

                offered_subprotocols = (
                        [configured_subprotocol]
                        if configured_subprotocol
                        else supported_by_listener
                )
                if configured_subprotocol and configured_subprotocol not in supported_by_listener:
                        logger.warning(
                                "Configured subprotocol %s for %s not offered by listener (listener offers=%s)",
                                configured_subprotocol,
                                station_id or "<unknown>",
                                supported_by_listener,
                                extra={"system": "client-server"},
                        )
                        offered_subprotocols = []

                if logger.isEnabledFor(logging.DEBUG):
                        logger.debug(
                                "Subprotocol negotiation with %s: client_offered=%s, server_offered=%s",
                                peer,
                                client_offered,
                                offered_subprotocols,
                                extra={"system": "client-server"},
                        )

                selected = super().select_subprotocol(client_subprotocols, offered_subprotocols)

                if selected is None:
                        rejection_reason = (
                                "No common WebSocket subprotocol "
                                f"(client offered: {client_offered or '<none>'}; "
                                f"server offered: {offered_subprotocols or '<none>'})"
                        )
                        self.subprotocol_rejection_reason = rejection_reason
                        self.subprotocol_negotiation_failed = bool(
                                client_offered or configured_subprotocol
                        )
                        if logger.isEnabledFor(logging.INFO):
                                logger.info(
                                        "No common WebSocket subprotocol negotiated with %s (client=%s, server=%s)",
                                        peer,
                                        client_offered,
                                        supported_by_listener,
                                        extra={"system": "client-server"},
                                )
                else:
                        self.subprotocol_negotiation_failed = False
                        if logger.isEnabledFor(logging.DEBUG):
                                logger.debug(
                                        "Selected WebSocket subprotocol %s for %s",
                                        selected,
                                        peer,
                                        extra={"system": "client-server"},
                                )

                return selected

async def keep_alive(
        websocket,
        target_ws,
        direction,
        station_id,
        connection_id,
        measure_ping=False,
        interval=60,
        timeout=DEFAULT_PING_TIMEOUT,
):
        """Send periodic ping messages to keep the connection alive.

        When ``measure_ping`` is True the round-trip time between ``ping`` and
        ``pong`` is measured and logged via :func:`log_message` (direction
        ``"ping"``). ``interval`` controls how often pings are sent and can be
        negotiated during the initial WebSocket handshake.
        """
        websocket_path = "---"
        peer = "0:0:0.0"
        sock = "0:0:0.0"
        peerInbound = "0:0:0.0"
        sockInbound = "0:0:0.0"

        system_label = direction_to_system(direction)

        try:

                transport = websocket.transport
                peer = transport.get_extra_info("peername")     # (host, port) des Peers
                sock = transport.get_extra_info("sockname")     # (host, port) deines Sockets
                logging.info(
                        f"keep_alive: Peer: {peer}, Sock: {sock}",
                        extra={"system": system_label},
                )

                transportInbound = target_ws.transport
                peerInbound = transportInbound.get_extra_info("peername")       # (host, port) des Peers
                sockInbound = transportInbound.get_extra_info("sockname")       # (host, port) deines Sockets

                websocket_path = websocket.request.path
                # websockets >= 15 removed the ``closed`` property in favor of
                # a ``state`` enum. Support both old and new APIs here.
                try:
                        from websockets.protocol import State  # type: ignore
                except Exception:  # pragma: no cover - older websockets
                        State = None

                while True:
                        ws_is_closed = False
                        closed_attr = getattr(websocket, "closed", None)
                        if closed_attr is not None:
                                ws_is_closed = closed_attr
                        else:
                                state_attr = getattr(websocket, "state", None)
                                if State is not None and state_attr is not None:
                                        ws_is_closed = state_attr in (State.CLOSING, State.CLOSED)
                        if ws_is_closed:
                                logging.info(
                                        "751: websocket closed, stopping keep_alive",
                                        extra={"station_id": "system", "system": system_label},
                                )
                                break
                        stats = CONNECTION_STATS.get(station_id)
                        if stats is not None:
                                stats["pings_sent"] = stats.get("pings_sent", 0) + 1
                        now_utc = datetime.now(timezone.utc)
                        last_seen = LAST_SEEN.get(station_id)
                        if last_seen is None:
                                last_seen = LAST_SEEN.setdefault(station_id, now_utc)
                        if last_seen is not None:
                                delta_seconds = (now_utc - last_seen).total_seconds()
                                if delta_seconds < interval:
                                        logging.debug(
                                                "752: Skipping ping because last_seen delta %.3fs < interval %s",
                                                delta_seconds,
                                                interval,
                                                extra={"station_id": station_id, "system": system_label},
                                        )
                                        await asyncio.sleep(interval)
                                        continue
                        start = datetime.now()
                        try:
                                waiter = await websocket.ping(b"ping")
                                await asyncio.wait_for(waiter, timeout)
                                rtt_ms = None
                                if measure_ping:
                                        rtt_ms = (datetime.now() - start).total_seconds() * 1000
                                logging.debug(
                                        "752: Received pong for payload %r (rtt_ms=%s)",
                                        b"ping",
                                        None if rtt_ms is None else int(rtt_ms),
                                        extra={"station_id": station_id, "system": system_label},
                                )
                                LAST_SEEN[station_id] = datetime.now(timezone.utc)
                                if measure_ping and rtt_ms is not None:
                                        if stats is not None:
                                                stats["rtt_total_ms"] = stats.get("rtt_total_ms", 0.0) + rtt_ms
                                                samples = stats.get("rtt_samples")
                                                if isinstance(samples, list):
                                                        samples.append(float(rtt_ms))
                                                        if len(samples) > RTT_SAMPLE_LIMIT:
                                                                del samples[
                                                                        0 : len(samples) - RTT_SAMPLE_LIMIT
                                                                ]
                                                else:
                                                        stats["rtt_samples"] = [float(rtt_ms)]
                                        await log_message(
                                                station_id,
                                                "ping",
                                                json.dumps({"rtt_ms": int(rtt_ms)}),
                                                connection_id,
                                        )
                        except asyncio.TimeoutError:
                                now_utc = datetime.now(timezone.utc)
                                last_seen = LAST_SEEN.get(station_id)
                                idle_seconds = None
                                if last_seen is not None:
                                        idle_delta = now_utc - last_seen
                                        idle_seconds = idle_delta.total_seconds()
                                if stats is not None:
                                        stats["pings_failed"] = stats.get("pings_failed", 0) + 1
                                        pings_failed = stats.get("pings_failed")
                                else:
                                        pings_failed = None
                                logging.warning(
                                        (
                                                f"751 ({websocket_path}): {direction} ping timeout"
                                                f" (interval={interval}, timeout={timeout},"
                                                f" idle_seconds={idle_seconds}, last_seen={last_seen},"
                                                f" pings_failed={pings_failed})"
                                        ),
                                        extra={"station_id": station_id, "system": system_label},
                                )
                                break
                        except websockets.exceptions.ConnectionClosed as e:
                                logging.warning(
                                        f"751 ({websocket_path}): {direction} Keep-alive closed: {e.code} - {e.reason}",
                                        extra={"station_id": "system", "system": system_label},
                                )
                                break
                        except Exception as e:
                                logging.warning(
                                        f"Ping measurement failed: {e}",
                                        extra={"station_id": station_id, "system": system_label},
                                )
                        logging.debug(
                                "752: Sent ping to keep connection " + direction + " alive ",
                                extra={"station_id": "system", "system": system_label},
                        )
                        await asyncio.sleep(interval)
        except asyncio.CancelledError:

                logging.warning(
                        "760: Keep-alive task cancelled (" + direction + ")",
                        extra={"station_id": station_id, "system": system_label},
                        exc_info=True,
                )
                await log_message(
                        str(websocket_path),
                        direction,
                        "Keep-alive task cancelled " + direction +"",
                        connection_id,
                )
                logging.info(
                        "Closing target websocket due to keep_alive cancellation",
                        extra={"station_id": station_id, "system": system_label},
                )
                if not getattr(target_ws, "closed", False):
                        await target_ws.close()
                directionReverse = "client_to_server"
                if (direction == "client_to_server"):
                        directionReverse = "server_to_client"
                logging.info(
                        f"790: Keep-alive task closed to target ",
                        extra={"station_id": station_id, "system": system_label},
                )
                logging.warning("760-1", extra={"station_id": station_id, "system": system_label})
                await log_message(
                        station_id,
                        directionReverse,
                        "Keep-alive task closed: " + directionReverse,
                        connection_id,
                )
                #logging.warning("760-2");
                #if target_ws.closed:
                #       log_message(station_id, directionReverse, "Keep-Alive target_ws is really closed. Done.")
                #logging.warning("760-3");
                #if websocket.closed:
                #       log_message(station_id, directionReverse, "Keep-Alive  source_ws is really closed. Done.")
                
        except Exception as e:
                if isinstance(e, websockets.exceptions.ConnectionClosed):
                        logging.warning(
                                f"751 ({websocket_path}): {direction} Keep-alive closed: {e.code} - {e.reason}",
                                extra={"station_id": "system", "system": system_label},
                        )
                else:
                        logging.error(
                                f"751 ({websocket_path}): {direction} Keep-alive error: {e}",
                                extra={"station_id": "system", "system": system_label},
                        )
                last_part = websocket_path.rsplit('/', 1)[-1]
                await log_message(
                        str(websocket_path),
                        direction,
                        "Exception: KeepAlive cancelled " + websocket_path,
                        connection_id,
                )
                logging.info(
                        "Closing target websocket due to keep_alive exception",
                        extra={"station_id": station_id, "system": system_label},
                )
                if not getattr(target_ws, "closed", False):
                        await target_ws.close()
                await log_message(
                        station_id,
                        "Disconnected",
                        f"WebSocket Disconnected: System to Backend: Peer: {peer}, Sock: {sock}",
                        connection_id,
                )
                logging.info(
                        "Closing websocket due to keep_alive exception",
                        extra={"station_id": station_id, "system": system_label},
                )
                if not getattr(websocket, "closed", False):
                        await websocket.close()
                await log_message(
                        station_id,
                        "Disconnected",
                        f"WebSocket Disconnected: Wallbox To System: Peer: {peerInbound}, Sock: {sockInbound}",
                        connection_id,
                )
        finally:
                if not getattr(target_ws, "closed", False):
                        await target_ws.close()
                if not getattr(websocket, "closed", False):
                        await websocket.close()
async def load_config_from_db():
        """
        Lädt die Basis-Konfiguration (fallback_enabled, fallback_base_url)
        aus der Tabelle op_config.
        Setzt globale Variablen FALLBACK_ENABLED (bool) und FALLBACK_BASE_URL (str).
        """
        global FALLBACK_ENABLED, FALLBACK_BASE_URL, THROTTLE_ENABLED, THROTTLE_SECONDS
        global MQTT_BROKER, MQTT_PORT, MQTT_USER, MQTT_PASSWORD, MQTT_TOPIC_PREFIX
        global OCPP_PROXY_IP, OCPP_PROXY_PORT, OCPP_PROXY_API_PORT
        global OCPP_PROXY_SSL_ENABLED, OCPP_PROXY_CERTFILE, OCPP_PROXY_KEYFILE
        global OCPP_PROXY_FORCE_TLS12
        global DIAG_FTP_HOST, DIAG_FTP_PORT, DIAG_FTP_USER, DIAG_FTP_PASSWORD
        global BACKEND_SSL_VERIFY, BACKEND_SSL_CA_FILE

        async with db_connection() as conn:
                async with conn.cursor(aiomysql.DictCursor) as cur:
                        await cur.execute("SELECT config_key, config_value FROM op_config")
                        rows = await cur.fetchall()

        # Defaults (config.json overrides DB)
        FALLBACK_ENABLED = False
        FALLBACK_BASE_URL = ""
        THROTTLE_ENABLED = (
                _CONFIGURED_THROTTLE_ENABLED
                if _CONFIG_HAS_THROTTLE_ENABLED
                else True
        )
        THROTTLE_SECONDS = (
                _CONFIGURED_THROTTLE_SECONDS
                if _CONFIG_HAS_THROTTLE_SECONDS
                else 120
        )
        DIAG_FTP_HOST = DIAG_CFG.get("ftp_host", "217.160.79.201")
        try:
                DIAG_FTP_PORT = int(DIAG_CFG.get("ftp_port", 21))
        except (TypeError, ValueError):
                DIAG_FTP_PORT = 21
        DIAG_FTP_USER = DIAG_CFG.get("ftp_user", "diag")
        DIAG_FTP_PASSWORD = DIAG_CFG.get("ftp_password", "rem0tec0nnect2026x")

        OCPP_PROXY_IP = _CONFIGURED_OCPP_PROXY_IP
        OCPP_PROXY_PORT = _CONFIGURED_OCPP_PROXY_PORT
        OCPP_PROXY_API_PORT = _CONFIGURED_OCPP_PROXY_API_PORT
        OCPP_PROXY_SSL_ENABLED = _CONFIGURED_OCPP_PROXY_SSL_ENABLED
        OCPP_PROXY_CERTFILE = _CONFIGURED_OCPP_PROXY_CERTFILE
        OCPP_PROXY_KEYFILE = _CONFIGURED_OCPP_PROXY_KEYFILE
        OCPP_PROXY_FORCE_TLS12 = _CONFIGURED_OCPP_PROXY_FORCE_TLS12
        BACKEND_SSL_VERIFY = bool(_config.get("backend_ssl_verify", False))
        BACKEND_SSL_CA_FILE = _config.get("backend_ssl_ca_file") or None

        for row in rows:
                key = row["config_key"]
                val = row["config_value"]
                if key == "fallback_enabled":
                        FALLBACK_ENABLED = val.lower() == "true"
                elif key == "fallback_base_url":
                        FALLBACK_BASE_URL = val.rstrip("/")  # ohne abschließendes '/'
                elif key == "throttle_enabled":
                        if not _CONFIG_HAS_THROTTLE_ENABLED:
                                THROTTLE_ENABLED = (val.lower() == "true")
                elif key == "throttle_seconds":
                        if not _CONFIG_HAS_THROTTLE_SECONDS:
                                try:
                                        THROTTLE_SECONDS = int(val)
                                except ValueError:
                                        # Wenn val keine Zahl ist, behalten wir den Default (60s)
                                        pass
                elif key == "mqtt_broker":
                        MQTT_BROKER = val
                elif key == "mqtt_port":
                        try:
                                MQTT_PORT = int(val)
                        except ValueError:
                                pass
                elif key == "mqtt_user":
                        MQTT_USER = val
                elif key == "mqtt_password":
                        MQTT_PASSWORD = val
                elif key == "mqtt_topic_prefix":
                        MQTT_TOPIC_PREFIX = val
                elif key == "ocpp_proxy_ip":
                        if not _CONFIG_HAS_OCPP_PROXY_IP:
                                OCPP_PROXY_IP = val
                elif key == "ocpp_proxy_port":
                        if not _CONFIG_HAS_OCPP_PROXY_PORT:
                                try:
                                        OCPP_PROXY_PORT = int(val)
                                except ValueError:
                                        pass
                elif key == "ocpp_proxy_api_port":
                        if not _CONFIG_HAS_OCPP_PROXY_API_PORT:
                                try:
                                        OCPP_PROXY_API_PORT = int(val)
                                except ValueError:
                                        pass
                elif key == "ocpp_proxy_ssl_enabled":
                        if not _CONFIG_HAS_OCPP_PROXY_SSL_ENABLED:
                                OCPP_PROXY_SSL_ENABLED = val.lower() == "true"
                elif key == "ocpp_proxy_ssl_certfile":
                        if not _CONFIG_HAS_OCPP_PROXY_CERTFILE:
                                OCPP_PROXY_CERTFILE = val or None
                elif key == "ocpp_proxy_ssl_keyfile":
                        if not _CONFIG_HAS_OCPP_PROXY_KEYFILE:
                                OCPP_PROXY_KEYFILE = val or None
                elif key == "ocpp_proxy_force_tls12":
                        if not _CONFIG_HAS_OCPP_PROXY_FORCE_TLS12:
                                OCPP_PROXY_FORCE_TLS12 = val.lower() == "true"
                elif key == "diag_ftp_host":
                        DIAG_FTP_HOST = val
                elif key == "diag_ftp_port":
                        try:
                                DIAG_FTP_PORT = int(val)
                        except ValueError:
                                pass
                elif key == "diag_ftp_user":
                        DIAG_FTP_USER = val
                elif key == "diag_ftp_password":
                        DIAG_FTP_PASSWORD = val
                elif key == "backend_ssl_verify":
                        BACKEND_SSL_VERIFY = val.lower() == "true"
                elif key == "backend_ssl_ca_file":
                        BACKEND_SSL_CA_FILE = val or None

        _rebuild_runtime_proxy_listeners()
                                
async def load_wallboxes_from_db():
        """
        Lädt alle Wallbox-Einträge aus der DB und schreibt sie
        in das globale Dict STATION_PATHS um.
        Neuer: Speichert auch das 'activity'-Feld.
        """
        global STATION_PATHS, STATION_PATH_ALIASES

        try:
                async with db_connection() as conn:
                        async with conn.cursor(aiomysql.DictCursor) as cur:
                                try:
                                        await cur.execute("SHOW COLUMNS FROM op_redirects")
                                        column_rows = await cur.fetchall()
                                except Exception:
                                        logging.exception(
                                                "Failed to inspect op_redirects columns", extra={"station_id": "system"}
                                        )
                                        column_rows = []

                                columns: set[str] = set()
                                for column in column_rows:
                                        if isinstance(column, Mapping):
                                                field_name = column.get("Field")
                                        else:
                                                field_name = column[0] if column else None
                                        if field_name:
                                                columns.add(str(field_name))

                                select_parts: list[str] = [
                                        "source_url",
                                        "ws_url",
                                        "activity",
                                        "auth_key",
                                        "measure_ping",
                                ]

                                def _append_or_default(column_name: str, default_expr: str) -> None:
                                        if column_name in columns:
                                                select_parts.append(column_name)
                                        else:
                                                select_parts.append(f"{default_expr} AS {column_name}")

                                _append_or_default("strict_availability", "0")
                                _append_or_default("charging_analytics", "0")
                                _append_or_default("pnc_enabled", "0")
                                _append_or_default("disconnect_alert_enabled", "0")
                                _append_or_default("disconnect_alert_email", "NULL")
                                _append_or_default("mqtt_enabled", "0")
                                _append_or_default("backend_basic_user", "NULL")
                                _append_or_default("backend_basic_password", "NULL")

                                if "ping_interval" in columns:
                                        select_parts.append("ping_interval")
                                else:
                                        select_parts.append(f"{DEFAULT_PING_INTERVAL} AS ping_interval")

                                if "ping_enabled" in columns:
                                        select_parts.append("ping_enabled")
                                else:
                                        select_parts.append("NULL AS ping_enabled")

                                if "ocpp_subprotocol" in columns:
                                        select_parts.append("ocpp_subprotocol")
                                        subprotocol_column = "ocpp_subprotocol"
                                elif "ocpp_version" in columns:
                                        select_parts.append("ocpp_version AS ocpp_subprotocol")
                                        subprotocol_column = "ocpp_subprotocol"
                                else:
                                        select_parts.append("NULL AS ocpp_subprotocol")
                                        subprotocol_column = "ocpp_subprotocol"

                                sql = f"SELECT {', '.join(select_parts)} FROM op_redirects"
                                await cur.execute(sql)
                                rows = await cur.fetchall()

                def _normalise_subprotocol(row: Mapping[str, Any]) -> Optional[str]:
                        configured = normalize_configured_subprotocol(row.get(subprotocol_column))
                        if configured:
                                return configured

                        inferred = infer_subprotocol_from_path(row.get("source_url", ""))
                        if inferred:
                                return inferred

                        return None

                def _build_station_entry(row: Mapping[str, Any]) -> dict[str, Any]:
                        ping_enabled_raw = row.get("ping_enabled")
                        ping_enabled_flag: Optional[bool]
                        if ping_enabled_raw is None:
                                ping_enabled_flag = None
                        else:
                                ping_enabled_flag = as_bool(ping_enabled_raw)

                        ping_interval_value = _coerce_optional_int(row.get("ping_interval"))
                        ping_source = "default"
                        if ping_interval_value is not None:
                                ping_source = "interval"
                        elif ping_enabled_flag is not None:
                                ping_interval_value = (
                                        DEFAULT_PING_INTERVAL if ping_enabled_flag else 0
                                )
                                ping_source = "flag"
                        else:
                                ping_interval_value = DEFAULT_PING_INTERVAL

                        configured_subprotocol = _normalise_subprotocol(row)
                        fallback_subprotocol = configured_subprotocol or infer_subprotocol_from_path(row.get("source_url", ""))
                        if not fallback_subprotocol:
                                fallback_subprotocol = "ocpp1.6"

                        return {
                                "source_url": row["source_url"],
                                "ws_url": row["ws_url"],
                                "activity": row["activity"],
                                "measure_ping": as_bool(row.get("measure_ping", 0)),
                                "strict_availability": as_bool(row.get("strict_availability", 0)),
                                "charging_analytics": as_bool(row.get("charging_analytics", 0)),
                                "pnc_enabled": as_bool(row.get("pnc_enabled", 0)),
                                "disconnect_alert_enabled": as_bool(row.get("disconnect_alert_enabled", 0)),
                                "disconnect_alert_email": _ensure_str(row.get("disconnect_alert_email")),
                                "mqtt_enabled": as_bool(row.get("mqtt_enabled", 0)),
                                "auth_key": _ensure_str(row.get("auth_key")),
                                "backend_basic_user": _ensure_str(row.get("backend_basic_user")),
                                "backend_basic_password": _ensure_str(row.get("backend_basic_password")),
                                "ping_interval": ping_interval_value,
                                "ping_enabled_flag": ping_enabled_flag,
                                "ping_interval_source": ping_source,
                                "subprotocol": fallback_subprotocol,
                                "configured_subprotocol": configured_subprotocol,
                        }

                STATION_PATHS = {
                        row["source_url"]: _build_station_entry(row)
                        for row in rows
                }

                STATION_PATH_ALIASES = {}
                for source_url in STATION_PATHS:
                        _register_station_aliases(source_url)

                logging.info(
                        f"750: Loaded {len(rows)} Wallbox-Einträge aus der DB.", extra={"station_id": "system"}
                )
        except Exception as e:
                logging.error(f"701: Error loading wallboxes from DB: {e}", extra={"station_id": "system"})

async def insert_redirect(path, ws_url):
        """Fügt einen neuen Eintrag in die Tabelle op_redirects ein, wenn Fallback verwendet wird."""
        try:
                async with db_connection() as conn:
                        async with conn.cursor() as cur:
                                await cur.execute(
                                        "SELECT id FROM op_redirects WHERE source_url=%s",
                                        (path,),
                                )
                                exists = await cur.fetchone()
                                if exists:
                                        logging.debug(
                                                f"Redirect {path} already exists, skipping insert",
                                                extra={"station_id": "system"},
                                        )
                                else:
                                        available_columns: set[str] = set()
                                        try:
                                                await cur.execute("SHOW COLUMNS FROM op_redirects")
                                                column_rows = await cur.fetchall()
                                        except Exception:
                                                column_rows = []

                                        for column in column_rows:
                                                if isinstance(column, Mapping):
                                                        field_name = column.get("Field")
                                                else:
                                                        field_name = column[0] if column else None
                                                if field_name:
                                                        available_columns.add(str(field_name))

                                        if "ocpp_subprotocol" in available_columns:
                                                subprotocol_column = "ocpp_subprotocol"
                                        elif "ocpp_version" in available_columns:
                                                subprotocol_column = "ocpp_version"
                                        else:
                                                subprotocol_column = None

                                        columns = ["source_url", "ws_url", "activity", "measure_ping"]
                                        values: list[object] = [path, ws_url, "allow", 0]
                                        if subprotocol_column:
                                                columns.append(subprotocol_column)
                                                inferred = infer_subprotocol_from_path(path)
                                                values.append(inferred)

                                        placeholders = ", ".join(["%s"] * len(columns))
                                        column_list = ", ".join(columns)
                                        sql = f"INSERT INTO op_redirects ({column_list}) VALUES ({placeholders})"
                                        try:
                                                await cur.execute(sql, values)
                                        except Exception:
                                                # Fallback for legacy schemas without optional columns
                                                await cur.execute(
                                                        "INSERT INTO op_redirects (source_url, ws_url, activity, measure_ping) VALUES (%s, %s, %s, %s)",
                                                        (path, ws_url, 'allow', 0)
                                                )
                                        await conn.commit()
                                        logging.info(
                                                f"Created new redirect entry for {path} -> {ws_url}",
                                                extra={"station_id": "system"},
                                        )
                                        await load_wallboxes_from_db()
        except Exception as e:
                logging.error(
                        f"702: Error inserting redirect for {path}: {e}",
                        extra={"station_id": "system"},
                )

async def update_redirect(
        source_url,
        new_ws_url=None,
        new_activity=None,
        new_measure_ping=None,
        new_mqtt_enabled=None,
        new_backend_user=None,
        new_backend_password=None,
        new_strict_availability=None,
        new_charging_analytics=None,
        new_subprotocol=None,
):
        """Aktualisiert Eintrag in op_redirects und STATION_PATHS."""
        try:
                async with db_connection() as conn:
                        async with conn.cursor() as cur:
                                available_columns: set[str] = set()
                                try:
                                        await cur.execute("SHOW COLUMNS FROM op_redirects")
                                        column_rows = await cur.fetchall()
                                except Exception:
                                        column_rows = []

                                for column in column_rows:
                                        if isinstance(column, Mapping):
                                                field_name = column.get("Field")
                                        else:
                                                field_name = column[0] if column else None
                                        if field_name:
                                                available_columns.add(str(field_name))

                                if "ocpp_subprotocol" in available_columns:
                                        subprotocol_column = "ocpp_subprotocol"
                                elif "ocpp_version" in available_columns:
                                        subprotocol_column = "ocpp_version"
                                else:
                                        subprotocol_column = None

                                # Baue SQL-Statement dynamisch
                                assignments: list[tuple[str, object]] = []
                                if new_ws_url is not None:
                                        assignments.append(("ws_url = %s", new_ws_url))
                                if new_activity is not None:
                                        assignments.append(("activity = %s", new_activity))
                                if new_measure_ping is not None:
                                        assignments.append(("measure_ping = %s", int(as_bool(new_measure_ping))))
                                if new_strict_availability is not None and (
                                        not available_columns or "strict_availability" in available_columns
                                ):
                                        assignments.append(("strict_availability = %s", int(as_bool(new_strict_availability))))
                                if new_charging_analytics is not None and (
                                        not available_columns or "charging_analytics" in available_columns
                                ):
                                        assignments.append(("charging_analytics = %s", int(as_bool(new_charging_analytics))))
                                if new_mqtt_enabled is not None and (
                                        not available_columns or "mqtt_enabled" in available_columns
                                ):
                                        assignments.append(("mqtt_enabled = %s", int(as_bool(new_mqtt_enabled))))
                                if new_backend_user is not None and (
                                        not available_columns or "backend_basic_user" in available_columns
                                ):
                                        normalized_user = new_backend_user or None
                                        assignments.append(("backend_basic_user = %s", normalized_user))
                                if new_backend_password is not None and (
                                        not available_columns or "backend_basic_password" in available_columns
                                ):
                                        normalized_pass = new_backend_password or None
                                        assignments.append(("backend_basic_password = %s", normalized_pass))
                                if new_subprotocol is not None and subprotocol_column:
                                        normalized_subprotocol = (new_subprotocol or "").strip() or None
                                        assignments.append((f"{subprotocol_column} = %s", normalized_subprotocol))

                                if assignments:
                                        sql = f"UPDATE op_redirects SET {', '.join(field for field, _ in assignments)} WHERE source_url = %s"
                                        params = [value for _, value in assignments] + [source_url]
                                        try:
                                                await cur.execute(sql, params)
                                        except Exception:
                                                # Fallback for Installationen ohne Backend-Credentials-Spalten
                                                filtered = [
                                                        item
                                                        for item in assignments
                                                        if "backend_basic_user" not in item[0]
                                                        and "backend_basic_password" not in item[0]
                                                        and "strict_availability" not in item[0]
                                                        and "charging_analytics" not in item[0]
                                                        and "ocpp_subprotocol" not in item[0]
                                                        and "ocpp_version" not in item[0]
                                                ]
                                                if filtered and len(filtered) != len(assignments):
                                                        sql = f"UPDATE op_redirects SET {', '.join(field for field, _ in filtered)} WHERE source_url = %s"
                                                        params = [value for _, value in filtered] + [source_url]
                                                        await cur.execute(sql, params)
                                                else:
                                                        raise
                                        await conn.commit()
        except Exception as e:
                logging.error(f"703: Error updating redirect for {source_url}: {e}", extra={"station_id": "system"})
        else:
                # Update STATION_PATHS lokal
                entry = STATION_PATHS.get(source_url, {}).copy()
                entry['source_url'] = source_url
                if new_ws_url is not None:
                        entry['ws_url'] = new_ws_url
                if new_activity is not None:
                        entry['activity'] = new_activity
                if new_measure_ping is not None:
                        entry['measure_ping'] = as_bool(new_measure_ping)
                if new_strict_availability is not None:
                        entry['strict_availability'] = as_bool(new_strict_availability)
                if new_charging_analytics is not None:
                        entry['charging_analytics'] = as_bool(new_charging_analytics)
                if new_mqtt_enabled is not None:
                        entry['mqtt_enabled'] = as_bool(new_mqtt_enabled)
                if new_backend_user is not None:
                        entry['backend_basic_user'] = new_backend_user or None
                if new_backend_password is not None:
                        entry['backend_basic_password'] = new_backend_password or None
                if new_subprotocol is not None:
                        cleaned_subprotocol = normalize_configured_subprotocol(new_subprotocol)
                        entry['configured_subprotocol'] = cleaned_subprotocol
                        if cleaned_subprotocol:
                                entry['subprotocol'] = cleaned_subprotocol
                        else:
                                entry['subprotocol'] = infer_subprotocol_from_path(source_url) or "ocpp1.6"
                STATION_PATHS[source_url] = entry
                _register_station_aliases(source_url)
                logging.info(f"Updated redirect {source_url}: ws_url={new_ws_url}, activity={new_activity}", extra={"station_id": "system"})

async def update_last_connected(station_name):
        async with db_connection() as conn:
                async with conn.cursor() as cur:
                        await cur.execute(
                                """
                                UPDATE op_redirects
                                SET
                                  last_connected = NOW(),
                                  reconnect_count = reconnect_count + 1,
                                  ocpp_endpoint = %s
                                WHERE source_url = %s
                        """,
                                (OCPP_ENDPOINT, station_name),
                        )
                        await conn.commit()

async def send_update_firmware_request(station_id: str, location: str) -> str:
        """Baut und verschickt einen OCPP ``UpdateFirmware``-Call an die Wallbox.

        ``location`` gibt die URL der Firmware-Datei an und wird nicht mehr
        fest im Code hinterlegt. Die Funktion liefert die verwendete
        ``uniqueId`` zurück.
        """
        ws = ACTIVE_CLIENTS.get(station_id)
        if ws is None or getattr(ws, "closed", False):
                raise RuntimeError(f"Keine aktive Verbindung für Station {station_id}")

        unique_id = str(uuid.uuid4())
        PENDING_FIRMWARE.add(unique_id)

        payload = {
                "location": location,
                "retrieveDate": datetime.utcnow().replace(microsecond=0).isoformat() + "Z",
                "retries": 1,
                "retryInterval": 60,
                # optional: "checksum": "sha256:..."
        }

        # OCPP-Call: [2, uniqueId, Action, Payload]
        message = json.dumps([2, unique_id, "UpdateFirmware", payload])

        await ws.send(message)
        logging.info(f"Gesendet UpdateFirmware an {station_id}: {message}",
                                 extra={"station_id": station_id})
        return unique_id

def publish_message_via_mqtt(station_id: str, direction: str, message: str):

        base_topic = f"wallbox/{station_id}/{direction}"
        if MQTT_TOPIC_PREFIX:
                prefix = MQTT_TOPIC_PREFIX.rstrip('/') + '/'
                topic = f"{prefix}{base_topic}"
        else:
                topic = base_topic
        # print ("Publish via MQTT: " + topic)
        
        try:
                # ensure payload is a JSON-string
                payload = message if isinstance(message, str) else json.dumps(message)
                mqtt_client.publish(topic, payload)
        except Exception as e:
                logging.error(f"[MQTT] publish to {topic} failed: {e}")

async def ensure_message_timestamp_precision(cur, table_name: str) -> None:

        await cur.execute(
                """
                SELECT COLUMN_TYPE, DATETIME_PRECISION, IS_NULLABLE, COLUMN_DEFAULT
                FROM information_schema.COLUMNS
                WHERE TABLE_SCHEMA = DATABASE()
                  AND TABLE_NAME = %s
                  AND COLUMN_NAME = 'timestamp'
                LIMIT 1
                """,
                (table_name,),
        )
        row = await cur.fetchone()

        def _value(key: str, index: int):
                if isinstance(row, Mapping):
                        return row.get(key) or row.get(key.lower())
                try:
                        return row[index]
                except Exception:
                        return None

        column_type = str(_value("COLUMN_TYPE", 0) or "").lower()
        precision = _value("DATETIME_PRECISION", 1)
        is_nullable = str(_value("IS_NULLABLE", 2) or "").upper()
        column_default = str(_value("COLUMN_DEFAULT", 3) or "")

        has_microsecond_precision = precision == 6 or column_type.startswith("timestamp(6)")
        has_not_null_default = (
                is_nullable == "NO"
                and column_default.upper().startswith("CURRENT_TIMESTAMP")
        )

        if has_microsecond_precision and has_not_null_default:
                return

        await cur.execute(
                f"ALTER TABLE `{table_name}` "
                "MODIFY `timestamp` timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6)"
        )

async def log_message(
        station_id,
        direction,
        message,
        connection_id=None,
        topic_override: Optional[str] = None,
):
        """Logs messages to MySQL database and debug output."""

        global DB_LOG_FAILURE_COUNT, DB_LOG_LAST_ERROR, OP_MESSAGES_TIMESTAMP_PATCHED

        topic = topic_override or ""
        station_id_short = station_id.rsplit('/', 1)[-1]
        system_label = direction_to_system(direction)
        protocol = get_connection_protocol(station_id, connection_id)
        ocpp2 = is_ocpp2_protocol(protocol)
        try:
                if isinstance(message, (str, bytes)):
                        payload = json.loads(message)
                else:
                        payload = message

                if not topic_override and isinstance(payload, list):
                        m_type = payload[0]

                        if (
                                direction == "client_to_server"
                                and m_type == 2
                                and payload[2] == "BootNotification"
                        ):
                                body = (
                                        payload[3]
                                        if len(payload) > 3 and isinstance(payload[3], dict)
                                        else {}
                                )
                                configs = {
                                        "chargePointVendor": body.get("chargePointVendor"),
                                        "chargePointModel": body.get("chargePointModel"),
                                        "chargePointSerialNumber": body.get(
                                                "chargePointSerialNumber"
                                        )
                                        or body.get("chargeBoxSerialNumber"),
                                        "firmwareVersion": body.get("firmwareVersion"),
                                }
                                create_background_task(
                                        notify_external_configuration_update(
                                                station_id_short, configs
                                        )
                                )
                                topic = "BootNotification"

                        if (
                                direction == "client_to_server"
                                and m_type == 2
                                and payload[2] == "StartTransaction"
                        ):
                                body = payload[3] if len(payload) > 3 and isinstance(payload[3], dict) else {}
                                connector_raw = body.get("connectorId") if isinstance(body, dict) else None
                                if connector_raw is None:
                                        connector_raw = 1
                                raw_id_tag = body.get("idTag") if isinstance(body, dict) else None
                                effective_id_tag = (
                                        get_external_idtag_override(station_id_short, raw_id_tag)
                                        or raw_id_tag
                                )
                                info = {
                                        "stationId": station_id_short,
                                        "connectorId": connector_raw,
                                        "idToken": effective_id_tag,
                                        "idTokenOriginal": raw_id_tag,
                                        "sessionStartTimestamp": body.get("timestamp"),
                                        "meterStartWh": body.get("meterStart"),
                                }
                                PENDING_STARTS[payload[1]] = info
                                topic = "StartTransaction"

                                meter_start = body.get("meterStart")
                                connector_id = connector_raw
                                if meter_start is not None and connector_id is not None:
                                        try:
                                                connector_id_int = int(connector_id)
                                                meter_start_val = float(meter_start)
                                        except (TypeError, ValueError):
                                                pass
                                        else:
                                                ts_dt = parse_ocpp_timestamp(body.get("timestamp"))
                                                create_background_task(
                                                        store_last_meter_reading(
                                                                station_id_short,
                                                                connector_id_int,
                                                                meter_start_val,
                                                                ts_dt,
                                                        )
                                                )

                        elif (
                                ocpp2
                                and direction == "client_to_server"
                                and m_type == 2
                                and payload[2] == "TransactionEvent"
                        ):
                                body = payload[3] if len(payload) > 3 and isinstance(payload[3], dict) else {}
                                event_raw = body.get("eventType")
                                event_type = event_raw.lower() if isinstance(event_raw, str) else ""
                                tx_info_obj = body.get("transactionInfo")
                                tx_info = tx_info_obj if isinstance(tx_info_obj, dict) else {}
                                transaction_id_value = tx_info.get("transactionId") or body.get("transactionId")
                                tx_id = str(transaction_id_value) if transaction_id_value is not None else None
                                raw_id_token = _normalise_id_token(body.get("idToken"))
                                effective_id_token = (
                                        get_external_idtag_override(station_id_short, raw_id_token)
                                        or raw_id_token
                                )
                                connector_id, evse_id = _extract_connector_and_evse(body)
                                connector_key = _normalize_connector_key(connector_id)
                                connector_id_int = connector_key if isinstance(connector_key, int) else None
                                meter_values_raw = None
                                if isinstance(body, dict):
                                        meter_values_raw = body.get("meterValue") or body.get("meterValues")
                                event_timestamp_value = (
                                        body.get("timestamp")
                                        if isinstance(body.get("timestamp"), str)
                                        else None
                                )
                                meter_timestamp, meter_value, meter_payload = _parse_meter_value_entry(
                                        meter_values_raw if isinstance(meter_values_raw, list) else []
                                )
                                if meter_payload and connector_key is not None:
                                        station_entries = LATEST_CONNECTOR_METERS.setdefault(
                                                station_id_short, {}
                                        )
                                        station_entries[connector_key] = meter_payload
                                        _append_connector_meter_history(
                                                station_id_short,
                                                connector_key,
                                                meter_payload,
                                                fallback_timestamp=event_timestamp_value,
                                        )

                                topic = (
                                        f"TransactionEvent.{event_raw}"
                                        if isinstance(event_raw, str)
                                        else "TransactionEvent"
                                )

                                if event_type == "started":
                                        info: dict[str, Any] = {
                                                "stationId": station_id_short,
                                                "connectorId": connector_id,
                                                "evseId": evse_id,
                                                "idToken": effective_id_token,
                                                "idTokenOriginal": raw_id_token,
                                                "sessionStartTimestamp": event_timestamp_value,
                                                "meterStartWh": meter_value,
                                        }
                                        if tx_id:
                                                info["transactionId"] = tx_id
                                                ACTIVE_TRANSACTIONS[tx_id] = info
                                        if tx_id:
                                                create_background_task(
                                                        upsert_session_state(
                                                                station_id_short,
                                                                tx_id,
                                                                transaction_id=tx_id,
                                                                connector_id=connector_id,
                                                                evse_id=evse_id,
                                                                id_tag=effective_id_token,
                                                                status="ACTIVE",
                                                                start_timestamp=event_timestamp_value,
                                                                meter_start_wh=meter_value,
                                                                meter_last_wh=meter_value,
                                                        )
                                                )
                                        if meter_value is not None and connector_id_int is not None:
                                                ts_dt = parse_ocpp_timestamp(
                                                        meter_timestamp or event_timestamp_value
                                                )
                                                create_background_task(
                                                        store_last_meter_reading(
                                                                station_id_short,
                                                                connector_id_int,
                                                                meter_value,
                                                                ts_dt,
                                                        )
                                                )
                                        external_payload = {
                                                "stationId": station_id_short,
                                                "connectorId": connector_id,
                                                "evseId": evse_id,
                                                "idToken": effective_id_token,
                                                "idTokenOriginal": raw_id_token,
                                                "rawIdToken": raw_id_token,
                                                "transactionId": tx_id,
                                                "sessionStartTimestamp": event_timestamp_value,
                                                "timestamp": event_timestamp_value,
                                        }
                                        external_payload = {
                                                key: value
                                                for key, value in external_payload.items()
                                                if value is not None
                                        }
                                        if "transactionId" in external_payload:
                                                create_background_task(
                                                        notify_external_update_transaction(external_payload)
                                                )

                                elif event_type == "updated":
                                        if tx_id and tx_id in ACTIVE_TRANSACTIONS and meter_value is not None:
                                                ACTIVE_TRANSACTIONS[tx_id]["meterLastWh"] = meter_value
                                        if meter_value is not None and connector_id_int is not None:
                                                ts_dt = parse_ocpp_timestamp(
                                                        meter_timestamp or event_timestamp_value
                                                )
                                                create_background_task(
                                                        store_last_meter_reading(
                                                                station_id_short,
                                                                connector_id_int,
                                                                meter_value,
                                                                ts_dt,
                                                        )
                                                )
                                        if tx_id and meter_value is not None:
                                                create_background_task(
                                                        upsert_session_state(
                                                                station_id_short,
                                                                tx_id,
                                                                transaction_id=tx_id,
                                                                connector_id=connector_id,
                                                                evse_id=evse_id,
                                                                status="ACTIVE",
                                                                meter_last_wh=meter_value,
                                                                last_status=None,
                                                        )
                                                )

                                elif event_type == "ended":
                                        start_info = ACTIVE_TRANSACTIONS.pop(tx_id, None) if tx_id else None
                                        if meter_value is not None and connector_id_int is not None:
                                                ts_dt = parse_ocpp_timestamp(
                                                        meter_timestamp or event_timestamp_value
                                                )
                                                create_background_task(
                                                        store_last_meter_reading(
                                                                station_id_short,
                                                                connector_id_int,
                                                                meter_value,
                                                                ts_dt,
                                                        )
                                                )
                                        stop_reason = tx_info.get("stoppedReason") if isinstance(tx_info, dict) else None
                                        trigger_reason = body.get("triggerReason")
                                        if not stop_reason and isinstance(trigger_reason, str):
                                                stop_reason = trigger_reason
                                        stop_id_token = (
                                                (start_info or {}).get("idTokenOriginal")
                                                or (start_info or {}).get("idToken")
                                                or effective_id_token
                                                or raw_id_token
                                        )
                                        external_payload = {
                                                "stationId": station_id_short,
                                                "connectorId": (start_info or {}).get("connectorId")
                                                or connector_id,
                                                "evseId": (start_info or {}).get("evseId") or evse_id,
                                                "idToken": stop_id_token,
                                                "transactionId": tx_id,
                                                "sessionStartTimestamp": (start_info or {}).get(
                                                        "sessionStartTimestamp"
                                                ),
                                                "sessionEndTimestamp": event_timestamp_value,
                                                "meterStartWh": (start_info or {}).get("meterStartWh"),
                                                "meterStopWh": meter_value,
                                                "timestamp": event_timestamp_value,
                                                "stopReason": stop_reason,
                                        }
                                        external_payload = {
                                                key: value
                                                for key, value in external_payload.items()
                                                if value is not None
                                        }
                                        if "transactionId" in external_payload:
                                                create_background_task(
                                                        notify_external_stop_transaction(external_payload)
                                                )

                                        if start_info:
                                                stop_payload = {
                                                        "transactionId": tx_id,
                                                        "timestamp": event_timestamp_value,
                                                        "meterStop": meter_value,
                                                }
                                                oicp_target = False
                                                if HUBJECT_CLIENT is not None:
                                                        oicp_target = await is_station_oicp_enabled(
                                                                station_id_short
                                                        )
                                                await ocpi_cdr_forwarder.send_cdr(
                                                        station_id_short, start_info, stop_payload
                                                )
                                                if oicp_target:
                                                        await _push_hubject_cdr_with_retry(
                                                                station_id_short,
                                                                start_info,
                                                                stop_payload,
                                                        )
                                        if tx_id:
                                                create_background_task(
                                                        upsert_session_state(
                                                                station_id_short,
                                                                tx_id,
                                                                transaction_id=tx_id,
                                                                connector_id=connector_id,
                                                                evse_id=evse_id,
                                                                id_tag=effective_id_token,
                                                                status="COMPLETED",
                                                                start_timestamp=(start_info or {}).get(
                                                                        "sessionStartTimestamp"
                                                                ),
                                                                end_timestamp=event_timestamp_value,
                                                                meter_start_wh=(start_info or {}).get(
                                                                        "meterStartWh"
                                                                ),
                                                                meter_stop_wh=meter_value,
                                                                meter_last_wh=meter_value,
                                                        )
                                                )
                                        else:
                                                logging.warning(
                                                        "TransactionEvent Ended without cached start info: transactionId=%s",
                                                        tx_id,
                                                        extra={"station_id": station_id, "system": system_label},
                                                )

                        elif (
                                direction == "client_to_server"
                                and m_type == 2
                                and payload[2] == "MeterValues"
                        ):
                                body = payload[3] if len(payload) > 3 and isinstance(payload[3], dict) else {}
                                mv_list = body.get("meterValue") if isinstance(body, dict) else []
                                entry = mv_list[0] if mv_list else {}
                                timestamp = entry.get("timestamp") if isinstance(entry, dict) else None
                                ts_dt = parse_ocpp_timestamp(timestamp if isinstance(timestamp, str) else None)
                                sampled = entry.get("sampledValue") if isinstance(entry, dict) else []
                                target = None
                                if isinstance(sampled, list):
                                        for sv in sampled:
                                                if not isinstance(sv, dict):
                                                        continue
                                                meas = sv.get("measurand")
                                                if meas in (None, "Energy.Active.Import.Register"):
                                                        target = sv
                                                        break
                                        if target is None:
                                                for sv in sampled:
                                                        if isinstance(sv, dict):
                                                                target = sv
                                                                break
                                meter_value = None
                                if isinstance(target, dict):
                                        value_str = target.get("value")
                                        try:
                                                meter_value = float(value_str)
                                        except (TypeError, ValueError):
                                                meter_value = None
                                connector_id = body.get("connectorId") if isinstance(body, dict) else None
                                if connector_id is None:
                                        connector_id = 1
                                try:
                                        connector_id_int = int(connector_id) if connector_id is not None else None
                                except (TypeError, ValueError):
                                        connector_id_int = None
                                if meter_value is not None and connector_id_int is not None:
                                        create_background_task(
                                                store_last_meter_reading(
                                                        station_id_short,
                                                        connector_id_int,
                                                        meter_value,
                                                        ts_dt,
                                                )
                                        )
                                        active_session_uid = _find_active_session_uid(
                                                station_id_short, connector_id
                                        )
                                        if active_session_uid:
                                                create_background_task(
                                                        upsert_session_state(
                                                                station_id_short,
                                                                active_session_uid,
                                                                connector_id=connector_id,
                                                                meter_last_wh=meter_value,
                                                                status="ACTIVE",
                                                        )
                                                )

                                connector_key = _normalize_connector_key(connector_id)
                                if connector_key is not None and isinstance(sampled, list):
                                        energy_import = None
                                        current_import: dict[str, float] = {}
                                        power_import_total: Optional[float] = None
                                        power_import_phases: dict[str, float] = {}
                                        for sv in sampled:
                                                if not isinstance(sv, dict):
                                                        continue
                                                measurand = sv.get("measurand") or "Energy.Active.Import.Register"
                                                value = _safe_float(sv.get("value"))
                                                if value is None:
                                                        continue
                                                if measurand == "Energy.Active.Import.Register":
                                                        energy_import = value
                                                elif measurand == "Current.Import":
                                                        phase = sv.get("phase")
                                                        if isinstance(phase, str) and phase in {"L1", "L2", "L3"}:
                                                                current_import[phase] = value
                                                elif measurand == "Power.Active.Import":
                                                        phase = sv.get("phase")
                                                        if isinstance(phase, str) and phase:
                                                                power_import_phases[phase] = value
                                                        else:
                                                                power_import_total = value

                                        meter_payload: dict[str, Any] = {}
                                        if isinstance(timestamp, str) and timestamp:
                                                meter_payload["timestamp"] = timestamp
                                        if energy_import is not None:
                                                meter_payload["Energy.Active.Import.Register"] = energy_import
                                        if current_import:
                                                meter_payload["Current.Import"] = current_import
                                        if power_import_total is not None:
                                                meter_payload["Power.Active.Import"] = power_import_total
                                        elif power_import_phases:
                                                meter_payload["Power.Active.Import"] = power_import_phases
                                        if power_import_phases:
                                                meter_payload["Power.Active.Import.Phases"] = power_import_phases

                                        if meter_payload:
                                                station_entries = LATEST_CONNECTOR_METERS.setdefault(station_id_short, {})
                                                station_entries[connector_key] = meter_payload
                                                _append_connector_meter_history(
                                                        station_id_short,
                                                        connector_key,
                                                        meter_payload,
                                                        fallback_timestamp=timestamp,
                                                )

                        elif (
                                direction == "server_to_client"
                                and m_type == 3
                                and isinstance(payload[2], dict)
                                and "transactionId" in payload[2]
                        ):
                                transaction_id = payload[2].get("transactionId")
                                if transaction_id is not None:
                                        start_info = PENDING_STARTS.pop(payload[1], None)
                                        tx_id = str(transaction_id)
                                        timestamp = (
                                                datetime.now(timezone.utc)
                                                .replace(microsecond=0)
                                                .isoformat()
                                                .replace("+00:00", "Z")
                                        )
                                        external_payload = {
                                                "stationId": station_id_short,
                                                "transactionId": tx_id,
                                                "timestamp": timestamp,
                                        }
                                        if start_info:
                                                start_info["transactionId"] = tx_id
                                                ACTIVE_TRANSACTIONS[tx_id] = start_info
                                                external_payload.update(
                                                        {
                                                                "connectorId": start_info.get("connectorId"),
                                                                "idToken": start_info.get("idTokenOriginal")
                                                                or start_info.get("idToken"),
                                                                "sessionStartTimestamp": start_info.get("sessionStartTimestamp"),
                                                        }
                                                )
                                                create_background_task(
                                                        upsert_session_state(
                                                                station_id_short,
                                                                tx_id,
                                                                transaction_id=tx_id,
                                                                connector_id=start_info.get("connectorId"),
                                                                id_tag=start_info.get("idToken")
                                                                or start_info.get("idTokenOriginal"),
                                                                status="ACTIVE",
                                                                start_timestamp=start_info.get(
                                                                        "sessionStartTimestamp"
                                                                ),
                                                                meter_start_wh=start_info.get("meterStartWh"),
                                                                meter_last_wh=start_info.get("meterStartWh"),
                                                        )
                                                )
                                        create_background_task(
                                                notify_external_update_transaction(external_payload)
                                        )

                        if (
                                direction == "client_to_server"
                                and m_type == 2
                                and payload[2] == "StopTransaction"
                        ):
                                body = payload[3] if len(payload) > 3 and isinstance(payload[3], dict) else {}
                                tx_id = str(body.get("transactionId"))
                                start_info = ACTIVE_TRANSACTIONS.pop(tx_id, None)
                                if start_info:
                                        oicp_target = False
                                        if HUBJECT_CLIENT is not None:
                                                oicp_target = await is_station_oicp_enabled(
                                                        station_id_short
                                                )
                                        await ocpi_cdr_forwarder.send_cdr(
                                                station_id_short, start_info, body
                                        )
                                        if oicp_target:
                                                await _push_hubject_cdr_with_retry(
                                                        station_id_short,
                                                        start_info,
                                                        body,
                                                )
                                else:
                                        logging.warning(
                                                "StopTransaction without cached StartTransaction info: transactionId=%s, cached_active=%s, payload=%s",
                                                tx_id,
                                                list(ACTIVE_TRANSACTIONS.keys()),
                                                body,
                                                extra={"station_id": station_id, "system": system_label},
                                        )
                                meter_stop = body.get("meterStop")
                                connector_id = (start_info or {}).get("connectorId") or body.get("connectorId")
                                if connector_id is None:
                                        connector_id = 1
                                stop_timestamp = body.get("timestamp") if isinstance(body, dict) else None
                                stop_dt = parse_ocpp_timestamp(stop_timestamp if isinstance(stop_timestamp, str) else None)
                                if meter_stop is not None and connector_id is not None:
                                        try:
                                                connector_id_int = int(connector_id)
                                                meter_stop_val = float(meter_stop)
                                        except (TypeError, ValueError):
                                                pass
                                        else:
                                                create_background_task(
                                                        store_last_meter_reading(
                                                                station_id_short,
                                                                connector_id_int,
                                                                meter_stop_val,
                                                                stop_dt,
                                                        )
                                                )
                                body_id_tag = body.get("idTag") if isinstance(body, dict) else None
                                stop_id_tag = (
                                        body_id_tag
                                        if body_id_tag is not None
                                        else (start_info or {}).get("idTokenOriginal")
                                        or (start_info or {}).get("idToken")
                                )
                                external_payload = {
                                        "stationId": station_id_short,
                                        "connectorId": (start_info or {}).get("connectorId")
                                        or body.get("connectorId"),
                                        "idToken": stop_id_tag,
                                        "transactionId": tx_id,
                                        "sessionStartTimestamp": (start_info or {}).get("sessionStartTimestamp"),
                                        "sessionEndTimestamp": body.get("timestamp"),
                                        "meterStartWh": (start_info or {}).get("meterStartWh"),
                                        "meterStopWh": body.get("meterStop"),
                                        "timestamp": body.get("timestamp"),
                                        "stopReason": body.get("reason"),
                                }
                                create_background_task(
                                        notify_external_stop_transaction(external_payload)
                                )
                                create_background_task(
                                        upsert_session_state(
                                                station_id_short,
                                                tx_id,
                                                transaction_id=tx_id,
                                                connector_id=connector_id,
                                                id_tag=stop_id_tag,
                                                status="COMPLETED",
                                                start_timestamp=(start_info or {}).get(
                                                        "sessionStartTimestamp"
                                                ),
                                                end_timestamp=body.get("timestamp"),
                                                meter_start_wh=(start_info or {}).get("meterStartWh"),
                                                meter_stop_wh=body.get("meterStop"),
                                                meter_last_wh=body.get("meterStop"),
                                        )
                                )
                                topic = "StopTransaction"

                        if (
                                direction == "server_to_client"
                                and m_type == 2
                                and payload[2] == "TriggerMessage"
                        ):
                                body = payload[3] if len(payload) > 3 and isinstance(payload[3], dict) else {}
                                if body.get("requestedMessage") == "StatusNotification":
                                        STATUS_NOTIFICATION_TRIGGER.add(station_id_short)

                        if (
                                direction == "client_to_server"
                                and m_type == 2
                                and payload[2] == "StatusNotification"
                        ):
                                body = (
                                        payload[3]
                                        if len(payload) > 3 and isinstance(payload[3], dict)
                                        else {}
                                )
                                if station_id_short in STATUS_NOTIFICATION_TRIGGER:
                                        STATUS_NOTIFICATION_TRIGGER.remove(station_id_short)
                                        create_background_task(
                                                notify_external_status_notification(
                                                        station_id_short, body
                                                )
                                        )

                                connector_raw = body.get("connectorId") if isinstance(body, dict) else None
                                try:
                                        connector_id_int = int(connector_raw)
                                except (TypeError, ValueError):
                                        connector_id_int = None

                                status_value = body.get("status") if isinstance(body, dict) else None
                                error_code = body.get("errorCode") if isinstance(body, dict) else None

                                connector_key = _normalize_connector_key(connector_raw)
                                status_payload: dict[str, Any] = {}
                                if status_value is not None:
                                        status_payload["status"] = status_value
                                timestamp_value = body.get("timestamp") if isinstance(body, dict) else None
                                if timestamp_value is not None:
                                        status_payload["timestamp"] = timestamp_value
                                if error_code is not None:
                                        status_payload["errorCode"] = error_code

                                if status_payload and connector_key is not None:
                                        station_entries = LATEST_CONNECTOR_STATUS.setdefault(station_id_short, {})
                                        station_entries[connector_key] = status_payload

                                create_background_task(
                                        upsert_evse_status(
                                                station_id_short,
                                                connector_id=connector_raw,
                                                evse_id=body.get("evseId") if isinstance(body, dict) else None,
                                                status=status_value,
                                                error_code=error_code,
                                                status_timestamp=timestamp_value,
                                        )
                                )

                                active_session_uid = _find_active_session_uid(
                                        station_id_short, connector_raw, body.get("evseId") if isinstance(body, dict) else None
                                )
                                if active_session_uid:
                                        create_background_task(
                                                upsert_session_state(
                                                        station_id_short,
                                                        active_session_uid,
                                                        connector_id=connector_raw,
                                                        evse_id=body.get("evseId") if isinstance(body, dict) else None,
                                                        last_status=status_value,
                                                        last_status_timestamp=timestamp_value,
                                                )
                                        )

                                if status_value == "Faulted":
                                        timestamp = datetime.now(timezone.utc).strftime(
                                                "%Y-%m-%d %H:%M:%S"
                                        )
                                        if connector_id_int is not None and error_code:
                                                reason = (
                                                        f"Connector {connector_id_int} reported Faulted "
                                                        f"({error_code}) at {timestamp}"
                                                )
                                        elif connector_id_int is not None:
                                                reason = (
                                                        f"Connector {connector_id_int} reported Faulted at {timestamp}"
                                                )
                                        elif error_code:
                                                reason = (
                                                        f"Chargepoint reported Faulted ({error_code}) at {timestamp}"
                                                )
                                        else:
                                                reason = f"Chargepoint went into fault state {timestamp}"
                                        create_background_task(
                                                _store_fault_mark(
                                                        station_id_short,
                                                        None,
                                                        reason,
                                                )
                                        )

                                if error_code == "ConnectorLockFailure":
                                        timestamp = datetime.now(timezone.utc).strftime(
                                                "%Y-%m-%d %H:%M:%S"
                                        )
                                        if connector_id_int is not None:
                                                reason = (
                                                        f"Connector {connector_id_int} reported Connector "
                                                        f"Lock Failure on {timestamp}"
                                                )
                                        else:
                                                reason = f"Connector Lock Failure on {timestamp}"
                                        create_background_task(
                                                _store_fault_mark(
                                                        station_id_short,
                                                        None,
                                                        reason,
                                                )
                                        )

                                if error_code == "OverCurrentFailure":
                                        timestamp = datetime.now(timezone.utc).strftime(
                                                "%Y-%m-%d %H:%M:%S"
                                        )
                                        if connector_id_int is not None:
                                                reason = (
                                                        f"Connector {connector_id_int} reported OverCurrentFailure "
                                                        f"on {timestamp}"
                                                )
                                        else:
                                                reason = f"Chargepoint reported OverCurrentFailure on {timestamp}"
                                        create_background_task(
                                                _store_fault_mark(
                                                        station_id_short,
                                                        None,
                                                        reason,
                                                )
                                        )

                                if error_code == "OtherError":
                                        info_value = (
                                                body.get("info") if isinstance(body, dict) else None
                                        )
                                        if info_value:
                                                reason = str(info_value)
                                        elif connector_id_int is not None:
                                                reason = (
                                                        f"Connector {connector_id_int} reported OtherError"
                                                )
                                        else:
                                                reason = "Chargepoint reported OtherError"
                                        create_background_task(
                                                _store_fault_mark(
                                                        station_id_short,
                                                        None,
                                                        reason,
                                                )
                                        )

                                preparing_key = (
                                        (station_id_short, connector_id_int)
                                        if connector_id_int is not None
                                        else None
                                )
                                if preparing_key is not None:
                                        tracker_entry = _PREPARING_FAILURE_TRACKER.get(preparing_key)
                                else:
                                        tracker_entry = None

                                if status_value == "Preparing" and preparing_key is not None:
                                        if tracker_entry is None:
                                                tracker_entry = {
                                                        "awaiting_charge": True,
                                                        "failures": 0,
                                                }
                                                _PREPARING_FAILURE_TRACKER[preparing_key] = tracker_entry
                                        else:
                                                tracker_entry["awaiting_charge"] = True

                                elif status_value == "Charging" and tracker_entry is not None:
                                        tracker_entry["awaiting_charge"] = False
                                        tracker_entry["failures"] = 0

                                elif (
                                        tracker_entry is not None
                                        and tracker_entry.get("awaiting_charge")
                                        and status_value in _PREPARING_FAILURE_TERMINAL_STATUSES
                                ):
                                        tracker_entry["awaiting_charge"] = False
                                        tracker_entry["failures"] = (
                                                int(tracker_entry.get("failures", 0)) + 1
                                        )
                                        if (
                                                tracker_entry["failures"]
                                                >= _PREPARING_FAILURE_THRESHOLD
                                        ):
                                                tracker_entry["failures"] = 0
                                                timestamp = datetime.now(timezone.utc).strftime(
                                                        "%Y-%m-%d %H:%M:%S"
                                                )
                                                reason = (
                                                        "Connector {connector} failed to start charging "
                                                        "{threshold} times consecutively (Preparing without "
                                                        "Charging) as of {timestamp}"
                                                ).format(
                                                        connector=connector_id_int,
                                                        threshold=_PREPARING_FAILURE_THRESHOLD,
                                                        timestamp=timestamp,
                                                )
                                                create_background_task(
                                                        _store_fault_mark(
                                                                station_id_short,
                                                                None,
                                                                reason,
                                                        )
                                                )

                        elif (
                                m_type == 3
                                and len(payload) > 2
                                and isinstance(payload[2], dict)
                                and "configurationKey" in payload[2]
                        ):
                                topic = "GetConfiguration"

                        elif (
                                len(payload) > 3
                                and isinstance(payload[3], dict)
                                and "idTag" in payload[3]
                        ):
                                topic = "Authorize"

        except (ValueError, TypeError):
                pass

        try:

                if connection_id is None:
                        connection_id = ACTIVE_CONNECTION_IDS.get(station_id)
                if connection_id is None:
                        # Use 0 when no connection ID is available so DB INSERT
                        # does not fail on NOT NULL constraint
                        connection_id = 0
                mqtt_entry, _ = resolve_station_entry(station_id, station_id)
                if mqtt_entry and mqtt_entry.get("mqtt_enabled"):
                        try:
                                publish_message_via_mqtt(station_id_short, direction, message)
                        except Exception as e:
                                # we swallow all errors here so proxy logic is never interrupted
                                print("Error while publishing")
                                print(e)
                                pass
                                
                async with db_connection() as conn:
                        async with conn.cursor() as cur:
                                if not OP_MESSAGES_TIMESTAMP_PATCHED:
                                        try:
                                                await ensure_message_timestamp_precision(cur, "op_messages")
                                        except Exception:
                                                logging.debug(
                                                        "Failed to enforce microsecond timestamps on op_messages",
                                                        exc_info=True,
                                                        extra={"station_id": station_id, "system": system_label},
                                                )
                                        else:
                                                OP_MESSAGES_TIMESTAMP_PATCHED = True

                                # [2, "5984436", "Authorize", {"idTag": "03907BAE"}]
                                # 1) dynamischen Tabellennamen ermitteln
                                now = datetime.now()
                                monthly_table = f"op_messages_{now.strftime('%y%m')}"

                                 # nur dieser Aufruf wird stumm geschaltet
                                with warnings.catch_warnings():
                                        warnings.filterwarnings(
                                                "ignore",
                                                message=r"Table '.*' already exists",
                                                category=Warning,
                                        )
                                        await cur.execute(
                                                f"CREATE TABLE IF NOT EXISTS `{monthly_table}` LIKE op_messages"
                                        )

                                if monthly_table not in MONTHLY_MESSAGE_TABLES_PATCHED:
                                        try:
                                                await ensure_message_timestamp_precision(cur, monthly_table)
                                        except Exception:
                                                logging.warning(
                                                        "Failed to enforce microsecond timestamps on %s",
                                                        monthly_table,
                                                        exc_info=True,
                                                        extra={"station_id": station_id, "system": system_label},
                                                )
                                        else:
                                                MONTHLY_MESSAGE_TABLES_PATCHED.add(monthly_table)

                                # 2) Tabelle anlegen, falls noch nicht da
                                # await cur.execute(f"CREATE TABLE IF NOT EXISTS `{monthly_table}` LIKE op_messages;")

                                # await cur.execute(
                                #       "INSERT INTO op_messages (source_url, direction, message, topic) VALUES (%s, %s, %s, %s)",
                                #       (station_id, direction, message, topic)
                                # )

                                # 3) Nachricht in die Monats-Tabelle schreiben
                                await cur.execute(
                                f"INSERT INTO `{monthly_table}` (source_url, direction, message, topic, connection_id) "
                                "VALUES (%s, %s, %s, %s, %s)",
                                (station_id, direction, message, topic, connection_id)
                                )

                                await conn.commit()

                                # logging.info(f"DB-LOG [{station_id}] {direction} {message}", extra={"station_id": station_id})

        except Exception:
                now = datetime.now(timezone.utc)
                DB_LOG_FAILURE_COUNT += 1
                should_log = (
                        DB_LOG_LAST_ERROR is None
                        or (now - DB_LOG_LAST_ERROR).total_seconds() >= 60
                        or DB_LOG_FAILURE_COUNT == 1
                )
                if should_log:
                        logging.error(
                                "Database logging failed (%s consecutive errors). Messages may not be persisted until the connection recovers.",
                                DB_LOG_FAILURE_COUNT,
                                extra={"station_id": station_id, "system": system_label},
                        )
                        logging.exception(
                                "Database error while logging message for station %s (failure #%s)",
                                station_id,
                                DB_LOG_FAILURE_COUNT,
                                extra={"station_id": station_id, "system": system_label},
                        )
                DB_LOG_LAST_ERROR = now
        else:
                if DB_LOG_FAILURE_COUNT:
                        logging.info(
                                "Database logging restored after %s consecutive errors.",
                                DB_LOG_FAILURE_COUNT,
                                extra={"station_id": station_id, "system": system_label},
                        )
                DB_LOG_FAILURE_COUNT = 0
                DB_LOG_LAST_ERROR = None

        trigger_fault_detection(station_id_short, message)

async def forward_messages(
        source_ws,
        target_ws,
        station_id,
        direction,
        connection_id,
        disconnect_tracker: Optional[MutableMapping[str, Optional[str]]] = None,
):
        """Forwards messages and logs them."""
        station_id_short = station_id.rsplit('/', 1)[-1]
        system_label = direction_to_system(direction)
        log = logging.LoggerAdapter(logging.getLogger(), {"station_id": station_id, "system": system_label})
        peer = "0:0:0.0"
        sock = "0:0:0.0"
        peerInbound = "0:0:0.0"
        sockInbound = "0:0:0.0"
                                
        try:

                transport = target_ws.transport
                peer = transport.get_extra_info("peername")     # (host, port) des Peers
                sock = transport.get_extra_info("sockname")     # (host, port) deines Sockets
                log.info(f"forward_messages: Peer: {peer}, Sock: {sock}")

                transportInbound = source_ws.transport
                peerInbound = transportInbound.get_extra_info("peername")       # (host, port) des Peers
                sockInbound = transportInbound.get_extra_info("sockname")       # (host, port) deines Sockets

                async for message in source_ws:
                        LAST_SEEN[station_id] = datetime.now(timezone.utc)
                        log.info(f"762: [{station_id}] {direction} Message: {message}")
                        await log_message(station_id, direction, message, connection_id)

                        data = None
                        try:
                                if isinstance(message, (str, bytes)):
                                        data = json.loads(message)
                                else:
                                        data = message
                        except Exception:
                                data = None

                        if (
                                direction == "client_to_server"
                                and isinstance(data, list)
                                and len(data) > 2
                                and data[0] == 2
                                and data[2] == "BootNotification"
                        ):
                                await store_bootnotification(station_id, data)

                        if (
                                direction == "server_to_client"
                                and isinstance(data, list)
                                and len(data) > 2
                                and data[0] == 2
                                and data[2] == "GetConfiguration"
                        ):
                                is_full = 1
                                if (
                                        len(data) > 3
                                        and isinstance(data[3], dict)
                                        and data[3].get("key")
                                ):
                                        is_full = 0
                                key = f"{station_id}|{data[1]}"
                                PENDING_GET_CONFIGURATION[key] = is_full

                        if (
                                direction == "client_to_server"
                                and isinstance(data, list)
                                and len(data) > 3
                                and isinstance(data[3], dict)
                        ):
                                body = data[3]
                                original_tag = _extract_id_token_value(body)
                                if original_tag is not None:
                                        tag = original_tag
                                        pending_key: Optional[str] = None
                                        action_obj = data[2] if len(data) > 2 else None
                                        action = action_obj if isinstance(action_obj, str) else None
                                        if (
                                                data[0] == 2
                                                and len(data) > 1
                                                and action in {"Authorize", "StartTransaction"}
                                        ):
                                                pending_key = f"{station_id}|{data[1]}"
                                                PENDING_RFID[pending_key] = tag
                                        if data[0] == 2 and action == "Authorize":
                                                connector_hint, evse_hint = _extract_connector_and_evse(body)
                                                tag, blocked = await fetch_mapped_token(
                                                    original_tag,
                                                    station_id_short,
                                                    station_id,
                                                    connector_hint=connector_hint,
                                                    evse_hint=evse_hint,
                                                )
                                                set_external_idtag_override(station_id_short, original_tag, tag)
                                                if blocked:
                                                        response = [3, data[1], {"idTagInfo": {"status": "Blocked"}}]
                                                        await source_ws.send(json.dumps(response))
                                                        await log_rfid_event(tag, station_id, "rejected")
                                                        if pending_key is not None:
                                                                PENDING_RFID.pop(pending_key, None)
                                                        continue
                                        else:
                                                override = get_external_idtag_override(
                                                        station_id_short, original_tag
                                                )
                                                if override:
                                                        tag = override
                                        new_tag = await translate_id_tag(tag)
                                        if new_tag:
                                                tag = new_tag
                                        if pending_key is not None:
                                                PENDING_RFID[pending_key] = tag
                                        if tag != original_tag:
                                                if _set_id_token_value(body, tag):
                                                        message = json.dumps(data)

                        if (
                                direction == "server_to_client"
                                and isinstance(data, list)
                                and data[0] == 3
                                and len(data) > 1
                        ):
                                key = f"{station_id}|{data[1]}"
                                tag = PENDING_RFID.pop(key, None)
                                if tag and isinstance(data[2], dict):
                                        status = data[2].get("idTagInfo", {}).get("status") or data[2].get("status")
                                        result = "accepted" if status and status.lower() == "accepted" else "rejected"
                                        await log_rfid_event(tag, station_id, result)

                        if (
                                direction == "client_to_server"
                                and isinstance(data, list)
                                and data[0] == 3
                                and len(data) > 1
                        ):
                                key = f"{station_id}|{data[1]}"
                                is_full = PENDING_GET_CONFIGURATION.pop(key, None)
                                if is_full is not None:
                                        await store_getconfiguration(station_id, data, is_full)
                                        if is_full:
                                                body = (
                                                        data[2]
                                                        if len(data) > 2
                                                        and isinstance(data[2], dict)
                                                        else {}
                                                )
                                                items = body.get("configurationKey") or []
                                                configs: dict[str, object] = {}
                                                if isinstance(items, list):
                                                        for item in items:
                                                                if not isinstance(item, dict):
                                                                        continue
                                                                key_name = item.get("key")
                                                                if key_name:
                                                                        configs[key_name] = item.get("value")
                                                create_background_task(
                                                        notify_external_configuration_update(
                                                                station_id_short, configs
                                                        )
                                                )

                        try:
                                if isinstance(data, list) and data[0] in {3, 4} and len(data) > 1:
                                        unique_id = data[1]
                                        removed = False

                                        if unique_id in PENDING_DIAGNOSTICS:
                                                PENDING_DIAGNOSTICS.remove(unique_id)
                                                removed = True
                                        if unique_id in PENDING_REBOOT:
                                                PENDING_REBOOT.remove(unique_id)
                                                removed = True
                                        if unique_id in PENDING_FIRMWARE:
                                                PENDING_FIRMWARE.remove(unique_id)
                                                removed = True

                                        if removed:
                                                log.info(
                                                        f"Dropped response for {station_id}, uniqueId={unique_id}"
                                                )
                                                continue

                        except (ValueError, TypeError):
                                pass  # keine valide JSON-Antwort, normal weiterverarbeiten

                        await target_ws.send(message)
        except asyncio.CancelledError:
                log.warning(
                        f"[{station_id}] forward_messages cancelled ({direction})",
                )
                log.info(
                        "Closing source websocket due to forward_messages cancellation",
                )
                await source_ws.close()
                log.info(
                        "Closing target websocket due to forward_messages cancellation",
                )
                await target_ws.close()
        except websockets.exceptions.ConnectionClosed as e:

                await log_message(
                        station_id,
                        direction,
                        f"Connection closed ({direction}): {e.code} - {e.reason}",
                        connection_id,
                )
                log.info(
                        f"702: [{station_id}] Connection closed ({direction}): {e.code} - {e.reason}",
                )
                log.info(f"702-2: forward_messages ConnectionClosed: transportInbound: {peerInbound}, Sock: {sockInbound}")

                log.info(
                        "Closing target websocket after ConnectionClosed in forward_messages",
                )
                await target_ws.close()

                directionReverse = "client_to_server"
                if (direction == "client_to_server"):
                        directionReverse = "server_to_client"

                if (
                        disconnect_tracker is not None
                        and not disconnect_tracker.get("source")
                ):
                        disconnect_tracker["source"] = (
                                "chargepoint" if direction == "client_to_server" else "server"
                        )
                log.info(f"702: [{station_id}] Connection closed to target")
                await log_message(station_id, directionReverse, "forward_messages: Connection closed: " + directionReverse, connection_id)


                if getattr(target_ws, "closed", False):
                        await log_message(station_id, directionReverse, "forward_messages:: Connection target_ws is really closed. Done.", connection_id)
                if getattr(source_ws, "closed", False):
                        await log_message(station_id, directionReverse, "forward_messages:: Connection source_ws is really closed. Done.", connection_id)

        except Exception as e:
                log.error(
                        f"703: [{station_id}] Error in {direction} forwarding: {e}",
                        exc_info=True,
                )
        finally:
                if not getattr(source_ws, "closed", False):
                        await source_ws.close()
                if not getattr(target_ws, "closed", False):
                        await target_ws.close()

def log_headers(conn, request):
        # request ist ein websockets.http11.Request mit .headers
        for name, value in request.headers.items():
                if (name == "sec-websocket-key"):
                        logging.info(f"Handshake Header – {name}: {value}")


def _first_query_param(params: Mapping[str, list[str]], key: str) -> Optional[str]:
        values = params.get(key)
        if not values:
                return None
        return values[0]


def _parse_bool_param(value: Optional[str]) -> Optional[bool]:
        if value is None:
                return None
        normalized = value.strip().lower()
        if normalized in {"1", "true", "t", "yes", "y", "on"}:
                return True
        if normalized in {"0", "false", "f", "no", "n", "off"}:
                return False
        return None


def _parse_int_param(value: Optional[str]) -> Optional[int]:
        if value is None:
                return None
        try:
                return int(value)
        except (TypeError, ValueError):
                return None


def _resolve_ping_settings(params: Mapping[str, list[str]], entry: Mapping[str, Any]) -> tuple[bool, int]:
        ping_enabled_override = _parse_bool_param(_first_query_param(params, "ping_enabled"))
        ping_interval_override = _parse_int_param(_first_query_param(params, "ping"))

        entry_interval = _coerce_optional_int(entry.get("ping_interval"))
        entry_flag = entry.get("ping_enabled_flag")
        if not isinstance(entry_flag, bool):
                entry_flag = None

        if entry_interval is None:
                if entry_flag is not None:
                        entry_interval = DEFAULT_PING_INTERVAL if entry_flag else 0
                else:
                        entry_interval = DEFAULT_PING_INTERVAL

        ping_interval = entry_interval
        if ping_interval_override is not None:
                ping_interval = ping_interval_override

        if ping_enabled_override is not None:
                ping_enabled = ping_enabled_override
        elif ping_interval_override is not None:
                ping_enabled = ping_interval > 0
        elif entry_flag is not None:
                ping_enabled = entry_flag
        else:
                ping_enabled = ping_interval > 0

        if not ping_enabled:
                ping_interval = 0

        return ping_enabled, ping_interval


async def handle_client(websocket):

        raw_path = websocket.request.path
        parsed_path = urlparse(raw_path)
        path = parsed_path.path
        params = parse_qs(parsed_path.query)
        station_id = normalize_station_id(path)
        entry, entry_key = resolve_station_entry(path, station_id)
        if entry is None:
                entry = {}
        canonical_path = entry_key or path
        disconnect_tracker: dict[str, Optional[str]] = {"source": None}
        connect_event_logged = False
        disconnect_source_hint: Optional[str] = None
        _record_handshake_success(station_id)
        configured_subprotocol = normalize_configured_subprotocol(
                entry.get("configured_subprotocol")
        )
        if configured_subprotocol is None:
                configured_subprotocol = normalize_configured_subprotocol(entry.get("subprotocol"))
        original_configured_subprotocol = configured_subprotocol
        handshake_subprotocol = websocket.subprotocol
        inferred_subprotocol = infer_subprotocol_from_path(path)
        client_offered_subprotocols = getattr(
                websocket,
                "client_offered_subprotocols",
                None,
        )
        if client_offered_subprotocols is None:
                raw_header = None
                try:
                        raw_header = websocket.request.headers.get("Sec-WebSocket-Protocol")
                except Exception:
                        raw_header = None
                client_offered_subprotocols = _parse_subprotocol_header(raw_header)

        server_offered_subprotocols = getattr(
                websocket,
                "server_offered_subprotocols",
                SUPPORTED_SUBPROTOCOLS,
        )
        negotiation_failed = bool(
                getattr(websocket, "subprotocol_negotiation_failed", False)
        )
        rejection_reason = getattr(websocket, "subprotocol_rejection_reason", "")
        handshake_offered_by_listener = bool(
                handshake_subprotocol
                and server_offered_subprotocols
                and handshake_subprotocol in server_offered_subprotocols
        )
        if (
                handshake_subprotocol
                and original_configured_subprotocol
                and handshake_subprotocol != original_configured_subprotocol
        ):
                logging.warning(
                        "709-2: Handshake subprotocol %s differs from configured %s for %s — preferring negotiated value",
                        handshake_subprotocol,
                        original_configured_subprotocol,
                        station_id,
                        extra={"station_id": station_id},
                )
                if handshake_offered_by_listener:
                        configured_subprotocol = handshake_subprotocol
        if handshake_subprotocol is None:
                if client_offered_subprotocols and not rejection_reason:
                        rejection_reason = (
                                "No common WebSocket subprotocol after handshake "
                                f"(client offered: {client_offered_subprotocols}; "
                                f"server offered: {server_offered_subprotocols or '<none>'})"
                        )
                negotiation_failed = True
        else:
                negotiation_failed = False

        fallback_subprotocol = (
                configured_subprotocol
                or inferred_subprotocol
                or entry.get("subprotocol")
                or "ocpp1.6"
        )
        logging.info(
                "Subprotocol negotiation for %s: client_offered=%s, server_offered=%s, configured=%s, inferred=%s, selected=%s, fallback=%s",
                station_id or "<unknown>",
                client_offered_subprotocols or "<none>",
                server_offered_subprotocols or "<none>",
                configured_subprotocol or "<none>",
                inferred_subprotocol or "<none>",
                handshake_subprotocol or "<none>",
                fallback_subprotocol or "<none>",
                extra={"station_id": station_id},
        )

        if negotiation_failed:
                close_reason = rejection_reason or (
                        "No common WebSocket subprotocol "
                        f"(client offered: {client_offered_subprotocols or '<none>'}; "
                        f"server offered: {server_offered_subprotocols or '<none>'})"
                )
                close_reason = _trim_close_reason(close_reason)
                _record_handshake_failure("subprotocol_mismatch", station_id)
                logging.warning(
                        "Closing connection for %s due to subprotocol mismatch: %s (close_code=%s)",
                        station_id or "<unknown>",
                        close_reason,
                        SUBPROTOCOL_CLOSE_CODE,
                        extra={"station_id": station_id},
                )
                try:
                        await websocket.close(code=SUBPROTOCOL_CLOSE_CODE, reason=close_reason)
                except Exception:
                        logging.exception(
                                "Failed to close WebSocket after subprotocol negotiation failed for %s",
                                station_id,
                                extra={"station_id": station_id},
                        )
                return

        negotiated_subprotocol = handshake_subprotocol or fallback_subprotocol
        if configured_subprotocol is not None:
                entry["configured_subprotocol"] = configured_subprotocol
        if negotiated_subprotocol and entry.get("subprotocol") != negotiated_subprotocol:
                entry["subprotocol"] = negotiated_subprotocol
        ping_enabled, ping_interval = _resolve_ping_settings(params, entry)
        ping_timeout = DEFAULT_PING_TIMEOUT if ping_enabled else None

        logging.info("710: Checking Path: " + str(path), extra={"station_id": "system"})
        errorstep = 6000
        remote_ws: Optional[WebSocketClientProtocol] = None

        # If another connection for this station already exists, block until it closes
        existing_ws = ACTIVE_CLIENTS.get(station_id)
        if existing_ws is not None and existing_ws is not websocket and not getattr(existing_ws, "closed", False):
                logging.info(
                        f"709: Waiting for existing connection on {path} to close",
                        extra={"station_id": station_id},
                )
                try:
                        await log_message(station_id, "wait", "Existing connection active")
                except Exception:
                        pass
                try:
                        existing_ws.close()
                        await asyncio.wait_for(existing_ws.wait_closed(), timeout=EXISTING_WS_CLOSE_TIMEOUT)
                except asyncio.TimeoutError:
                        logging.warning(
                                f"709: Timeout waiting for existing connection on {path} to close",
                                extra={"station_id": station_id},
                        )
                finally:
                        if ACTIVE_CLIENTS.get(station_id) is existing_ws:
                                ACTIVE_CLIENTS.pop(station_id, None)
                logging.info(
                        f"709: Existing connection on {path} closed or timed out, continuing",
                        extra={"station_id": station_id},
                )

        for name, value in websocket.request.headers.items():
                logging.debug(f"200: {name}: {value}")

        # ------------------ Throttle: abbrechen, falls zu schnelles Reconnect ------------------
        if THROTTLE_ENABLED:
                last = LAST_DISCONNECT.get(station_id)
                if last:
                        # Unterschied in Sekunden: beide datetime-Objekte sollten tz-aware sein
                        delta = (datetime.now(timezone.utc) - last).total_seconds()
                        if delta < THROTTLE_SECONDS:
                                logging.warning(
                                        f"Throttled connection for {station_id}: nur {int(delta)} s seit letztem Disconnect",
                                        extra={"station_id": station_id}
                                )
                                # Logge in DB, dass die Verbindung throttled wurde
                                await log_message(station_id, "throttle",
                                                                  f"Connection throttled— erst nach {THROTTLE_SECONDS} s erneut verbinden")
                                return  # Verbindung ablehnen
        # ----------------------------------------------------------------------------------------

        errorstep = 610

        # Prüfen, ob path in STATION_PATHS ist
        activity = entry.get("activity")
        ws_url = entry.get("ws_url")
        if entry:
                # Prüfen, ob activity auf 'block' steht
                if activity and activity.lower() == 'block':
                        logging.warning(f"704: Blocked connection for path {path}", extra={"station_id": "system"})
                        # Logge in DB, dass die Verbindung abgelehnt wurde
                        await log_message(station_id, "block", "Connection blocked by activity setting")
                        return
        else:
                # 2) Falls nicht gefunden und Fallback aktiviert:
                if FALLBACK_ENABLED:
                        relative_path = path.lstrip("/")
                        ws_url = f"{FALLBACK_BASE_URL}/{relative_path}"
                        # Neuen Redirect-Eintrag in DB anlegen
                        asyncio.create_task(insert_redirect(path, ws_url))
                        logging.info(f"789: Fallback URL: {ws_url}, new redirect entry created", extra={"station_id": "system"})
                else:
                        logging.warning("704: Rejected connection: Invalid path", extra={"station_id": "system"})
                        return

        errorstep = 6111
        if ping_enabled:
                logging.info(
                        f"Ping interval negotiated for {station_id}: {ping_interval}s",
                        extra={"station_id": station_id},
                )
        else:
                logging.info(
                        f"Ping disabled for {station_id}",
                        extra={"station_id": station_id},
                )

        await update_last_connected(canonical_path)
        remote_ws_entry = entry if entry else {}
        if ws_url is not None and remote_ws_entry.get("ws_url") != ws_url:
                remote_ws_entry = {**remote_ws_entry, "ws_url": ws_url}
        remote_ws_url = remote_ws_entry
        backend_key = canonical_path

        now = datetime.now(timezone.utc)

        # RECONNECTION_COUNTER erhoehen
        if station_id in RECONNECTION_COUNTER:
                RECONNECTION_COUNTER[station_id] += 1
        else:
                RECONNECTION_COUNTER[station_id] = 1

        connection_id = RECONNECTION_COUNTER[station_id]
        _record_reconnect_event(station_id, now)
        ACTIVE_CONNECTION_IDS[station_id] = connection_id
        if negotiated_subprotocol:
                CONNECTION_SUBPROTOCOLS[(station_id, connection_id)] = negotiated_subprotocol

        logging.info(
                f"711: Negotiated OCPP subprotocol for {station_id}: {negotiated_subprotocol}",
                extra={"station_id": station_id},
        )
        try:
                await log_message(
                        station_id,
                        "client_to_server",
                        f"Negotiated OCPP subprotocol: {negotiated_subprotocol}",
                        connection_id,
                        topic_override="Subprotocol",
                )
        except Exception:
                logging.exception(
                        "Failed to persist negotiated subprotocol information for %s",
                        station_id,
                        extra={"station_id": station_id},
                )

        # update connection stats for new session
        stats = CONNECTION_STATS.setdefault(
                station_id,
                {
                        "pings_sent": 0,
                        "pings_failed": 0,
                        "rtt_total_ms": 0.0,
                        "rtt_samples": [],
                        "online_time": 0.0,
                        "offline_time": 0.0,
                        "last_state": "offline",
                        "last_state_ts": BROKER_START,
                },
        )
        if stats.get("last_state") == "offline":
                stats["offline_time"] += (now - stats.get("last_state_ts", now)).total_seconds()
        stats["last_state"] = "online"
        stats["last_state_ts"] = now

        logging.info(
                f"705: connecting {station_id} to remote WebSocket: {remote_ws_url}",
                extra={"station_id": station_id},
        )

        ACTIVE_CLIENTS[station_id] = websocket
        try:
                await log_cp_connect_event(station_id, "connect")
                connect_event_logged = True
        except Exception:
                logging.debug(
                        "Failed to persist connect log entry for %s",
                        station_id,
                        exc_info=True,
                )
        errorstep = 6201
        # Zeitpunkt merken, ab dem die Station verbunden ist
        CONNECTION_START_TIMES[station_id] = now
        LAST_CONNECT[station_id] = now
        create_background_task(
            notify_station_connected(station_id, canonical_path),
            name=f"notify-connected-{station_id}",
        )
        
        try:
           # 1) Parsen und loggen
                errorstep = 613         
                json_str = json.dumps(remote_ws_url)
                errorstep = 614                         
                ws_url = remote_ws_url["ws_url"]
                errorstep = 615
                parsed = urlparse(ws_url)
                errorstep = 616
                hostname = parsed.hostname
                port     = parsed.port or (443 if parsed.scheme == "wss" else 80)

                logging.info(
                        "723: ========= Versuche Verbindung zu %s://%s:%s%s =========",
                        parsed.scheme,
                        hostname,
                        port,
                        parsed.path,
                        extra={"station_id": station_id},
                )
                errorstep = 6201

                # --- Alle Handshake-Header loggen ---
                # Je nach WebSocket-Implementierung liegen sie in .request_headers oder .headers
                headers = websocket.request.headers
                # Jetzt jeden Header einzeln loggen
                if headers:
                        for name, value in headers.items():
                                if (name == "sec-websocket-key"):
                                        logging.info(
                                                f"2001: Handshake Header – {name}: {value}",
                                                extra={"station_id": station_id}
                                        )                               
                                if (name == "authorization"):
                                        logging.info(
                                                f"2001: Handshake Header – {name}: {value}",
                                                extra={"station_id": station_id}
                                        )                               
                                if (name == "authorization"):
                                        await log_message(station_id, "authorization", value)
                                
                
                # --- Authorization-Handling (Basic oder Token) ---
                incoming_key = None
                if hasattr(websocket, "request_headers"):
                        incoming_key = websocket.request.headers.get("authorization")
                elif hasattr(websocket, "headers"):
                        incoming_key = websocket.headers.get("authorization")

                incoming_basic = parse_basic_credentials(incoming_key)
                if incoming_key:
                        logging.info(
                                f"Received Authorization header from wallbox: {incoming_key}",
                                extra={"station_id": station_id},
                        )
                        if incoming_basic:
                                username, password = incoming_basic
                                logging.info(
                                        "Wallbox Basic credentials username='%s' password='%s'",
                                        username,
                                        password,
                                        extra={"station_id": station_id},
                                )

                db_key = remote_ws_url.get("auth_key")
                db_basic_user = remote_ws_url.get("backend_basic_user")
                db_basic_password = remote_ws_url.get("backend_basic_password")

                used_key, used_key_source, auth_headers = _select_backend_auth(
                        incoming_key,
                        db_key,
                        db_basic_user,
                        db_basic_password,
                        station_id,
                )

                await log_cp_auth_event(
                        station_id,
                        incoming_key,
                        incoming_basic,
                        used_key,
                        used_key_source,
                )

                if used_key_source == "incoming" and incoming_basic:
                        logging.info(
                                "Forwarding Basic credentials from wallbox to backend",
                                extra={"station_id": station_id},
                        )

                # 3) nur bei vorhandenem Key Auth-Header setzen
                connect_kwargs: dict[str, Any] = {}
                if negotiated_subprotocol:
                        connect_kwargs["subprotocols"] = [negotiated_subprotocol]
                if ping_enabled:
                        connect_kwargs.update({
                                "ping_interval": ping_interval,
                                "ping_timeout": ping_timeout,
                        })
                else:
                        connect_kwargs["ping_interval"] = None

                errorstep = 6202
                logging.info(
                        "Using backend subprotocol %s for %s (client_offered=%s, server_offered=%s)",
                        negotiated_subprotocol or "<none>",
                        station_id,
                        client_offered_subprotocols or "<none>",
                        server_offered_subprotocols or "<none>",
                        extra={"station_id": station_id},
                )

                async with _backend_websocket_connection(
                        ws_url,
                        connect_kwargs,
                        auth_headers,
                        station_id,
                ) as remote_ws:
                        ACTIVE_BACKEND_CONNECTIONS[backend_key] = remote_ws
                        errorstep = 621
                        logging.info(
                                "706: Connection established with Server.",
                                extra={"station_id": station_id, "system": "server-backend"},
                        )
                        # await update_last_connected(station_id)
                        errorstep = 622                 
                        await log_message(station_id, "Connected", f"WebSocket Connection established with Server Client", connection_id)

                        transport = remote_ws.transport
                        peer = transport.get_extra_info("peername")     # (host, port) des Peers
                        sock = transport.get_extra_info("sockname")     # (host, port) deines Sockets
                        logging.info(
                                f"Peer: {peer}, Sock: {sock}",
                                extra={"system": "server-backend"},
                        )
                        
                        transportInbound = websocket.transport
                        peerInbound = transportInbound.get_extra_info("peername")       # (host, port) des Peers
                        sockInbound = transportInbound.get_extra_info("sockname")       # (host, port) deines Sockets
                        logging.info(
                                f"transportInbound: {peerInbound}, Sock: {sockInbound}",
                                extra={"system": "client-server"},
                        )
                                        
                        # await log_message(station_id, "Connected", "WebSocket Connection established with Server Client: " + websocket.remote_address)
                        await log_message(station_id, "Connected", f"WebSocket Connection: System to Backend: Peer: {peer}, Sock: {sock}", connection_id)
                        await log_message(station_id, "Connected", f"WebSocket Connection: Wallbox To System: Peer: {peerInbound}, Sock: {sockInbound}", connection_id)
                                                
                        errorstep = 623
                        # Run both message forwarding and keep-alive tasks concurrently
                        measure_flag = entry.get("measure_ping", False)
                        tasks = [
                                asyncio.create_task(
                                        forward_messages(
                                                websocket,
                                                remote_ws,
                                                station_id,
                                                "client_to_server",
                                                connection_id,
                                                disconnect_tracker,
                                        )
                                ),
                                asyncio.create_task(
                                        forward_messages(
                                                remote_ws,
                                                websocket,
                                                station_id,
                                                "server_to_client",
                                                connection_id,
                                                disconnect_tracker,
                                        )
                                ),
                        ]
                        if ping_enabled:
                                tasks.extend([
                                        asyncio.create_task(
                                                keep_alive(
                                                        websocket,
                                                        remote_ws,
                                                        "client_to_server",
                                                        station_id,
                                                        connection_id,
                                                        measure_flag,
                                                        interval=ping_interval,
                                                        timeout=ping_timeout,
                                                )
                                        ),
                                        asyncio.create_task(
                                                keep_alive(
                                                        remote_ws,
                                                        websocket,
                                                        "server_to_client",
                                                        station_id,
                                                        connection_id,
                                                        measure_flag,
                                                        interval=ping_interval,
                                                        timeout=ping_timeout,
                                                )
                                        ),
                                ])
                        
                        # Wait until one of the tasks fails or completes, then cancel the rest
                        done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)

                        for task in pending:
                                task.cancel()

                        for task in done:
                                try:
                                        task.result()
                                except Exception as e:
                                        logging.error(
                                                f"Task raised an exception: {e}",
                                                extra={"station_id": station_id},
                                                exc_info=True,
                                        )

                        await asyncio.gather(*pending, return_exceptions=True)
        
                errorstep = 630
        
        except BackendConnectionError as exc:
                disconnect_source_hint = disconnect_source_hint or "server"
                logging.error(
                        "706: Backend unreachable after %s attempts (timeout=%ss, backoff=%ss): %s",
                        BACKEND_CONNECT_RETRIES,
                        BACKEND_CONNECT_TIMEOUT,
                        BACKEND_CONNECT_BACKOFF_SECONDS,
                        exc,
                        extra={"station_id": station_id, "system": "server-backend"},
                )
                await log_message(
                        station_id,
                        "backend_unreachable",
                        f"Backend connection failed after {BACKEND_CONNECT_RETRIES} attempts",
                        connection_id,
                )
                try:
                        await websocket.close(
                                code=1011,
                                reason="Backend unreachable after retries",
                        )
                except Exception:
                        logging.debug(
                                "Failed to send graceful close to client while backend unreachable",
                                exc_info=True,
                                extra={"station_id": station_id},
                        )
        except websockets.exceptions.ConnectionClosed as e:
                if disconnect_source_hint is None:
                        disconnect_source_hint = "chargepoint"
                logging.warning(
                        f"707: WebSocket closed: {e.code} - {e.reason}",
                        extra={"station_id": station_id},
                )
                await log_message(
                        station_id,
                        "Closed",
                        f"WebSocket closed: {e.code} - {e.reason}",
                        connection_id,
                )
                await asyncio.sleep(60)  # Wait before retrying
                # Note: With a closed websocket, the client should typically reconnect.
        except Exception as e:
                logging.error(
                        f"713: WebSocket Error (" + str(errorstep) + "): " + str(e),
                        extra={"station_id": station_id},
                )
                await log_message(
                        station_id,
                        "Exception",
                        f"WebSocket rejected ({errorstep}): {e}",
                        connection_id,
                )
        finally:
                # Merke hier den Disconnect-Zeitpunkt für Throttling
                disconnect_time = datetime.now(timezone.utc)
                try:
                        LAST_DISCONNECT[station_id] = disconnect_time
                except Exception:
                        pass
                if entry.get("strict_availability"):
                        station_id_short = station_id.rsplit('/', 1)[-1]
                        create_background_task(
                                _mark_strict_availability_disconnect(station_id_short),
                                name=f"strict-availability-mark-{station_id_short}",
                        )
                schedule_disconnect_notification(station_id, canonical_path, disconnect_time)
                stats = CONNECTION_STATS.get(station_id)
                now = datetime.now(timezone.utc)
                if stats is not None and stats.get("last_state") == "online":
                        stats["online_time"] += (now - stats.get("last_state_ts", now)).total_seconds()
                        stats["last_state"] = "offline"
                        stats["last_state_ts"] = now
                if (
                        ACTIVE_CLIENTS.get(station_id) is websocket
                        or ACTIVE_CONNECTION_IDS.get(station_id) == connection_id
                ):
                        ACTIVE_CLIENTS.pop(station_id, None)
                        CONNECTION_START_TIMES.pop(station_id, None)
                        ACTIVE_CONNECTION_IDS.pop(station_id, None)
                        CONNECTION_SUBPROTOCOLS.pop((station_id, connection_id), None)
                        LAST_SEEN.pop(station_id, None)
                        ACTIVE_BACKEND_CONNECTIONS.pop(backend_key, None)
                if remote_ws is not None and not getattr(remote_ws, "closed", False):
                        logging.info(
                                "Closing remote websocket in handle_client",
                                extra={
                                        "station_id": station_id,
                                        "system": "server-backend",
                                },
                        )
                        await remote_ws.close()
                disconnect_source = disconnect_tracker.get("source") or disconnect_source_hint
                if disconnect_source not in {"chargepoint", "server"}:
                        disconnect_source = None
                if connect_event_logged:
                        try:
                                await log_cp_connect_event(
                                        station_id, "disconnect", disconnect_source
                                )
                        except Exception:
                                logging.debug(
                                        "Failed to persist disconnect log entry for %s",
                                        station_id,
                                        exc_info=True,
                                )
                
async def send_reboot_request(station_id: str) -> str:
        """
        Baut und verschickt einen OCPP Reset-Call (Hard Reset)
        an die Wallbox mit der gegebenen station_id.
        Returns die uniqueId des Calls.
        """
        ws = ACTIVE_CLIENTS.get(station_id)
        if ws is None or getattr(ws, "closed", False):
                raise RuntimeError(f"Keine aktive Verbindung für Station {station_id}")

        unique_id = str(uuid.uuid4())
        PENDING_REBOOT.add(unique_id)

        # Soft or Hard reset? Adjust as needed.
        payload = {"type": "Hard"}

        message = json.dumps([2, unique_id, "Reset", payload])
        await ws.send(message)
        logging.info(
                f"Gesendet Reset an {station_id}: {message}",
                extra={"station_id": station_id}
        )
        return unique_id


async def disconnect_station(station_id: str) -> bool:
        """Close the websocket connection for ``station_id`` if present."""
        ws = ACTIVE_CLIENTS.get(station_id)
        if ws is None or getattr(ws, "closed", False):
                return False
        await ws.close()
        ACTIVE_CLIENTS.pop(station_id, None)
        CONNECTION_START_TIMES.pop(station_id, None)
        ACTIVE_CONNECTION_IDS.pop(station_id, None)
        for key in list(CONNECTION_SUBPROTOCOLS.keys()):
                if key[0] == station_id:
                        CONNECTION_SUBPROTOCOLS.pop(key, None)
        LAST_DISCONNECT[station_id] = datetime.now(timezone.utc)
        return True


async def send_diagnostics_request(
        station_id: str, location_override: str | None = None
) -> tuple[str, str]:
        """
        Baut und verschickt einen OCPP GetDiagnostics-Call
        an die Wallbox mit der gegebenen station_id.
        Returns die uniqueId sowie die effektive Ziel-URL.
        """
        ws = ACTIVE_CLIENTS.get(station_id)
        if ws is None or getattr(ws, "closed", False):
                raise RuntimeError(f"738: Keine aktive Verbindung für Station {station_id}")

        unique_id = f"{uuid.uuid4()}"
        PENDING_DIAGNOSTICS.add(unique_id)

        if location_override:
                target_location = str(location_override)
        else:
                ftp_host = DIAG_FTP_HOST or "217.160.79.201"
                ftp_port = DIAG_FTP_PORT if isinstance(DIAG_FTP_PORT, int) else 21
                if ftp_port and ftp_port != 21:
                        host_part = f"{ftp_host}:{ftp_port}"
                else:
                        host_part = ftp_host

                if DIAG_FTP_USER and DIAG_FTP_PASSWORD:
                        userinfo = f"{quote(str(DIAG_FTP_USER), safe='')}:{quote(str(DIAG_FTP_PASSWORD), safe='')}@"
                elif DIAG_FTP_USER:
                        userinfo = f"{quote(str(DIAG_FTP_USER), safe='')}@"
                else:
                        userinfo = ""

                target_location = f"ftp://{userinfo}{host_part}/"

        payload = {
                "location": target_location,
                "retries": 1,
                "retryInterval": 60,
        }
        message = json.dumps([2, unique_id, "GetDiagnostics", payload])

        await ws.send(message)
        logging.info(
                "743: Gesendet GetDiagnostics an %s (location=%s): %s",
                station_id,
                target_location,
                message,
                extra={"station_id": station_id},
        )
        return unique_id, target_location

async def send_diagnosticsHTTP_request(station_id: str) -> str:
        """
        Baut und verschickt einen OCPP send_diagnosticsHTTP_request-Call
        an die Wallbox mit der gegebenen station_id.
        Returns die uniqueId, die im Call verwendet wurde.
        """
        ws = ACTIVE_CLIENTS.get(station_id)
        # print(ws.__dict__)
        # if not ws or ws.closed:
        if ws is None or getattr(ws, "closed", False):
                raise RuntimeError(f"738: Keine aktive Verbindung für Station {station_id}")

        # uniqueId à la "Station@UUID"
        # unique_id = f"{station_id}@{uuid.uuid4()}"
        unique_id = f"{uuid.uuid4()}"
        PENDING_DIAGNOSTICS.add(unique_id)
        
        # "location": "ftp://diag:rem0tec0nnect2026x@217.160.79.201/home/diag",
        payload = {
                "location": "https://upload.electrified.de/upload_diagnostic",
                "retries": 1,
                "retryInterval": 60
        }
        # Der OCPP-Call: [2, uniqueId, Action, Payload]
        message = json.dumps([2, unique_id, "GetDiagnostics", payload])

        await ws.send(message)
        logging.info(f"743: Gesendet GetDiagnostics an {station_id}: {message}",
                                 extra={"station_id": station_id})
        return unique_id

        
def _build_process_request_handler(allow_api_requests: bool):
        async def _handler(path, request_headers):
                return await process_request(
                        path,
                        request_headers,
                        allow_api_requests=allow_api_requests,
                )

        return _handler


async def _handle_http_api_request(path: str, parsed: ParseResult):
        # http://demo.ocpp-proxy.de/refresh
        if path == "/refreshStationList":
                logging.info("721: Refresh Station List")
                await load_wallboxes_from_db()
                return http_response(HTTPStatus.OK, b"done.\n")

        if path == "/healthCheck-01":
                logging.info("722: Health Check")
                return http_response(HTTPStatus.OK, b"alive.\n")

        if path == "/brokerStatus":
                logging.info("722: Broker status requested")
                attempt_iso = (
                        LAST_MYSQL_CHECK_ATTEMPT.isoformat().replace("+00:00", "Z")
                        if LAST_MYSQL_CHECK_ATTEMPT
                        else None
                )
                payload = {
                        "status": "online",
                        "mysql": {
                                "last_check_attempt": attempt_iso,
                                "last_check_success": LAST_MYSQL_CHECK_SUCCESS,
                        },
                }
                body = json.dumps(payload).encode("utf-8")
                return http_response(
                        HTTPStatus.OK,
                        body,
                        "application/json; charset=utf-8",
                )

        if path == "/api/db_pool_metrics":
                logging.info("db_pool_metrics requested")
                metrics = collect_db_pool_metrics()
                body = json.dumps(metrics, ensure_ascii=False).encode("utf-8")
                return http_response(
                        HTTPStatus.OK,
                        body,
                        "application/json; charset=utf-8",
                )

        if path == "/connectionStats":
                params = parse_qs(parsed.query)
                include_total = params.get("total", ["1"])[0] != "0"
                station_filter = {
                        s for s in (params.get("station_id") or []) if isinstance(s, str) and s
                } or None
                payload = build_connection_stats_payload(
                        station_filter, datetime.now(timezone.utc), include_total=include_total
                )
                body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
                return http_response(
                        HTTPStatus.OK,
                        body,
                        "application/json; charset=utf-8",
                )

        if path == "/api/reconnectStats":
                logging.info("Reconnect stats requested")
                payload = build_reconnect_stats_payload(datetime.now(timezone.utc))
                body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
                return http_response(
                        HTTPStatus.OK,
                        body,
                        "application/json; charset=utf-8",
                )

        if path.startswith("/connectedWallboxes"):
                logging.info("723: Connected Boxes Request")
                ccount = len(STATION_PATHS)
                connected_count = len(ACTIVE_CLIENTS)
                connected_ids = {
                        extract_chargepoint_id(station_id)
                        for station_id in ACTIVE_CLIENTS.keys()
                }

                wallboxes: list[dict[str, Any]] = []
                known_ids: set[str] = set()

                for source_url, _entry in STATION_PATHS.items():
                        station_key = normalize_station_id(source_url)
                        station_short = extract_chargepoint_id(source_url)
                        known_ids.add(station_short)

                        connectors = _build_connectors_payload(station_short)

                        wallboxes.append(
                                {
                                        "stationId": station_short,
                                        "path": source_url,
                                        "connected": station_key in ACTIVE_CLIENTS,
                                        "connectors": connectors,
                                }
                        )

                extra_ids = (
                        set(LATEST_CONNECTOR_STATUS.keys())
                        | set(LATEST_CONNECTOR_METERS.keys())
                ) - known_ids

                for station_short in sorted(extra_ids):
                        connectors = _build_connectors_payload(station_short)

                        wallboxes.append(
                                {
                                        "stationId": station_short,
                                        "path": None,
                                        "connected": station_short in connected_ids,
                                        "connectors": connectors,
                                }
                        )

                payload = {
                        "entries": ccount,
                        "connected": connected_count,
                        "wallboxes": wallboxes,
                }
                body = json.dumps(payload, ensure_ascii=False)

                return http_response(
                        HTTPStatus.OK,
                        (body + "\n").encode("utf-8"),
                        "application/json; charset=utf-8",
                )

        if path.startswith("/websocketCounts"):
                counts = get_active_websocket_counts()
                mybody = json.dumps(counts)
                return http_response(
                        HTTPStatus.OK,
                        (mybody + "\n").encode("utf-8"),
                        "application/json",
                )

        if path.startswith("/mysqlPoolStatus"):
                pool_status = get_mysql_pool_status()
                mybody = json.dumps(pool_status)
                return http_response(
                        HTTPStatus.OK,
                        (mybody + "\n").encode("utf-8"),
                        "application/json",
                )

        if path.startswith("/firmware/"):
                filename = path.removeprefix("/firmware/")
                safe_root = FIRMWARE_DIR.resolve()
                file_path = (safe_root / filename).resolve()
                logging.info("FW-Update: safe_root: %s", safe_root)
                logging.info("FW-Update: file_path: %s", file_path)

                try:
                        if not file_path.is_file() or safe_root not in file_path.parents:
                                raise FileNotFoundError
                        with open(file_path, "rb") as f:
                                content = f.read()
                except FileNotFoundError:
                        logging.info("Requested firmware not found: %s", filename)
                        return http_response(HTTPStatus.NOT_FOUND, b"File not found\n")

                return http_response(
                        HTTPStatus.OK,
                        content,
                        "application/octet-stream",
                )

        logging.info("729: Reply to non-websocket: %s", path)
        return http_response(HTTPStatus.OK, b"\n")


async def process_request(path, request_headers, *, allow_api_requests: bool = True):
        """
        Fängt alle HTTP-Requests ab, bevor das WebSocket-Handshake-Modul eingreift:
          - Keine Upgrade-Header → 200 OK mit einfachem Text
          - Ungültiger Station-Pfad   → 404 Not Found
          - Sonst None → websockets macht normalen WebSocket-Handshake
        """
        headers = request_headers
        parsed = urlparse(path)

        path = parsed.path
        logging.info("704-1: Web-Request: \"" + path + "\"")

        upgrade_hdr = headers.get("Upgrade")
        is_websocket_upgrade = (upgrade_hdr or "").lower() == "websocket"

        if not allow_api_requests and not is_websocket_upgrade:
                logging.info(
                        "API request rejected on non-API listener: %s",
                        path,
                        extra={"station_id": "system"},
                )
                return http_response(
                        HTTPStatus.NOT_FOUND,
                        b"API endpoints are only available on the API port\n",
                )

        # Ignore common browser file requests like /favicon.ico and respond
        # with a simple 404 without triggering WebSocket errors.
        if path == "/favicon.ico":
                return (
                        HTTPStatus.NOT_FOUND,
                        [("Content-Type", "text/plain; charset=utf-8")],
                        b"",
                )

        if path.startswith("/setRedirect"):  # Beispiel: /setRedirect?source_url=/ocpp16/XYZ&activity=block&ws_url=ws://new
                query = parse_qs(parsed.query)
                source_url = query.get('source_url', [None])[0]
                new_activity = query.get('activity', [None])[0]
                new_ws_url = query.get('ws_url', [None])[0]
                new_measure = query.get('measure_ping', [None])[0]
                new_mqtt = query.get('mqtt_enabled', [None])[0]
                new_strict = query.get('strict_availability', [None])[0]
                new_charging = query.get('charging_analytics', [None])[0]
                new_subprotocol = None
                for key in ("ocpp_subprotocol", "ocpp_version", "subprotocol"):
                        values = query.get(key)
                        if values:
                                new_subprotocol = values[0]
                                break
                backend_user_vals = query.get('backend_user')
                backend_pass_vals = query.get('backend_password')
                backend_user = backend_user_vals[0] if backend_user_vals is not None else None
                backend_password = backend_pass_vals[0] if backend_pass_vals is not None else None
                if not source_url:
                        body = json.dumps({"error": "Missing source_url"}).encode('utf-8')
                        return HTTPStatus.BAD_REQUEST, [("Content-Type", "application/json")], body
                await update_redirect(
                        source_url,
                        new_ws_url,
                        new_activity,
                        new_measure,
                        new_mqtt,
                        backend_user,
                        backend_password,
                        new_strict_availability=new_strict,
                        new_charging_analytics=new_charging,
                        new_subprotocol=new_subprotocol,
                )
                if new_activity and new_activity.lower() == 'block':
                        sid = normalize_station_id(source_url)
                        client_ws = ACTIVE_CLIENTS.get(sid)
                        if client_ws:
                                logging.info(
                                        f"Closing client websocket for {source_url} due to block",
                                        extra={
                                                "station_id": "system",
                                                "system": "client-server",
                                        },
                                )
                                await client_ws.close()
                                ACTIVE_CLIENTS.pop(sid, None)
                                logging.info(f"Closed connection for {source_url} due to block", extra={"station_id": "system"})
                body = json.dumps({"result": "ok"}).encode('utf-8')
                return HTTPStatus.OK, [("Content-Type", "application/json")], body
        
        if path == '/getConnecteEVSE':
                logging.info ("704-3a: getConnecteEVSE requested")
                # logging.info (        request_headers.get("Upgrade", "").lower() )
                                
                now = datetime.now(timezone.utc)
                connected: list[dict] = []
                for station_id in ACTIVE_CLIENTS.keys():
                        start = CONNECTION_START_TIMES.get(station_id)
                        reconnectCounter = RECONNECTION_COUNTER.get(station_id, 0)
                        connection_quality = calculate_connection_quality(station_id, now)
                        connection_id = ACTIVE_CONNECTION_IDS.get(station_id)
                        negotiated_protocol = get_connection_protocol(
                                station_id, connection_id
                        )

                        if start:
                                delta = now - start
                                connected.append({
                                        "station_id": station_id,
                                        "connected_since": start.isoformat(),
                                        "duration_seconds": int(delta.total_seconds()),
                                        "duration": str(delta),
                                        "reconnectCounter": reconnectCounter,
                                        "connectionQuality": connection_quality,
                                        "ocpp_subprotocol": negotiated_protocol,
                          })
                        else:
                                connected.append({
                                        "station_id": station_id,
                                        "connected_since": None,
                                        "duration_seconds": None,
                                        "duration": None,
                                        "reconnectCounter": reconnectCounter,
                                        "connectionQuality": connection_quality,
                                        "ocpp_subprotocol": negotiated_protocol,
                          })

                disconnected: list[dict] = []
                for station_id, start in LAST_CONNECT.items():
                        if station_id in ACTIVE_CLIENTS:
                                continue
                        disconnect_time = LAST_DISCONNECT.get(station_id)
                        reconnectCounter = RECONNECTION_COUNTER.get(station_id, 0)
                        ref_time = disconnect_time or now
                        connection_quality = calculate_connection_quality(station_id, ref_time)

                        if start and disconnect_time:
                                delta = disconnect_time - start
                                disconnected.append({
                                        "station_id": station_id,
                                        "connected_since": start.isoformat(),
                                        "duration_seconds": int(delta.total_seconds()),
                                        "duration": str(delta),
                                        "reconnectCounter": reconnectCounter,
                                        "connectionQuality": connection_quality,
                                        "disconnected_since": disconnect_time.isoformat(),
                          })
                        else:
                                disconnected.append({
                                        "station_id": station_id,
                                        "connected_since": start.isoformat() if start else None,
                                        "duration_seconds": None,
                                        "duration": None,
                                        "reconnectCounter": reconnectCounter,
                                        "connectionQuality": connection_quality,
                                        "disconnected_since": disconnect_time.isoformat() if disconnect_time else None,
                          })

                mybody = json.dumps({"connected": connected, "disconnected": disconnected}).encode('utf-8')
                                        
                return http_response(
                        HTTPStatus.OK,
                        mybody,
                        "application/json; charset=utf-8",
                )
                        
        # elif parsed.path == '/getConnectedDevices':
        #       connected = list(ACTIVE_CLIENTS.keys())
        #       body = json.dumps({"connected": connected}).encode('utf-8')
        #       return HTTPStatus.OK, [("Content-Type", "application/json")], body
        # Keine Behandlung: Weiter zur WebSocket Handshake
        # return None
                                
        # Startseite
        if path == "/":
                logging.info("755: / Homepage requested.")
                return http_response(HTTPStatus.OK, b".\n")

        # Expose resource limits and websocket usage
        # Response shape:
        # {
        #   "ulimit_no_file_soft": <int|null>,
        #   "ulimit_no_file_hard": <int|null>,
        #   "wallbox_to_broker": <int>,
        #   "broker_to_backend": <int>,
        #   "estimated_fd_usage": <int>,
        #   "estimated_free": <int|null>,
        #   "estimation_note": <str>
        # }
        if path == "/system/limits":
                logging.info("System limits requested")
                status = get_system_limit_status()
                body = json.dumps(status).encode("utf-8")
                return http_response(
                        HTTPStatus.OK,
                        body,
                        "application/json; charset=utf-8",
                )

        if path == "/robots.txt":
                logging.info("756: /robots.txt requested.")
                return http_response(
                        HTTPStatus.OK,
                        b"User-agent: *:\nDisallow: /\n",
                )

        # Reboot (Reset) per HTTP triggern
        if path.startswith("/rebootChargePoint/"):
                station_id = normalize_station_id(path.removeprefix("/rebootChargePoint"))
                logging.info(
                        f"rebootChargePoint requested: {station_id}",
                        extra={"station_id": "system"}
                )

                if station_id not in ACTIVE_CLIENTS:
                        return (
                                HTTPStatus.NOT_FOUND,
                                [("Content-Type", "text/plain; charset=utf-8")],
                                b"Station nicht verbunden\n",
                        )

                # Fire-and-forget
                asyncio.create_task(send_reboot_request(station_id))

                return (
                        HTTPStatus.OK,
                        [("Content-Type", "text/plain; charset=utf-8")],
                        b"Reset-Aufruf gestartet\n",
                )
                
        # Firmware-Update per HTTP triggern
        if path.startswith("/updateFirmware/"):
                station_id = normalize_station_id(path.removeprefix("/updateFirmware"))
                params = parse_qs(parsed.query)
                location = params.get("location", [None])[0]
                logging.info("updateFirmware requested: " + station_id)
                if not location:
                        return http_response(
                                HTTPStatus.BAD_REQUEST,
                                b"Parameter 'location' fehlt\n",
                        )
                if station_id not in ACTIVE_CLIENTS:
                        return http_response(
                                HTTPStatus.NOT_FOUND,
                                b"Station nicht verbunden\n",
                        )
                # Fire-and-forget
                asyncio.create_task(send_update_firmware_request(station_id, location))
                return http_response(
                        HTTPStatus.OK,
                        f"UpdateFirmware-Aufruf gestartet: {station_id}\n".encode("utf-8"),
                )
                                        
        if path.startswith("/getDiagnostics/"):
                # station_id = path.split("/")[2]
                station_id = normalize_station_id(path.removeprefix("/getDiagnostics"))
                logging.info("740: /getDiagnostics requested: " + str(station_id))
                if station_id not in ACTIVE_CLIENTS:
                        logging.info("741: Station not connected: " + str(station_id))
                        return http_response(
                                HTTPStatus.NOT_FOUND,
                                b"Station nicht verbunden\n",
                        )

                # Fire-and-forget
                asyncio.create_task(send_diagnostics_request(station_id))
                return http_response(
                        HTTPStatus.OK,
                        b"GetDiagnostics-Aufruf gestartet\n",
                )

        # --- Ab hier Disconnect-Endpoint ergänzen ---
        # curl http://217.154.74.195/disconnectChargePoint/7000641623000473
        if path.startswith("/disconnectChargePoint/"):
                # Station-ID aus der URL extrahieren
                station_id = normalize_station_id(path.removeprefix("/disconnectChargePoint"))
                logging.info(f"disconnectChargePoint requested: {station_id}", extra={"station_id": "system"})
                if not await disconnect_station(station_id):
                        return http_response(
                                HTTPStatus.NOT_FOUND,
                                b"Station nicht verbunden\n",
                        )
                return http_response(
                        HTTPStatus.OK,
                        b"Chargepoint getrennt\n",
                )
        # --- Ende Disconnect-Endpoint ---

        if path.startswith("/getDiagnosticsHTTP/"):
                # station_id = path.split("/")[2]
                station_id = normalize_station_id(path.removeprefix("/getDiagnosticsHTTP"))
                logging.info("740: /getDiagnosticsHTTP requested: " + str(station_id))
                if station_id not in ACTIVE_CLIENTS:
                        logging.info("741: Station not connected: " + str(station_id))
                        return http_response(
                                HTTPStatus.NOT_FOUND,
                                b"Station nicht verbunden\n",
                        )
                # Fire-and-forget
                asyncio.create_task(send_diagnosticsHTTP_request(station_id))
                return http_response(
                        HTTPStatus.OK,
                        b"GetDiagnostics-Aufruf gestartet\n",
                )

        # ACTIVE_CLIENTS
        if path.startswith("/getConnectedDevices"):
                logging.info("731: /getConnectedDevices requested")
                now = datetime.now(timezone.utc)
                devices: list[dict] = []
                for station_id, ws in ACTIVE_CLIENTS.items():
                        start = CONNECTION_START_TIMES.get(station_id)
                        display_id = extract_chargepoint_id(station_id)
                        info = {
                                "station_id": display_id,
                                "path": ws.request.path,
                        }
                        if start:
                                delta = now - start
                                info.update(
                                        {
                                                "connected_since": start.isoformat(),
                                                "duration_seconds": int(delta.total_seconds()),
                                                "duration": str(delta),
                                        }
                                )
                        else:
                                info.update(
                                        {
                                                "connected_since": None,
                                                "duration_seconds": None,
                                                "duration": None,
                                        }
                                )
                        info["connectors"] = _build_connectors_payload(display_id)
                        devices.append(info)

                mybody = json.dumps({"connected": devices})
                logging.info("731-2: " + mybody)

                return http_response(
                        HTTPStatus.OK,
                        (mybody + "\n").encode("utf-8"),
                        "application/json; charset=utf-8",
                )

        if path.startswith("/activeSessions"):
                logging.info("/activeSessions requested")
                now = datetime.now(timezone.utc)
                sessions: list[dict] = []
                for tx_id, info in list(ACTIVE_TRANSACTIONS.items()):
                        meter_start_value = _safe_float(info.get("meterStartWh"))
                        meter_last_value = _safe_float(info.get("meterLastWh"))

                        entry = {
                                "transactionId": info.get("transactionId", tx_id),
                                "stationId": info.get("stationId"),
                                "connectorId": info.get("connectorId"),
                                "idToken": info.get("idToken"),
                                "sessionStartTimestamp": info.get("sessionStartTimestamp"),
                        }
                        if meter_start_value is not None:
                                entry["meterStartWh"] = meter_start_value

                        station_short = info.get("stationId")
                        meter_details: Optional[Mapping[str, Any]] = None
                        if isinstance(station_short, str):
                                station_meters = LATEST_CONNECTOR_METERS.get(station_short, {})
                                if station_meters:
                                        normalised: dict[ConnectorKey, dict[str, Any]] = {}
                                        for raw_key, meter_payload in station_meters.items():
                                                norm_key = _normalize_connector_key(raw_key)
                                                key: Optional[ConnectorKey]
                                                if norm_key is not None:
                                                        key = norm_key
                                                elif raw_key is not None:
                                                        key = raw_key
                                                else:
                                                        key = None
                                                if key is None:
                                                        continue
                                                normalised[key] = meter_payload

                                        connector_candidate = _normalize_connector_key(info.get("connectorId"))
                                        evse_candidate = _normalize_connector_key(info.get("evseId"))
                                        candidates: list[ConnectorKey] = []
                                        if connector_candidate is not None:
                                                candidates.append(connector_candidate)
                                        if (
                                                evse_candidate is not None
                                                and evse_candidate not in candidates
                                        ):
                                                candidates.append(evse_candidate)

                                        for candidate in candidates:
                                                if candidate in normalised:
                                                        meter_details = normalised[candidate]
                                                        break

                                        if meter_details is None and len(normalised) == 1:
                                                meter_details = next(iter(normalised.values()))

                        if isinstance(meter_details, Mapping):
                                power_value: Optional[float] = None
                                phases_payload: dict[str, float] = {}

                                raw_power = meter_details.get("Power.Active.Import")
                                if isinstance(raw_power, Mapping):
                                        for phase_key, phase_value in raw_power.items():
                                                val = _safe_float(phase_value)
                                                if val is None:
                                                        continue
                                                phases_payload[str(phase_key)] = val
                                else:
                                        power_value = _safe_float(raw_power)

                                raw_phases = meter_details.get("Power.Active.Import.Phases")
                                if isinstance(raw_phases, Mapping):
                                        for phase_key, phase_value in raw_phases.items():
                                                val = _safe_float(phase_value)
                                                if val is None:
                                                        continue
                                                phases_payload[str(phase_key)] = val

                                energy_value = meter_details.get("Energy.Active.Import.Register")
                                if meter_last_value is None:
                                        meter_last_value = _safe_float(energy_value)

                                if power_value is not None:
                                        entry["powerActiveImport"] = power_value
                                if phases_payload:
                                        entry["powerActiveImportPhases"] = phases_payload

                        if meter_last_value is not None:
                                entry["meterLastWh"] = meter_last_value

                        if (
                                meter_start_value is not None
                                and meter_last_value is not None
                                and meter_last_value >= meter_start_value
                        ):
                                entry["sessionEnergyDeltaWh"] = (
                                        meter_last_value - meter_start_value
                                )

                        start_ts = info.get("sessionStartTimestamp")
                        if start_ts:
                                try:
                                        start_dt = datetime.fromisoformat(start_ts.replace("Z", "+00:00"))
                                        delta = now - start_dt
                                        entry["durationSeconds"] = int(delta.total_seconds())
                                        entry["duration"] = str(delta)
                                except ValueError:
                                        entry["durationSeconds"] = None
                                        entry["duration"] = None
                        else:
                                entry["durationSeconds"] = None
                                entry["duration"] = None

                        sessions.append(entry)

                body = json.dumps({"sessions": sessions})
                return http_response(
                        HTTPStatus.OK,
                        (body + "\n").encode("utf-8"),
                        "application/json; charset=utf-8",
                )

        if not is_websocket_upgrade:
                logging.info("719: Connect on a non-websocket: %s", path)

                for header, value in headers.items():
                        if header == "host":
                                logging.info("778: host: %s", value)

                return await _handle_http_api_request(path, parsed)

        # 2) Nur Pfade zulassen, die wir aus der DB geladen haben
        if resolve_station_entry(path)[0] is None:
                logging.info("718: Rejected Device: " + str(path))

                if FALLBACK_ENABLED:
                        relative_path = path.lstrip("/")
                        ws_url = f"{FALLBACK_BASE_URL}/{relative_path}"
                        # Neuen Redirect-Eintrag in DB anlegen
                        asyncio.create_task(insert_redirect(path, ws_url))
                        logging.info(f"789-2: Fallback URL: {ws_url}, new redirect entry created", extra={"station_id": "system"})
                
                _record_handshake_failure("invalid_path", normalize_station_id(path))
                return http_response(
                        HTTPStatus.NOT_FOUND,
                        b"Station nicht gefunden\n",
                )

        # 3) Für echte WebSocket-Upgrades weiter zum Handshake
        station_id = normalize_station_id(path)
        existing_ws = ACTIVE_CLIENTS.get(station_id)
        if existing_ws is not None and not getattr(existing_ws, "closed", False):
                logging.info(
                        f"789-1: Waiting for existing connection on {path} to close",
                        extra={"station_id": station_id},
                )
                try:
                        await log_message(station_id, "wait", "Existing connection active")
                except Exception:
                        pass
                try:
                        existing_ws.close()
                        await asyncio.wait_for(existing_ws.wait_closed(), timeout=EXISTING_WS_CLOSE_TIMEOUT)
                except asyncio.TimeoutError:
                        logging.warning(
                                f"789-1: Timeout waiting for existing connection on {path} to close",
                                extra={"station_id": station_id},
                        )
                finally:
                        ACTIVE_CLIENTS.pop(station_id, None)
                logging.info(
                        f"789-1: Existing connection on {path} closed or timed out, continuing",
                        extra={"station_id": station_id},
                )

        logging.info("789: Continue to upgrade. " + str(path))
        return None
        
async def check_mysql_connection(log_prefix: str = "") -> bool:
        """Verify that the configured MySQL database is reachable.

        The result is written to the application log.  ``log_prefix`` can be
        used to add contextual information (e.g. ``"[Startup] "``).
        """

        global LAST_MYSQL_CHECK_ATTEMPT, LAST_MYSQL_CHECK_SUCCESS

        LAST_MYSQL_CHECK_ATTEMPT = datetime.now(timezone.utc)

        try:
                async with db_connection() as conn:
                        async with conn.cursor() as cur:
                                await cur.execute("SELECT 1")
                                await cur.fetchone()
        except Exception as exc:
                LAST_MYSQL_CHECK_SUCCESS = False
                target = _describe_mysql_target(MYSQL_CONFIG)
                logging.error(
                        "%sMySQL connectivity check failed for %s: %s",
                        log_prefix,
                        target,
                        exc,
                )
                return False

        LAST_MYSQL_CHECK_SUCCESS = True
        target = _describe_mysql_target(MYSQL_CONFIG)
        logging.info("%sMySQL connectivity check succeeded for %s", log_prefix, target)
        return True


async def periodic_mysql_connectivity_check(interval_seconds: int = 300) -> None:
        """Run the MySQL connectivity check in regular intervals."""

        while True:
                try:
                        await asyncio.sleep(interval_seconds)
                        await check_mysql_connection(log_prefix="[Periodic] ")
                except asyncio.CancelledError:
                        raise
                except Exception:
                        logging.exception("Unexpected error in periodic MySQL connectivity check")


def _make_protocol_factory(listener: ProxyListenerSettings):
        return partial(
                LoggingWebSocketServerProtocol,
                cert_identifiers=listener.cert_identifiers,
                listener_name=listener.name,
        )


def _build_listener_ssl_context(listener: ProxyListenerSettings) -> ssl.SSLContext | None:
        if not listener.use_ssl:
                return None

        if not listener.certfile or not listener.keyfile:
                raise RuntimeError(
                        f"Listener '{listener.name}' is missing certfile or keyfile despite SSL being enabled"
                )

        ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
        if listener.force_tls12:
                if hasattr(ssl_context, "maximum_version") and hasattr(ssl, "TLSVersion"):
                        ssl_context.maximum_version = ssl.TLSVersion.TLSv1_2
                else:  # pragma: no cover - fallback for very old Python versions
                        ssl_context.options |= getattr(ssl, "OP_NO_TLSv1_3", 0)
                logging.info(
                        "708: TLS-Kompatibilitätsmodus aktiv – TLS-Verbindungen werden auf Version 1.2 begrenzt (%s)",
                        listener.name,
                        extra={"station_id": "system"},
                )
        try:
            ssl_context.load_cert_chain(listener.certfile, listener.keyfile)
        except Exception as exc:  # pragma: no cover - startup failure
            logging.error(
                "708: Laden des TLS-Zertifikats ist fehlgeschlagen (%s): %s",
                listener.name,
                exc,
                extra={"station_id": "system"},
            )
            raise
        return ssl_context


async def _start_proxy_listener(
        listener: ProxyListenerSettings,
        subprotocols: list[str],
):
        ssl_context = _build_listener_ssl_context(listener)
        process_request_handler = _build_process_request_handler(False)
        server = await ws_serve(
                handle_client,
                listener.ip,
                listener.port,
                subprotocols=subprotocols,
                process_request=process_request_handler,
                ping_interval=None,
                ping_timeout=None,
                create_protocol=_make_protocol_factory(listener),
                ssl=ssl_context,
        )

        scheme = "wss" if ssl_context is not None else "ws"
        logging.info(
            "708: WebSocket Proxy running on %s://%s:%s/<STATION_ID> (%s)",
            scheme,
            listener.ip,
            listener.port,
            listener.name,
            extra={"station_id": "system"},
        )
        return server


async def _handle_api_websocket(websocket, path: str):
        logging.info(
                "Rejecting WebSocket connection on API listener (%s)",
                path,
                extra={"station_id": "system"},
        )
        await websocket.close(code=4000, reason="WebSocket API endpoint")


async def _start_api_listener(listener: ProxyListenerSettings):
        ssl_context = _build_listener_ssl_context(listener)
        process_request_handler = _build_process_request_handler(True)
        server = await ws_serve(
                _handle_api_websocket,
                listener.ip,
                listener.port,
                subprotocols=[],
                process_request=process_request_handler,
                ping_interval=None,
                ping_timeout=None,
                create_protocol=_make_protocol_factory(listener),
                ssl=ssl_context,
        )

        scheme = "wss" if ssl_context is not None else "ws"
        logging.info(
            "708: API listener running on %s://%s:%s (%s)",
            scheme,
            listener.ip,
            listener.port,
            listener.name,
            extra={"station_id": "system"},
        )
        return server


async def main(*, config_preloaded: bool = False):
        """Starts the WebSocket proxy server."""
        # Configure websockets logging according to global log level
        logging.getLogger("websockets.server").setLevel(LOG_LEVEL)
        logging.getLogger("websockets.client").setLevel(LOG_LEVEL)

        try:
                if not config_preloaded:
                        await load_config_from_db()

                await init_db_pool()
                await check_mysql_connection(log_prefix="[Startup] ")

                logging.info(
                        "[Startup] throttle_enabled=%s (throttle_seconds=%s)",
                        "an" if THROTTLE_ENABLED else "aus",
                        THROTTLE_SECONDS,
                        extra={"station_id": "system"},
                )
                logging.info(
                        "[Startup] websocket ping_interval=%s, ping_timeout=%s",
                        DEFAULT_PING_INTERVAL,
                        DEFAULT_PING_TIMEOUT,
                        extra={"station_id": "system"},
                )
                logging.info(
                        "[Startup] backend_connect timeout=%s, retries=%s, backoff_seconds=%s",
                        BACKEND_CONNECT_TIMEOUT,
                        BACKEND_CONNECT_RETRIES,
                        BACKEND_CONNECT_BACKOFF_SECONDS,
                        extra={"station_id": "system"},
                )
                for listener in OCPP_PROXY_LISTENERS:
                        logging.info(
                                "[Startup] ocpp_proxy_endpoint=%s:%s (listener=%s)",
                                listener.ip,
                                listener.port,
                                listener.name,
                                extra={"station_id": "system"},
                        )
                        logging.info(
                                "[Startup] ocpp_proxy_ssl=%s (listener=%s)",
                                "aktiviert" if listener.use_ssl else "deaktiviert",
                                listener.name,
                                extra={"station_id": "system"},
                        )
                if OCPP_API_LISTENER is not None:
                        logging.info(
                                "[Startup] api_proxy_port=%s (ssl=%s)",
                                OCPP_API_LISTENER.port,
                                "aktiviert" if OCPP_API_LISTENER.use_ssl else "deaktiviert",
                                extra={"station_id": "system"},
                        )
                try:
                        mqtt_client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
                        mqtt_client.connect(MQTT_BROKER, MQTT_PORT)
                        mqtt_client.loop_start()
                except Exception as e:
                        logging.error(f"[MQTT] connect failed: {e}")

                await load_wallboxes_from_db()
                await ensure_websocket_log_table()
                await ensure_rfid_log_table()
                await ensure_bootnotification_table()
                await ensure_getconfiguration_table()
                await ensure_last_meterreading_table()
                await ensure_cp_auth_log_table()
                await log_authorize_transaction_status()
                asyncio.create_task(log_websocket_counts_periodically())
                asyncio.create_task(periodic_mysql_connectivity_check())

                configured_subprotocols = _collect_supported_subprotocols()
                SUPPORTED_SUBPROTOCOLS[:] = configured_subprotocols
                logging.info(
                        "708: Offering WebSocket subprotocols during handshake: %s",
                        ", ".join(configured_subprotocols) or "<none>",
                        extra={"station_id": "system"},
                        )

                servers = []
                for listener in OCPP_PROXY_LISTENERS:
                        server = await _start_proxy_listener(listener, configured_subprotocols)
                        servers.append(server)
                if OCPP_API_LISTENER is not None:
                        api_server = await _start_api_listener(OCPP_API_LISTENER)
                        servers.append(api_server)
                try:
                        await asyncio.gather(*(server.wait_closed() for server in servers))
                finally:
                        pending = list(BACKGROUND_TASKS)
                        if pending:
                                await asyncio.gather(*pending, return_exceptions=True)
                                BACKGROUND_TASKS.difference_update(pending)
        finally:
                await close_db_pool()

if __name__ == "__main__":
        mysql_host = MYSQL_CONFIG.get("host", "localhost")
        mysql_port = MYSQL_CONFIG.get("port", 3306)
        logging.info("OCPP Proxy Server Startup version %s", VERSION, extra={"station_id": "system"})
        logging.info("Configured OCPP endpoint: %s", OCPP_ENDPOINT, extra={"station_id": "system"})
        logging.info(
                "MySQL wird unter %s:%s gesucht",
                mysql_host,
                mysql_port,
                extra={"station_id": "system"},
        )
        config_loaded = False

        async def _preload_runtime_config():
                try:
                        await load_config_from_db()
                        return tuple(OCPP_PROXY_LISTENERS)
                finally:
                        await close_db_pool()

        try:
                listeners = asyncio.run(_preload_runtime_config())
        except Exception as exc:
                print("Konfiguration aus der Datenbank konnte vor dem Start nicht geladen werden:", exc)
                print(
                        f"Verwende Fallback-Konfiguration {OCPP_PROXY_IP}:{OCPP_PROXY_PORT} für den Initial-Start"
                )
                listeners = tuple(OCPP_PROXY_LISTENERS)
        else:
                config_loaded = True

        if not listeners:
                listeners = tuple(OCPP_PROXY_LISTENERS)

        for listener in listeners:
                scheme = "wss" if listener.use_ssl else "ws"
                print(
                        f"OCPP Proxy lauscht auf {scheme}://{listener.ip}:{listener.port} ({listener.name})"
                )
        if OCPP_API_LISTENER is not None:
                api_scheme = "https" if OCPP_API_LISTENER.use_ssl else "http"
                print(
                        f"OCPP Proxy API läuft auf {api_scheme}://{OCPP_API_LISTENER.ip}:{OCPP_API_LISTENER.port} ({OCPP_API_LISTENER.name})"
                )

        asyncio.run(main(config_preloaded=config_loaded))
                
                
