import asyncio
import hashlib
import json
import sys
import uuid
import secrets
from collections import defaultdict, deque
from datetime import date, datetime, timezone, timedelta
from decimal import Decimal, InvalidOperation
import base64
import smtplib
import logging
import os
from email.message import EmailMessage
from dataclasses import dataclass
from pathlib import Path
from urllib.parse import quote, urlparse
import resource
import copy
import inspect
import queue
import ssl
import socket
import time
import threading
import math
from types import SimpleNamespace
import concurrent.futures
import importlib
import warnings

import pymysql
from pymysql.cursors import DictCursor
from aiohttp import ClientSession, ClientTimeout, web
from websockets.legacy.server import WebSocketServerProtocol, serve
from websockets.exceptions import (
    InvalidMessage,
    WebSocketException,
    ConnectionClosed,
)
from websockets.legacy.http import read_request
from typing import Any, Mapping, Optional, Sequence

aiomysql_spec = importlib.util.find_spec("aiomysql")
aiomysql = importlib.import_module("aiomysql") if aiomysql_spec else None  # type: ignore[assignment]

try:
    import aiomysql  # type: ignore[import]
except Exception:  # pragma: no cover - optional dependency
    aiomysql = None  # type: ignore[assignment]

warnings.filterwarnings(
    "ignore",
    message=r"^Table '.*' already exists",
    category=pymysql.Warning,
)
warnings.filterwarnings(
    "ignore",
    message=r"^Integer display width is deprecated",
    category=pymysql.Warning,
)

CONFIG_FILE = "config.json"

try:
    with open(CONFIG_FILE, "r", encoding="utf-8") as f:
        CONFIG = json.load(f)
except FileNotFoundError:
    CONFIG = {}

_CONFIG_PATH = Path(CONFIG_FILE)
_CONFIG_MTIME: float | None = None
try:
    _CONFIG_MTIME = _CONFIG_PATH.stat().st_mtime
except OSError:
    _CONFIG_MTIME = None

_json_response = web.json_response

_DIAGNOSTICS_CONFIG = CONFIG.get("diagnostics", {})


def _build_default_diagnostic_location() -> str:
    location = _DIAGNOSTICS_CONFIG.get("location")
    if location:
        return str(location)

    ftp_host = str(_DIAGNOSTICS_CONFIG.get("ftp_host", "217.160.79.201"))
    ftp_port_raw = _DIAGNOSTICS_CONFIG.get("ftp_port", 21)
    try:
        ftp_port = int(ftp_port_raw)
    except (TypeError, ValueError):
        ftp_port = 21

    if ftp_port and ftp_port != 21:
        host_part = f"{ftp_host}:{ftp_port}"
    else:
        host_part = ftp_host

    ftp_user = _DIAGNOSTICS_CONFIG.get("ftp_user", "diag")
    ftp_password = _DIAGNOSTICS_CONFIG.get("ftp_password", "rem0tec0nnect2026x")
    if ftp_user and ftp_password:
        userinfo = (
            f"{quote(str(ftp_user), safe='')}:{quote(str(ftp_password), safe='')}@"
        )
    elif ftp_user:
        userinfo = f"{quote(str(ftp_user), safe='')}@"
    else:
        userinfo = ""

    return f"ftp://{userinfo}{host_part}/"


DEFAULT_DIAGNOSTIC_LOCATION = _build_default_diagnostic_location()


def _json_dumps_with_newline(data: Any) -> str:
    text = json.dumps(data)
    if not text.endswith("\n"):
        text += "\n"
    return text


def _json_response_with_newline(data: Any, **kwargs: Any):
    kwargs.setdefault("dumps", _json_dumps_with_newline)
    return _json_response(data, **kwargs)


def _mask_identifier_value(value: Any) -> Any:
    if not isinstance(value, str):
        return value

    normalized = value.strip()
    if len(normalized) <= 4:
        return "***"

    return f"{normalized[:2]}***{normalized[-2:]}"


def _mask_sensitive_identifiers(payload: Any) -> Any:
    if isinstance(payload, Mapping):
        masked: dict[Any, Any] = {}
        for key, value in payload.items():
            lowered = str(key).lower()
            if lowered in {"vin", "emaid", "evcoid", "evco_id", "evco-id"}:
                masked[key] = _mask_identifier_value(value)
            else:
                masked[key] = _mask_sensitive_identifiers(value)
        return masked
    if isinstance(payload, Sequence) and not isinstance(payload, (str, bytes, bytearray)):
        return [_mask_sensitive_identifiers(item) for item in payload]
    return payload


def _normalize_certificate_blob(value: Any, *, allow_empty: bool = False) -> str:
    text = "" if value is None else str(value)
    cleaned = text.strip()
    if not cleaned:
        if allow_empty:
            return ""
        raise ValueError("certificate content required")

    if "BEGIN CERTIFICATE" in cleaned:
        lines = [
            line.strip()
            for line in cleaned.splitlines()
            if "BEGIN CERTIFICATE" not in line and "END CERTIFICATE" not in line
        ]
        cleaned = "".join(lines)
    cleaned = cleaned.replace("\n", "").replace("\r", "").strip()

    try:
        base64.b64decode(cleaned, validate=True)
        return cleaned
    except Exception:
        encoded = base64.b64encode(cleaned.encode("utf-8"))
        return encoded.decode("utf-8")


def _format_timestamp_value(value: Any) -> str | None:
    if not isinstance(value, datetime):
        return None
    if value.tzinfo is None:
        value = value.replace(tzinfo=timezone.utc)
    else:
        value = value.astimezone(timezone.utc)
    return value.isoformat().replace("+00:00", "Z")


HUBJECT_CLIENT: Optional["HubjectClient"] = None
HUBJECT_CLIENT_ERROR: Optional[str] = None
_PNC_ADAPTER: Any | None = None
_PNC_ADAPTER_ERROR: str | None = None
_PNC_ADAPTER_CFG_FINGERPRINT: str | None = None
_PNC_COLUMN_PRESENT: bool | None = None
_PNC_FLAG_CACHE: dict[str, bool] = {}
_PNC_RELOAD_FLAG_PATH = (_CONFIG_PATH.parent / "debug" / "pnc_reload.flag").resolve()
SESSION_LOG_REGISTER = "Energy.Active.Import.Register"
SESSION_LOG_UNIT = "Wh"

_SESSION_LOG_PROCESSED_COUNT = 0
_SESSION_LOG_SIGNED_DATA_COUNT = 0
_SESSION_LOG_COUNTER_LOCK = asyncio.Lock()
_SESSION_LOG_COLUMN_PRESENT: bool | None = None
_EXTENDED_SESSION_LOG_FLAG_CACHE: dict[str, bool] = {}

try:
    from hubject_client import (
        HubjectClient,
        HubjectClientError,
        HubjectConfigurationError,
        load_hubject_config,
    )
except Exception as exc:  # pragma: no cover - optional dependency at import time
    logging.warning("Hubject client import failed: %s", exc)
    HubjectClient = None  # type: ignore[assignment]
    HubjectClientError = None  # type: ignore[assignment]
    load_hubject_config = None  # type: ignore[assignment]
    HUBJECT_CLIENT_ERROR = str(exc)

web.json_response = _json_response_with_newline  # type: ignore[assignment]

if HubjectClient is not None and load_hubject_config is not None:
    try:
        _hubject_config = load_hubject_config(
            CONFIG,
            config_dir=Path(CONFIG_FILE).resolve().parent,
            certs_dir=Path(CONFIG_FILE).resolve().parent / "certs",
        )
    except HubjectConfigurationError as exc:
        logging.info("Hubject integration disabled: %s", exc)
        HUBJECT_CLIENT_ERROR = str(exc)
    except FileNotFoundError as exc:  # pragma: no cover - config removed at runtime
        logging.info("Hubject configuration could not be loaded: %s", exc)
        HUBJECT_CLIENT_ERROR = str(exc)
    else:
        HUBJECT_CLIENT = HubjectClient(_hubject_config)
        HUBJECT_CLIENT_ERROR = None

DEFAULT_ACCESSIBILITY = "Test station"
DEFAULT_ACCESSIBILITY_LOCATION = "ParkingLot"
DEFAULT_PLUGS = "Type 2 Connector (Cable Attached)"
DEFAULT_AUTHENTICATION_MODES = (
    "NFC RFID Classic",
    "REMOTE",
    "Direct Payment",
)
DEFAULT_AUTHENTICATION_MODES_JSON = json.dumps(
    list(DEFAULT_AUTHENTICATION_MODES), ensure_ascii=False
)
DEFAULT_HUBJECT_VALUE_ADDED_SERVICES = ("Reservation",)
DEFAULT_HUBJECT_VALUE_ADDED_SERVICES_JSON = json.dumps(
    list(DEFAULT_HUBJECT_VALUE_ADDED_SERVICES), ensure_ascii=False
)
DEFAULT_HUBJECT_CHARGING_FACILITIES = (
    {
        "ChargingFacilityStatus": "Available",
        "PowerType": "AC_3_PHASE",
        "Power": 22,
        "Voltage": 400,
        "Amperage": 32,
        "ChargingMode": "Mode_3",
    },
)
DEFAULT_HUBJECT_CHARGING_FACILITIES_JSON = json.dumps(
    list(DEFAULT_HUBJECT_CHARGING_FACILITIES), ensure_ascii=False
)

log_cfg = CONFIG.get("log_levels", {})
level_name = log_cfg.get("ocpp_server", "INFO").upper()
LOG_LEVEL = getattr(logging, level_name, logging.INFO)
LOG_LEVEL_NAME = logging.getLevelName(LOG_LEVEL)


class StationIDFilter(logging.Filter):
    """Ensure log records always provide station and system context."""

    def filter(self, record: logging.LogRecord) -> bool:  # type: ignore[override]
        if not hasattr(record, "station_id"):
            if record.name.startswith("websockets.client"):
                record.station_id = "backend"
            elif record.name.startswith("websockets.server"):
                record.station_id = "chargepoint"
            else:
                record.station_id = "ocpp-server"
        if not hasattr(record, "system"):
            if record.name.startswith("websockets.client"):
                record.system = "server-backend"
            elif record.name.startswith("websockets.server"):
                record.system = "client-server"
            else:
                record.system = "ocpp-server"
        return True


class SafeFormatter(logging.Formatter):
    """Populate optional logging fields when not provided by the record."""

    def format(self, record: logging.LogRecord) -> str:  # type: ignore[override]
        if not hasattr(record, "station_id"):
            if record.name.startswith("websockets.client"):
                record.station_id = "backend"
            elif record.name.startswith("websockets.server"):
                record.station_id = "chargepoint"
            else:
                record.station_id = "ocpp-server"
        if not hasattr(record, "system"):
            if record.name.startswith("websockets.client"):
                record.system = "server-backend"
            elif record.name.startswith("websockets.server"):
                record.system = "client-server"
            else:
                record.system = "ocpp-server"
        return super().format(record)


LOG_FORMAT = "%(asctime)s - %(levelname)s - [%(station_id)s] - [%(system)s] - %(message)s"
_root_logger = logging.getLogger()
if _root_logger.handlers:
    _root_logger.handlers.clear()
    _root_logger.filters.clear()

logging.basicConfig(
    level=LOG_LEVEL,
    format=LOG_FORMAT,
    stream=sys.stdout,
)

_root_logger.setLevel(LOG_LEVEL)
_formatter = SafeFormatter(LOG_FORMAT)
for handler in _root_logger.handlers:
    handler.setFormatter(_formatter)
_root_logger.addFilter(StationIDFilter())

# Default timeout used when waiting for OCPP call results.  Individual actions
# may override this value when they are known to take longer to acknowledge.
DEFAULT_ACTION_TIMEOUT = int(CONFIG.get("ocpp_action_timeout", 30))
# Firmware updates are typically executed asynchronously by the charge point
# and can legitimately take longer than other commands to acknowledge.
UPDATE_FIRMWARE_TIMEOUT = int(CONFIG.get("ocpp_update_firmware_timeout", 120))


class OCPPCallError(Exception):
    """Raised when an OCPP CallError message is received for a pending call."""

    def __init__(
        self,
        action: str | None,
        error_code: str,
        description: str,
        details: Any | None = None,
    ) -> None:
        super().__init__(f"{action or 'unknown'} failed with {error_code}: {description}")
        self.action = action
        self.error_code = error_code
        self.description = description
        self.details = details

mysql_cfg = CONFIG.get("mysql", {})
OCPP_LOG_QUEUE_MAXSIZE = int(CONFIG.get("ocpp_log_queue_maxsize", 1000))
OCPP_LOG_BATCH_SIZE = int(CONFIG.get("ocpp_log_batch_size", 50))
OCPP_LOG_FLUSH_INTERVAL = float(CONFIG.get("ocpp_log_flush_interval_seconds", 1.0))
try:
    OCPP_LOG_DROP_HISTORY_MINUTES = int(CONFIG.get("ocpp_log_drop_history_minutes", 120))
except (TypeError, ValueError):
    OCPP_LOG_DROP_HISTORY_MINUTES = 120
try:
    OCPP_LOG_DROP_DEFAULT_WINDOW_MINUTES = float(
        CONFIG.get("ocpp_log_drop_window_minutes", 15)
    )
except (TypeError, ValueError):
    OCPP_LOG_DROP_DEFAULT_WINDOW_MINUTES = 15.0
MYSQL_POOL_SIZE = int(CONFIG.get("mysql_pool_size", 10))
_db_pool: Any | None = None
_db_sync_pool: Any | None = None
_db_executor: concurrent.futures.ThreadPoolExecutor | None = None
_ocpp_log_queue: asyncio.Queue[dict[str, Any]] | None = None
_ocpp_log_worker_task: asyncio.Task | None = None
_ocpp_log_stop_event: asyncio.Event | None = None
_ocpp_log_pool: Any | None = None
_ocpp_log_prepared_tables: set[str] = set()
_ocpp_log_executor: concurrent.futures.ThreadPoolExecutor | None = None
_sync_log_conn: Any | None = None
_ocpp_log_drop_count = 0
_ocpp_log_last_drop_at: datetime | None = None
_ocpp_log_drop_events: deque[datetime] = deque()
_RFID_STATUS_CACHE: dict[tuple[str, str], tuple[float, str]] = {}
_RFID_STATUS_CACHE_TTL = int(CONFIG.get("rfid_status_cache_ttl_seconds", 60))
_CONFIG_CACHE: dict[str, tuple[float, str]] = {}
_CONFIG_CACHE_TTL = int(CONFIG.get("configuration_cache_ttl_seconds", 120))
_OICP_FLAG_CACHE: dict[str, tuple[float, bool]] = {}
_OICP_FLAG_CACHE_TTL = int(CONFIG.get("oicp_flag_cache_ttl_seconds", 300))
_CP_METADATA_CACHE: dict[str, tuple[float, Optional[dict[str, Any]], list[dict[str, Any]]]] = {}
_CP_METADATA_CACHE_TTL = int(CONFIG.get("cp_metadata_cache_ttl_seconds", 300))

# Default websocket keep-alive settings
ws_cfg = CONFIG.get("websocket", {})
PING_INTERVAL = int(ws_cfg.get("ping_interval", 60))
PING_TIMEOUT = int(ws_cfg.get("ping_timeout", 2 * PING_INTERVAL))
DEFAULT_STALE_TIMEOUT = int(ws_cfg.get("stale_timeout", 2 * PING_TIMEOUT))


def load_runtime_config():
    """Load configuration from the MySQL database.

    The original implementation assumed that the database is always
    reachable.  In test environments this isn't necessarily the case which
    caused the server to crash on startup.  Fail gracefully so the server can
    still run with default settings while logging the failure for debugging
    purposes.
    """

    try:
        conn = pymysql.connect(
            host=mysql_cfg.get("host"),
            user=mysql_cfg.get("user"),
            password=mysql_cfg.get("password"),
            db=mysql_cfg.get("db"),
            charset=mysql_cfg.get("charset", "utf8"),
        )
    except Exception as exc:  # pragma: no cover - network environment
        logging.warning("MySQL unavailable, using default runtime config: %s", exc)
        return {}

    try:
        cur = conn.cursor()
        cur.execute("SELECT config_key, config_value FROM op_config")
        rows = cur.fetchall()
        return {r[0]: r[1] for r in rows}
    except Exception as exc:  # pragma: no cover - network environment
        logging.error("Failed loading runtime config: %s", exc)
        return {}
    finally:
        conn.close()


_runtime_cfg = load_runtime_config()
DEFAULT_SERVER_HOST = "0.0.0.0"
DEFAULT_SERVER_PORT = 8080
DEFAULT_API_PORT = 9751


def _sanitize_host(value: Any, setting_name: str) -> str | None:
    try:
        host = str(value).strip()
    except Exception:
        logging.warning("Ignoring %s due to invalid value: %r", setting_name, value)
        return None

    if not host:
        return None

    if any(c.isspace() for c in host):
        logging.warning("Ignoring %s containing whitespace: %r", setting_name, host)
        return None

    return host


def _sanitize_port(value: Any, setting_name: str) -> int | None:
    try:
        port = int(value)
    except (TypeError, ValueError):
        if value is not None:
            logging.warning("Ignoring %s due to non-integer value: %r", setting_name, value)
        return None

    if not (1 <= port <= 65535):
        logging.warning("Ignoring %s outside valid port range: %r", setting_name, value)
        return None

    return port


def _is_host_resolvable(host: str) -> bool:
    try:
        socket.getaddrinfo(host, None)
    except socket.gaierror:
        return False

    return True


@dataclass(frozen=True)
class OcppNetworkSettings:
    host: str
    ocpp_port: int
    api_port: int
    host_source: str
    ocpp_port_source: str
    api_port_source: str


def _resolve_ocpp_network_settings(
    config: Mapping[str, Any], runtime_cfg: Mapping[str, Any]
) -> OcppNetworkSettings:
    ocpp_cfg = config.get("ocpp_server")
    if not isinstance(ocpp_cfg, Mapping):
        ocpp_cfg = {}

    legacy_ocpp_cfg = config.get("ocpp_proxy")
    if not isinstance(legacy_ocpp_cfg, Mapping):
        legacy_ocpp_cfg = {}

    host = _sanitize_host(
        ocpp_cfg.get("host") or ocpp_cfg.get("ip"), "ocpp_server.host"
    )
    host_source = "config.json" if host else None
    if host is None:
        host = _sanitize_host(legacy_ocpp_cfg.get("host"), "ocpp_proxy.host")
        host_source = "config.json" if host else None
    if host is None:
        host = _sanitize_host(runtime_cfg.get("ocpp_proxy_ip"), "ocpp_proxy_ip")
        host_source = "database" if host else None
    if host is None:
        host = DEFAULT_SERVER_HOST
        host_source = "default"

    host_source = host_source or "default"
    if not _is_host_resolvable(host):
        logging.error(
            "Configured OCPP host %r (source=%s) is not resolvable; using %s instead",
            host,
            host_source,
            DEFAULT_SERVER_HOST,
        )
        host = DEFAULT_SERVER_HOST
        host_source = "default"

    ocpp_port = _sanitize_port(
        ocpp_cfg.get("ocpp_port"), "ocpp_server.ocpp_port"
    )
    ocpp_port_source = "config.json" if ocpp_port is not None else None
    if ocpp_port is None:
        ocpp_port = _sanitize_port(
            legacy_ocpp_cfg.get("ocpp_port"), "ocpp_proxy.ocpp_port"
        )
        ocpp_port_source = "config.json" if ocpp_port is not None else None
    if ocpp_port is None:
        ocpp_port = _sanitize_port(
            runtime_cfg.get("ocpp_server_port"), "ocpp_server_port"
        )
        ocpp_port_source = "database" if ocpp_port is not None else None
    if ocpp_port is None:
        ocpp_port = DEFAULT_SERVER_PORT
        ocpp_port_source = "default"

    api_port = _sanitize_port(
        ocpp_cfg.get("api_port"), "ocpp_server.api_port"
    )
    api_port_source = "config.json" if api_port is not None else None
    if api_port is None:
        api_port = _sanitize_port(
            legacy_ocpp_cfg.get("api_port"), "ocpp_proxy.api_port"
        )
        api_port_source = "config.json" if api_port is not None else None
    if api_port is None:
        api_port = _sanitize_port(runtime_cfg.get("ocpp_api_port"), "ocpp_api_port")
        api_port_source = "database" if api_port is not None else None
    if api_port is None:
        api_port = DEFAULT_API_PORT
        api_port_source = "default"

    return OcppNetworkSettings(
        host=host,
        ocpp_port=ocpp_port,
        api_port=api_port,
        host_source=host_source or "default",
        ocpp_port_source=ocpp_port_source or "default",
        api_port_source=api_port_source or "default",
    )


_network_settings = _resolve_ocpp_network_settings(CONFIG, _runtime_cfg)
SERVER_HOST = _network_settings.host
SERVER_PORT = _network_settings.ocpp_port
# Prefer the dedicated API port but default to the previous value if unset.
API_PORT = _network_settings.api_port

logging.info(
    "[Startup] OCPP host=%s (source=%s), ocpp_port=%s (source=%s), api_port=%s (source=%s)",
    SERVER_HOST,
    _network_settings.host_source,
    SERVER_PORT,
    _network_settings.ocpp_port_source,
    API_PORT,
    _network_settings.api_port_source,
)


# Default server startup behaviour toggles and heartbeat interval.
MAX_CHANGE_CONFIGURATION_ITEMS = 5
_DEFAULT_CHANGE_CONFIGURATION_ITEMS = tuple(
    {"enabled": False, "key": "", "value": ""}
    for _ in range(MAX_CHANGE_CONFIGURATION_ITEMS)
)

DEFAULT_SERVER_STARTUP_CONFIG = {
    "local_authorize_offline": True,
    "authorize_remote_tx_requests": True,
    "local_auth_list_enabled": True,
    "authorization_cache_enabled": True,
    "change_availability_operative": True,
    "enforce_websocket_ping_interval": True,
    "send_heartbeat_change_on_boot": True,
    "trigger_meter_values_on_start": True,
    "trigger_status_notification_on_start": True,
    "trigger_boot_notification_on_message_before_boot": True,
    "heartbeat_interval": 300,
    "change_configuration_items": _DEFAULT_CHANGE_CONFIGURATION_ITEMS,
}


def _default_server_startup_config() -> dict[str, Any]:
    config = copy.deepcopy(DEFAULT_SERVER_STARTUP_CONFIG)
    config["change_configuration_items"] = [
        dict(item) for item in config.get("change_configuration_items", [])
    ]
    return config


LOCAL_AUTH_CONFIGURATION_KEYS = [
    "LocalAuthListEnabled",
    "LocalAuthorizeOffline",
    "SendLocalListMaxLength",
]


try:
    ACTION_METRIC_SAMPLE_SIZE = int(CONFIG.get("ocpp_action_metric_history", 1000))
except (TypeError, ValueError):
    ACTION_METRIC_SAMPLE_SIZE = 1000
try:
    CALL_ERROR_WARNING_THRESHOLD = int(CONFIG.get("ocpp_call_error_warning_threshold", 10))
except (TypeError, ValueError):
    CALL_ERROR_WARNING_THRESHOLD = 10
_warning_codes_config = CONFIG.get(
    "ocpp_call_error_warning_codes",
    ["NotSupported", "InternalError"],
)
if isinstance(_warning_codes_config, (list, tuple, set)):
    CALL_ERROR_WARNING_CODES = {str(code) for code in _warning_codes_config}
elif isinstance(_warning_codes_config, str):
    CALL_ERROR_WARNING_CODES = {
        code.strip() for code in _warning_codes_config.split(",") if code.strip()
    }
else:  # pragma: no cover - misconfigured config file
    CALL_ERROR_WARNING_CODES = {"NotSupported", "InternalError"}


@dataclass
class _ActionMetric:
    success: int = 0
    call_errors: int = 0
    latencies_ms: deque[float] = None  # type: ignore[assignment]
    error_codes: dict[str, int] = None  # type: ignore[assignment]

    def __post_init__(self):
        if self.latencies_ms is None:
            self.latencies_ms = deque(maxlen=max(ACTION_METRIC_SAMPLE_SIZE, 1))
        if self.error_codes is None:
            self.error_codes = defaultdict(int)


def _default_action_metric() -> _ActionMetric:
    return _ActionMetric()


ACTION_METRICS: dict[str, _ActionMetric] = defaultdict(_default_action_metric)
ACTION_METRICS_LOCK = threading.Lock()


connected: dict[str, Any] = {}
connected_since: dict[str, datetime] = {}
last_seen: dict[str, datetime] = {}
stale_timeouts: dict[str, int] = {}
stale_timeout_events: dict[str, int] = {}
pending: dict[str, tuple[asyncio.Future, str, float]] = {}
# Track currently active transactions
current_transactions: dict[int, dict[str, Any]] = {}
HUBJECT_REMOTE_START_TTL = int(CONFIG.get("hubject_remote_start_ttl", 900))
HUBJECT_PENDING_CDRS: dict[tuple[str, int], tuple[datetime, dict[str, Any]]] = {}
HUBJECT_ACTIVE_CDRS: dict[int, tuple[datetime, dict[str, Any]]] = {}
HUBJECT_PENDING_AUTHORIZATIONS: dict[
    tuple[str, str], tuple[datetime, dict[str, Any]]
] = {}
# Track handshake state
boot_notifications: set[str] = set()
boot_requested: set[str] = set()
pending_meter_triggers: set[str] = set()

# Track how upcoming transactions were initiated (remote/local).
PENDING_START_METHODS: dict[tuple[str, int], tuple[datetime, str]] = {}

# Track API-triggered remote authorizations to auto-approve upcoming Authorize requests.
REMOTE_API_AUTHORIZATION_TTL = int(CONFIG.get("remote_api_authorization_ttl", 300))
REMOTE_API_AUTHORIZATION_WILDCARD = "__ALLOW_ANY_RFID__"
_remote_api_authorizations: dict[str, dict[str, datetime]] = {}

# Track the last known availability mode (Operative/Inoperative) per station.
AVAILABILITY_OPERATIVE = "Operative"
AVAILABILITY_INOPERATIVE = "Inoperative"
AVAILABILITY_UNKNOWN = "Unknown"
availability_state: dict[str, str] = {}

# Track last published Hubject EVSE statuses and configured EVSE identifiers.
_evse_status_cache: dict[tuple[str, int], str] = {}
_evse_id_cache: dict[str, str] = {}


def _prune_remote_api_authorizations(station_id: str, now: datetime | None = None) -> None:
    key = _normalize_chargepoint_id(station_id)
    if not key:
        return
    entries = _remote_api_authorizations.get(key)
    if not entries:
        return
    current = now or datetime.now(timezone.utc)
    expired = [
        tag
        for tag, ts in entries.items()
        if (current - ts).total_seconds() > REMOTE_API_AUTHORIZATION_TTL
    ]
    for tag in expired:
        entries.pop(tag, None)
    if not entries:
        _remote_api_authorizations.pop(key, None)


def register_remote_api_authorization(
    station_id: str, id_tag: str | None, *, allow_any_rfid: bool = False
) -> None:
    key = _normalize_chargepoint_id(station_id)
    if not key:
        return
    now = datetime.now(timezone.utc)
    _prune_remote_api_authorizations(key, now)
    entries = _remote_api_authorizations.setdefault(key, {})
    if allow_any_rfid:
        entries[REMOTE_API_AUTHORIZATION_WILDCARD] = now
    if id_tag:
        normalized_tag = str(id_tag).strip().upper()
        if normalized_tag:
            entries[normalized_tag] = now


def _is_remote_api_authorization_valid(station_id: str, id_tag: str | None) -> bool:
    key = _normalize_chargepoint_id(station_id)
    if not key:
        return False
    now = datetime.now(timezone.utc)
    _prune_remote_api_authorizations(key, now)
    entries = _remote_api_authorizations.get(key)
    if not entries:
        return False
    if REMOTE_API_AUTHORIZATION_WILDCARD in entries:
        return True
    if not id_tag:
        return False
    normalized_tag = str(id_tag).strip().upper()
    if not normalized_tag:
        return False
    return normalized_tag in entries


def _calculate_duration_ms(start_time: float | None) -> float | None:
    if start_time is None:
        return None
    return max((time.perf_counter() - start_time) * 1000, 0)


def _record_action_metric(
    action: str, success: bool, duration_ms: float | None = None, error_code: str | None = None
) -> None:
    if not action:
        return
    with ACTION_METRICS_LOCK:
        metric = ACTION_METRICS[action]
        if success:
            metric.success += 1
        else:
            metric.call_errors += 1
            if error_code:
                metric.error_codes[str(error_code)] += 1
        if duration_ms is not None:
            metric.latencies_ms.append(float(duration_ms))


def _calculate_latency_stats(latencies: Sequence[float]) -> tuple[float | None, float | None]:
    if not latencies:
        return None, None
    sorted_latencies = sorted(latencies)
    count = len(sorted_latencies)
    avg_ms = sum(sorted_latencies) / count
    index_95 = min(math.ceil(count * 0.95) - 1, count - 1)
    p95_ms = sorted_latencies[index_95]
    return avg_ms, p95_ms


def _build_action_metrics_snapshot() -> tuple[dict[str, Any], list[dict[str, Any]]]:
    with ACTION_METRICS_LOCK:
        metrics_items = list(ACTION_METRICS.items())

    snapshot: dict[str, Any] = {}
    warnings: list[dict[str, Any]] = []
    threshold = max(int(CALL_ERROR_WARNING_THRESHOLD), 0)
    for action, metric in metrics_items:
        latencies_copy = list(metric.latencies_ms)
        avg_ms, p95_ms = _calculate_latency_stats(latencies_copy)
        snapshot[action] = {
            "success": metric.success,
            "call_errors": metric.call_errors,
            "avg_ms": avg_ms,
            "p95_ms": p95_ms,
        }
        if threshold > 0:
            for code, count in metric.error_codes.items():
                if count >= threshold and (
                    not CALL_ERROR_WARNING_CODES or code in CALL_ERROR_WARNING_CODES
                ):
                    warnings.append(
                        {
                            "action": action,
                            "code": code,
                            "count": count,
                            "threshold": threshold,
                        }
                    )

    warnings.sort(key=lambda entry: (-entry.get("count", 0), entry.get("action", "")))
    return snapshot, warnings


class _ActionMetricsTimer:
    __slots__ = ("action", "start_time", "recorded")

    def __init__(self, action: str):
        self.action = action
        self.start_time = time.perf_counter()
        self.recorded = False

    def _record(self, success: bool, error_code: str | None = None) -> None:
        if self.recorded:
            return
        duration_ms = _calculate_duration_ms(self.start_time)
        _record_action_metric(self.action, success, duration_ms, error_code)
        self.recorded = True

    def success(self) -> None:
        self._record(True, None)

    def call_error(self, error_code: str | None = None) -> None:
        self._record(False, error_code)

    def finalize(self) -> None:
        if not self.recorded:
            self._record(True, None)


def _normalize_hubject_token(token: str | None) -> str | None:
    if token is None:
        return None
    normalized = str(token).strip().upper()
    return normalized or None


def _prune_hubject_authorizations(now: datetime | None = None) -> None:
    ttl = max(HUBJECT_REMOTE_START_TTL, 0)
    if ttl == 0:
        HUBJECT_PENDING_AUTHORIZATIONS.clear()
        return
    current = now or datetime.now(timezone.utc)
    expired = [
        key
        for key, (ts, _) in HUBJECT_PENDING_AUTHORIZATIONS.items()
        if (current - ts).total_seconds() > ttl
    ]
    for key in expired:
        HUBJECT_PENDING_AUTHORIZATIONS.pop(key, None)


def _record_pending_hubject_authorization(
    chargepoint_id: str | None,
    token: str | None,
    request_payload: Mapping[str, Any],
    response_payload: Mapping[str, Any],
    *,
    now: datetime | None = None,
) -> None:
    station_key = _normalize_chargepoint_id(chargepoint_id) if chargepoint_id else None
    token_key = _normalize_hubject_token(token)
    if not station_key or not token_key:
        return
    timestamp = now or datetime.now(timezone.utc)
    _prune_hubject_authorizations(timestamp)
    request_copy = copy.deepcopy(dict(request_payload))
    response_copy = copy.deepcopy(dict(response_payload))
    metadata: dict[str, Any] = {
        "raw_request": request_copy,
        "raw_payload": response_copy,
        "session_id": response_copy.get("SessionID") or request_copy.get("SessionID"),
        "cpo_partner_session_id": response_copy.get("CPOPartnerSessionID")
        or request_copy.get("CPOPartnerSessionID"),
        "emp_partner_session_id": response_copy.get("EMPPartnerSessionID")
        or request_copy.get("EMPPartnerSessionID"),
        "operator_id": response_copy.get("OperatorID")
        or request_copy.get("OperatorID"),
        "provider_id": response_copy.get("ProviderID")
        or request_copy.get("ProviderID"),
        "identification": response_copy.get("Identification")
        or request_copy.get("Identification"),
        "evse_id": response_copy.get("EVSEId") or request_copy.get("EVSEId"),
        "token": token_key,
        "id_tag": token_key,
        "received_at": _format_timestamp_value(timestamp),
    }
    HUBJECT_PENDING_AUTHORIZATIONS[(station_key, token_key)] = (timestamp, metadata)


def _consume_pending_hubject_authorization(
    chargepoint_id: str | None,
    token: str | None,
    *,
    now: datetime | None = None,
) -> dict[str, Any] | None:
    station_key = _normalize_chargepoint_id(chargepoint_id) if chargepoint_id else None
    token_key = _normalize_hubject_token(token)
    if not station_key or not token_key:
        return None
    _prune_hubject_authorizations(now)
    entry = HUBJECT_PENDING_AUTHORIZATIONS.pop((station_key, token_key), None)
    if not entry:
        return None
    _, metadata = entry
    return copy.deepcopy(metadata)


def _prune_hubject_cdr_cache(now: datetime | None = None) -> None:
    ttl = max(HUBJECT_REMOTE_START_TTL, 0)
    if ttl == 0:
        HUBJECT_PENDING_CDRS.clear()
        HUBJECT_ACTIVE_CDRS.clear()
        return
    current = now or datetime.now(timezone.utc)
    expired_pending = [
        key
        for key, (ts, _) in HUBJECT_PENDING_CDRS.items()
        if (current - ts).total_seconds() > ttl
    ]
    for key in expired_pending:
        HUBJECT_PENDING_CDRS.pop(key, None)
    expired_active = [
        tx_id
        for tx_id, (ts, _) in HUBJECT_ACTIVE_CDRS.items()
        if (current - ts).total_seconds() > ttl
    ]
    for tx_id in expired_active:
        HUBJECT_ACTIVE_CDRS.pop(tx_id, None)


def _set_pending_hubject_cdr(
    chargepoint_id: str,
    connector_id: int,
    metadata: dict[str, Any],
    *,
    now: datetime | None = None,
) -> None:
    if connector_id <= 0:
        return
    key = (str(chargepoint_id), int(connector_id))
    timestamp = now or datetime.now(timezone.utc)
    _prune_hubject_cdr_cache(timestamp)
    HUBJECT_PENDING_CDRS[key] = (timestamp, metadata)


def _pop_pending_hubject_cdr(
    chargepoint_id: str, connector_id: int, *, now: datetime | None = None
) -> dict[str, Any] | None:
    if connector_id <= 0:
        return None
    _prune_hubject_cdr_cache(now)
    entry = HUBJECT_PENDING_CDRS.pop((str(chargepoint_id), int(connector_id)), None)
    if not entry:
        return None
    _, metadata = entry
    return metadata


def _record_active_hubject_cdr(
    tx_id: int,
    context: dict[str, Any],
    *,
    now: datetime | None = None,
) -> None:
    timestamp = now or datetime.now(timezone.utc)
    _prune_hubject_cdr_cache(timestamp)
    HUBJECT_ACTIVE_CDRS[tx_id] = (timestamp, context)


def _pop_active_hubject_cdr(
    tx_id: int, *, now: datetime | None = None
) -> dict[str, Any] | None:
    _prune_hubject_cdr_cache(now)
    entry = HUBJECT_ACTIVE_CDRS.pop(tx_id, None)
    if not entry:
        return None
    _, context = entry
    return context


def _prune_pending_start_methods(now: datetime | None = None) -> None:
    ttl = max(HUBJECT_REMOTE_START_TTL, 0)
    if ttl == 0:
        PENDING_START_METHODS.clear()
        return
    current = now or datetime.now(timezone.utc)
    expired = [
        key
        for key, (ts, _) in PENDING_START_METHODS.items()
        if (current - ts).total_seconds() > ttl
    ]
    for key in expired:
        PENDING_START_METHODS.pop(key, None)


def _set_pending_start_method(
    chargepoint_id: str, connector_id: int | None, method: str, *, now: datetime | None = None
) -> None:
    if connector_id is None:
        return
    try:
        connector_int = int(connector_id)
    except (TypeError, ValueError):
        return
    if connector_int <= 0:
        return
    station_key = _normalize_chargepoint_id(chargepoint_id)
    if not station_key:
        return
    timestamp = now or datetime.now(timezone.utc)
    _prune_pending_start_methods(timestamp)
    PENDING_START_METHODS[(station_key, connector_int)] = (
        timestamp,
        str(method).strip().lower(),
    )


def _pop_pending_start_method(
    chargepoint_id: str, connector_id: int | None, *, now: datetime | None = None
) -> str | None:
    if connector_id is None:
        return None
    try:
        connector_int = int(connector_id)
    except (TypeError, ValueError):
        return None
    if connector_int <= 0:
        return None
    station_key = _normalize_chargepoint_id(chargepoint_id)
    if not station_key:
        return None
    _prune_pending_start_methods(now)
    entry = PENDING_START_METHODS.pop((station_key, connector_int), None)
    if not entry:
        return None
    _, method = entry
    return method


def _normalize_start_method(value: str | None) -> str | None:
    if not isinstance(value, str):
        return None
    text = value.strip().lower()
    if text in {"remote", "local"}:
        return text
    return None


def _finalize_transaction_start_method(
    transaction_entry: dict[str, Any], pending_method: str | None = None
) -> str:
    method = None
    if transaction_entry.get("hubject"):
        method = "remote"
    else:
        normalized_pending = _normalize_start_method(pending_method)
        if normalized_pending:
            method = normalized_pending
        else:
            existing = _normalize_start_method(transaction_entry.get("start_method"))
            if existing:
                method = existing
    if method is None:
        method = "local"
    transaction_entry["start_method"] = method
    return method


def _attach_hubject_metadata_to_transaction(
    chargepoint_id: str,
    connector_id_int: int | None,
    tx_id: int,
    transaction_entry: dict[str, Any],
    *,
    now: datetime | None = None,
) -> None:
    if connector_id_int is None or connector_id_int <= 0:
        return
    metadata = _pop_pending_hubject_cdr(
        chargepoint_id,
        connector_id_int,
        now=now,
    )
    if not metadata:
        return
    context: dict[str, Any] = {
        "metadata": metadata,
        "start_info": copy.deepcopy(transaction_entry.get("start_info") or {}),
        "chargepoint_id": chargepoint_id,
        "connector_id": connector_id_int,
        "transaction_id": tx_id,
    }
    if "received_at" in metadata:
        context["pending_received_at"] = metadata.get("received_at")
    transaction_entry["hubject"] = context
    _record_active_hubject_cdr(tx_id, context, now=now)


def _ensure_evse_identifier(
    chargepoint_id: str,
    connector_id_int: int | None,
    existing: str | None,
) -> str | None:
    if existing:
        return existing
    if connector_id_int is None:
        return None
    try:
        return _resolve_evse_identifier(chargepoint_id, connector_id_int)
    except Exception:
        return None


def _build_hubject_cdr_payload(
    context: dict[str, Any],
    start_info: dict[str, Any],
    stop_info: dict[str, Any],
) -> dict[str, Any] | None:
    metadata = context.get("metadata") or {}
    cdr: dict[str, Any] = {
        "SessionID": metadata.get("session_id"),
        "CPOPartnerSessionID": metadata.get("cpo_partner_session_id"),
        "EMPPartnerSessionID": metadata.get("emp_partner_session_id"),
        "OperatorID": metadata.get("operator_id"),
        "ProviderID": metadata.get("provider_id"),
        "ChargePointID": context.get("chargepoint_id"),
        "ConnectorID": context.get("connector_id"),
        "TransactionID": context.get("transaction_id"),
        "EvseID": stop_info.get("evseId") or start_info.get("evseId"),
        "Identification": metadata.get("identification"),
        "Token": metadata.get("token"),
        "RemoteStartPayload": metadata.get("raw_payload"),
        "RemoteRequest": metadata.get("raw_request"),
        "PendingReceivedAt": context.get("pending_received_at"),
        "StartInfo": start_info or {},
        "StopInfo": stop_info or {},
    }
    start_timestamp = start_info.get("timestamp")
    stop_timestamp = stop_info.get("timestamp")
    if start_timestamp:
        cdr["StartTimestamp"] = start_timestamp
    if stop_timestamp:
        cdr["StopTimestamp"] = stop_timestamp
    meter_start = start_info.get("meterStart")
    if meter_start is not None:
        cdr["MeterStart"] = meter_start
    meter_stop = stop_info.get("meterStop")
    if meter_stop is not None:
        cdr["MeterStop"] = meter_stop
    filtered = {
        key: value
        for key, value in cdr.items()
        if value not in (None, "") or key in {"StartInfo", "StopInfo"}
    }
    if not (
        filtered.get("SessionID")
        or filtered.get("CPOPartnerSessionID")
        or filtered.get("EMPPartnerSessionID")
    ):
        # Without at least one session identifier there is little value in the CDR payload.
        # Avoid sending empty payloads.
        return None
    return {"Cdr": filtered}


def _get_hubject_cdr_endpoint() -> str | None:
    cfg = CONFIG.get("hubject_api")
    if not isinstance(cfg, Mapping):
        return None
    endpoint = cfg.get("cdr_endpoint")
    if endpoint:
        return str(endpoint)
    host = cfg.get("host")
    if not host:
        return None
    port = cfg.get("port")
    try:
        port_int = int(port) if port not in (None, "") else None
    except (TypeError, ValueError):
        port_int = None
    scheme = "https" if port_int in (None, 443) else "http"
    base = f"{scheme}://{host}"
    if port_int and port_int not in (80, 443):
        base = f"{base}:{port_int}"
    path = cfg.get("cdr_path") or "/Cdrmgmt/Send"
    path_str = str(path)
    if not path_str.startswith("/"):
        path_str = f"/{path_str}"
    return f"{base}{path_str}"


def _get_hubject_authorize_endpoint() -> str | None:
    cfg = CONFIG.get("hubject_api")
    if not isinstance(cfg, Mapping):
        return None
    endpoint = cfg.get("authorize_start_endpoint")
    if endpoint:
        return str(endpoint)
    host = cfg.get("host")
    port_int: int | None = None
    if host:
        port = cfg.get("port")
        try:
            port_int = int(port) if port not in (None, "") else None
        except (TypeError, ValueError):
            port_int = None
        scheme = "https" if port_int in (None, 443) else "http"
        base_url = f"{scheme}://{host}"
        if port_int and port_int not in (80, 443):
            base_url = f"{base_url}:{port_int}"
    else:
        base_url = cfg.get("base_url") or cfg.get("ocpp_base_url")
        if not base_url:
            return None
    base_url_str = str(base_url).rstrip("/")
    path = cfg.get("authorize_start_path") or "/Authorization/AuthorizeStart"
    path_str = str(path)
    if not path_str.startswith("/"):
        path_str = f"/{path_str}"
    return f"{base_url_str}{path_str}"


def _get_hubject_authorize_stop_endpoint() -> str | None:
    cfg = CONFIG.get("hubject_api")
    if not isinstance(cfg, Mapping):
        return None
    endpoint = cfg.get("authorize_stop_endpoint")
    if endpoint:
        return str(endpoint)
    host = cfg.get("host")
    port_int: int | None = None
    if host:
        port = cfg.get("port")
        try:
            port_int = int(port) if port not in (None, "") else None
        except (TypeError, ValueError):
            port_int = None
        scheme = "https" if port_int in (None, 443) else "http"
        base_url = f"{scheme}://{host}"
        if port_int and port_int not in (80, 443):
            base_url = f"{base_url}:{port_int}"
    else:
        base_url = cfg.get("base_url") or cfg.get("ocpp_base_url")
        if not base_url:
            return None
    base_url_str = str(base_url).rstrip("/")
    path = cfg.get("authorize_stop_path") or "/Authorization/AuthorizeStop"
    path_str = str(path)
    if not path_str.startswith("/"):
        path_str = f"/{path_str}"
    return f"{base_url_str}{path_str}"


def _build_hubject_api_ssl_context() -> ssl.SSLContext | bool | None:
    cfg = CONFIG.get("hubject_api")
    if not isinstance(cfg, Mapping):
        return None
    verify_tls = cfg.get("cdr_verify_tls")
    if verify_tls is None:
        verify_tls = cfg.get("ocpp_verify_tls", True)
    if not verify_tls:
        return False
    ca_bundle = cfg.get("ca_bundle")
    try:
        context = (
            ssl.create_default_context(cafile=str(ca_bundle))
            if ca_bundle
            else ssl.create_default_context()
        )
    except Exception:
        logging.debug("Failed to construct Hubject API SSL context", exc_info=True)
        return None
    certfile = cfg.get("certfile")
    if certfile:
        keyfile = cfg.get("keyfile")
        try:
            context.load_cert_chain(str(certfile), keyfile=str(keyfile) if keyfile else None)
        except Exception:
            logging.debug("Failed to load Hubject API client certificate", exc_info=True)
    return context


async def _send_hubject_cdr_via_api(payload: Mapping[str, Any]) -> None:
    endpoint = _get_hubject_cdr_endpoint()
    if not endpoint:
        logging.debug("Hubject CDR endpoint not configured; skipping dispatch")
        return
    cfg = CONFIG.get("hubject_api")
    timeout_value: float | int | None = None
    if isinstance(cfg, Mapping):
        timeout_value = cfg.get("cdr_timeout") or cfg.get("ocpp_timeout")
    try:
        timeout = ClientTimeout(total=float(timeout_value) if timeout_value else 10.0)
    except (TypeError, ValueError):
        timeout = ClientTimeout(total=10.0)
    ssl_context = _build_hubject_api_ssl_context()
    headers = {"Content-Type": "application/json"}
    try:
        async with ClientSession(timeout=timeout) as session:
            async with session.post(endpoint, json=payload, ssl=ssl_context, headers=headers) as resp:
                text = await resp.text()
                if resp.status >= 400:
                    logging.warning(
                        "Hubject CDR submission failed with HTTP %s for %s: %s",
                        resp.status,
                        endpoint,
                        text,
                    )
                else:
                    logging.info(
                        "Hubject CDR submission succeeded with HTTP %s for %s",
                        resp.status,
                        endpoint,
                    )
    except Exception:
        logging.exception("Failed to submit Hubject CDR to %s", endpoint)


async def _query_hubject_api_authorize_start(
    evse_id: str,
    rfid: str,
    *,
    chargepoint_id: str | None = None,
) -> tuple[str | None, Mapping[str, Any] | None]:
    endpoint = _get_hubject_authorize_endpoint()
    if not endpoint:
        logging.debug("Hubject authorize-start endpoint not configured; skipping relay call")
        return None, None
    cfg = CONFIG.get("hubject_api")
    timeout_value: float | int | None = None
    if isinstance(cfg, Mapping):
        timeout_value = cfg.get("authorize_start_timeout") or cfg.get("ocpp_timeout")
    try:
        timeout = ClientTimeout(total=float(timeout_value) if timeout_value else 5.0)
    except (TypeError, ValueError):
        timeout = ClientTimeout(total=5.0)
    ssl_context = _build_hubject_api_ssl_context()
    payload = {"EVSEId": evse_id, "RFID": rfid}
    headers = {"Content-Type": "application/json"}
    async with ClientSession(timeout=timeout) as session:
        async with session.post(endpoint, json=payload, ssl=ssl_context, headers=headers) as resp:
            text = await resp.text()
            if resp.status >= 400:
                logging.warning(
                    "Hubject authorize-start failed with HTTP %s for %s: %s",
                    resp.status,
                    endpoint,
                    text,
                )
                return None, None
            try:
                data = await resp.json(content_type=None)
            except Exception:
                logging.warning(
                    "Hubject authorize-start returned invalid JSON for %s: %s",
                    endpoint,
                    text,
                )
                return None, None
    mapped = _map_hubject_status_to_ocpp(data)
    try:
        serialized_response = json.dumps(data, ensure_ascii=False, sort_keys=True)
    except (TypeError, ValueError):
        serialized_response = repr(data)
    logging.debug(
        "Hubject authorize-start response from hubject_api for %s/%s: mapped=%s payload=%s",
        evse_id,
        rfid,
        mapped,
        serialized_response,
    )
    response_mapping: Mapping[str, Any] | None
    if isinstance(data, Mapping):
        response_mapping = data
        if mapped and mapped.lower() == "accepted":
            try:
                _record_pending_hubject_authorization(
                    chargepoint_id,
                    rfid,
                    payload,
                    data,
                    now=datetime.now(timezone.utc),
                )
            except Exception:
                logging.debug(
                    "Failed to store pending Hubject authorization for %s", evse_id,
                    exc_info=True,
                )
    else:
        response_mapping = None
    return mapped, response_mapping


async def _notify_hubject_authorize_stop(
    context: Mapping[str, Any],
    stop_payload: Mapping[str, Any],
    stop_time: datetime,
) -> None:
    endpoint = _get_hubject_authorize_stop_endpoint()
    if not endpoint:
        logging.debug("Hubject authorize-stop endpoint not configured; skipping notification")
        return
    cfg = CONFIG.get("hubject_api")
    timeout_value: float | int | None = None
    if isinstance(cfg, Mapping):
        timeout_value = (
            cfg.get("authorize_stop_timeout")
            or cfg.get("authorize_start_timeout")
            or cfg.get("ocpp_timeout")
        )
    try:
        timeout = ClientTimeout(total=float(timeout_value) if timeout_value else 5.0)
    except (TypeError, ValueError):
        timeout = ClientTimeout(total=5.0)
    metadata = context.get("metadata") if isinstance(context, Mapping) else None
    if not isinstance(metadata, Mapping):
        metadata = {}
    chargepoint_id = context.get("chargepoint_id") if isinstance(context, Mapping) else None
    connector_id = context.get("connector_id") if isinstance(context, Mapping) else None
    evse_id = metadata.get("evse_id") or stop_payload.get("evseId")
    if not evse_id:
        evse_id = _ensure_evse_identifier(
            chargepoint_id or "",
            connector_id if isinstance(connector_id, int) else None,
            stop_payload.get("evseId"),
        )
    identification = metadata.get("identification")
    token = metadata.get("token") or metadata.get("id_tag") or stop_payload.get("idTag")
    if identification is None and token:
        identification = {"RFIDMifareFamilyIdentification": {"UID": str(token)}}
    if not evse_id or identification is None:
        logging.debug(
            "Skipping Hubject authorize-stop notification due to missing EVSE or identification"
        )
        return
    request_payload: dict[str, Any] = {
        "EVSEId": evse_id,
        "Identification": identification,
    }
    session_id = metadata.get("session_id")
    cpo_session_id = metadata.get("cpo_partner_session_id") or session_id
    emp_session_id = metadata.get("emp_partner_session_id")
    operator_id = metadata.get("operator_id")
    provider_id = metadata.get("provider_id")
    transaction_id = metadata.get("transaction_id") or stop_payload.get("transactionId")
    if session_id:
        request_payload["SessionID"] = session_id
    if cpo_session_id:
        request_payload["CPOPartnerSessionID"] = cpo_session_id
    if emp_session_id:
        request_payload["EMPPartnerSessionID"] = emp_session_id
    if operator_id:
        request_payload["OperatorID"] = operator_id
    if provider_id:
        request_payload["ProviderID"] = provider_id
    if transaction_id:
        request_payload["TransactionID"] = transaction_id
    meter_stop = stop_payload.get("meterStop")
    if meter_stop is not None:
        request_payload["MeterStop"] = meter_stop
    reason = stop_payload.get("reason")
    if reason:
        request_payload["Reason"] = reason
    request_payload["AuthorizationStop"] = {
        "SessionID": cpo_session_id or session_id,
        "Timestamp": _format_timestamp_value(stop_time) or _format_timestamp_value(datetime.now(timezone.utc)),
    }
    ssl_context = _build_hubject_api_ssl_context()
    headers = {"Content-Type": "application/json"}
    try:
        async with ClientSession(timeout=timeout) as session:
            async with session.post(
                endpoint,
                json=request_payload,
                ssl=ssl_context,
                headers=headers,
            ) as resp:
                text = await resp.text()
                if resp.status >= 400:
                    logging.warning(
                        "Hubject authorize-stop notification failed with HTTP %s for %s: %s",
                        resp.status,
                        endpoint,
                        text,
                    )
                else:
                    logging.info(
                        "Hubject authorize-stop notification succeeded with HTTP %s for %s",
                        resp.status,
                        endpoint,
                    )
    except Exception:
        logging.exception("Failed to notify Hubject authorize-stop at %s", endpoint)


def _schedule_hubject_cdr_submission(
    chargepoint_id: str,
    connector_id_int: int | None,
    tx_id: int | None,
    transaction_info: dict[str, Any] | None,
    payload: Mapping[str, Any],
    stop_time: datetime,
    meter_stop: Any,
) -> None:
    if tx_id is not None:
        active_context = _pop_active_hubject_cdr(tx_id)
    else:
        active_context = None
    info_context = transaction_info.get("hubject") if transaction_info else None
    context = info_context or active_context
    if context is None:
        return
    start_info: dict[str, Any] = {}
    if transaction_info and transaction_info.get("start_info"):
        start_info = copy.deepcopy(transaction_info.get("start_info"))
    elif context.get("start_info"):
        start_info = copy.deepcopy(context.get("start_info"))
    connector_id_resolved = connector_id_int
    if connector_id_resolved is None:
        connector_id_resolved = transaction_info.get("connector_id_int") if transaction_info else None
    if connector_id_resolved is None:
        connector_id_resolved = context.get("connector_id")
    stop_info = dict(payload)
    stop_info["chargepoint_id"] = chargepoint_id
    if connector_id_resolved is not None:
        stop_info["connectorId"] = connector_id_resolved
    elif transaction_info and transaction_info.get("connector_id") is not None:
        stop_info["connectorId"] = transaction_info.get("connector_id")
    timestamp = payload.get("timestamp")
    stop_info["timestamp"] = timestamp if isinstance(timestamp, str) and timestamp else _format_timestamp_value(stop_time)
    if meter_stop is not None:
        stop_info["meterStop"] = meter_stop
    if start_info.get("meterStart") is not None and "meterStart" not in stop_info:
        stop_info["meterStart"] = start_info.get("meterStart")
    if start_info.get("idTag") and not stop_info.get("idTag"):
        stop_info["idTag"] = start_info.get("idTag")
    if start_info.get("timestamp") and not stop_info.get("startTimestamp"):
        stop_info["startTimestamp"] = start_info.get("timestamp")
    evse_id = _ensure_evse_identifier(
        chargepoint_id,
        connector_id_resolved,
        start_info.get("evseId") or stop_info.get("evseId"),
    )
    if evse_id:
        start_info.setdefault("evseId", evse_id)
        stop_info["evseId"] = evse_id
    cdr_payload = _build_hubject_cdr_payload(context, start_info, stop_info)
    if not cdr_payload:
        return
    try:
        task = asyncio.create_task(_send_hubject_cdr_via_api(cdr_payload))
    except Exception:
        logging.debug(
            "Failed to schedule Hubject CDR submission for %s", chargepoint_id,
            exc_info=True,
        )
    else:
        # Ensure scheduled task runs even if not awaited directly.
        task.add_done_callback(lambda fut: None)


def _normalize_availability_value(value: Any) -> str | None:
    if value is None:
        return None
    lowered = str(value).strip().lower()
    if lowered == "operative":
        return AVAILABILITY_OPERATIVE
    if lowered == "inoperative":
        return AVAILABILITY_INOPERATIVE
    return None


def _record_availability(station_id: str, availability: Any) -> None:
    normalized = _normalize_availability_value(availability)
    if normalized:
        availability_state[station_id] = normalized
    elif availability is None:
        availability_state.pop(station_id, None)


def _availability_from_status_notification(status: Any) -> str | None:
    if status is None:
        return None
    normalized = str(status).strip().lower()
    if normalized in {
        "available",
        "preparing",
        "charging",
        "suspendedevse",
        "suspendedev",
        "finishing",
        "reserved",
    }:
        return AVAILABILITY_OPERATIVE
    if normalized in {"unavailable", "faulted", "outofservice"}:
        return AVAILABILITY_INOPERATIVE
    return None


def _extract_evse_id_from_configuration_payload(data: Any) -> str | None:
    if isinstance(data, Mapping):
        for candidate in ("EVSEId", "EVSEID", "evseId", "evseid"):
            value = data.get(candidate)
            if value:
                return str(value)

        config_entries = data.get("configurationKey")
        if isinstance(config_entries, list):
            for entry in config_entries:
                if not isinstance(entry, Mapping):
                    continue
                key = entry.get("key") or entry.get("Key")
                if key and str(key).strip().lower() == "evseid":
                    value = entry.get("value") or entry.get("Value")
                    if value:
                        return str(value)

        for value in data.values():
            candidate = _extract_evse_id_from_configuration_payload(value)
            if candidate:
                return candidate

    if isinstance(data, list):
        for item in data:
            candidate = _extract_evse_id_from_configuration_payload(item)
            if candidate:
                return candidate

    return None


def _update_evse_id_cache(chargepoint_id: str, configuration: Any) -> None:
    evse_id = _extract_evse_id_from_configuration_payload(configuration)
    if evse_id:
        _evse_id_cache[chargepoint_id] = evse_id


def _load_evse_id_from_db(chargepoint_id: str) -> str | None:
    try:
        conn = get_db_conn()
        with conn.cursor() as cur:
            cur.execute(
                "SELECT configuration_json FROM op_server_cp_config WHERE chargepoint_id=%s ORDER BY id DESC LIMIT 1",
                (chargepoint_id,),
            )
            row = cur.fetchone()
    except Exception:
        logging.debug(
            "Failed loading EVSE ID for %s from database", chargepoint_id, exc_info=True
        )
        return None
    finally:
        try:
            conn.close()
        except Exception:
            pass

    if not row:
        return None

    if isinstance(row, Mapping):
        raw = row.get("configuration_json")
    else:
        raw = row[0]

    if raw is None:
        return None

    if isinstance(raw, (bytes, bytearray)):
        raw = raw.decode("utf-8", errors="ignore")

    if isinstance(raw, str):
        try:
            data = json.loads(raw)
        except Exception:
            logging.debug(
                "Failed parsing EVSE configuration JSON for %s", chargepoint_id, exc_info=True
            )
            return None
    else:
        data = raw

    evse_id = _extract_evse_id_from_configuration_payload(data)
    if evse_id:
        _evse_id_cache[chargepoint_id] = evse_id
    return evse_id


def _get_or_load_evse_id(chargepoint_id: str) -> str | None:
    cached = _evse_id_cache.get(chargepoint_id)
    if cached:
        return cached
    return _load_evse_id_from_db(chargepoint_id)


def _resolve_evse_identifier(chargepoint_id: str, connector_id: int) -> str:
    evse_id = _get_or_load_evse_id(chargepoint_id)
    if evse_id:
        if connector_id > 0:
            return f"{evse_id}#{connector_id}"
        return evse_id
    if connector_id > 0:
        return f"{chargepoint_id}#{connector_id}"
    return chargepoint_id


def _split_evse_identifier(evse_identifier: str) -> tuple[str, int | None]:
    normalized = str(evse_identifier or "").strip()
    connector_id: int | None = None
    if "#" in normalized:
        base, _, suffix = normalized.partition("#")
        normalized = base.strip()
        try:
            connector_id = int(suffix.strip())
        except (TypeError, ValueError):
            connector_id = None
    return normalized, connector_id


def _find_chargepoint_by_evse(evse_id: str) -> str | None:
    normalized = str(evse_id or "").strip()
    if not normalized:
        return None

    for chargepoint_id, cached_evse in _evse_id_cache.items():
        if cached_evse and cached_evse.strip().lower() == normalized.lower():
            return chargepoint_id

    try:
        conn = get_db_conn()
    except Exception:
        logging.debug(
            "Database unavailable - cannot resolve EVSE %s", normalized, exc_info=True
        )
        return None

    try:
        with conn.cursor() as cur:
            cur.execute(
                """
                SELECT chargepoint_id
                FROM cp_server_cp_metadata
                WHERE evse_id=%s
                ORDER BY updated_at DESC, id DESC
                LIMIT 1
                """,
                (normalized,),
            )
            row = cur.fetchone()
    except Exception:
        logging.debug(
            "Failed resolving chargepoint for EVSE %s", normalized, exc_info=True
        )
        return None
    finally:
        try:
            conn.close()
        except Exception:
            pass

    if not row:
        return None

    if isinstance(row, Mapping):
        chargepoint_id = row.get("chargepoint_id")
    else:
        chargepoint_id = row[0]

    if not chargepoint_id:
        return None

    chargepoint_id_str = str(chargepoint_id)
    _evse_id_cache[chargepoint_id_str] = normalized
    return chargepoint_id_str


def _map_ocpp_status_to_hubject(status: str) -> str | None:
    normalized = str(status or "").strip().lower()
    if not normalized:
        return None
    if normalized in {"available"}:
        return "Available"
    if normalized in {
        "charging",
        "preparing",
        "occupied",
        "finishing",
        "suspendedev",
        "suspendedevse",
        "reserved",
    }:
        return "Occupied"
    if normalized in {"faulted", "unavailable", "outofservice"}:
        return "OutOfService"
    return None


async def process_status_notification_for_hubject(
    chargepoint_id: str,
    connector_id: int,
    status: str,
    status_timestamp: datetime | None,
) -> None:
    if HUBJECT_CLIENT is None:
        return

    mapped_status = _map_ocpp_status_to_hubject(status)
    if mapped_status is None:
        return

    cache_key = (chargepoint_id, connector_id)
    if _evse_status_cache.get(cache_key) == mapped_status:
        return

    _evse_status_cache[cache_key] = mapped_status

    timestamp = status_timestamp or datetime.now(timezone.utc)
    if timestamp.tzinfo is None:
        timestamp = timestamp.replace(tzinfo=timezone.utc)
    else:
        timestamp = timestamp.astimezone(timezone.utc)

    evse_identifier = _resolve_evse_identifier(chargepoint_id, connector_id)

    try:
        if mapped_status == "Available":
            _, status_code, _ = await HUBJECT_CLIENT.set_evse_available(
                evse_identifier,
                timestamp=timestamp,
            )
        elif mapped_status == "Occupied":
            _, status_code, _ = await HUBJECT_CLIENT.set_evse_occupied(
                evse_identifier,
                timestamp=timestamp,
            )
        else:
            _, status_code, _ = await HUBJECT_CLIENT.set_evse_status(
                evse_identifier,
                mapped_status,
                timestamp=timestamp,
            )
    except HubjectClientError as exc:
        logging.warning(
            "Hubject EVSE status update failed for %s (%s) with HTTP %s",
            evse_identifier,
            mapped_status,
            exc.status,
        )
    except Exception:
        logging.exception(
            "Unexpected error notifying Hubject about %s status for %s",
            mapped_status,
            evse_identifier,
        )
    else:
        logging.info(
            "Hubject EVSE status update succeeded for %s (%s) with HTTP %s",
            evse_identifier,
            mapped_status,
            status_code,
        )


async def _push_hubject_transaction_status(
    chargepoint_id: str,
    connector_id: int,
    status: str,
    status_timestamp: datetime | None,
) -> None:
    if HUBJECT_CLIENT is None:
        return

    if connector_id <= 0:
        return

    try:
        await process_status_notification_for_hubject(
            chargepoint_id,
            connector_id,
            status,
            status_timestamp,
        )
    except Exception:
        logging.debug(
            "Failed to push Hubject transaction status %s for %s connector %s",
            status,
            chargepoint_id,
            connector_id,
            exc_info=True,
        )


class ChargePointLoggerAdapter(logging.LoggerAdapter):
    """Append the charge point ID to websocket server log messages."""

    def process(self, msg, kwargs):  # type: ignore[override]
        msg, kwargs = super().process(msg, kwargs)
        cp_id = self.extra.get("chargepoint_id")
        if cp_id:
            msg = f"{msg} ({cp_id})"
        return msg, kwargs


async def log_ws_request(path, request_headers):
    """Log incoming websocket handshake requests.

    `process_request` callbacks in :func:`websockets.serve` receive the path
    string and a ``Headers`` instance. The previous implementation expected an
    object with a ``path`` attribute which caused an ``AttributeError`` at
    runtime.  Accept the correct parameters and log the path directly.
    """
    logging.info("WebSocket handshake for %s", path)
    return None


class LoggingWebSocketServerProtocol(WebSocketServerProtocol):
    async def read_http_request(self):
        try:
            path, headers = await read_request(self.reader)
            peer = self.transport.get_extra_info("peername")
            # Remember the peer so we can reference it when the connection is
            # eventually closed.
            self.peer = peer
            # Store the request information so handlers can reliably access
            # the connection path and charge point ID.
            self.path = path
            self.request = type("Request", (), {"path": path, "headers": headers})()
            cleaned = urlparse(path).path.strip("/") if path else ""
            cp_id = cleaned.split("/")[-1] if cleaned else ""
            self.chargepoint_id = cp_id
            self.logger = ChargePointLoggerAdapter(self.logger, {"chargepoint_id": cp_id})
            subprotocol = headers.get("Sec-WebSocket-Protocol")
            self.logger.info(
                "Incoming request from %s: GET %s [Sec-WebSocket-Protocol: %s]",
                peer,
                path,
                subprotocol or "none",
            )

        except Exception as exc:
            raise InvalidMessage("did not receive a valid HTTP request") from exc
        return path, headers

    def connection_lost(self, exc):
        """Log websocket connection closures with extra context."""
        code = getattr(self, "close_code", None)
        reason = getattr(self, "close_reason", "")
        peer = getattr(self, "peer", None) or self.transport.get_extra_info("peername")
        self.logger.info(
            "Connection closed: %s - %s (peer: %s)", code, reason or "", peer
        )
        cp_id = getattr(self, "chargepoint_id", None)
        if cp_id:
            connected.pop(cp_id, None)
            connected_since.pop(cp_id, None)
            last_seen.pop(cp_id, None)
            boot_notifications.discard(cp_id)
            boot_requested.discard(cp_id)
        super().connection_lost(exc)


def handle_asyncio_exception(loop, context):
    exc = context.get("exception")
    if isinstance(exc, (EOFError, WebSocketException)):
        logging.debug("Ignored websocket handshake error: %s", exc)
        return
    loop.default_exception_handler(context)


def get_db_conn():
    return pymysql.connect(
        host=mysql_cfg.get("host"),
        user=mysql_cfg.get("user"),
        password=mysql_cfg.get("password"),
        db=mysql_cfg.get("db"),
        charset=mysql_cfg.get("charset", "utf8"),
    )


class _PymysqlConnectionPool:
    def __init__(self, maxsize: int):
        self._maxsize = maxsize
        self._available: "queue.SimpleQueue[Any]" = queue.SimpleQueue()
        self._created = 0
        self._lock = threading.Lock()

    @property
    def created_count(self) -> int:
        with self._lock:
            return self._created

    @property
    def available_count(self) -> int:
        try:
            return self._available.qsize()
        except Exception:
            return 0

    def _discard_connection(self, conn: Any):
        if conn is None:
            return
        try:
            conn.close()
        except Exception:
            pass
        with self._lock:
            if self._created > 0:
                self._created -= 1

    def acquire(self):
        while True:
            try:
                conn = self._available.get_nowait()
            except queue.Empty:
                conn = None

            if conn is None:
                with self._lock:
                    if self._created < self._maxsize:
                        self._created += 1
                        conn = get_db_conn()
                    else:
                        conn = self._available.get()

            try:
                conn.ping(reconnect=True)
                return conn
            except Exception:
                self._discard_connection(conn)

    def release(self, conn: Any):
        if conn is None:
            return
        try:
            conn.ping(reconnect=True)
            self._available.put(conn)
            return
        except Exception:
            self._discard_connection(conn)

    def close_all(self):
        while True:
            try:
                conn = self._available.get_nowait()
            except queue.Empty:
                break
            try:
                conn.close()
            except Exception:
                pass


async def _maybe_await(value: Any) -> Any:
    if inspect.isawaitable(value):
        return await value
    return value


_DB_CONNECTION_ERRORS = (pymysql.err.OperationalError, pymysql.err.InterfaceError)
_DB_CONNECTION_ERROR_CODES = {2006, 2013, 2014, 2055}


def _is_connection_error(exc: BaseException) -> bool:
    if not isinstance(exc, _DB_CONNECTION_ERRORS):
        return False
    args = getattr(exc, "args", ())
    if args and isinstance(args[0], int) and args[0] in _DB_CONNECTION_ERROR_CODES:
        return True
    message = " ".join(str(part) for part in args)
    return "Lost connection" in message or "gone away" in message or "Not connected" in message


async def _reset_shared_db_pool(*, close_async: bool = True, close_sync: bool = True):
    global _db_pool, _db_sync_pool, _db_executor, _sync_log_conn, _ocpp_log_pool
    if close_async and _db_pool is not None:
        try:
            _db_pool.close()
        except Exception:
            pass
        try:
            await _db_pool.wait_closed()
        except Exception:
            pass
        _db_pool = None
        _ocpp_log_pool = None
    if close_sync and _db_sync_pool is not None:
        try:
            _db_sync_pool.close_all()
        except Exception:
            pass
        _db_sync_pool = None
    if close_sync and _sync_log_conn is not None:
        try:
            _sync_log_conn.close()
        except Exception:
            pass
        _sync_log_conn = None
    if close_sync and _db_executor is not None:
        try:
            _db_executor.shutdown(wait=False, cancel_futures=True)
        except Exception:
            pass
        _db_executor = None


async def _init_shared_db_pool():
    global _db_pool, _db_sync_pool, _db_executor
    if _db_pool is not None or _db_sync_pool is not None:
        return
    if aiomysql is not None:
        try:
            _db_pool = await aiomysql.create_pool(
                host=mysql_cfg.get("host"),
                user=mysql_cfg.get("user"),
                password=mysql_cfg.get("password"),
                db=mysql_cfg.get("db"),
                charset=mysql_cfg.get("charset", "utf8"),
                autocommit=False,
                minsize=1,
                maxsize=MYSQL_POOL_SIZE,
            )
            return
        except Exception:
            logging.warning(
                "Failed to create aiomysql pool, falling back to threaded connections",
                exc_info=True,
            )
    _db_sync_pool = _PymysqlConnectionPool(MYSQL_POOL_SIZE)
    if _db_executor is None:
        _db_executor = concurrent.futures.ThreadPoolExecutor(
            max_workers=min(MYSQL_POOL_SIZE, 10)
        )


async def _with_db_cursor(
    operation,
    *,
    cursor_class: Any | None = None,
    commit: bool = False,
):
    await _init_shared_db_pool()
    resolved_cursor = cursor_class
    if _db_pool is not None and cursor_class is DictCursor and aiomysql is not None:
        try:
            from aiomysql.cursors import DictCursor as AioDictCursor  # type: ignore

            resolved_cursor = AioDictCursor
        except Exception:
            resolved_cursor = cursor_class
    for attempt in range(2):
        if _db_pool is not None:
            try:
                async with _db_pool.acquire() as conn:
                    if resolved_cursor is None:
                        cur_ctx = conn.cursor()
                    else:
                        cur_ctx = conn.cursor(resolved_cursor)
                    async with cur_ctx as cur:
                        result = await operation(cur)
                    if commit:
                        await _maybe_await(conn.commit())
                    return result
            except Exception as exc:
                if _is_connection_error(exc):
                    logging.warning(
                        "Database connection lost (async path), resetting pool (attempt %s): %s",
                        attempt + 1,
                        exc,
                    )
                    await _reset_shared_db_pool(close_async=True, close_sync=False)
                    await _init_shared_db_pool()
                    continue
                raise
        loop = asyncio.get_running_loop()
        assert _db_sync_pool is not None
        if _db_executor is None:
            _db_executor = concurrent.futures.ThreadPoolExecutor(
                max_workers=min(MYSQL_POOL_SIZE, 10)
            )

        def _runner():
            conn = _db_sync_pool.acquire()
            try:
                if resolved_cursor is None:
                    cur_ctx = conn.cursor()
                else:
                    cur_ctx = conn.cursor(resolved_cursor)
                with cur_ctx as cur:
                    result = asyncio.run(operation(cur))
                if commit:
                    conn.commit()
                return result
            finally:
                _db_sync_pool.release(conn)

        try:
            return await loop.run_in_executor(_db_executor, _runner)
        except Exception as exc:
            if _is_connection_error(exc):
                logging.warning(
                    "Database connection lost (sync path), resetting pool (attempt %s): %s",
                    attempt + 1,
                    exc,
                )
                await _reset_shared_db_pool(close_async=False, close_sync=True)
                await _init_shared_db_pool()
                continue
            raise
    raise RuntimeError("database operation failed after reconnect attempts")


async def _cursor_execute(cursor: Any, query: str, params: tuple | list | None = None):
    return await _maybe_await(cursor.execute(query, params or ()))


async def _cursor_fetchone(cursor: Any):
    return await _maybe_await(cursor.fetchone())


async def _cursor_fetchall(cursor: Any):
    return await _maybe_await(cursor.fetchall())


def _cache_entry_valid(entry_timestamp: float, ttl: int) -> bool:
    return (time.time() - entry_timestamp) < ttl


def _get_cached_rfid_status(chargepoint_id: str, card_uuid: str | None) -> str | None:
    if not chargepoint_id or not card_uuid:
        return None
    entry = _RFID_STATUS_CACHE.get((chargepoint_id, card_uuid))
    if not entry:
        return None
    ts, status = entry
    if _cache_entry_valid(ts, _RFID_STATUS_CACHE_TTL):
        return status
    _RFID_STATUS_CACHE.pop((chargepoint_id, card_uuid), None)
    return None


def _set_cached_rfid_status(chargepoint_id: str, card_uuid: str | None, status: str):
    if not chargepoint_id or not card_uuid:
        return
    _RFID_STATUS_CACHE[(chargepoint_id, card_uuid)] = (time.time(), status)


def _get_cached_configuration(chargepoint_id: str, payload: str) -> bool:
    entry = _CONFIG_CACHE.get(chargepoint_id)
    if not entry:
        return False
    ts, cached_payload = entry
    if cached_payload == payload and _cache_entry_valid(ts, _CONFIG_CACHE_TTL):
        return True
    _CONFIG_CACHE.pop(chargepoint_id, None)
    return False


def _set_cached_configuration(chargepoint_id: str, payload: str):
    _CONFIG_CACHE[chargepoint_id] = (time.time(), payload)


def _get_cached_oicp_flag(chargepoint_id: str) -> bool | None:
    entry = _OICP_FLAG_CACHE.get(chargepoint_id)
    if not entry:
        return None
    ts, value = entry
    if _cache_entry_valid(ts, _OICP_FLAG_CACHE_TTL):
        return value
    _OICP_FLAG_CACHE.pop(chargepoint_id, None)
    return None


def _set_cached_oicp_flag(chargepoint_id: str, value: bool):
    _OICP_FLAG_CACHE[chargepoint_id] = (time.time(), value)


def _get_cached_cp_metadata(chargepoint_id: str):
    entry = _CP_METADATA_CACHE.get(chargepoint_id)
    if not entry:
        return None
    ts, metadata, statuses = entry
    if _cache_entry_valid(ts, _CP_METADATA_CACHE_TTL):
        return metadata, statuses
    _CP_METADATA_CACHE.pop(chargepoint_id, None)
    return None


def _set_cached_cp_metadata(chargepoint_id: str, metadata: Optional[dict[str, Any]], statuses: list[dict[str, Any]]):
    _CP_METADATA_CACHE[chargepoint_id] = (time.time(), metadata, statuses)


def _invalidate_cp_metadata_cache(chargepoint_id: str):
    if not chargepoint_id:
        return
    _CP_METADATA_CACHE.pop(chargepoint_id, None)


def _coerce_bool(value, default: bool) -> bool:
    if value is None:
        return default
    if isinstance(value, bool):
        return value
    if isinstance(value, (int, float)):
        return bool(int(value))
    if isinstance(value, (bytes, bytearray)):
        value = value.decode("utf-8", errors="ignore")
    text = str(value).strip().lower()
    if not text:
        return default
    if text in {"1", "true", "yes", "on", "enabled"}:
        return True
    if text in {"0", "false", "no", "off", "disabled"}:
        return False
    return default


def _coerce_int(value, default: int) -> int:
    if value is None:
        return default
    try:
        candidate = int(str(value).strip())
    except (TypeError, ValueError):
        return default
    return candidate


def _coerce_optional_bool_config(value: Any) -> bool | None:
    if isinstance(value, bool):
        return value
    if value is None:
        return None
    if isinstance(value, (int, float)):
        return bool(int(value))
    if isinstance(value, (bytes, bytearray)):
        value = value.decode("utf-8", errors="ignore")
    text = str(value).strip().lower()
    if not text:
        return None
    if text in {"1", "true", "yes", "on", "enabled"}:
        return True
    if text in {"0", "false", "no", "off", "disabled"}:
        return False
    return None


def _normalize_mail_recipients(value: Any) -> list[str]:
    if isinstance(value, (list, tuple, set)):
        candidates = value
    elif value is None:
        candidates = []
    else:
        candidates = str(value).split(",")
    recipients = [str(item).strip() for item in candidates if str(item).strip()]
    return recipients


_mail_cfg = CONFIG.get("mail", {}) or {}
MAIL_SERVER = _mail_cfg.get("MAIL_SERVER") or _mail_cfg.get("server") or "smtp.ionos.de"
MAIL_PORT = _coerce_int(_mail_cfg.get("MAIL_PORT") or _mail_cfg.get("port"), 587)
_mail_use_tls = _coerce_optional_bool_config(
    _mail_cfg.get("MAIL_USE_TLS") or _mail_cfg.get("use_tls")
)
MAIL_USE_TLS = True if _mail_use_tls is None else _mail_use_tls
_mail_use_ssl = _coerce_optional_bool_config(
    _mail_cfg.get("MAIL_USE_SSL") or _mail_cfg.get("use_ssl")
)
MAIL_USE_SSL = False if _mail_use_ssl is None else _mail_use_ssl
MAIL_USERNAME = _mail_cfg.get("MAIL_USERNAME") or _mail_cfg.get("username")
MAIL_PASSWORD = _mail_cfg.get("MAIL_PASSWORD") or _mail_cfg.get("password")
MAIL_DEFAULT_SENDER = (
    _mail_cfg.get("MAIL_DEFAULT_SENDER") or _mail_cfg.get("default_sender") or MAIL_USERNAME
)
MAIL_RECIPIENTS = _normalize_mail_recipients(
    _mail_cfg.get("MAIL_RECIPIENTS") or _mail_cfg.get("recipients")
)
if not MAIL_RECIPIENTS:
    MAIL_RECIPIENTS = ["seb@neleso.com", "henry.grafvonhelldorff@wall-e.works"]

_extended_session_summary_enabled = _coerce_optional_bool_config(
    _mail_cfg.get("extended_session_summary_enabled")
)
if _extended_session_summary_enabled is None:
    MAIL_EXTENDED_SESSION_SUMMARY_ENABLED = True
else:
    MAIL_EXTENDED_SESSION_SUMMARY_ENABLED = _extended_session_summary_enabled


ENABLE_OCPP_2X = _coerce_bool(
    _runtime_cfg.get("enable_ocpp_2x", CONFIG.get("enable_ocpp_2x")),
    False,
)
OCPP_SUBPROTOCOLS: list[str] = ["ocpp1.6"]
if ENABLE_OCPP_2X:
    OCPP_SUBPROTOCOLS.extend(["ocpp2.0.1", "ocpp2.0"])


def _resolve_ocpp_version_from_subprotocol(subprotocol: str | None) -> str:
    if not subprotocol:
        return "ocpp1.6"
    normalized = str(subprotocol).strip().lower()
    if normalized.startswith("ocpp2"):
        return "ocpp2.x"
    if normalized.startswith("ocpp1.6"):
        return "ocpp1.6"
    return normalized or "ocpp1.6"


async def _create_async_log_pool() -> Any | None:
    await _init_shared_db_pool()
    return _db_pool


def _get_sync_log_conn():
    global _sync_log_conn
    if _sync_log_conn is None:
        _sync_log_conn = get_db_conn()
    else:
        try:
            _sync_log_conn.ping(reconnect=True)
        except Exception:
            _sync_log_conn = get_db_conn()
    return _sync_log_conn


def _prepare_table_if_needed_sync(table_name: str):
    conn = _get_sync_log_conn()
    with conn.cursor() as cur:
        cur.execute(
            f"""
            CREATE TABLE IF NOT EXISTS {table_name} (
                id INT AUTO_INCREMENT PRIMARY KEY,
                chargepoint_id VARCHAR(255),
                ocpp_endpoint VARCHAR(255),
                direction VARCHAR(32),
                message JSON,
                topic VARCHAR(255),
                `timestamp` TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
            """
        )
    conn.commit()


def _flush_ocpp_log_batch_sync(batch: list[dict[str, Any]], prepared_tables: set[str]):
    if not batch:
        return
    table_groups: dict[str, list[tuple]] = {}
    for item in batch:
        ts = item["timestamp"]
        table = f"op_server_messages_{ts.strftime('%y%m')}"
        if table not in prepared_tables:
            _prepare_table_if_needed_sync(table)
            prepared_tables.add(table)
        table_groups.setdefault(table, []).append(
            (
                item["chargepoint_id"],
                item["ocpp_endpoint"],
                item["direction"],
                item["message"],
                item["topic"],
                ts,
            )
        )

    conn = _get_sync_log_conn()
    with conn.cursor() as cur:
        for table, rows in table_groups.items():
            cur.executemany(
                f"INSERT INTO {table} (chargepoint_id, ocpp_endpoint, direction, message, topic, `timestamp`) VALUES (%s, %s, %s, %s, %s, %s)",
                rows,
            )
    conn.commit()


async def _flush_ocpp_log_batch(batch: list[dict[str, Any]]):
    global _ocpp_log_pool, _ocpp_log_executor
    if not batch:
        return
    if _ocpp_log_pool is None and aiomysql is not None:
        _ocpp_log_pool = await _create_async_log_pool()
    if _ocpp_log_pool is not None:
        try:
            await _flush_ocpp_log_batch_async(batch)
            return
        except Exception as exc:
            if _is_connection_error(exc):
                logging.warning(
                    "Database connection lost (async OCPP log path), resetting pool: %s",
                    exc,
                )
                await _reset_shared_db_pool(close_async=True, close_sync=False)
                _ocpp_log_pool = None
            else:
                raise
    if _ocpp_log_executor is None:
        _ocpp_log_executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
    await asyncio.get_running_loop().run_in_executor(
        _ocpp_log_executor, _flush_ocpp_log_batch_sync, batch, _ocpp_log_prepared_tables
    )


async def _ensure_table_async(conn, table_name: str):
    async with conn.cursor() as cur:
        await cur.execute(
            f"""
            CREATE TABLE IF NOT EXISTS {table_name} (
                id INT AUTO_INCREMENT PRIMARY KEY,
                chargepoint_id VARCHAR(255),
                ocpp_endpoint VARCHAR(255),
                direction VARCHAR(32),
                message JSON,
                topic VARCHAR(255),
                `timestamp` TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
            """
        )


async def _flush_ocpp_log_batch_async(batch: list[dict[str, Any]]):
    if not batch or _ocpp_log_pool is None:
        return
    table_groups: dict[str, list[tuple]] = {}
    for item in batch:
        ts = item["timestamp"]
        table = f"op_server_messages_{ts.strftime('%y%m')}"
        table_groups.setdefault(table, []).append(
            (
                item["chargepoint_id"],
                item["ocpp_endpoint"],
                item["direction"],
                item["message"],
                item["topic"],
                ts,
            )
        )

    async with _ocpp_log_pool.acquire() as conn:
        for table, rows in table_groups.items():
            if table not in _ocpp_log_prepared_tables:
                await _ensure_table_async(conn, table)
                _ocpp_log_prepared_tables.add(table)
            async with conn.cursor() as cur:
                await cur.executemany(
                    f"INSERT INTO {table} (chargepoint_id, ocpp_endpoint, direction, message, topic, `timestamp`) VALUES (%s, %s, %s, %s, %s, %s)",
                    rows,
                )
        await conn.commit()


async def _ocpp_log_worker():
    if _ocpp_log_queue is None:
        return
    batch: list[dict[str, Any]] = []
    while True:
        timeout = OCPP_LOG_FLUSH_INTERVAL if not batch else 0.01
        try:
            item = await asyncio.wait_for(_ocpp_log_queue.get(), timeout=timeout)  # type: ignore[arg-type]
            batch.append(item)
            _ocpp_log_queue.task_done()
        except asyncio.TimeoutError:
            item = None

        should_stop = _ocpp_log_stop_event.is_set() if _ocpp_log_stop_event else False
        if batch and (len(batch) >= OCPP_LOG_BATCH_SIZE or item is None):
            try:
                await _flush_ocpp_log_batch(batch)
            except Exception:
                logging.debug("Failed flushing OCPP log batch", exc_info=True)
            batch.clear()

        if should_stop:
            if _ocpp_log_queue and _ocpp_log_queue.empty():
                break
    if batch:
        try:
            await _flush_ocpp_log_batch(batch)
        except Exception:
            logging.debug("Failed flushing final OCPP log batch", exc_info=True)


def _prune_ocpp_log_drop_events(now: datetime | None = None) -> None:
    if not _ocpp_log_drop_events:
        return
    current = now or datetime.now(timezone.utc)
    cutoff = current - timedelta(minutes=max(OCPP_LOG_DROP_HISTORY_MINUTES, 1))
    while _ocpp_log_drop_events and _ocpp_log_drop_events[0] < cutoff:
        _ocpp_log_drop_events.popleft()


def _count_recent_ocpp_log_drops(
    window_minutes: float | None = None, *, now: datetime | None = None
) -> int:
    _prune_ocpp_log_drop_events(now)
    if not _ocpp_log_drop_events:
        return 0

    if window_minutes is None or window_minutes <= 0:
        return len(_ocpp_log_drop_events)

    current = now or datetime.now(timezone.utc)
    cutoff = current - timedelta(minutes=window_minutes)
    return sum(1 for ts in _ocpp_log_drop_events if ts >= cutoff)


def _build_ocpp_log_queue_metrics(window_minutes: float | None = None) -> dict[str, Any]:
    now = datetime.now(timezone.utc)
    queue_ref = _ocpp_log_queue
    size = queue_ref.qsize() if queue_ref else 0
    maxsize = queue_ref.maxsize if queue_ref else OCPP_LOG_QUEUE_MAXSIZE
    if maxsize is None:
        maxsize = 0
    remaining_capacity = max(maxsize - size, 0) if maxsize else None
    fill_ratio: float | None = None
    try:
        fill_ratio = size / maxsize if maxsize else None
    except ZeroDivisionError:
        fill_ratio = None

    recent_window = window_minutes
    if recent_window is None:
        recent_window = OCPP_LOG_DROP_DEFAULT_WINDOW_MINUTES

    recent_drop_count = _count_recent_ocpp_log_drops(recent_window, now=now)
    last_drop = _format_timestamp_value(_ocpp_log_last_drop_at)

    return {
        "size": size,
        "maxsize": maxsize,
        "remaining_capacity": remaining_capacity,
        "fill_ratio": fill_ratio,
        "drop_count": _ocpp_log_drop_count,
        "last_drop_at": last_drop,
        "recent_drop_count": recent_drop_count,
        "recent_window_minutes": recent_window,
        "history_retention_minutes": OCPP_LOG_DROP_HISTORY_MINUTES,
        "empty": queue_ref.empty() if queue_ref else True,
    }


async def _start_ocpp_log_worker(loop: asyncio.AbstractEventLoop):
    global _ocpp_log_queue, _ocpp_log_stop_event, _ocpp_log_worker_task, _ocpp_log_pool
    if _ocpp_log_queue is None:
        _ocpp_log_queue = asyncio.Queue(maxsize=OCPP_LOG_QUEUE_MAXSIZE)
    if _ocpp_log_stop_event is None:
        _ocpp_log_stop_event = asyncio.Event()
    if _ocpp_log_pool is None:
        _ocpp_log_pool = await _create_async_log_pool()
    if _ocpp_log_worker_task is None or _ocpp_log_worker_task.done():
        _ocpp_log_worker_task = loop.create_task(_ocpp_log_worker())


async def _shutdown_ocpp_log_worker():
    global _ocpp_log_worker_task, _ocpp_log_pool, _ocpp_log_executor, _sync_log_conn
    if _ocpp_log_stop_event:
        _ocpp_log_stop_event.set()
    if _ocpp_log_worker_task:
        try:
            await _ocpp_log_worker_task
        except asyncio.CancelledError:
            pass
        _ocpp_log_worker_task = None
    if _ocpp_log_pool is not None:
        _ocpp_log_pool.close()
        try:
            await _ocpp_log_pool.wait_closed()
        except Exception:
            pass
        _ocpp_log_pool = None
    if _ocpp_log_executor is not None:
        _ocpp_log_executor.shutdown(wait=False, cancel_futures=True)
        _ocpp_log_executor = None
    if _sync_log_conn is not None:
        try:
            _sync_log_conn.close()
        except Exception:
            pass
        _sync_log_conn = None


def resolve_stale_timeout(heartbeat_interval: Any, base_timeout: int | None = None) -> int:
    """Determine the stale timeout based on heartbeat and base settings."""

    base = (
        _coerce_int(base_timeout, DEFAULT_STALE_TIMEOUT)
        if base_timeout is not None
        else DEFAULT_STALE_TIMEOUT
    )
    if base <= 0:
        base = DEFAULT_STALE_TIMEOUT

    heartbeat_value = _coerce_int(heartbeat_interval, 0)
    if heartbeat_value < 0:
        heartbeat_value = 0

    heartbeat_based_timeout = heartbeat_value * 2 if heartbeat_value else 0
    resolved = max(base, heartbeat_based_timeout)
    if heartbeat_based_timeout and heartbeat_based_timeout > base:
        logging.warning(
            "Configured stale timeout (%s s) is less than twice the heartbeat interval (%s s); using %s s instead",
            base,
            heartbeat_value,
            resolved,
        )
    return resolved


def load_server_startup_config() -> dict[str, Any]:
    config = _default_server_startup_config()
    try:
        conn = get_db_conn()
    except Exception:
        return config

    try:
        with conn.cursor(DictCursor) as cur:
            cur.execute(
                """
                SELECT
                    local_authorize_offline,
                    authorize_remote_tx_requests,
                    local_auth_list_enabled,
                    authorization_cache_enabled,
                    change_availability_operative,
                    enforce_websocket_ping_interval,
                    send_heartbeat_change_on_boot,
                    trigger_meter_values_on_start,
                    trigger_status_notification_on_start,
                    trigger_boot_notification_on_message_before_boot,
                    heartbeat_interval,
                    change_configuration_enabled_1,
                    change_configuration_key_1,
                    change_configuration_value_1,
                    change_configuration_enabled_2,
                    change_configuration_key_2,
                    change_configuration_value_2,
                    change_configuration_enabled_3,
                    change_configuration_key_3,
                    change_configuration_value_3,
                    change_configuration_enabled_4,
                    change_configuration_key_4,
                    change_configuration_value_4,
                    change_configuration_enabled_5,
                    change_configuration_key_5,
                    change_configuration_value_5
                FROM op_server_config
                ORDER BY updated_at DESC, id DESC
                LIMIT 1
                """
            )
            row = cur.fetchone()
    except Exception:
        return config
    finally:
        try:
            conn.close()
        except Exception:
            pass

    if not row:
        return config

    config["local_authorize_offline"] = _coerce_bool(
        row.get("local_authorize_offline"), config["local_authorize_offline"]
    )
    config["authorize_remote_tx_requests"] = _coerce_bool(
        row.get("authorize_remote_tx_requests"),
        config["authorize_remote_tx_requests"],
    )
    config["local_auth_list_enabled"] = _coerce_bool(
        row.get("local_auth_list_enabled"), config["local_auth_list_enabled"]
    )
    config["authorization_cache_enabled"] = _coerce_bool(
        row.get("authorization_cache_enabled"), config["authorization_cache_enabled"]
    )
    config["change_availability_operative"] = _coerce_bool(
        row.get("change_availability_operative"),
        config["change_availability_operative"],
    )
    config["enforce_websocket_ping_interval"] = _coerce_bool(
        row.get("enforce_websocket_ping_interval"),
        config["enforce_websocket_ping_interval"],
    )
    config["send_heartbeat_change_on_boot"] = _coerce_bool(
        row.get("send_heartbeat_change_on_boot"),
        config["send_heartbeat_change_on_boot"],
    )
    config["trigger_meter_values_on_start"] = _coerce_bool(
        row.get("trigger_meter_values_on_start"),
        config["trigger_meter_values_on_start"],
    )
    config["trigger_status_notification_on_start"] = _coerce_bool(
        row.get("trigger_status_notification_on_start"),
        config["trigger_status_notification_on_start"],
    )
    config["trigger_boot_notification_on_message_before_boot"] = _coerce_bool(
        row.get("trigger_boot_notification_on_message_before_boot"),
        config["trigger_boot_notification_on_message_before_boot"],
    )
    heartbeat = _coerce_int(row.get("heartbeat_interval"), config["heartbeat_interval"])
    if heartbeat <= 0:
        heartbeat = config["heartbeat_interval"]
    config["heartbeat_interval"] = heartbeat
    items = config.get("change_configuration_items", [])
    for idx, entry in enumerate(items, start=1):
        enabled = _coerce_bool(
            row.get(f"change_configuration_enabled_{idx}"), entry.get("enabled", False)
        )
        key = row.get(f"change_configuration_key_{idx}")
        value = row.get(f"change_configuration_value_{idx}")
        entry["enabled"] = enabled
        entry["key"] = ("" if key is None else str(key)).strip()
        entry["value"] = ("" if value is None else str(value)).strip()
    return config


def _sanitize_change_configuration_items(
    items: Any,
) -> list[dict[str, Any]]:
    if not isinstance(items, Sequence) or isinstance(items, (str, bytes, bytearray)):
        return []

    sanitized: list[dict[str, Any]] = []
    for idx, item in enumerate(items, start=1):
        if not isinstance(item, Mapping):
            continue
        sanitized.append(
            {
                "index": idx,
                "enabled": bool(item.get("enabled")),
                "key": ("" if item.get("key") is None else str(item.get("key"))).strip(),
                "value": (
                    "" if item.get("value") is None else str(item.get("value"))
                ).strip(),
            }
        )
    return sanitized


def log_server_startup_config(startup_cfg: Mapping[str, Any]) -> None:
    """Log the server startup configuration loaded from ``op_server_config``."""

    normalized: dict[str, Any] = {**startup_cfg}
    normalized["change_configuration_items"] = _sanitize_change_configuration_items(
        startup_cfg.get("change_configuration_items")
    )
        
    logging.info(
        "[Startup] Loaded op_server_config: %s",
        json.dumps(normalized, ensure_ascii=False, sort_keys=True),
    )


def build_server_default_config(startup_cfg: dict[str, Any]) -> dict[str, str]:
    defaults: dict[str, str] = {}
    if startup_cfg.get("local_authorize_offline", True):
        defaults["LocalAuthorizeOffline"] = str(
            _runtime_cfg.get("server_default_LocalAuthorizeOffline", "true")
        ).lower()
    if startup_cfg.get("authorize_remote_tx_requests", True):
        defaults["AuthorizeRemoteTxRequests"] = str(
            _runtime_cfg.get("server_default_AuthorizeRemoteTxRequests", "false")
        ).lower()
    if startup_cfg.get("local_auth_list_enabled", True):
        defaults["LocalAuthListEnabled"] = str(
            _runtime_cfg.get("server_default_LocalAuthListEnabled", "true")
        ).lower()
    if startup_cfg.get("authorization_cache_enabled", True):
        defaults["AuthorizationCacheEnabled"] = str(
            _runtime_cfg.get("server_default_AuthorizationCacheEnabled", "true")
        ).lower()
    for item in startup_cfg.get("change_configuration_items", []):
        try:
            enabled = bool(item.get("enabled"))
        except AttributeError:
            continue
        if not enabled:
            continue
        key = ("" if item.get("key") is None else str(item.get("key"))).strip()
        if not key:
            continue
        value_raw = item.get("value")
        value = "" if value_raw is None else str(value_raw)
        defaults[key] = value
    return defaults


async def ensure_server_config_table():
    async def _ensure(cursor: Any):
        await _cursor_execute(
            cursor,
            """
            CREATE TABLE IF NOT EXISTS op_server_config (
                id INT AUTO_INCREMENT PRIMARY KEY,
                local_authorize_offline TINYINT(1) NOT NULL DEFAULT 1,
                authorize_remote_tx_requests TINYINT(1) NOT NULL DEFAULT 1,
                local_auth_list_enabled TINYINT(1) NOT NULL DEFAULT 1,
                authorization_cache_enabled TINYINT(1) NOT NULL DEFAULT 1,
                change_availability_operative TINYINT(1) NOT NULL DEFAULT 1,
                enforce_websocket_ping_interval TINYINT(1) NOT NULL DEFAULT 1,
                send_heartbeat_change_on_boot TINYINT(1) NOT NULL DEFAULT 1,
                trigger_meter_values_on_start TINYINT(1) NOT NULL DEFAULT 1,
                trigger_status_notification_on_start TINYINT(1) NOT NULL DEFAULT 1,
                trigger_boot_notification_on_message_before_boot TINYINT(1) NOT NULL DEFAULT 1,
                heartbeat_interval INT NOT NULL DEFAULT 300,
                change_configuration_enabled_1 TINYINT(1) NOT NULL DEFAULT 0,
                change_configuration_key_1 VARCHAR(255) DEFAULT NULL,
                change_configuration_value_1 VARCHAR(255) DEFAULT NULL,
                change_configuration_enabled_2 TINYINT(1) NOT NULL DEFAULT 0,
                change_configuration_key_2 VARCHAR(255) DEFAULT NULL,
                change_configuration_value_2 VARCHAR(255) DEFAULT NULL,
                change_configuration_enabled_3 TINYINT(1) NOT NULL DEFAULT 0,
                change_configuration_key_3 VARCHAR(255) DEFAULT NULL,
                change_configuration_value_3 VARCHAR(255) DEFAULT NULL,
                change_configuration_enabled_4 TINYINT(1) NOT NULL DEFAULT 0,
                change_configuration_key_4 VARCHAR(255) DEFAULT NULL,
                change_configuration_value_4 VARCHAR(255) DEFAULT NULL,
                change_configuration_enabled_5 TINYINT(1) NOT NULL DEFAULT 0,
                change_configuration_key_5 VARCHAR(255) DEFAULT NULL,
                change_configuration_value_5 VARCHAR(255) DEFAULT NULL,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
            )
            """,
        )
        ddl_columns = [
            (
                "enforce_websocket_ping_interval",
                """
                ALTER TABLE op_server_config
                ADD COLUMN enforce_websocket_ping_interval TINYINT(1) NOT NULL DEFAULT 1
                AFTER change_availability_operative
                """,
            ),
            (
                "send_heartbeat_change_on_boot",
                """
                ALTER TABLE op_server_config
                ADD COLUMN send_heartbeat_change_on_boot TINYINT(1) NOT NULL DEFAULT 1
                AFTER enforce_websocket_ping_interval
                """,
            ),
            (
                "trigger_meter_values_on_start",
                """
                ALTER TABLE op_server_config
                ADD COLUMN trigger_meter_values_on_start TINYINT(1) NOT NULL DEFAULT 1
                AFTER send_heartbeat_change_on_boot
                """,
            ),
            (
                "trigger_status_notification_on_start",
                """
                ALTER TABLE op_server_config
                ADD COLUMN trigger_status_notification_on_start TINYINT(1) NOT NULL DEFAULT 1
                AFTER trigger_meter_values_on_start
                """,
            ),
            (
                "trigger_boot_notification_on_message_before_boot",
                """
                ALTER TABLE op_server_config
                ADD COLUMN trigger_boot_notification_on_message_before_boot TINYINT(1) NOT NULL DEFAULT 1
                AFTER trigger_status_notification_on_start
                """,
            ),
            (
                "change_configuration_enabled_1",
                """
                ALTER TABLE op_server_config
                ADD COLUMN change_configuration_enabled_1 TINYINT(1) NOT NULL DEFAULT 0 AFTER heartbeat_interval
                """,
            ),
            (
                "change_configuration_key_1",
                """
                ALTER TABLE op_server_config
                ADD COLUMN change_configuration_key_1 VARCHAR(255) AFTER change_configuration_enabled_1
                """,
            ),
            (
                "change_configuration_value_1",
                """
                ALTER TABLE op_server_config
                ADD COLUMN change_configuration_value_1 VARCHAR(255) AFTER change_configuration_key_1
                """,
            ),
            (
                "change_configuration_enabled_2",
                """
                ALTER TABLE op_server_config
                ADD COLUMN change_configuration_enabled_2 TINYINT(1) NOT NULL DEFAULT 0 AFTER change_configuration_value_1
                """,
            ),
            (
                "change_configuration_key_2",
                """
                ALTER TABLE op_server_config
                ADD COLUMN change_configuration_key_2 VARCHAR(255) AFTER change_configuration_enabled_2
                """,
            ),
            (
                "change_configuration_value_2",
                """
                ALTER TABLE op_server_config
                ADD COLUMN change_configuration_value_2 VARCHAR(255) AFTER change_configuration_key_2
                """,
            ),
            (
                "change_configuration_enabled_3",
                """
                ALTER TABLE op_server_config
                ADD COLUMN change_configuration_enabled_3 TINYINT(1) NOT NULL DEFAULT 0 AFTER change_configuration_value_2
                """,
            ),
            (
                "change_configuration_key_3",
                """
                ALTER TABLE op_server_config
                ADD COLUMN change_configuration_key_3 VARCHAR(255) AFTER change_configuration_enabled_3
                """,
            ),
            (
                "change_configuration_value_3",
                """
                ALTER TABLE op_server_config
                ADD COLUMN change_configuration_value_3 VARCHAR(255) AFTER change_configuration_key_3
                """,
            ),
            (
                "change_configuration_enabled_4",
                """
                ALTER TABLE op_server_config
                ADD COLUMN change_configuration_enabled_4 TINYINT(1) NOT NULL DEFAULT 0 AFTER change_configuration_value_3
                """,
            ),
            (
                "change_configuration_key_4",
                """
                ALTER TABLE op_server_config
                ADD COLUMN change_configuration_key_4 VARCHAR(255) AFTER change_configuration_enabled_4
                """,
            ),
            (
                "change_configuration_value_4",
                """
                ALTER TABLE op_server_config
                ADD COLUMN change_configuration_value_4 VARCHAR(255) AFTER change_configuration_key_4
                """,
            ),
            (
                "change_configuration_enabled_5",
                """
                ALTER TABLE op_server_config
                ADD COLUMN change_configuration_enabled_5 TINYINT(1) NOT NULL DEFAULT 0 AFTER change_configuration_value_4
                """,
            ),
            (
                "change_configuration_key_5",
                """
                ALTER TABLE op_server_config
                ADD COLUMN change_configuration_key_5 VARCHAR(255) AFTER change_configuration_enabled_5
                """,
            ),
            (
                "change_configuration_value_5",
                """
                ALTER TABLE op_server_config
                ADD COLUMN change_configuration_value_5 VARCHAR(255) AFTER change_configuration_key_5
                """,
            ),
        ]
        for column_name, ddl in ddl_columns:
            await _cursor_execute(
                cursor,
                "SHOW COLUMNS FROM op_server_config LIKE %s",
                (column_name,),
            )
            if not await _cursor_fetchone(cursor):
                await _cursor_execute(cursor, ddl)

    await _with_db_cursor(_ensure, commit=True)


async def ensure_cp_config_table():
    async def _ensure(cursor: Any):
        await _cursor_execute(
            cursor,
            """
            CREATE TABLE IF NOT EXISTS op_server_cp_config (
                id INT AUTO_INCREMENT PRIMARY KEY,
                chargepoint_id VARCHAR(255),
                configuration_json JSON,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
            """,
        )

    await _with_db_cursor(_ensure, commit=True)


async def ensure_oicp_enable_table():
    async def _ensure(cursor: Any):
        await _cursor_execute(
            cursor,
            """
            CREATE TABLE IF NOT EXISTS op_server_oicp_enable (
                chargepoint_id VARCHAR(255) PRIMARY KEY,
                enabled TINYINT(1) NOT NULL DEFAULT 0
            )
            """,
        )

    await _with_db_cursor(_ensure, commit=True)


async def ensure_cp_metadata_table():
    async def _ensure(cursor: Any):
        await _cursor_execute(
            cursor,
            """
            CREATE TABLE IF NOT EXISTS cp_server_cp_metadata (
                id INT AUTO_INCREMENT PRIMARY KEY,
                chargepoint_id VARCHAR(255) NOT NULL,
                operator_id VARCHAR(32) DEFAULT NULL,
                operator_name VARCHAR(255) DEFAULT NULL,
                evse_id VARCHAR(255) DEFAULT NULL,
                address TEXT,
                coordinates JSON DEFAULT NULL,
                hotline_number VARCHAR(32) DEFAULT NULL,
                additional_data TEXT,
                payment_options TEXT,
                authentication_modes TEXT,
                charging_facilities TEXT,
                value_added_services TEXT,
                accessibility VARCHAR(255) DEFAULT NULL,
                accessibility_location VARCHAR(255) DEFAULT NULL,
                plugs VARCHAR(255) DEFAULT NULL,
                default_action_type VARCHAR(32) NOT NULL DEFAULT 'fullLoad',
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                UNIQUE KEY uniq_cp_metadata_chargepoint (chargepoint_id)
            ) CHARACTER SET utf8mb4
            """,
        )
        column_specs = (
            ('accessibility', "VARCHAR(255) DEFAULT NULL", DEFAULT_ACCESSIBILITY),
            (
                'accessibility_location',
                "VARCHAR(255) DEFAULT NULL",
                DEFAULT_ACCESSIBILITY_LOCATION,
            ),
            ('authentication_modes', "TEXT", DEFAULT_AUTHENTICATION_MODES_JSON),
            ('plugs', "VARCHAR(255) DEFAULT NULL", DEFAULT_PLUGS),
            (
                'value_added_services',
                "TEXT",
                DEFAULT_HUBJECT_VALUE_ADDED_SERVICES_JSON,
            ),
            (
                'charging_facilities',
                "TEXT",
                DEFAULT_HUBJECT_CHARGING_FACILITIES_JSON,
            ),
        )
        for column_name, definition, default_value in column_specs:
            await _cursor_execute(
                cursor,
                "SHOW COLUMNS FROM cp_server_cp_metadata LIKE %s",
                (column_name,),
            )
            if await _cursor_fetchone(cursor):
                continue
            await _cursor_execute(
                cursor,
                f"ALTER TABLE cp_server_cp_metadata ADD COLUMN {column_name} {definition}",
            )
            if default_value is None:
                continue
            if column_name == 'authentication_modes':
                await _cursor_execute(
                    cursor,
                    """
                    UPDATE cp_server_cp_metadata
                    SET authentication_modes=%s
                    WHERE authentication_modes IS NULL OR authentication_modes=''
                    """,
                    (default_value,),
                )
            else:
                await _cursor_execute(
                    cursor,
                    f"""
                    UPDATE cp_server_cp_metadata
                    SET {column_name}=%s
                    WHERE {column_name} IS NULL OR {column_name}=''
                    """,
                    (default_value,),
                )

    await _with_db_cursor(_ensure, commit=True)


async def ensure_rfid_tables():
    async def _ensure(cursor: Any):
        await _cursor_execute(
            cursor,
            """
            CREATE TABLE IF NOT EXISTS op_rfid_mapping (
                id INT AUTO_INCREMENT PRIMARY KEY,
                chargepoint_id VARCHAR(255),
                rfid_list VARCHAR(255),
                single_uuid VARCHAR(50),
                freecharging TINYINT(1) DEFAULT 0
            )
            """
        )
        await _cursor_execute(
            cursor,
            """
            CREATE TABLE IF NOT EXISTS op_emsp_tokens (
                uid VARCHAR(255) PRIMARY KEY,
                auth_id VARCHAR(255),
                issuer VARCHAR(255),
                valid TINYINT(1) DEFAULT 1,
                whitelist VARCHAR(32),
                local_rfid VARCHAR(255),
                status VARCHAR(32) NOT NULL DEFAULT 'valid',
                source VARCHAR(16) NOT NULL DEFAULT 'emsp',
                valid_until DATETIME NULL,
                updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                KEY idx_auth_id (auth_id),
                KEY idx_local_rfid (local_rfid),
                KEY idx_status (status),
                KEY idx_source (source)
            )
            """,
        )
        await _cursor_execute(
            cursor, "SHOW COLUMNS FROM op_emsp_tokens LIKE 'status'", ()
        )
        if not await _cursor_fetchone(cursor):
            await _cursor_execute(
                cursor,
                "ALTER TABLE op_emsp_tokens ADD COLUMN status VARCHAR(32) NOT NULL DEFAULT 'valid'",
            )
        await _cursor_execute(
            cursor, "SHOW COLUMNS FROM op_emsp_tokens LIKE 'source'", ()
        )
        if not await _cursor_fetchone(cursor):
            await _cursor_execute(
                cursor,
                "ALTER TABLE op_emsp_tokens ADD COLUMN source VARCHAR(16) NOT NULL DEFAULT 'emsp'",
            )
        await _cursor_execute(
            cursor, "SHOW COLUMNS FROM op_emsp_tokens LIKE 'valid_until'", ()
        )
        if not await _cursor_fetchone(cursor):
            await _cursor_execute(
                cursor, "ALTER TABLE op_emsp_tokens ADD COLUMN valid_until DATETIME NULL"
            )
        await _cursor_execute(
            cursor, "SHOW INDEX FROM op_emsp_tokens WHERE Key_name='idx_status'", ()
        )
        if not await _cursor_fetchone(cursor):
            await _cursor_execute(
                cursor, "ALTER TABLE op_emsp_tokens ADD KEY idx_status (status)"
            )
        await _cursor_execute(
            cursor, "SHOW INDEX FROM op_emsp_tokens WHERE Key_name='idx_source'", ()
        )
        if not await _cursor_fetchone(cursor):
            await _cursor_execute(
                cursor, "ALTER TABLE op_emsp_tokens ADD KEY idx_source (source)"
            )
        await _cursor_execute(
            cursor,
            """
            CREATE TABLE IF NOT EXISTS op_server_rfid_lists (
                rfid_list_id VARCHAR(255),
                uuid VARCHAR(50)
            )
            """
        )
        await _cursor_execute(
            cursor,
            """
            CREATE TABLE IF NOT EXISTS op_server_rfid_global (
                uuid VARCHAR(50) PRIMARY KEY,
                status VARCHAR(20) NOT NULL DEFAULT 'accepted'
            )
            """
        )
        await _cursor_execute(
            cursor,
            """
            CREATE TABLE IF NOT EXISTS op_server_rfid_blocked (
                uuid VARCHAR(50) PRIMARY KEY
            )
            """
        )
        await _cursor_execute(cursor, "SHOW COLUMNS FROM op_server_rfid_global LIKE 'status'")
        if not await _cursor_fetchone(cursor):
            await _cursor_execute(
                cursor,
                """
                ALTER TABLE op_server_rfid_global
                ADD COLUMN status VARCHAR(20) NOT NULL DEFAULT 'accepted' AFTER uuid
                """,
            )
        await _cursor_execute(
            cursor,
            """
            CREATE TABLE IF NOT EXISTS op_rfid_free (
                chargepoint_id VARCHAR(255) PRIMARY KEY
            )
            """
        )

    await _with_db_cursor(_ensure, commit=True)


async def ensure_voucher_tables():
    async def _ensure(cursor: Any):
        await _cursor_execute(
            cursor,
            """
            CREATE TABLE IF NOT EXISTS op_vouchers (
                id INT AUTO_INCREMENT PRIMARY KEY,
                rfid_uuid VARCHAR(64) NOT NULL,
                energy_kwh DECIMAL(12,3) NOT NULL,
                energy_used_wh INT NOT NULL DEFAULT 0,
                valid_until DATETIME NULL,
                allowed_chargepoints TEXT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                UNIQUE KEY uniq_rfid (rfid_uuid)
            ) CHARACTER SET utf8mb4
            """
        )

    await _with_db_cursor(_ensure, commit=True)


def _normalize_chargepoint_id(identifier: str | None) -> str:
    """Normalize charge point identifiers extracted from websocket paths."""

    if not identifier:
        return ""

    if isinstance(identifier, (bytes, bytearray)):
        identifier = identifier.decode("utf-8", errors="ignore")

    text = str(identifier).strip()
    if not text:
        return ""

    parsed = urlparse(text)
    path = parsed.path if parsed.scheme else text
    segments = [segment for segment in path.strip("/").split("/") if segment]
    if segments:
        return segments[-1]
    return text


def _normalize_card_uuid(value: str | None) -> str | None:
    if value is None:
        return None
    normalized = str(value).strip().upper()
    return normalized or None


def _extract_id_token_value(payload: Mapping[str, Any]) -> str | None:
    if not isinstance(payload, Mapping):
        return None
    direct = payload.get("idTag")
    if isinstance(direct, str) and direct.strip():
        return direct.strip()
    token = payload.get("idToken")
    if isinstance(token, Mapping):
        for key in ("idToken", "value", "id"):
            candidate = token.get(key)
            if isinstance(candidate, str) and candidate.strip():
                return candidate.strip()
    return None


def _normalize_ocpp_payload(payload: Any, *, ocpp2: bool) -> Mapping[str, Any]:
    if not isinstance(payload, Mapping):
        return {}
    normalized: dict[str, Any] = dict(payload)
    if ocpp2:
        token_value = _extract_id_token_value(normalized)
        if token_value and "idTag" not in normalized:
            normalized["idTag"] = token_value
    return normalized


def _parse_timestamp_utc(value: Any, *, default: datetime | None = None) -> datetime:
    if value is None:
        return default or datetime.now(timezone.utc)
    parsed = None
    try:
        parsed = datetime.fromisoformat(str(value).replace("Z", "+00:00"))
    except Exception:
        parsed = None
    if parsed is None:
        parsed = default or datetime.now(timezone.utc)
    if parsed.tzinfo is None:
        return parsed.replace(tzinfo=timezone.utc)
    return parsed.astimezone(timezone.utc)


def _extract_meter_value_data(
    payload: Mapping[str, Any], *, default_ts: datetime | None = None
) -> tuple[float | None, str, datetime]:
    mv_list = payload.get("meterValue") or []
    entry = mv_list[0] if mv_list else {}
    ts_raw = entry.get("timestamp") if isinstance(entry, Mapping) else None
    ts_dt = _parse_timestamp_utc(ts_raw, default=default_ts)
    sampled = entry.get("sampledValue") or []
    target = None
    for sv in sampled:
        try:
            meas = sv.get("measurand")
        except AttributeError:
            meas = None
        if meas in (None, "Energy.Active.Import.Register"):
            target = sv
            break
    if target is None and sampled:
        target = sampled[0]
    meter_value = None
    unit = ""
    if target:
        try:
            value_str = target.get("value")
        except AttributeError:
            value_str = None
        try:
            unit = (
                target.get("unit")
                or (target.get("unitOfMeasure") or {}).get("unit")
                or ""
            )
        except Exception:
            unit = ""
        try:
            meter_value = float(value_str)
        except (TypeError, ValueError):
            meter_value = None
    return meter_value, unit, ts_dt


OCPP2_SUPPORTED_ACTIONS: set[str] = {
    "Authorize",
    "BootNotification",
    "DataTransfer",
    "Heartbeat",
    "MeterValues",
    "StatusNotification",
    "TransactionEvent",
}


def _parse_voucher_chargepoints(value: Any) -> set[str]:
    if value is None:
        return set()
    entries = str(value).replace(";", ",").split(",")
    normalized: set[str] = set()
    for entry in entries:
        cp_id = _normalize_chargepoint_id(entry)
        if cp_id:
            normalized.add(cp_id.upper())
    return normalized


def _extract_column_value(row: Any, key: str) -> Any:
    if row is None:
        return None
    if isinstance(row, Mapping):
        return row.get(key)
    if isinstance(row, (list, tuple)):
        try:
            return row[0]
        except IndexError:
            return None
    return row


async def _read_oicp_flag(cursor: Any, normalized_id: str) -> bool:
    await _cursor_execute(
        cursor,
        """
        SELECT enabled
        FROM op_server_oicp_enable
        WHERE chargepoint_id=%s
        LIMIT 1
        """,
        (normalized_id,),
    )
    row = await _cursor_fetchone(cursor)
    value = _extract_column_value(row, "enabled")
    return bool(value)


async def is_oicp_enabled(
    station_id: str,
    *,
    cursor: Any | None = None,
) -> bool:
    normalized_id = _normalize_chargepoint_id(station_id)
    cached = _get_cached_oicp_flag(normalized_id)
    if cached is not None:
        return cached

    async def _execute(cursor_obj: Any) -> bool:
        return await _read_oicp_flag(cursor_obj, normalized_id)

    try:
        if cursor is not None:
            value = await _execute(cursor)
        else:
            value = await _with_db_cursor(_execute)
        _set_cached_oicp_flag(normalized_id, value)
        return value
    except Exception:
        logging.exception(
            "Failed to read OICP flag for %s (normalized=%s)", station_id, normalized_id
        )
        return False


def _ensure_extended_session_log_column(cursor: Any) -> bool:
    global _SESSION_LOG_COLUMN_PRESENT
    if _SESSION_LOG_COLUMN_PRESENT is not None:
        return _SESSION_LOG_COLUMN_PRESENT

    try:
        cursor.execute("SHOW COLUMNS FROM op_redirects LIKE 'extended_session_log'")
        _SESSION_LOG_COLUMN_PRESENT = bool(cursor.fetchone())
    except Exception:
        logging.exception(
            "Failed to inspect op_redirects for extended_session_log column"
        )
        _SESSION_LOG_COLUMN_PRESENT = False
    return _SESSION_LOG_COLUMN_PRESENT


def _is_extended_session_log_enabled(
    station_id: str, *, cursor: Any | None = None, conn: Any | None = None
) -> bool:
    normalized_id = _normalize_chargepoint_id(station_id)
    if normalized_id in _EXTENDED_SESSION_LOG_FLAG_CACHE:
        return _EXTENDED_SESSION_LOG_FLAG_CACHE[normalized_id]

    close_conn = False
    cursor_cm = None
    if cursor is None:
        if conn is None:
            conn = get_db_conn()
            close_conn = True
        cursor_cm = conn.cursor()

    enabled = False
    try:
        if cursor is None and cursor_cm is not None:
            cursor = cursor_cm.__enter__()
        if not _ensure_extended_session_log_column(cursor):
            return False
        cursor.execute(
            """
            SELECT extended_session_log
            FROM op_redirects
            WHERE source_url=%s OR source_url LIKE %s
            ORDER BY id DESC
            LIMIT 1
            """,
            (station_id, f"%/{normalized_id}"),
        )
        row = cursor.fetchone()
        value = _extract_column_value(row, "extended_session_log")
        enabled = _coerce_bool(value, False)
        return enabled
    except Exception:
        logging.exception(
            "Failed to read extended session log flag for %s (normalized=%s)",
            station_id,
            normalized_id,
        )
        return False
    finally:
        _EXTENDED_SESSION_LOG_FLAG_CACHE[normalized_id] = enabled
        if cursor_cm is not None:
            cursor_cm.__exit__(None, None, None)
        if close_conn and conn is not None:
            try:
                conn.close()
            except Exception:
                pass


def _ensure_pnc_column(cursor: Any) -> bool:
    global _PNC_COLUMN_PRESENT
    if _PNC_COLUMN_PRESENT is not None:
        return _PNC_COLUMN_PRESENT

    try:
        cursor.execute("SHOW COLUMNS FROM op_redirects LIKE 'pnc_enabled'")
        _PNC_COLUMN_PRESENT = bool(cursor.fetchone())
    except Exception:
        logging.exception("Failed to inspect op_redirects for pnc_enabled column")
        _PNC_COLUMN_PRESENT = False
    return _PNC_COLUMN_PRESENT


def _is_pnc_enabled(
    station_id: str, *, cursor: Any | None = None, conn: Any | None = None
) -> bool:
    normalized_id = _normalize_chargepoint_id(station_id)
    if normalized_id in _PNC_FLAG_CACHE:
        return _PNC_FLAG_CACHE[normalized_id]

    close_conn = False
    cursor_cm = None
    if cursor is None:
        if conn is None:
            conn = get_db_conn()
            close_conn = True
        cursor_cm = conn.cursor()

    enabled = False
    try:
        if cursor is None and cursor_cm is not None:
            cursor = cursor_cm.__enter__()
        if not _ensure_pnc_column(cursor):
            return False
        cursor.execute(
            """
            SELECT pnc_enabled
            FROM op_redirects
            WHERE source_url=%s OR source_url LIKE %s
            ORDER BY id DESC
            LIMIT 1
            """,
            (station_id, f"%/{normalized_id}"),
        )
        row = cursor.fetchone()
        value = _extract_column_value(row, "pnc_enabled")
        enabled = _coerce_bool(value, False)
        return enabled
    except Exception:
        logging.exception(
            "Failed to read PnC flag for %s (normalized=%s)", station_id, normalized_id
        )
        return False
    finally:
        _PNC_FLAG_CACHE[normalized_id] = enabled
        if cursor_cm is not None:
            cursor_cm.__exit__(None, None, None)
        if close_conn and conn is not None:
            try:
                conn.close()
            except Exception:
                pass


def _hubject_config_fingerprint(config: Mapping[str, Any]) -> str | None:
    hubject_cfg = config.get("hubject")
    if not isinstance(hubject_cfg, Mapping):
        return None
    try:
        return json.dumps(hubject_cfg, sort_keys=True)
    except Exception:
        return None


def _refresh_config_from_disk() -> None:
    global CONFIG, _CONFIG_MTIME

    try:
        stat = _CONFIG_PATH.stat()
    except OSError:
        return

    if _CONFIG_MTIME is not None and stat.st_mtime <= _CONFIG_MTIME:
        return

    try:
        with _CONFIG_PATH.open("r", encoding="utf-8") as f:
            CONFIG = json.load(f)
            _CONFIG_MTIME = stat.st_mtime
    except Exception:
        logging.exception("Failed to refresh configuration from disk")


def _load_pnc_adapter() -> Any | None:
    global _PNC_ADAPTER, _PNC_ADAPTER_ERROR, _PNC_ADAPTER_CFG_FINGERPRINT

    _refresh_config_from_disk()

    current_fingerprint = _hubject_config_fingerprint(CONFIG)

    reload_requested = False
    try:
        if _PNC_RELOAD_FLAG_PATH.exists():
            reload_requested = True
            _PNC_RELOAD_FLAG_PATH.unlink()
    except OSError:
        reload_requested = True

    if (
        _PNC_ADAPTER_CFG_FINGERPRINT is not None
        and current_fingerprint is not None
        and current_fingerprint != _PNC_ADAPTER_CFG_FINGERPRINT
    ):
        reload_requested = True

    if reload_requested:
        _PNC_ADAPTER = None
        _PNC_ADAPTER_ERROR = None
        _PNC_ADAPTER_CFG_FINGERPRINT = None

    if _PNC_ADAPTER is not None or _PNC_ADAPTER_ERROR:
        return _PNC_ADAPTER

    try:
        from services.pnc_adapter import load_adapter_from_config
    except Exception as exc:  # pragma: no cover - optional dependency
        logging.info("PnC adapter import failed: %s", exc)
        _PNC_ADAPTER_ERROR = str(exc)
        return None

    try:
        _PNC_ADAPTER = load_adapter_from_config(
            CONFIG,
            config_dir=Path(CONFIG_FILE).resolve().parent,
            certs_dir=Path(CONFIG_FILE).resolve().parent / "certs",
        )
        _PNC_ADAPTER_CFG_FINGERPRINT = current_fingerprint
    except Exception as exc:
        logging.info("PnC adapter initialization failed: %s", exc)
        _PNC_ADAPTER_ERROR = str(exc)
        _PNC_ADAPTER = None
    return _PNC_ADAPTER


def _extract_contract_certificate(payload: Mapping[str, Any] | Any) -> Any:
    if not isinstance(payload, Mapping):
        return None

    if "contractCertificate" in payload:
        return payload.get("contractCertificate")

    iso_data = payload.get("iso15118CertificateHashData")
    if iso_data:
        return {"iso15118CertificateHashData": iso_data}

    id_tag_data = payload.get("idTagData")
    if isinstance(id_tag_data, Mapping):
        iso_hash = id_tag_data.get("iso15118CertificateHashData")
        if iso_hash:
            return {"iso15118CertificateHashData": iso_hash}

    return None


async def _authorize_with_pnc_if_enabled(
    station_id: str, payload: Mapping[str, Any]
) -> Any | None:
    if not _is_pnc_enabled(station_id):
        return None

    adapter = _load_pnc_adapter()
    if adapter is None:
        return None

    certificate_payload = _extract_contract_certificate(payload)
    if certificate_payload is None:
        return None

    try:
        return await adapter.pnc_authorize(certificate_payload)
    except Exception:
        logging.exception("PnC adapter failed for station %s", station_id)
        return SimpleNamespace(
            authorized=False, status="Rejected", error="pnc_adapter_failure"
        )


def _extract_hubject_status(payload: Any) -> str | None:
    if isinstance(payload, str):
        candidate = payload.strip()
        return candidate or None
    if isinstance(payload, Mapping):
        for key in ("AuthorizationStatus", "authorizationStatus", "Status", "status"):
            value = payload.get(key)
            if isinstance(value, str) and value.strip():
                return value.strip()
        for value in payload.values():
            if isinstance(value, Mapping):
                nested = _extract_hubject_status(value)
                if nested:
                    return nested
    return None


def _map_hubject_status_to_ocpp(payload: Any) -> str:
    status = _extract_hubject_status(payload)
    if not status:
        return "Invalid"
    normalized = status.strip().lower()
    if normalized in {"approved", "accepted", "authorized", "authorised", "ok", "success"}:
        return "Accepted"
    if normalized in {"blocked", "denied", "rejected", "invalid", "forbidden"}:
        return "Blocked"
    return "Invalid"


def _build_pnc_id_tag_info(pnc_decision: Any) -> dict[str, Any]:
    status = (
        "Accepted"
        if getattr(pnc_decision, "authorized", False)
        else getattr(pnc_decision, "status", None) or "Invalid"
    )

    id_tag_info: dict[str, Any] = {"status": status}
    error = getattr(pnc_decision, "error", None)
    if error:
        id_tag_info["errorCode"] = str(error)
    return id_tag_info


async def resolve_authorize_id_tag_info(
    station_id: str, payload: Mapping[str, Any]
) -> tuple[dict[str, Any], Any | None]:
    pnc_decision = await _authorize_with_pnc_if_enabled(station_id, payload)
    if pnc_decision is not None:
        return _build_pnc_id_tag_info(pnc_decision), pnc_decision

    status = await check_rfid_authorization(station_id, payload.get("idTag"))
    return {"status": status}, None


async def ensure_connection_log_table():
    async def _ensure(cursor: Any):
        await _cursor_execute(
            cursor,
            """
            CREATE TABLE IF NOT EXISTS op_server_connection_log (
                id INT AUTO_INCREMENT PRIMARY KEY,
                ocpp_endpoint VARCHAR(255),
                connected_devices INT DEFAULT 0,
                ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
            """,
        )

    await _with_db_cursor(_ensure, commit=True)


async def ensure_server_sessions_table():
    async def _ensure(cursor: Any):
        async def _column_exists(column_name: str) -> bool:
            await _cursor_execute(
                cursor,
                """
                SELECT COUNT(*)
                FROM information_schema.COLUMNS
                WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s AND COLUMN_NAME=%s
                """,
                (mysql_cfg.get("db"), "op_server_sessions", column_name),
            )
            row = await _cursor_fetchone(cursor)
            return bool(row and row[0])

        async def _add_column_if_missing(column_name: str, definition: str):
            if await _column_exists(column_name):
                return
            await _cursor_execute(
                cursor,
                f"ALTER TABLE op_server_sessions ADD COLUMN {definition}",
            )

        await _cursor_execute(
            cursor,
            """
            CREATE TABLE IF NOT EXISTS op_server_sessions (
                id BIGINT AUTO_INCREMENT PRIMARY KEY,
                chargepoint_id VARCHAR(255) NOT NULL,
                connector_id INT NOT NULL DEFAULT 0,
                transaction_id BIGINT NOT NULL,
                session_start DATETIME DEFAULT NULL,
                session_end DATETIME DEFAULT NULL,
                meter_start BIGINT DEFAULT NULL,
                last_meter_value BIGINT DEFAULT NULL,
                last_meter_ts DATETIME DEFAULT NULL,
                energy_wh BIGINT DEFAULT NULL,
                status VARCHAR(64) DEFAULT NULL,
                id_tag VARCHAR(100) DEFAULT NULL,
                ended_flag TINYINT(1) NOT NULL DEFAULT 0,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                UNIQUE KEY uniq_op_server_sessions (chargepoint_id, transaction_id, connector_id)
            ) CHARACTER SET utf8mb4
            """,
        )

        await _add_column_if_missing("meter_start", "meter_start BIGINT DEFAULT NULL")
        await _add_column_if_missing(
            "last_meter_value", "last_meter_value BIGINT DEFAULT NULL"
        )
        await _add_column_if_missing(
            "last_meter_ts", "last_meter_ts DATETIME DEFAULT NULL"
        )
        await _add_column_if_missing("id_tag", "id_tag VARCHAR(100) DEFAULT NULL")

    await _with_db_cursor(_ensure, commit=True)


async def ensure_session_log_table():
    async def _ensure(cursor: Any):
        await _cursor_execute(
            cursor,
            """
            CREATE TABLE IF NOT EXISTS op_session_log (
                id INT AUTO_INCREMENT PRIMARY KEY,
                chargepoint_id VARCHAR(255),
                connector_id INT,
                payload TEXT,
                event TEXT,
                ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                UNIQUE KEY uniq_session_log (chargepoint_id, connector_id, ts)
            )
            """,
        )

    await _with_db_cursor(_ensure, commit=True)


async def ensure_triggermeter_table():
    async def _ensure(cursor: Any):
        await _cursor_execute(
            cursor,
            """
            CREATE TABLE IF NOT EXISTS op_server_log_triggermeter (
                id INT AUTO_INCREMENT PRIMARY KEY,
                chargepoint_id VARCHAR(255),
                meter_reading FLOAT,
                unit VARCHAR(20),
                ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
            """,
        )

    await _with_db_cursor(_ensure, commit=True)


async def ensure_last_meterreading_table():
    async def _ensure(cursor: Any):
        await _cursor_execute(
            cursor,
            """
            CREATE TABLE IF NOT EXISTS op_broker_lastmeterreading (
                id INT AUTO_INCREMENT PRIMARY KEY,
                chargepoint_id VARCHAR(255),
                connector_id INT,
                meter_value FLOAT,
                ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                UNIQUE KEY uniq_meterreading (chargepoint_id, connector_id)
            )
            """,
        )

    await _with_db_cursor(_ensure, commit=True)


async def ensure_status_notification_table():
    async def _ensure(cursor: Any):
        await _cursor_execute(
            cursor,
            """
            CREATE TABLE IF NOT EXISTS op_server_statusnotification (
                id INT AUTO_INCREMENT PRIMARY KEY,
                chargepoint_id VARCHAR(255),
                connector_id INT,
                status VARCHAR(30),
                ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
            """,
        )

    await _with_db_cursor(_ensure, commit=True)


async def ensure_api_call_log_table():
    async def _ensure(cursor: Any):
        await _cursor_execute(
            cursor,
            """
            CREATE TABLE IF NOT EXISTS op_server_apicall_log (
                id BIGINT AUTO_INCREMENT PRIMARY KEY,
                url VARCHAR(512) NOT NULL,
                method VARCHAR(16) NOT NULL,
                payload LONGTEXT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
            """,
        )

    await _with_db_cursor(_ensure, commit=True)


async def _load_cp_metadata_and_status(
    chargepoint_id: str,
) -> tuple[Optional[dict[str, Any]], list[dict[str, Any]]]:
    cached = _get_cached_cp_metadata(chargepoint_id)
    if cached:
        return cached

    async def _load(cursor: Any):
        await _cursor_execute(
            cursor,
            "SELECT * FROM cp_server_cp_metadata WHERE chargepoint_id=%s",
            (chargepoint_id,),
        )
        metadata = await _cursor_fetchone(cursor)
        await _cursor_execute(
            cursor,
            """
            SELECT connector_id, status
            FROM op_server_statusnotification
            WHERE chargepoint_id=%s
            """,
            (chargepoint_id,),
        )
        status_rows = await _cursor_fetchall(cursor)
        return metadata, status_rows or []

    metadata, status_rows = await _with_db_cursor(_load, cursor_class=DictCursor)
    _set_cached_cp_metadata(chargepoint_id, metadata, status_rows)
    return metadata, status_rows


def _map_ocpp_status_to_oicp(status: Any) -> str:
    normalized = str(status or "").strip().lower()
    if not normalized:
        return "Unknown"
    if normalized in {"faulted", "error", "failure"}:
        return "Faulted"
    if normalized in {"unavailable", "outofservice", "inoperative"}:
        return "OutOfService"
    if normalized in {"reserved"}:
        return "Reserved"
    if normalized in {
        "charging",
        "preparing",
        "suspendedev",
        "suspendedevse",
        "finishing",
        "occupied",
    }:
        return "Occupied"
    if normalized in {"available", "operative"}:
        return "Available"
    return "Unknown"


def _aggregate_evse_status(status_rows: Sequence[Mapping[str, Any]]) -> str:
    if not status_rows:
        return "Unknown"
    mapped = [_map_ocpp_status_to_oicp(row.get("status")) for row in status_rows]
    if "Faulted" in mapped:
        return "Faulted"
    if "OutOfService" in mapped:
        return "OutOfService"
    if "Occupied" in mapped:
        return "Occupied"
    if "Reserved" in mapped:
        return "Reserved"
    if "Available" in mapped:
        return "Available"
    return mapped[0] if mapped else "Unknown"


async def push_oicp_evse_status(
    chargepoint_id: str, action_type: Optional[str] = None
) -> dict[str, Any]:
    if HUBJECT_CLIENT is None:
        raise RuntimeError("Hubject client nicht konfiguriert")

    await ensure_cp_metadata_table()
    await ensure_status_notification_table()

    normalized_id = str(chargepoint_id or "").strip()
    if not normalized_id:
        raise ValueError("chargepoint_id darf nicht leer sein")

    metadata, status_rows = await _load_cp_metadata_and_status(normalized_id)
    if not metadata:
        raise ValueError("Für diesen Ladepunkt sind keine Metadaten hinterlegt")

    operator_id = str(metadata.get("operator_id") or "").strip()
    if not operator_id:
        raise ValueError("Operator ID ist nicht konfiguriert")

    operator_name = metadata.get("operator_name")
    evse_id = str(metadata.get("evse_id") or "").strip() or normalized_id

    default_action = str(metadata.get("default_action_type") or "update").strip()
    action_candidates = {
        "fullload": "fullLoad",
        "update": "update",
        "delta": "delta",
    }
    resolved_action = None
    if action_type:
        key = str(action_type).strip()
        resolved_action = action_candidates.get(key.lower(), key)
    if not resolved_action:
        resolved_action = action_candidates.get(default_action.lower(), default_action)
    if resolved_action not in {"fullLoad", "update", "delta"}:
        resolved_action = "update"

    evse_status = _aggregate_evse_status(status_rows)

    additional_data_raw = metadata.get("additional_data")
    extra_payload: dict[str, Any] = {}
    operator_overrides: dict[str, Any] = {}
    evse_overrides: dict[str, Any] = {}
    if additional_data_raw:
        try:
            parsed = json.loads(additional_data_raw)
        except (TypeError, ValueError):
            logging.warning(
                "Ungültige JSON-Daten in cp_server_cp_metadata.additional_data für %s",
                normalized_id,
            )
        else:
            if isinstance(parsed, Mapping):
                payload_data = parsed.get("payload")
                if isinstance(payload_data, Mapping):
                    extra_payload = dict(payload_data)
                operator_data = parsed.get("operator")
                if isinstance(operator_data, Mapping):
                    operator_overrides = dict(operator_data)
                evse_data = parsed.get("evse_record")
                if isinstance(evse_data, Mapping):
                    evse_overrides = dict(evse_data)

    evse_record: dict[str, Any] = {
        "EvseID": evse_id,
        "EvseStatus": evse_status,
    }
    if evse_overrides:
        evse_record.update(evse_overrides)

    operator_entry: dict[str, Any] = {
        "OperatorID": operator_id,
        "EvseStatusRecords": [evse_record],
    }
    if operator_name:
        operator_entry["OperatorName"] = operator_name
    if operator_overrides:
        operator_entry.update(operator_overrides)

    payload: dict[str, Any] = {
        "ActionType": resolved_action,
        "OperatorEvseStatus": [operator_entry],
    }
    if extra_payload:
        payload.update(extra_payload)

    logging.info(
        "Triggering Hubject PushEvseStatusData for %s (EVSE %s, Action %s, Status %s)",
        normalized_id,
        evse_id,
        resolved_action,
        evse_status,
    )
    response = await HUBJECT_CLIENT.call_operation("PushEvseStatusData", payload)
    return {
        "chargepoint_id": normalized_id,
        "evse_id": evse_id,
        "evse_status": evse_status,
        "action_type": resolved_action,
        "payload": payload,
        "hubject_response": response,
    }


def _format_session_log_timestamp(value: datetime | None) -> str:
    ts = value or datetime.now(timezone.utc)
    if ts.tzinfo is None:
        ts = ts.replace(tzinfo=timezone.utc)
    else:
        ts = ts.astimezone(timezone.utc)
    return ts.isoformat().replace("+00:00", "Z")


def _build_session_log_payload(
    station_id: str,
    message_type: str,
    *,
    connector_id: int | None = None,
    transaction_id: Any | None = None,
    meter_value: Any | None = None,
    meter_unit: str | None = None,
    meter_start: Any | None = None,
    meter_stop: Any | None = None,
    reason: Any | None = None,
    id_tag: Any | None = None,
    transaction_data: Any | None = None,
    data_transfer_value: Any | None = None,
    timestamp: datetime | None = None,
) -> dict[str, Any]:
    session_unit = meter_unit or SESSION_LOG_UNIT
    normalized_id = _normalize_chargepoint_id(station_id) or station_id
    value_field = (
        meter_value
        if meter_value is not None
        else meter_stop
        if meter_stop is not None
        else meter_start
    )
    session_payload: dict[str, Any] = {
        "Transaction.Id": transaction_id,
        "ID.Tag": id_tag,
        "Energy.Meter.Start": meter_start,
        "Energy.Meter.Stop": meter_stop if meter_stop is not None else meter_value,
        "Reason.Session": reason,
        "unit": session_unit,
        "Data.Transfer.Value": "{}" if data_transfer_value is None else data_transfer_value,
        "Transaction.Data": transaction_data,
    }
    return {
        "id": str(uuid.uuid4()),
        "domain": "ocpp",
        "deviceId": normalized_id,
        "connector_id": connector_id,
        "register": SESSION_LOG_REGISTER,
        "value": str(value_field) if value_field is not None else None,
        "unit": session_unit,
        "location": None,
        "context": None,
        "timestamp": _format_session_log_timestamp(timestamp),
        "phases": None,
        "session": session_payload,
        "messageType": message_type,
    }


def _persist_session_log_entry(
    device_id: str, message_type: str, payload: Mapping[str, Any], transaction_id: Any | None
) -> bool:
    try:
        conn = get_db_conn()
        with conn.cursor() as cur:
            cur.execute(
                """
                INSERT INTO op_server_session_log (device_id, messageType, json, transaction_id)
                VALUES (%s, %s, %s, %s)
                """,
                (
                    device_id,
                    message_type,
                    json.dumps(payload, ensure_ascii=False),
                    "" if transaction_id is None else str(transaction_id),
                ),
            )
        conn.commit()
        return True
    except Exception:
        logging.debug(
            "Failed to persist extended session log entry for %s", device_id, exc_info=True
        )
        return False
    finally:
        try:
            conn.close()
        except Exception:
            pass


async def log_extended_session_event(
    station_id: str,
    message_type: str,
    *,
    connector_id: int | None = None,
    transaction_id: Any | None = None,
    meter_value: Any | None = None,
    meter_unit: str | None = None,
    meter_start: Any | None = None,
    meter_stop: Any | None = None,
    reason: Any | None = None,
    id_tag: Any | None = None,
    transaction_data: Any | None = None,
    data_transfer_value: Any | None = None,
    timestamp: datetime | None = None,
) -> None:
    global _SESSION_LOG_PROCESSED_COUNT, _SESSION_LOG_SIGNED_DATA_COUNT
    if not _is_extended_session_log_enabled(station_id):
        return

    try:
        payload = _build_session_log_payload(
            station_id,
            message_type,
            connector_id=connector_id,
            transaction_id=transaction_id,
            meter_value=meter_value,
            meter_unit=meter_unit,
            meter_start=meter_start,
            meter_stop=meter_stop,
            reason=reason,
            id_tag=id_tag,
            transaction_data=transaction_data,
            data_transfer_value=data_transfer_value,
            timestamp=timestamp,
        )
    except Exception:
        logging.debug(
            "Failed to build extended session log payload for %s", station_id, exc_info=True
        )
        return

    try:
        persisted = await asyncio.to_thread(
            _persist_session_log_entry,
            _normalize_chargepoint_id(station_id) or station_id,
            message_type,
            payload,
            transaction_id,
        )
    except Exception:
        logging.debug(
            "Failed to persist extended session log entry for %s", station_id, exc_info=True
        )
        return

    if not persisted:
        return

    payload_str = json.dumps(payload, ensure_ascii=False)
    async with _SESSION_LOG_COUNTER_LOCK:
        _SESSION_LOG_PROCESSED_COUNT += 1
        if "signedData" in payload_str:
            _SESSION_LOG_SIGNED_DATA_COUNT += 1


def _seconds_until_next_summary():
    now = datetime.now()
    schedule_hours = [3, 9, 15, 21]
    for hour in schedule_hours:
        target = now.replace(hour=hour, minute=0, second=0, microsecond=0)
        if target > now:
            return (target - now).total_seconds()
    next_day = now + timedelta(days=1)
    target = next_day.replace(hour=3, minute=0, second=0, microsecond=0)
    return (target - now).total_seconds()


def _send_session_email(processed: int, signed: int) -> None:
    if not MAIL_EXTENDED_SESSION_SUMMARY_ENABLED:
        return
    if not MAIL_RECIPIENTS:
        logging.debug(
            "Extended session summary email not sent because no recipients are configured"
        )
        return
    msg = EmailMessage()
    msg["Subject"] = "Extended Session Log Summary"
    msg["From"] = MAIL_DEFAULT_SENDER
    msg["To"] = ", ".join(MAIL_RECIPIENTS)
    msg.set_content(
        f"Processed messages since last email: {processed}\n"
        f"Messages with 'signedData': {signed}"
    )

    with smtplib.SMTP(MAIL_SERVER, MAIL_PORT) as server:
        if MAIL_USE_TLS:
            server.starttls()
        if MAIL_USERNAME and MAIL_PASSWORD:
            server.login(MAIL_USERNAME, MAIL_PASSWORD)
        server.send_message(msg)


async def send_periodic_session_email():
    global _SESSION_LOG_PROCESSED_COUNT, _SESSION_LOG_SIGNED_DATA_COUNT
    while True:
        await asyncio.sleep(_seconds_until_next_summary())
        async with _SESSION_LOG_COUNTER_LOCK:
            processed = _SESSION_LOG_PROCESSED_COUNT
            signed = _SESSION_LOG_SIGNED_DATA_COUNT
            _SESSION_LOG_PROCESSED_COUNT = 0
            _SESSION_LOG_SIGNED_DATA_COUNT = 0
        try:
            await asyncio.to_thread(_send_session_email, processed, signed)
        except Exception:
            logging.debug("Failed to send extended session summary email", exc_info=True)


def log_ocpp_message(chargepoint_id: str, direction: str, message: Any, topic: str):
    global _ocpp_log_drop_count, _ocpp_log_last_drop_at, _ocpp_log_drop_events
    ocpp_endpoint = CONFIG.get("ocpp_endpoint", "")
    safe_message = _mask_sensitive_identifiers(copy.deepcopy(message))
    # Use timezone-aware UTC timestamp to avoid deprecation warnings
    ts = datetime.now(timezone.utc)
    if _ocpp_log_queue is None:
        return
    entry = {
        "chargepoint_id": chargepoint_id,
        "ocpp_endpoint": ocpp_endpoint,
        "direction": direction,
        "message": json.dumps(safe_message, ensure_ascii=False)
        if not isinstance(safe_message, str)
        else safe_message,
        "topic": topic,
        "timestamp": ts,
    }
    try:
        _ocpp_log_queue.put_nowait(entry)
    except (asyncio.QueueFull, queue.Full):
        now = datetime.now(timezone.utc)
        _ocpp_log_drop_count += 1
        _ocpp_log_last_drop_at = now
        try:
            _ocpp_log_drop_events.append(now)
            _prune_ocpp_log_drop_events(now)
        except Exception:
            pass
        logging.debug("OCPP log queue is full; dropping message for %s", chargepoint_id)


async def log_connection_count():
    ocpp_endpoint = CONFIG.get("ocpp_endpoint", "")
    while True:
        try:
            async def _log(cursor: Any):
                await _cursor_execute(
                    cursor,
                    "INSERT INTO op_server_connection_log (ocpp_endpoint, connected_devices, ts) VALUES (%s, %s, NOW())",
                    (ocpp_endpoint, len(connected)),
                )

            await _with_db_cursor(_log, commit=True)
        except Exception:
            pass
        await asyncio.sleep(3600)


async def cleanup_stale_connections():
    """Remove stale websocket connections."""
    while True:
        now = datetime.now(timezone.utc)
        stale: list[tuple[str, WebSocketServerProtocol]] = []
        for station_id, ws in list(connected.items()):
            last = last_seen.get(station_id, connected_since.get(station_id, now))
            timeout = stale_timeouts.get(station_id, DEFAULT_STALE_TIMEOUT)
            if ws.closed or (now - last).total_seconds() > timeout:
                stale.append((station_id, ws))
        for station_id, ws in stale:
            stale_timeout_events[station_id] = stale_timeout_events.get(station_id, 0) + 1
            try:
                if not ws.closed:
                    await ws.close(code=1001, reason="stale connection")
            except Exception:
                logging.exception("Error closing stale connection for %s", station_id)
            connected.pop(station_id, None)
            connected_since.pop(station_id, None)
            last_seen.pop(station_id, None)
            stale_timeouts.pop(station_id, None)
            boot_notifications.discard(station_id)
            boot_requested.discard(station_id)
            logging.info("Removed stale connection for %s", station_id)
        await asyncio.sleep(PING_INTERVAL)


async def check_rfid_authorization(chargepoint_id: str, card_uuid: str) -> str:
    normalized_id = _normalize_chargepoint_id(chargepoint_id)
    normalized_card = _normalize_card_uuid(card_uuid)
    if normalized_card:
        cached_status = _get_cached_rfid_status(normalized_id, normalized_card)
        if cached_status is not None:
            return cached_status
    status: str | None = None
    oicp_enabled = False

    def _resolve_emsp_status(row: Any) -> str:
        valid = bool(_extract_column_value(row, "valid"))
        whitelist = _extract_column_value(row, "whitelist")
        whitelist_normalized = str(whitelist or "").strip().upper()
        if not valid:
            return "Blocked"
        if whitelist_normalized in {"NEVER", "BLOCKED"}:
            return "Blocked"
        if whitelist_normalized in {"ALLOWED", "ALLOWED_OFFLINE", "ALLOWED_LOCAL", "WHITELISTED"}:
            return "Accepted"
        if whitelist_normalized:
            return "Invalid"
        return "Accepted"

    if _is_remote_api_authorization_valid(
        normalized_id, normalized_card or None
    ):
        return "Accepted"

    async def _evaluate(cursor: Any) -> tuple[str | None, bool]:
        local_status: str | None = None
        local_oicp = False
        mapped_card = normalized_card

        if normalized_card:
            await _cursor_execute(
                cursor,
                """
                SELECT id, rfid_uuid, energy_kwh, energy_used_wh, valid_until, allowed_chargepoints
                FROM op_vouchers
                WHERE UPPER(rfid_uuid)=%s
                LIMIT 1
                """,
                (normalized_card,),
            )
            voucher_row = await _cursor_fetchone(cursor)
            if voucher_row:
                voucher_status = _evaluate_voucher_status(
                    voucher_row, normalized_id or chargepoint_id
                )
                if voucher_status != "Accepted":
                    return voucher_status, False
                return "Accepted", False

        if normalized_card:
            await _cursor_execute(
                cursor,
                """
                SELECT uid, auth_id, issuer, valid, whitelist, local_rfid
                FROM op_emsp_tokens
                WHERE UPPER(uid)=%s OR UPPER(auth_id)=%s OR UPPER(local_rfid)=%s
                LIMIT 1
                """,
                (normalized_card, normalized_card, normalized_card),
            )
            emsp_row = await _cursor_fetchone(cursor)
            if emsp_row:
                local_status = _resolve_emsp_status(emsp_row)
                mapped_card = _normalize_card_uuid(
                    _extract_column_value(emsp_row, "local_rfid")
                ) or normalized_card
                if local_status is not None:
                    return local_status, False

        await _cursor_execute(
            cursor,
            "SELECT 1 FROM op_rfid_free WHERE chargepoint_id=%s LIMIT 1",
            (normalized_id,),
        )
        if await _cursor_fetchone(cursor):
            local_status = "Accepted"
        elif not normalized_card:
            local_status = "Invalid"

        if local_status is None and normalized_card:
            await _cursor_execute(
                cursor,
                "SELECT 1 FROM op_server_rfid_blocked WHERE uuid=%s LIMIT 1",
                (mapped_card,),
            )
            if await _cursor_fetchone(cursor):
                local_status = "Blocked"

        if local_status is None and normalized_card:
            await _cursor_execute(
                cursor,
                "SELECT status FROM op_server_rfid_global WHERE uuid=%s LIMIT 1",
                (mapped_card,),
            )
            row = await _cursor_fetchone(cursor)
            if row:
                status_value = _extract_column_value(row, "status")
                normalized_status = str(status_value or "").strip().lower()
                if normalized_status in {"", "accepted"}:
                    local_status = "Accepted"
                elif normalized_status == "blocked":
                    local_status = "Blocked"
                else:
                    local_status = "Invalid"

        if local_status is None and normalized_card:
            await _cursor_execute(
                cursor,
                "SELECT freecharging, single_uuid, rfid_list FROM op_rfid_mapping WHERE chargepoint_id=%s",
                (normalized_id,),
            )
            rows = await _cursor_fetchall(cursor)
            for freecharging, single_uuid, rfid_list in rows:
                if freecharging == 1:
                    local_status = "Accepted"
                    break
                if single_uuid and single_uuid == mapped_card:
                    local_status = "Accepted"
                    break
                if rfid_list:
                    await _cursor_execute(
                        cursor,
                        "SELECT 1 FROM op_server_rfid_lists WHERE rfid_list_id=%s AND uuid=%s LIMIT 1",
                        (rfid_list, mapped_card),
                    )
                    if await _cursor_fetchone(cursor):
                        local_status = "Accepted"
                        break

        if local_status is None and normalized_card:
            local_oicp = await is_oicp_enabled(normalized_id, cursor=cursor)
        return local_status, local_oicp

    try:
        status, oicp_enabled = await _with_db_cursor(_evaluate)
        if normalized_card and status:
            _set_cached_rfid_status(normalized_id, normalized_card, status)
    except Exception:
        logging.exception("RFID authorization DB error")
        return "Error"

    if status is not None or not card_uuid:
        return status or "Invalid"
    if not oicp_enabled:
        return "Invalid"

    try:
        logging.info(
            "Calling Hubject AuthorizeStart for %s with tag %s", normalized_id, card_uuid
        )
        mapped_status, _ = await _query_hubject_api_authorize_start(
            normalized_id,
            card_uuid,
            chargepoint_id=chargepoint_id,
        )
    except asyncio.TimeoutError:
        logging.warning(
            "Hubject AuthorizeStart timed out for %s with tag %s",
            chargepoint_id,
            card_uuid,
        )
        return "Invalid"
    except Exception:
        logging.exception(
            "Hubject AuthorizeStart failed for %s with tag %s", normalized_id, card_uuid
        )
        return "Invalid"
    if not mapped_status:
        return "Invalid"
    logging.info(
        "Hubject AuthorizeStart result for %s with tag %s: %s",
        normalized_id,
        card_uuid,
        mapped_status,
    )
    return mapped_status


async def store_get_configuration(chargepoint_id: str, data: dict):
    _update_evse_id_cache(chargepoint_id, data)
    payload = json.dumps(data, ensure_ascii=False)
    if _get_cached_configuration(chargepoint_id, payload):
        return

    async def _store(cursor: Any):
        await _cursor_execute(
            cursor,
            """
            SELECT id
            FROM op_server_cp_config
            WHERE chargepoint_id=%s
            ORDER BY created_at DESC, id DESC
            LIMIT 1
            """,
            (chargepoint_id,),
        )
        existing = await _cursor_fetchone(cursor)
        if isinstance(existing, dict):
            record_id = existing.get("id")
        elif existing:
            record_id = existing[0]
        else:
            record_id = None

        if record_id is not None:
            await _cursor_execute(
                cursor,
                "UPDATE op_server_cp_config SET configuration_json=%s WHERE id=%s",
                (payload, record_id),
            )
        else:
            await _cursor_execute(
                cursor,
                "INSERT INTO op_server_cp_config (chargepoint_id, configuration_json) VALUES (%s, %s)",
                (chargepoint_id, payload),
            )

    try:
        await _with_db_cursor(_store, cursor_class=DictCursor, commit=True)
        _set_cached_configuration(chargepoint_id, payload)
    except Exception:
        logging.debug(
            "Failed to store configuration for %s", chargepoint_id, exc_info=True
        )


async def store_trigger_meter_value(chargepoint_id: str, value: float, unit: str, ts: datetime):
    async def _store(cursor: Any):
        await _cursor_execute(
            cursor,
            "INSERT INTO op_server_log_triggermeter (chargepoint_id, meter_reading, unit, ts) VALUES (%s, %s, %s, %s)",
            (chargepoint_id, value, unit, ts),
        )

    try:
        await _with_db_cursor(_store, commit=True)
    except Exception:
        pass


async def store_last_meter_reading(
    chargepoint_id: str, connector_id: int, meter_value: float, ts: datetime
):
    async def _store(cursor: Any):
        await _cursor_execute(
            cursor,
            """
            INSERT INTO op_broker_lastmeterreading (chargepoint_id, connector_id, meter_value, ts)
            VALUES (%s, %s, %s, %s) AS new_values
            ON DUPLICATE KEY UPDATE
                meter_value = new_values.meter_value,
                ts = new_values.ts
            """,
            (chargepoint_id, connector_id, meter_value, ts),
        )

    try:
        await _with_db_cursor(_store, commit=True)
    except Exception:
        pass


def _coerce_int_or_none(value: Any) -> int | None:
    if value is None:
        return None
    if isinstance(value, bool):
        return int(value)
    if isinstance(value, int):
        return value
    if isinstance(value, float):
        try:
            return int(value)
        except (TypeError, ValueError):
            return None
    text = str(value).strip()
    if not text:
        return None
    try:
        return int(text)
    except (TypeError, ValueError):
        try:
            return int(float(text))
        except (TypeError, ValueError):
            return None


def _derive_session_transaction_id(value: Any) -> int | None:
    coerced = _coerce_int_or_none(value)
    if coerced is not None:
        return coerced
    if value is None:
        return None
    try:
        digest = hashlib.sha256(str(value).encode("utf-8")).digest()
        return int.from_bytes(digest[:8], "big") & 0x7FFFFFFFFFFFFFFF
    except Exception:
        return None


async def store_charging_session_record(
    chargepoint_id: str,
    connector_id: int | None,
    transaction_id: int | None,
    id_tag: str | None,
    session_start: datetime | None,
    session_end: datetime | None,
    meter_start: int | None,
    meter_stop: int | None,
    energy_wh: int | None,
    reason: str | None,
):
    if transaction_id is None:
        return

    connector_id_value = connector_id if connector_id is not None else 0
    id_tag_value = (id_tag or "").strip()[:100]
    reason_value = (reason or "").strip()[:50] or None

    conn = None
    try:
        conn = get_db_conn()
        previous_energy = None
        with conn.cursor() as cur:
            if transaction_id is not None:
                try:
                    cur.execute(
                        """
                        SELECT energyChargedWh
                        FROM op_server_charging_sessions
                        WHERE chargepoint_id=%s AND transaction_id=%s
                        LIMIT 1
                        """,
                        (chargepoint_id, transaction_id),
                    )
                    previous_energy = _coerce_int_or_none(
                        _extract_column_value(cur.fetchone(), "energyChargedWh")
                    )
                except Exception:
                    previous_energy = None

            cur.execute(
                """
                INSERT INTO op_server_charging_sessions (
                    chargepoint_id,
                    connector_id,
                    transaction_id,
                    id_tag,
                    session_start,
                    session_end,
                    meter_start,
                    meter_stop,
                    energyChargedWh,
                    reason
                )
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                ON DUPLICATE KEY UPDATE
                    id_tag = VALUES(id_tag),
                    session_start = VALUES(session_start),
                    session_end = VALUES(session_end),
                    meter_start = VALUES(meter_start),
                    meter_stop = VALUES(meter_stop),
                    energyChargedWh = VALUES(energyChargedWh),
                    reason = VALUES(reason)
                """,
                (
                    chargepoint_id,
                    connector_id_value,
                    transaction_id,
                    id_tag_value,
                    session_start,
                    session_end,
                    meter_start,
                    meter_stop,
                    energy_wh,
                    reason_value,
                ),
            )

        if energy_wh is not None:
            baseline = previous_energy if previous_energy is not None else 0
            delta_wh = energy_wh - baseline
            if delta_wh > 0:
                _update_voucher_usage(chargepoint_id, id_tag_value, delta_wh, conn=conn)

        conn.commit()
    except Exception:
        logging.debug(
            "Failed to persist charging session for %s (tx %s)",
            chargepoint_id,
            transaction_id,
            exc_info=True,
        )
    finally:
        if conn is not None:
            try:
                conn.close()
            except Exception:
                pass


async def store_server_session(
    chargepoint_id: str,
    transaction_id: int | None,
    *,
    connector_id: int | None = None,
    id_tag: str | None = None,
    session_start: datetime | None = None,
    session_end: datetime | None = None,
    meter_start: int | None = None,
    last_meter_value: int | None = None,
    last_meter_ts: datetime | None = None,
    energy_wh: int | None = None,
    status: str | None = None,
    ended: bool | None = None,
):
    if not chargepoint_id or transaction_id is None:
        return

    connector_value = connector_id if connector_id is not None else 0
    status_value = (status or "").strip()[:64] or None
    id_tag_value = (id_tag or "").strip()[:100] or None
    ended_flag = 1 if ended else 0

    async def _store(cursor: Any):
        await _cursor_execute(
            cursor,
            """
            INSERT INTO op_server_sessions (
                chargepoint_id,
                connector_id,
                transaction_id,
                id_tag,
                session_start,
                session_end,
                meter_start,
                last_meter_value,
                last_meter_ts,
                energy_wh,
                status,
                ended_flag
            )
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            ON DUPLICATE KEY UPDATE
                session_start = COALESCE(op_server_sessions.session_start, VALUES(session_start)),
                session_end = COALESCE(VALUES(session_end), op_server_sessions.session_end),
                meter_start = COALESCE(op_server_sessions.meter_start, VALUES(meter_start)),
                last_meter_value = COALESCE(VALUES(last_meter_value), op_server_sessions.last_meter_value),
                last_meter_ts = CASE
                    WHEN op_server_sessions.last_meter_ts IS NULL THEN VALUES(last_meter_ts)
                    WHEN VALUES(last_meter_ts) IS NULL THEN op_server_sessions.last_meter_ts
                    WHEN VALUES(last_meter_ts) > op_server_sessions.last_meter_ts THEN VALUES(last_meter_ts)
                    ELSE op_server_sessions.last_meter_ts
                END,
                energy_wh = CASE
                    WHEN VALUES(energy_wh) IS NULL THEN op_server_sessions.energy_wh
                    WHEN op_server_sessions.energy_wh IS NULL THEN VALUES(energy_wh)
                    ELSE GREATEST(op_server_sessions.energy_wh, VALUES(energy_wh))
                END,
                status = COALESCE(VALUES(status), op_server_sessions.status),
                id_tag = COALESCE(op_server_sessions.id_tag, VALUES(id_tag)),
                ended_flag = GREATEST(op_server_sessions.ended_flag, VALUES(ended_flag)),
                updated_at = CURRENT_TIMESTAMP
            """,
            (
                chargepoint_id,
                connector_value,
                transaction_id,
                id_tag_value,
                session_start,
                session_end,
                meter_start,
                last_meter_value,
                last_meter_ts,
                energy_wh,
                status_value,
                ended_flag,
            ),
        )

    try:
        await _with_db_cursor(_store, commit=True)
    except Exception:
        logging.debug(
            "Failed to persist server session for %s (tx %s)",
            chargepoint_id,
            transaction_id,
            exc_info=True,
        )


def _parse_ocpp_timestamp(value: Any) -> datetime | None:
    if not isinstance(value, str):
        return None
    candidate = value.strip()
    if not candidate:
        return None
    try:
        parsed = datetime.fromisoformat(candidate.replace("Z", "+00:00"))
    except ValueError:
        return None
    if parsed.tzinfo is not None:
        return parsed.astimezone(timezone.utc).replace(tzinfo=None)
    return parsed


def _evaluate_voucher_status(
    voucher_row: Mapping[str, Any],
    chargepoint_id: str,
    *,
    now: datetime | None = None,
) -> str:
    normalized_station = (_normalize_chargepoint_id(chargepoint_id) or "").upper()
    allowed = _parse_voucher_chargepoints(
        _extract_column_value(voucher_row, "allowed_chargepoints")
    )
    if allowed and normalized_station not in allowed:
        return "Blocked"

    expiry_value = _extract_column_value(voucher_row, "valid_until")
    expiry_dt: datetime | None = None
    if isinstance(expiry_value, datetime):
        expiry_dt = expiry_value
    elif isinstance(expiry_value, date):
        expiry_dt = datetime.combine(expiry_value, datetime.min.time())
    if expiry_dt is not None:
        expiry_utc = expiry_dt
        if expiry_dt.tzinfo is None:
            expiry_utc = expiry_dt.replace(tzinfo=timezone.utc)
        try:
            if expiry_utc < (now or datetime.now(timezone.utc)):
                return "Expired"
        except Exception:
            pass

    try:
        energy_limit_wh = Decimal(
            str(_extract_column_value(voucher_row, "energy_kwh") or "0")
        ) * Decimal(
            "1000"
        )
    except (TypeError, InvalidOperation):
        return "Invalid"
    used_wh = Decimal(
        str(_extract_column_value(voucher_row, "energy_used_wh") or "0")
    )
    if energy_limit_wh <= 0:
        return "Invalid"
    if used_wh >= energy_limit_wh:
        return "Expired"
    return "Accepted"


def _update_voucher_usage(
    chargepoint_id: str, id_tag: str | None, energy_delta_wh: int | None, *, conn=None
) -> None:
    try:
        normalized_station = (_normalize_chargepoint_id(chargepoint_id) or "").upper()
        card_uuid = _normalize_card_uuid(id_tag)
        if not card_uuid:
            return
        delta_int = _coerce_int_or_none(energy_delta_wh)
        if delta_int is None or delta_int <= 0:
            return

        owns_connection = conn is None
        if conn is None:
            conn = get_db_conn()
        try:
            with conn.cursor(DictCursor) as cur:
                cur.execute(
                    """
                    SELECT id, allowed_chargepoints
                    FROM op_vouchers
                    WHERE UPPER(rfid_uuid)=%s
                    LIMIT 1
                    """,
                    (card_uuid,),
                )
                row = cur.fetchone()
                if not row:
                    return
                allowed = _parse_voucher_chargepoints(row.get("allowed_chargepoints"))
                if allowed and normalized_station not in allowed:
                    return
                cur.execute(
                    """
                    UPDATE op_vouchers
                    SET energy_used_wh = LEAST(CAST(energy_kwh * 1000 AS SIGNED), energy_used_wh + %s)
                    WHERE id=%s
                    """,
                    (delta_int, row.get("id")),
                )
            if owns_connection:
                conn.commit()
        finally:
            if owns_connection and conn is not None:
                conn.close()
    except Exception:
        logging.debug("Voucher update failed", exc_info=True)


async def store_status_notification_entry(
    chargepoint_id: str,
    connector_id: int,
    status: str,
    error_code: str | None,
    info: str | None,
    vendor_id: str | None,
    status_timestamp: datetime | None,
):
    try:
        conn = get_db_conn()
        with conn.cursor() as cur:
            cur.execute(
                """
                INSERT INTO op_server_statusnotification (
                    chargepoint_id,
                    connector_id,
                    status,
                    error_code,
                    info,
                    vendor_id,
                    status_timestamp
                )
                VALUES (%s, %s, %s, %s, %s, %s, %s)
                ON DUPLICATE KEY UPDATE
                    status = VALUES(status),
                    error_code = VALUES(error_code),
                    info = VALUES(info),
                    vendor_id = VALUES(vendor_id),
                    status_timestamp = VALUES(status_timestamp),
                    updated_at = CURRENT_TIMESTAMP
                """,
                (
                    chargepoint_id,
                    connector_id,
                    status,
                    error_code,
                    info,
                    vendor_id,
                    status_timestamp,
                ),
            )
        conn.commit()
        _invalidate_cp_metadata_cache(chargepoint_id)
    except Exception:
        pass
    finally:
        try:
            conn.close()
        except Exception:
            pass


async def ensure_pnc_csr_table():
    async def _ensure(cursor: Any):
        await _cursor_execute(
            cursor,
            """
            CREATE TABLE IF NOT EXISTS op_server_pnc_csr (
                chargepoint_id VARCHAR(255) NOT NULL PRIMARY KEY,
                csr TEXT NOT NULL,
                received_at DATETIME NOT NULL,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
            )
            """,
        )

    await _with_db_cursor(_ensure, commit=True)


async def ensure_cp_sign_requests_table():
    async def _ensure(cursor: Any):
        await _cursor_execute(
            cursor,
            """
            CREATE TABLE IF NOT EXISTS op_server_cp_sign_requests (
                id INT AUTO_INCREMENT PRIMARY KEY,
                chargepoint_id VARCHAR(255) NOT NULL,
                ts DATETIME NOT NULL,
                payload TEXT,
                hubject_response LONGTEXT,
                leaf_cert_signed LONGTEXT,
                processed INT NOT NULL DEFAULT 0,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                KEY idx_chargepoint_id (chargepoint_id),
                KEY idx_ts (ts)
            )
            """,
        )

        try:
            await _cursor_execute(
                cursor,
                """
                ALTER TABLE op_server_cp_sign_requests
                ADD COLUMN payload TEXT AFTER ts
                """,
            )
        except Exception:
            pass

        try:
            await _cursor_execute(
                cursor,
                """
                ALTER TABLE op_server_cp_sign_requests
                ADD COLUMN hubject_response LONGTEXT AFTER payload
                """,
            )
        except Exception:
            pass

        try:
            await _cursor_execute(
                cursor,
                """
                ALTER TABLE op_server_cp_sign_requests
                ADD COLUMN leaf_cert_signed LONGTEXT AFTER hubject_response
                """,
            )
        except Exception:
            pass

    await _with_db_cursor(_ensure, commit=True)


async def store_pnc_csr(
    chargepoint_id: str, csr: str, received_at: datetime | None = None
) -> None:
    if not chargepoint_id or not csr:
        return

    if received_at is None:
        received_at = datetime.now(timezone.utc)

    async def _store(cursor: Any):
        await _cursor_execute(
            cursor,
            """
            INSERT INTO op_server_pnc_csr (chargepoint_id, csr, received_at)
            VALUES (%s, %s, %s)
            ON DUPLICATE KEY UPDATE
                csr = VALUES(csr),
                received_at = VALUES(received_at)
            """,
            (chargepoint_id, csr, received_at),
        )

    try:
        await _with_db_cursor(_store, commit=True)
    except Exception:
        logging.debug("Failed to store PNC CSR for %s", chargepoint_id, exc_info=True)


async def store_cp_sign_request(
    chargepoint_id: str,
    ts: datetime | None = None,
    processed: int = 0,
    payload: str | None = None,
) -> None:
    if not chargepoint_id:
        return

    if ts is None:
        ts = datetime.now(timezone.utc)

    async def _store(cursor: Any):
        await _cursor_execute(
            cursor,
            """
            INSERT INTO op_server_cp_sign_requests (chargepoint_id, ts, processed, payload)
            VALUES (%s, %s, %s, %s)
            """,
            (chargepoint_id, ts, processed, payload),
        )

    try:
        await _with_db_cursor(_store, commit=True)
    except Exception:
        logging.debug(
            "Failed to record CP sign certificate request for %s", chargepoint_id,
            exc_info=True,
        )


def _serialize_cp_sign_payload(raw_payload: Any) -> str | None:
    if raw_payload is None:
        return None

    if isinstance(raw_payload, str):
        return raw_payload

    if isinstance(raw_payload, (bytes, bytearray)):
        try:
            return raw_payload.decode()
        except Exception:
            return raw_payload.hex()

    try:
        return json.dumps(raw_payload, ensure_ascii=False)
    except Exception:
        return str(raw_payload)


def _parse_datatransfer_data_payload(
    payload: Mapping[str, Any], *, station_id: str
) -> tuple[Mapping[str, Any], bool]:
    data_payload = payload.get("data") if isinstance(payload, Mapping) else None
    if isinstance(data_payload, str):
        try:
            parsed = json.loads(data_payload)
        except Exception:
            logging.debug(
                "DataTransfer payload for %s contains invalid JSON string", station_id,
                exc_info=True,
            )
            return {}, False
        if not isinstance(parsed, Mapping):
            logging.debug(
                "DataTransfer payload for %s JSON body is not an object", station_id,
            )
            return {}, False
        return parsed, True

    if isinstance(data_payload, Mapping):
        return data_payload, True

    logging.debug(
        "DataTransfer payload for %s has unexpected data type: %s",
        station_id,
        type(data_payload).__name__,
    )
    return {}, False


async def _resolve_datatransfer_status(
    payload: Mapping[str, Any], station_id: str
) -> str:
    vendor_id_raw = payload.get("vendorId") if isinstance(payload, Mapping) else None
    message_id_raw = payload.get("messageId") if isinstance(payload, Mapping) else None
    raw_data_payload = payload.get("data") if isinstance(payload, Mapping) else None

    vendor_id = str(vendor_id_raw).strip() if isinstance(vendor_id_raw, str) else ""
    message_id = str(message_id_raw).strip() if isinstance(message_id_raw, str) else ""

    if vendor_id.lower() == "org.openchargealliance.iso15118pnc":
        parsed_data, data_valid = _parse_datatransfer_data_payload(
            payload, station_id=station_id
        )
        return await _handle_iso15118_pnc_datatransfer(
            station_id,
            message_id,
            parsed_data,
            data_valid,
            raw_data_payload,
        )

    if vendor_id.lower() == "abb":
        return "Accepted"

    return "UnknownVendorId"


async def _handle_iso15118_pnc_datatransfer(
    station_id: str,
    message_id: str,
    parsed_data: Mapping[str, Any],
    data_payload_valid: bool,
    raw_data_payload: Any,
) -> str:
    status_value = "UnknownMessageId"

    if message_id == "SignCertificate":
        status_value = "Accepted"
        try:
            serialized_payload = _serialize_cp_sign_payload(raw_data_payload)
            await store_cp_sign_request(
                station_id, datetime.now(timezone.utc), 0, serialized_payload
            )
        except Exception:
            logging.debug(
                "Failed to log CP sign certificate request for %s", station_id,
                exc_info=True,
            )
        if not data_payload_valid:
            logging.debug(
                "PNC SignCertificate for %s missing or invalid data payload", station_id
            )
            return status_value

        csr_value = parsed_data.get("csr") if isinstance(parsed_data, Mapping) else None
        if isinstance(csr_value, str):
            csr_text = csr_value.strip()
            if csr_text:
                try:
                    await store_pnc_csr(
                        station_id, csr_text, datetime.now(timezone.utc)
                    )
                except Exception:
                    logging.debug(
                        "Failed to persist PNC CSR for %s", station_id, exc_info=True
                    )
            else:
                logging.debug(
                    "PNC SignCertificate CSR payload empty for %s", station_id
                )
        else:
            logging.debug(
                "PNC SignCertificate payload missing csr for %s", station_id
            )

    elif message_id == "TriggerMessage":
        if not data_payload_valid:
            logging.debug(
                "PNC TriggerMessage payload for %s missing or invalid data", station_id
            )
            return "Rejected"

        requested_message = (
            str(parsed_data.get("requestedMessage")).strip()
            if isinstance(parsed_data, Mapping)
            and isinstance(parsed_data.get("requestedMessage"), str)
            else ""
        )

        if requested_message == "SignV2GCertificate":
            status_value = "Accepted"
        elif requested_message:
            status_value = "Rejected"
            logging.debug(
                "PNC TriggerMessage requestedMessage not supported for %s: %s",
                station_id,
                requested_message,
            )
        else:
            status_value = "Rejected"
            logging.debug(
                "PNC TriggerMessage payload missing requestedMessage for %s", station_id
            )

    return status_value


async def log_remote_api_call(request: web.Request, payload: Mapping[str, Any]) -> None:
    try:
        url = str(request.url)
        method = request.method or ""
        try:
            serialized_payload = json.dumps(payload, ensure_ascii=False)
        except (TypeError, ValueError):
            serialized_payload = str(payload)

        async def _insert(cursor: Any):
            await _cursor_execute(
                cursor,
                """
                INSERT INTO op_server_apicall_log (url, method, payload)
                VALUES (%s, %s, %s)
                """,
                (url[:512], method[:16], serialized_payload),
            )

        await _with_db_cursor(_insert, commit=True)
    except Exception:
        logging.debug(
            "Failed to persist remote API call for %s", str(request.url), exc_info=True
        )


async def call_action(
    ws,
    action: str,
    payload: dict,
    *,
    timeout: float | None = None,
):
    uid = str(uuid.uuid4())
    msg = [2, uid, action, payload]
    fut: asyncio.Future = asyncio.get_event_loop().create_future()
    start_time = time.perf_counter()
    pending[uid] = (fut, action, start_time)
    await ws.send(json.dumps(msg))
    chargepoint_id = getattr(ws, "chargepoint_id", "")
    log_ocpp_message(chargepoint_id, "server_to_client", msg, action)
    wait_timeout = DEFAULT_ACTION_TIMEOUT if timeout is None else timeout
    if action == "UpdateFirmware":
        logging.info(
            "Sent UpdateFirmware to %s with payload %s (timeout=%ss)",
            chargepoint_id or getattr(ws, "chargepoint_id", "unknown"),
            json.dumps(payload, ensure_ascii=False),
            wait_timeout,
        )
    try:
        if wait_timeout is None or wait_timeout <= 0:
            result = await fut
        else:
            result = await asyncio.wait_for(fut, timeout=wait_timeout)
    except asyncio.TimeoutError as exc:
        cp = chargepoint_id or getattr(ws, "chargepoint_id", "unknown")
        logging.warning(
            "Timeout waiting for %s response from %s after %ss", action, cp, wait_timeout
        )
        if action == "UpdateFirmware":
            other_pending = {
                pending_uid: pending_action
                for pending_uid, (_, pending_action, _) in pending.items()
                if pending_uid != uid
            }
            logging.warning(
                "UpdateFirmware request to %s with payload %s did not receive a "
                "CallResult/CallError acknowledgement. Remaining pending actions: %s",
                cp,
                json.dumps(payload, ensure_ascii=False),
                other_pending,
            )
        duration_ms = _calculate_duration_ms(start_time)
        _record_action_metric(action, False, duration_ms, "Timeout")
        raise asyncio.TimeoutError(
            f"Timed out waiting for {action} response from {cp} after {wait_timeout}s"
        ) from exc
    finally:
        pending.pop(uid, None)
    return result


async def get_configuration(station_id: str):
    ws = connected.get(station_id)
    if not ws:
        raise ValueError("station not connected")
    data = await call_action(ws, "GetConfiguration", {})
    await store_get_configuration(station_id, data)
    return data


async def set_configuration(station_id: str, key: str, value: str):
    ws = connected.get(station_id)
    if not ws:
        raise ValueError("station not connected")
    payload = {"key": key, "value": value}
    return await call_action(ws, "ChangeConfiguration", payload)


async def send_custom_change_configuration(
    station_id: str, payload: dict[str, Any]
) -> dict[str, Any]:
    ws = connected.get(station_id)
    if not ws:
        raise ValueError("station not connected")
    if not isinstance(payload, dict):
        raise ValueError("payload must be a JSON object")
    return await call_action(ws, "ChangeConfiguration", payload)


def _stringify_config_value(value: Any) -> str:
    if isinstance(value, bool):
        return "true" if value else "false"
    return str(value)


async def set_local_auth_configuration(
    station_id: str, settings: dict[str, Any]
) -> dict[str, Any]:
    normalized_settings: dict[str, Any] = {}
    for key in LOCAL_AUTH_CONFIGURATION_KEYS:
        if key in settings and settings[key] is not None:
            normalized_settings[key] = _stringify_config_value(settings[key])

    if not normalized_settings:
        raise ValueError("no valid local authorization settings provided")

    results: dict[str, Any] = {}
    for key, value in normalized_settings.items():
        results[key] = await set_configuration(station_id, key, value)
    return results


async def get_local_list_version(station_id: str) -> dict[str, Any]:
    ws = connected.get(station_id)
    if not ws:
        raise ValueError("station not connected")
    return await call_action(ws, "GetLocalListVersion", {})


async def send_local_list(
    station_id: str,
    list_version: int,
    update_type: str,
    local_authorization_list: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
    ws = connected.get(station_id)
    if not ws:
        raise ValueError("station not connected")

    normalized_type = str(update_type).strip().lower()
    update_type_map = {"full": "Full", "differential": "Differential"}
    if normalized_type not in update_type_map:
        raise ValueError("updateType must be 'Full' or 'Differential'")

    payload: dict[str, Any] = {
        "listVersion": int(list_version),
        "updateType": update_type_map[normalized_type],
    }

    if local_authorization_list is not None:
        if not isinstance(local_authorization_list, list):
            raise ValueError("localAuthorizationList must be an array of entries")
        payload["localAuthorizationList"] = local_authorization_list

    return await call_action(ws, "SendLocalList", payload)


async def change_availability(
    station_id: str, availability_type: str, connector_id: int = 0
):
    ws = connected.get(station_id)
    if not ws:
        raise ValueError("station not connected")
    payload = {"connectorId": connector_id, "type": availability_type}
    result = await call_action(ws, "ChangeAvailability", payload)
    if isinstance(result, dict):
        status = str(result.get("status", "")).strip().lower()
        if status in {"accepted", "scheduled"}:
            _record_availability(station_id, availability_type)
        result.setdefault(
            "availability",
            availability_state.get(station_id, AVAILABILITY_UNKNOWN),
        )
    return result


async def trigger_message(
    station_id: str, requested_message: str, connector_id: int | None = None
):
    ws = connected.get(station_id)
    if not ws:
        raise ValueError("station not connected")
    payload = {"requestedMessage": requested_message}
    if connector_id is not None:
        payload["connectorId"] = connector_id
    return await call_action(ws, "TriggerMessage", payload)


async def trigger_meter_values(station_id: str, connector_id: int | None = None):
    pending_meter_triggers.add(station_id)
    return await trigger_message(station_id, "MeterValues", connector_id)


async def reset_chargepoint(station_id: str, reset_type: str = "Hard"):
    ws = connected.get(station_id)
    if not ws:
        raise ValueError("station not connected")
    payload = {"type": reset_type or "Hard"}
    return await call_action(ws, "Reset", payload)


async def request_diagnostics(
    station_id: str,
    location: str,
    retries: int = 1,
    retry_interval: int = 60,
    start_time: str | None = None,
    stop_time: str | None = None,
):
    ws = connected.get(station_id)
    if not ws:
        raise ValueError("station not connected")
    payload: dict[str, Any] = {
        "location": location,
        "retries": int(retries),
        "retryInterval": int(retry_interval),
    }
    if start_time:
        payload["startTime"] = start_time
    if stop_time:
        payload["stopTime"] = stop_time
    return await call_action(ws, "GetDiagnostics", payload)


async def install_certificate(
    station_id: str, certificate_type: str, certificate: str
):
    ws = connected.get(station_id)
    if not ws:
        raise ValueError("station not connected")
    payload = {"certificateType": certificate_type, "certificate": certificate}
    return await call_action(ws, "InstallCertificate", payload)


async def send_certificate_signed(
    station_id: str, certificate_chain: str, certificate_type: str | None = None
):
    ws = connected.get(station_id)
    if not ws:
        raise ValueError("station not connected")
    payload: dict[str, Any] = {"certificateChain": certificate_chain}
    if certificate_type:
        payload["certificateType"] = certificate_type
    return await call_action(ws, "CertificateSigned", payload)


async def get_installed_certificate_ids(
    station_id: str, certificate_type: str | None = None
):
    ws = connected.get(station_id)
    if not ws:
        raise ValueError("station not connected")
    payload: dict[str, Any] = {}
    if certificate_type:
        payload["certificateType"] = certificate_type
    return await call_action(ws, "GetInstalledCertificateIds", payload)


async def delete_certificate(
    station_id: str,
    hash_algorithm: str,
    issuer_name_hash: str,
    issuer_key_hash: str,
    serial_number: str,
):
    ws = connected.get(station_id)
    if not ws:
        raise ValueError("station not connected")
    payload = {
        "certificateHashData": {
            "hashAlgorithm": hash_algorithm,
            "issuerNameHash": issuer_name_hash,
            "issuerKeyHash": issuer_key_hash,
            "serialNumber": serial_number,
        }
    }
    return await call_action(ws, "DeleteCertificate", payload)


async def get_certificate_status(
    station_id: str,
    hash_algorithm: str,
    issuer_name_hash: str,
    issuer_key_hash: str,
    serial_number: str,
):
    ws = connected.get(station_id)
    if not ws:
        raise ValueError("station not connected")

    payload = {
        "certificateHashData": {
            "hashAlgorithm": hash_algorithm,
            "issuerNameHash": issuer_name_hash,
            "issuerKeyHash": issuer_key_hash,
            "serialNumber": serial_number,
        }
    }
    return await call_action(ws, "GetCertificateStatus", payload)


async def get_15118_ev_certificate(
    station_id: str, certificate_request: dict[str, Any]
):
    ws = connected.get(station_id)
    if not ws:
        raise ValueError("station not connected")
    payload: dict[str, Any] = {}
    if certificate_request:
        payload.update(certificate_request)
    return await call_action(ws, "Get15118EVCertificate", payload)


async def authorize_with_ocsp(
    station_id: str, id_tag: str, ocsp_data: dict[str, Any] | None = None
):
    ws = connected.get(station_id)
    if not ws:
        raise ValueError("station not connected")
    payload: dict[str, Any] = {"idTag": id_tag}
    if ocsp_data:
        payload.update(ocsp_data)
    return await call_action(ws, "Authorize", payload)


async def disconnect_chargepoint(station_id: str) -> dict[str, str]:
    """Close the websocket connection for the given station."""
    ws = connected.get(station_id)
    if not ws or getattr(ws, "closed", False):
        return {"status": "not_connected"}

    await ws.close()
    connected.pop(station_id, None)
    connected_since.pop(station_id, None)
    last_seen.pop(station_id, None)
    return {"status": "disconnected"}


async def request_boot_notification(station_id: str):
    """Request a BootNotification without forcing a reset.

    When no BootNotification has been received yet, a ``TriggerMessage``
    is sent once to prompt the charge point. If the station still does
    not respond, we do not escalate further.
    """

    if station_id not in boot_requested:
        boot_requested.add(station_id)
        startup_cfg = load_server_startup_config()
        if startup_cfg.get("trigger_boot_notification_on_message_before_boot", True):
            try:
                await trigger_message(station_id, "BootNotification")
            except Exception:
                logging.warning("Failed to trigger BootNotification for %s", station_id)


async def remote_start_transaction(
    station_id: str, connector_id: int, id_tag: str | None = None
):
    ws = connected.get(station_id)
    if not ws:
        raise ValueError("station not connected")
    payload = {"connectorId": connector_id, "idTag": id_tag or "remote"}
    return await call_action(ws, "RemoteStartTransaction", payload)


async def remote_stop_transaction(station_id: str, transaction_id: int):
    ws = connected.get(station_id)
    if not ws:
        raise ValueError("station not connected")
    payload = {"transactionId": transaction_id}
    return await call_action(ws, "RemoteStopTransaction", payload)


async def reserve_now(
    station_id: str,
    connector_id: int,
    id_tag: str,
    *,
    reservation_id: int,
    expiry_date: datetime | str | None = None,
    parent_id_tag: str | None = None,
):
    ws = connected.get(station_id)
    if not ws:
        raise ValueError("station not connected")
    payload: dict[str, Any] = {
        "connectorId": connector_id,
        "idTag": id_tag or "remote",
        "reservationId": reservation_id,
    }
    if expiry_date:
        if isinstance(expiry_date, datetime):
            payload["expiryDate"] = _format_timestamp_value(expiry_date)
        else:
            payload["expiryDate"] = str(expiry_date)
    else:
        payload["expiryDate"] = _format_timestamp_value(datetime.now(timezone.utc) + timedelta(minutes=15))
    if parent_id_tag:
        payload["parentIdTag"] = parent_id_tag
    return await call_action(ws, "ReserveNow", payload)


def _find_active_transaction_id(
    chargepoint_id: str, connector_id: int | None
) -> int | None:
    station = str(chargepoint_id or "").strip()
    if not station:
        return None

    for tx_id, info in current_transactions.items():
        if info.get("chargepoint_id") != station:
            continue

        info_connector = info.get("connector_id_int")
        if connector_id is not None:
            if info_connector == connector_id:
                return tx_id
            try:
                raw_value = info.get("connector_id")
                if raw_value is not None and int(str(raw_value).strip()) == connector_id:
                    return tx_id
            except (TypeError, ValueError):
                continue
        else:
            return tx_id

    return None


def _matches_hubject_session(metadata: Mapping[str, Any], session_id: str) -> bool:
    normalized_session = str(session_id or "").strip()
    if not normalized_session:
        return False
    candidate_keys = (
        "session_id",
        "cpo_partner_session_id",
        "emp_partner_session_id",
        "SessionID",
        "CPOPartnerSessionID",
        "EMPPartnerSessionID",
    )
    for key in candidate_keys:
        value = metadata.get(key)
        if value not in (None, "") and str(value).strip() == normalized_session:
            return True
    for nested_key in ("raw_request", "raw_payload"):
        nested = metadata.get(nested_key)
        if not isinstance(nested, Mapping):
            continue
        for key in candidate_keys:
            value = nested.get(key)
            if value not in (None, "") and str(value).strip() == normalized_session:
                return True
    return False


def _find_active_transaction_id_by_session(
    chargepoint_id: str,
    session_id: str | None,
    connector_id: int | None = None,
) -> int | None:
    station = str(chargepoint_id or "").strip()
    normalized_session = str(session_id or "").strip()
    if not station or not normalized_session:
        return None

    for tx_id, info in current_transactions.items():
        if info.get("chargepoint_id") != station:
            continue

        info_connector = info.get("connector_id_int")
        if connector_id is not None:
            if info_connector != connector_id:
                try:
                    raw_value = info.get("connector_id")
                    if (
                        raw_value is None
                        or int(str(raw_value).strip()) != connector_id
                    ):
                        continue
                except (TypeError, ValueError):
                    continue

        hubject_context = info.get("hubject")
        if not isinstance(hubject_context, Mapping):
            continue
        metadata = hubject_context.get("metadata")
        if not isinstance(metadata, Mapping):
            continue
        if _matches_hubject_session(metadata, normalized_session):
            return tx_id

    return None


def _find_active_transaction_id_by_session_any(
    session_id: str | None,
    *,
    evse_id: str | None = None,
    connector_id: int | None = None,
) -> int | None:
    normalized_session = str(session_id or "").strip()
    if not normalized_session:
        return None
    normalized_evse = str(evse_id or "").strip().lower() if evse_id else None
    fallback_match: int | None = None

    for tx_id, info in current_transactions.items():
        info_connector = info.get("connector_id_int")
        if connector_id is not None:
            if info_connector != connector_id:
                try:
                    raw_value = info.get("connector_id")
                    if (
                        raw_value is None
                        or int(str(raw_value).strip()) != connector_id
                    ):
                        continue
                except (TypeError, ValueError):
                    continue

        hubject_context = info.get("hubject")
        if not isinstance(hubject_context, Mapping):
            continue
        metadata = hubject_context.get("metadata")
        if not isinstance(metadata, Mapping):
            continue
        if not _matches_hubject_session(metadata, normalized_session):
            continue
        if normalized_evse:
            evse_value = metadata.get("evse_id") or metadata.get("EVSEId")
            if evse_value is None:
                start_info = hubject_context.get("start_info")
                if isinstance(start_info, Mapping):
                    evse_value = start_info.get("evseId") or start_info.get("EVSEId")
            if evse_value and str(evse_value).strip().lower() == normalized_evse:
                return tx_id
            if fallback_match is None:
                fallback_match = tx_id
        else:
            return tx_id

    return fallback_match


async def unlock_connector(station_id: str, connector_id: int):
    ws = connected.get(station_id)
    if not ws:
        raise ValueError("station not connected")
    payload = {"connectorId": connector_id}
    return await call_action(ws, "UnlockConnector", payload)


async def update_firmware_request(
    station_id: str,
    location: str,
    retries: int | None = None,
    retry_interval: int | None = None,
):
    ws = connected.get(station_id)
    if not ws:
        raise ValueError("station not connected")

    retrieve_date = (
        datetime.now(timezone.utc)
        .replace(microsecond=0)
        .isoformat()
        .replace("+00:00", "Z")
    )
    payload: dict[str, Any] = {
        "location": location,
        "retrieveDate": retrieve_date,
    }
    if retries is not None:
        payload["retries"] = retries
    if retry_interval is not None:
        payload["retryInterval"] = retry_interval
    return await call_action(
        ws,
        "UpdateFirmware",
        payload,
        timeout=UPDATE_FIRMWARE_TIMEOUT,
    )


async def set_operational(station_id: str, ws):
    """Set a connected charge point to Operative via ChangeAvailability."""
    try:
        payload = {"connectorId": 0, "type": "Operative"}
        result = await call_action(ws, "ChangeAvailability", payload)
        if isinstance(result, dict):
            status = str(result.get("status", "")).strip().lower()
            if status in {"accepted", "scheduled"}:
                _record_availability(station_id, AVAILABILITY_OPERATIVE)
        logging.info("Set %s to Operative", station_id)
    except Exception as exc:
        logging.warning("Failed to set %s to Operative: %s", station_id, exc)


async def apply_default_configuration(
    station_id: str, startup_cfg: dict[str, Any] | None = None
):
    """Apply server default configuration settings to a charge point."""
    if startup_cfg is None:
        startup_cfg = load_server_startup_config()
    defaults = build_server_default_config(startup_cfg)
    for key, value in defaults.items():
        try:
            await set_configuration(station_id, key, value)
        except Exception as exc:
            logging.warning("Failed to set %s for %s: %s", key, station_id, exc)


async def ensure_websocket_ping_interval(station_id: str, ws):
    """Ensure ``WebSocketPingInterval`` is set to 60 for a charge point.

    The server queries the current value via ``GetConfiguration``. If the
    returned value is missing or different from 60, the setting is updated
    using ``ChangeConfiguration``.
    """
    try:
        data = await call_action(
            ws, "GetConfiguration", {"key": ["WebSocketPingInterval"]}
        )
    except Exception as exc:
        logging.warning(
            "Failed to fetch WebSocketPingInterval for %s: %s", station_id, exc
        )
        return

    try:
        cfg = data.get("configurationKey", [])
        current = next(
            (
                item.get("value")
                for item in cfg
                if item.get("key") == "WebSocketPingInterval"
            ),
            None,
        )
        if current != "60":
            await set_configuration(station_id, "WebSocketPingInterval", "60")
    except Exception as exc:
        logging.warning(
            "Failed to set WebSocketPingInterval for %s: %s", station_id, exc
        )


async def auto_get_configuration(station_id: str, ws):
    await asyncio.sleep(10)
    if getattr(ws, "closed", False):
        return
    startup_cfg = load_server_startup_config()
    await apply_default_configuration(station_id, startup_cfg)
    if startup_cfg.get("enforce_websocket_ping_interval", True):
        await ensure_websocket_ping_interval(station_id, ws)
    try:
        await get_configuration(station_id)
    except Exception:
        pass
    if startup_cfg.get("trigger_meter_values_on_start", True):
        try:
            await trigger_meter_values(station_id)
        except Exception:
            pass
    if startup_cfg.get("trigger_status_notification_on_start", True):
        try:
            await trigger_message(station_id, "StatusNotification")
        except Exception:
            pass


async def _send_call_error_response(
    ws: Any, station_id: str, uid: Any, action: str, code: str = "NotSupported", description: str = ""
):
    resp = [4, uid, code, description or "", {}]
    await ws.send(json.dumps(resp))
    log_ocpp_message(station_id, "server_to_client", resp, action or "CallError")


async def handle_connection(ws):
    path = getattr(ws, "path", "")
    if not path and getattr(ws, "request", None):
        path = getattr(ws.request, "path", "")
    # Extract the charge point ID from the last path segment while
    # ignoring any query string or other components.
    cleaned = urlparse(path).path.strip("/") if path else ""
    station_id = cleaned.split("/")[-1] if cleaned else str(uuid.uuid4())
    subprotocol = getattr(ws, "subprotocol", None)
    ocpp_version = _resolve_ocpp_version_from_subprotocol(subprotocol)
    is_ocpp2 = ocpp_version.startswith("ocpp2")
    if is_ocpp2 and not ENABLE_OCPP_2X:
        logging.warning(
            "Rejecting OCPP 2.x connection for %s (subprotocol=%s) because enable_ocpp_2x is disabled",
            station_id,
            subprotocol,
        )
        try:
            await ws.close(code=1003, reason="OCPP 2.x disabled")
        finally:
            return
    ws.ocpp_version = ocpp_version
    peer = getattr(ws, "peer", None)
    logging.info("WebSocket connection established for %s from %s", station_id, peer)
    connected[station_id] = ws
    now = datetime.now(timezone.utc)
    stale_timeout_events.setdefault(station_id, 0)
    connected_since[station_id] = now
    last_seen[station_id] = now
    initial_startup_cfg = load_server_startup_config()
    stale_timeouts[station_id] = resolve_stale_timeout(
        initial_startup_cfg.get("heartbeat_interval")
    )
    availability_state.setdefault(station_id, AVAILABILITY_UNKNOWN)
    ws.chargepoint_id = station_id
    boot_notifications.discard(station_id)
    boot_requested.discard(station_id)
    oicp_flag = await is_oicp_enabled(station_id)
    logging.info("OICP enabled: %s for %s", str(bool(oicp_flag)).lower(), station_id)
    async def on_control(_):
        last_seen[station_id] = datetime.now(timezone.utc)

    ws.ping_handler = on_control
    ws.pong_handler = on_control

    config_task = asyncio.create_task(auto_get_configuration(station_id, ws))
    tasks = [config_task]
    try:
        async for raw in ws:
            last_seen[station_id] = datetime.now(timezone.utc)
            logging.debug("Raw message from %s: %s", station_id, raw)
            try:
                data = json.loads(raw)
            except json.JSONDecodeError:
                logging.warning("Invalid JSON from %s: %s", station_id, raw)
                continue
            if not isinstance(data, list) or len(data) < 3:
                logging.warning("Malformed OCPP frame from %s: %s", station_id, data)
                continue
            msg_type, uid = data[0], data[1]
            if not isinstance(msg_type, int):
                logging.warning("Invalid message type from %s: %s", station_id, msg_type)
                continue
            if msg_type == 3 and uid in pending:
                fut, req_action, start_time = pending.get(uid)
                log_ocpp_message(station_id, "client_to_server", data, req_action)
                if fut and not fut.done():
                    fut.set_result(data[2] if len(data) > 2 else {})
                duration_ms = _calculate_duration_ms(start_time)
                _record_action_metric(req_action, True, duration_ms)
                continue
            if msg_type == 4:
                fut_action = None
                pending_entry = pending.get(uid)
                if pending_entry:
                    fut, fut_action, start_time = pending_entry
                    log_ocpp_message(
                        station_id, "client_to_server", data, fut_action or "Error"
                    )
                    error_code = data[2] if len(data) > 2 else "UnknownError"
                    error_description = data[3] if len(data) > 3 else ""
                    error_details = data[4] if len(data) > 4 else None
                    duration_ms = _calculate_duration_ms(start_time)
                    _record_action_metric(
                        fut_action or "UnknownAction", False, duration_ms, error_code
                    )
                    logging.error(
                        "OCPP CallError for %s (%s): %s - %s",
                        station_id,
                        fut_action or uid,
                        error_code,
                        error_description,
                    )
                    if fut and not fut.done():
                        fut.set_exception(
                            OCPPCallError(
                                fut_action,
                                str(error_code),
                                str(error_description),
                                error_details,
                            )
                        )
                else:
                    log_ocpp_message(station_id, "client_to_server", data, "Error")
                continue
            if msg_type != 2:
                continue
            action = data[2]
            if not isinstance(action, str):
                logging.warning(
                    "Ignoring OCPP frame with non-string action from %s: %s", station_id, data
                )
                continue
            action_timer = _ActionMetricsTimer(action)
            ocpp_version = str(getattr(ws, "ocpp_version", "")).lower()
            ocpp2 = ocpp_version.startswith("ocpp2")
            payload = _normalize_ocpp_payload(data[3] if len(data) > 3 else {}, ocpp2=ocpp2)
            log_ocpp_message(station_id, "client_to_server", data, action)
            if ocpp2 and action not in OCPP2_SUPPORTED_ACTIONS:
                await _send_call_error_response(
                    ws,
                    station_id,
                    uid,
                    action,
                    "NotSupported",
                    f"OCPP 2.x action {action} is not supported",
                )
                action_timer.call_error("NotSupported")
                action_timer.finalize()
                continue
            if action != "BootNotification" and station_id not in boot_notifications:
                asyncio.create_task(request_boot_notification(station_id))
            if action == "BootNotification":
                startup_cfg = load_server_startup_config()
                heartbeat_interval = startup_cfg.get("heartbeat_interval", 300)
                try:
                    heartbeat_interval_int = int(heartbeat_interval)
                except (TypeError, ValueError):
                    heartbeat_interval_int = 300
                if heartbeat_interval_int <= 0:
                    heartbeat_interval_int = 300
                stale_timeouts[station_id] = resolve_stale_timeout(
                    heartbeat_interval_int
                )
                resp = [3, uid, {
                    "currentTime": datetime.now(timezone.utc).isoformat(),
                    "interval": heartbeat_interval_int,
                    "status": "Accepted"
                }]
                await ws.send(json.dumps(resp))
                log_ocpp_message(station_id, "server_to_client", resp, action)
                action_timer.success()
                if startup_cfg.get("change_availability_operative", True):
                    asyncio.create_task(set_operational(station_id, ws))
                if startup_cfg.get("send_heartbeat_change_on_boot", True):
                    hb_uid = str(uuid.uuid4())
                    hb_msg = [
                        2,
                        hb_uid,
                        "ChangeConfiguration",
                        {
                            "key": "HeartbeatInterval",
                            "value": str(heartbeat_interval_int),
                        },
                    ]
                    await ws.send(json.dumps(hb_msg))
                    log_ocpp_message(
                        station_id, "server_to_client", hb_msg, "ChangeConfiguration"
                    )
                boot_notifications.add(station_id)
            elif action == "Heartbeat":
                resp = [3, uid, {"currentTime": datetime.now(timezone.utc).isoformat()}]
                await ws.send(json.dumps(resp))
                log_ocpp_message(station_id, "server_to_client", resp, action)
                action_timer.success()
            elif action == "Authorize":
                card_uuid = _extract_id_token_value(payload)
                payload = dict(payload)
                if card_uuid and "idTag" not in payload:
                    payload["idTag"] = card_uuid
                logging.info(
                    "RFID card presented at %s: %s", station_id, card_uuid
                )
                id_tag_info, _pnc_decision = await resolve_authorize_id_tag_info(
                    station_id, payload
                )
                response_payload: dict[str, Any] = (
                    {"idTokenInfo": dict(id_tag_info)}
                    if ocpp2
                    else {"idTagInfo": id_tag_info}
                )
                resp = [3, uid, response_payload]
                await ws.send(json.dumps(resp))
                logging.info(
                    "RFID response for %s at %s: %s",
                    card_uuid,
                    station_id,
                    id_tag_info.get("status"),
                )
                log_ocpp_message(station_id, "server_to_client", resp, action)
                action_timer.success()
            elif ocpp2 and action == "TransactionEvent":
                event_type_raw = payload.get("eventType")
                event_type = str(event_type_raw).strip().lower() if event_type_raw is not None else ""
                tx_info = payload.get("transactionInfo") if isinstance(payload.get("transactionInfo"), Mapping) else {}
                tx_id = tx_info.get("transactionId") or payload.get("transactionId")
                if tx_id is None:
                    tx_id = str(uid)
                tx_id_int = _derive_session_transaction_id(tx_id)
                evse = payload.get("evse") if isinstance(payload.get("evse"), Mapping) else {}
                connector_id = evse.get("connectorId") if isinstance(evse, Mapping) else None
                connector_id_int = _coerce_int_or_none(connector_id)
                evse_id = evse.get("id") if isinstance(evse, Mapping) else None
                timestamp_value = payload.get("timestamp") or tx_info.get("timestamp")
                ts_dt = _parse_timestamp_utc(timestamp_value)
                meter_value, unit, meter_ts = _extract_meter_value_data(payload, default_ts=ts_dt)
                ts_dt = meter_ts or ts_dt
                meter_value_int = _coerce_int_or_none(meter_value)
                id_tag_value = _extract_id_token_value(payload)
                id_token_info = None
                if id_tag_value:
                    try:
                        id_token_info, _ = await resolve_authorize_id_tag_info(
                            station_id, payload
                        )
                    except Exception:
                        id_token_info = None
                response_payload: dict[str, Any] = {}
                if id_token_info:
                    response_payload["idTokenInfo"] = dict(id_token_info)
                if event_type == "started":
                    normalized_station_id = _normalize_chargepoint_id(station_id) or station_id
                    oicp_enabled = False
                    if (
                        HUBJECT_CLIENT is not None
                        and connector_id_int is not None
                        and connector_id_int > 0
                    ):
                        oicp_enabled = await is_oicp_enabled(station_id)
                    start_info = dict(payload)
                    start_info["chargepoint_id"] = station_id
                    start_info.setdefault("transactionId", tx_id)
                    if connector_id_int is not None:
                        start_info["connectorId"] = connector_id_int
                    if evse_id is not None:
                        start_info["evseId"] = evse_id
                    start_info.setdefault("timestamp", _format_timestamp_value(ts_dt))
                    if id_tag_value:
                        start_info.setdefault("idTag", id_tag_value)
                    if meter_value is not None:
                        start_info.setdefault("meterStart", _coerce_int_or_none(meter_value))
                    pending_start_method = _pop_pending_start_method(
                        station_id, connector_id_int
                    )
                    current_transactions[tx_id] = {
                        "chargepoint_id": station_id,
                        "start_time": ts_dt,
                        "connector_id": connector_id,
                        "connector_id_int": connector_id_int,
                        "oicp_enabled": oicp_enabled,
                        "start_info": start_info,
                        "start_method": pending_start_method,
                    }
                    meter_start_value = _coerce_int_or_none(meter_value)
                    if meter_start_value is not None and connector_id_int is not None:
                        try:
                            await store_last_meter_reading(
                                station_id, connector_id_int, float(meter_start_value), ts_dt
                            )
                        except Exception:
                            pass
                    if (
                        oicp_enabled
                        and connector_id_int is not None
                        and connector_id_int > 0
                    ):
                        try:
                            asyncio.create_task(
                                _push_hubject_transaction_status(
                                    station_id,
                                    connector_id_int,
                                    "Occupied",
                                    ts_dt,
                                )
                            )
                        except Exception:
                            logging.debug(
                                "Failed to schedule Hubject occupied status for %s",
                                station_id,
                                exc_info=True,
                            )
                    pending_authorization = _consume_pending_hubject_authorization(
                        station_id,
                        id_tag_value,
                    )
                    if (
                        pending_authorization
                        and connector_id_int is not None
                        and connector_id_int > 0
                    ):
                        metadata = dict(pending_authorization)
                        metadata.setdefault("chargepoint_id", station_id)
                        metadata.setdefault("connector_id", connector_id_int)
                        evse_identifier = metadata.get("evse_id") or start_info.get("evseId")
                        if not evse_identifier and evse_id:
                            evse_identifier = evse_id
                        if evse_identifier:
                            metadata["evse_id"] = evse_identifier
                        if metadata.get("identification") is None and id_tag_value:
                            metadata["identification"] = {
                                "RFIDMifareFamilyIdentification": {"UID": id_tag_value}
                            }
                        metadata.setdefault("token", id_tag_value)
                        metadata.setdefault("id_tag", id_tag_value)
                        if "received_at" not in metadata:
                            metadata["received_at"] = _format_timestamp_value(
                                datetime.now(timezone.utc)
                            )
                        _set_pending_hubject_cdr(
                            station_id,
                            connector_id_int,
                            metadata,
                        )
                    _attach_hubject_metadata_to_transaction(
                        station_id,
                        connector_id_int,
                        tx_id,
                        current_transactions[tx_id],
                    )
                    entry = current_transactions.get(tx_id)
                    if entry is not None:
                        _finalize_transaction_start_method(entry, pending_start_method)
                    try:
                        await store_server_session(
                            normalized_station_id,
                            tx_id_int,
                            connector_id=connector_id_int,
                            id_tag=id_tag_value,
                            session_start=ts_dt,
                            meter_start=_coerce_int_or_none(meter_value),
                            last_meter_value=_coerce_int_or_none(meter_value),
                            last_meter_ts=ts_dt,
                            status="Started",
                            ended=False,
                        )
                    except Exception:
                        logging.debug(
                            "Failed to upsert server session start for %s (tx %s)",
                            normalized_station_id,
                            tx_id,
                            exc_info=True,
                        )
                    try:
                        asyncio.create_task(
                            log_extended_session_event(
                                station_id,
                                "Start",
                                connector_id=connector_id_int,
                                transaction_id=tx_id,
                                meter_value=meter_start_value,
                                meter_unit=unit or SESSION_LOG_UNIT,
                                meter_start=meter_start_value,
                                meter_stop=None,
                                id_tag=id_tag_value,
                                timestamp=ts_dt,
                            )
                        )
                    except Exception:
                        logging.debug(
                            "Failed to schedule extended session log for 2.x start of %s",
                            station_id,
                            exc_info=True,
                        )
                    resp = [3, uid, response_payload]
                    await ws.send(json.dumps(resp))
                    log_ocpp_message(station_id, "server_to_client", resp, action)
                    action_timer.success()
                elif event_type == "updated":
                    normalized_station_id = _normalize_chargepoint_id(station_id) or station_id
                    progress_tx_id = tx_id
                    start_entry = current_transactions.get(progress_tx_id) if progress_tx_id is not None else None
                    start_metadata = start_entry.get("start_info") if isinstance(start_entry, dict) else {}
                    start_meter_value = (
                        _coerce_int_or_none(start_metadata.get("meterStart"))
                        if isinstance(start_metadata, Mapping)
                        else None
                    )
                    id_tag_progress = (
                        start_metadata.get("idTag") if isinstance(start_metadata, Mapping) else None
                    ) or id_tag_value
                    energy_wh = None
                    if start_meter_value is not None and meter_value_int is not None:
                        diff = meter_value_int - start_meter_value
                        if diff >= 0:
                            energy_wh = diff
                    if meter_value is not None and connector_id_int is not None:
                        try:
                            await store_last_meter_reading(
                                station_id, connector_id_int, float(meter_value), ts_dt
                            )
                        except Exception:
                            pass
                    if meter_value is not None:
                        try:
                            asyncio.create_task(
                                log_extended_session_event(
                                    station_id,
                                    "Progress",
                                    connector_id=connector_id_int,
                                    transaction_id=progress_tx_id,
                                    meter_value=meter_value,
                                    meter_unit=unit or SESSION_LOG_UNIT,
                                    meter_start=start_meter_value,
                                    meter_stop=_coerce_int_or_none(meter_value),
                                    id_tag=id_tag_progress,
                                    timestamp=ts_dt,
                                )
                            )
                        except Exception:
                            logging.debug(
                                "Failed to schedule extended session log for 2.x meter values of %s",
                                station_id,
                                exc_info=True,
                            )
                    try:
                        await store_server_session(
                            normalized_station_id,
                            tx_id_int,
                            connector_id=connector_id_int,
                            id_tag=id_tag_progress,
                            last_meter_value=meter_value_int,
                            last_meter_ts=ts_dt,
                            energy_wh=energy_wh,
                            status="Progress",
                            ended=False,
                        )
                    except Exception:
                        logging.debug(
                            "Failed to upsert server session progress for %s (tx %s)",
                            normalized_station_id,
                            tx_id,
                            exc_info=True,
                        )
                    resp = [3, uid, response_payload]
                    await ws.send(json.dumps(resp))
                    log_ocpp_message(station_id, "server_to_client", resp, action)
                    action_timer.success()
                elif event_type == "ended":
                    normalized_station_id = _normalize_chargepoint_id(station_id) or station_id
                    info = current_transactions.pop(tx_id, None)
                    start_info = info.get("start_info") if isinstance(info, dict) else {}
                    connector_id_int_resolved = connector_id_int
                    if connector_id_int_resolved is None and isinstance(info, dict):
                        connector_id_int_resolved = _coerce_int_or_none(info.get("connector_id_int"))
                    if connector_id_int_resolved is None and isinstance(start_info, Mapping):
                        connector_id_int_resolved = _coerce_int_or_none(start_info.get("connectorId"))
                    if connector_id_int_resolved is None and connector_id is not None:
                        connector_id_int_resolved = _coerce_int_or_none(connector_id)
                    meter_stop_value = _coerce_int_or_none(meter_value)
                    start_time = None
                    oicp_enabled = False
                    if isinstance(info, dict):
                        start_time = info.get("start_time")
                        oicp_enabled = bool(info.get("oicp_enabled"))
                    if (
                        not oicp_enabled
                        and HUBJECT_CLIENT is not None
                        and connector_id_int_resolved is not None
                        and connector_id_int_resolved > 0
                    ):
                        oicp_enabled = await is_oicp_enabled(station_id)
                    id_tag_value = (
                        start_info.get("idTag") if isinstance(start_info, Mapping) else None
                    ) or id_tag_value
                    meter_start = None
                    if isinstance(start_info, Mapping):
                        meter_start = _coerce_int_or_none(start_info.get("meterStart"))
                    if meter_stop_value is not None and connector_id_int_resolved is not None:
                        try:
                            await store_last_meter_reading(
                                station_id,
                                connector_id_int_resolved,
                                float(meter_stop_value),
                                ts_dt,
                            )
                        except Exception:
                            pass
                    energy_charged = None
                    if meter_start is not None and meter_stop_value is not None:
                        diff = meter_stop_value - meter_start
                        if diff >= 0:
                            energy_charged = diff
                    reason_value = tx_info.get("stoppedReason") if isinstance(tx_info, Mapping) else None
                    try:
                        await store_server_session(
                            normalized_station_id,
                            tx_id_int,
                            connector_id=connector_id_int_resolved,
                            id_tag=id_tag_value,
                            session_start=start_time,
                            session_end=ts_dt,
                            meter_start=meter_start,
                            last_meter_value=meter_stop_value,
                            last_meter_ts=ts_dt,
                            energy_wh=energy_charged,
                            status=reason_value or "Ended",
                            ended=True,
                        )
                    except Exception:
                        logging.debug(
                            "Failed to upsert server session end for %s (tx %s)",
                            normalized_station_id,
                            tx_id,
                            exc_info=True,
                        )
                    try:
                        await store_charging_session_record(
                            station_id,
                            connector_id_int_resolved,
                            tx_id_int,
                            id_tag_value,
                            start_time,
                            ts_dt,
                            meter_start,
                            meter_stop_value,
                            energy_charged,
                            reason_value,
                        )
                    except Exception:
                        logging.debug(
                            "Failed to persist 2.x charging session for %s (tx %s)",
                            station_id,
                            tx_id,
                            exc_info=True,
                        )
                    try:
                        asyncio.create_task(
                            log_extended_session_event(
                                station_id,
                                "Stop",
                                connector_id=connector_id_int_resolved,
                                transaction_id=tx_id,
                                meter_unit=SESSION_LOG_UNIT,
                                meter_start=meter_start,
                                meter_stop=meter_stop_value,
                                id_tag=id_tag_value,
                                reason=reason_value,
                                transaction_data=payload.get("transactionData"),
                                timestamp=ts_dt,
                            )
                        )
                    except Exception:
                        logging.debug(
                            "Failed to schedule extended session log for 2.x stop of %s",
                            station_id,
                            exc_info=True,
                        )
                    if (
                        oicp_enabled
                        and connector_id_int_resolved is not None
                        and connector_id_int_resolved > 0
                    ):
                        try:
                            asyncio.create_task(
                                _push_hubject_transaction_status(
                                    station_id,
                                    connector_id_int_resolved,
                                    "Available",
                                    ts_dt,
                                )
                            )
                        except Exception:
                            logging.debug(
                                "Failed to schedule Hubject available status for %s",
                                station_id,
                                exc_info=True,
                            )
                    _schedule_hubject_cdr_submission(
                        station_id,
                        connector_id_int_resolved,
                        tx_id_int,
                        info if isinstance(info, Mapping) else None,
                        payload,
                        ts_dt,
                        meter_stop_value,
                    )
                    resp = [3, uid, response_payload]
                    await ws.send(json.dumps(resp))
                    log_ocpp_message(station_id, "server_to_client", resp, action)
                    action_timer.success()
                else:
                    resp = [3, uid, response_payload]
                    await ws.send(json.dumps(resp))
                    log_ocpp_message(station_id, "server_to_client", resp, action)
                    action_timer.success()
            elif action == "StartTransaction":
                card_uuid = payload.get("idTag")
                status = await check_rfid_authorization(station_id, card_uuid)
                tx_id = uuid.uuid4().int & 0xFFFFFFFF
                tx_id_int = _coerce_int_or_none(tx_id)
                ts = payload.get("timestamp")
                connector_id = payload.get("connectorId")
                connector_id_int = _coerce_int_or_none(connector_id)
                try:
                    start_time = (
                        datetime.fromisoformat(ts.replace("Z", "+00:00"))
                        if ts
                        else datetime.now(timezone.utc)
                    )
                except Exception:
                    start_time = datetime.now(timezone.utc)
                oicp_enabled = False
                if (
                    HUBJECT_CLIENT is not None
                    and connector_id_int is not None
                    and connector_id_int > 0
                ):
                    oicp_enabled = await is_oicp_enabled(station_id)
                start_info = dict(payload)
                start_info.setdefault("chargepoint_id", station_id)
                if connector_id_int is not None:
                    start_info["connectorId"] = connector_id_int
                elif connector_id is not None:
                    start_info["connectorId"] = connector_id
                start_info.setdefault(
                    "timestamp",
                    ts if isinstance(ts, str) and ts else _format_timestamp_value(start_time),
                )
                start_info.setdefault("meterStart", payload.get("meterStart"))
                start_info.setdefault("idTag", card_uuid)
                resolved_evse = None
                try:
                    resolved_evse = _resolve_evse_identifier(
                        station_id,
                        connector_id_int if connector_id_int is not None else 0,
                    )
                except Exception:
                    resolved_evse = None
                if resolved_evse:
                    start_info["evseId"] = resolved_evse
                pending_start_method = _pop_pending_start_method(
                    station_id, connector_id_int
                )
                current_transactions[tx_id] = {
                    "chargepoint_id": station_id,
                    "start_time": start_time,
                    "connector_id": connector_id,
                    "connector_id_int": connector_id_int,
                    "oicp_enabled": oicp_enabled,
                    "start_info": start_info,
                    "start_method": pending_start_method,
                }
                meter_start = payload.get("meterStart")
                if meter_start is not None and connector_id is not None:
                    try:
                        conn_id_int = int(connector_id)
                        await store_last_meter_reading(
                            station_id,
                            conn_id_int,
                            float(meter_start),
                            start_time,
                        )
                    except (ValueError, TypeError):
                        pass
                if (
                    oicp_enabled
                    and connector_id_int is not None
                    and connector_id_int > 0
                ):
                    try:
                        asyncio.create_task(
                            _push_hubject_transaction_status(
                                station_id,
                                connector_id_int,
                                "Occupied",
                                start_time,
                            )
                        )
                    except Exception:
                        logging.debug(
                            "Failed to schedule Hubject occupied status for %s", station_id,
                            exc_info=True,
                        )
                pending_authorization = _consume_pending_hubject_authorization(
                    station_id,
                    card_uuid,
                )
                if (
                    pending_authorization
                    and connector_id_int is not None
                    and connector_id_int > 0
                ):
                    metadata = dict(pending_authorization)
                    metadata.setdefault("chargepoint_id", station_id)
                    metadata.setdefault("connector_id", connector_id_int)
                    evse_identifier = metadata.get("evse_id") or start_info.get("evseId")
                    if not evse_identifier and resolved_evse:
                        evse_identifier = resolved_evse
                    if evse_identifier:
                        metadata["evse_id"] = evse_identifier
                    if metadata.get("identification") is None and card_uuid:
                        metadata["identification"] = {
                            "RFIDMifareFamilyIdentification": {"UID": card_uuid}
                        }
                    metadata.setdefault("token", card_uuid)
                    metadata.setdefault("id_tag", card_uuid)
                    if "received_at" not in metadata:
                        metadata["received_at"] = _format_timestamp_value(
                            datetime.now(timezone.utc)
                        )
                    _set_pending_hubject_cdr(
                        station_id,
                        connector_id_int,
                        metadata,
                    )
                _attach_hubject_metadata_to_transaction(
                    station_id,
                    connector_id_int,
                    tx_id,
                    current_transactions[tx_id],
                )
                entry = current_transactions.get(tx_id)
                if entry is not None:
                    _finalize_transaction_start_method(
                        entry, pending_start_method
                    )
                meter_start_int = _coerce_int_or_none(meter_start)
                normalized_station_id = _normalize_chargepoint_id(station_id) or station_id
                try:
                    await store_server_session(
                        normalized_station_id,
                        tx_id_int,
                        connector_id=connector_id_int,
                        id_tag=card_uuid,
                        session_start=start_time,
                        meter_start=meter_start_int,
                        last_meter_value=meter_start_int,
                        last_meter_ts=start_time,
                        status="Started",
                        ended=False,
                    )
                except Exception:
                    logging.debug(
                        "Failed to upsert server session start for %s (tx %s)",
                        normalized_station_id,
                        tx_id,
                        exc_info=True,
                    )
                try:
                    asyncio.create_task(
                        log_extended_session_event(
                            station_id,
                            "Start",
                            connector_id=connector_id_int,
                            transaction_id=tx_id,
                            meter_value=_coerce_int_or_none(meter_start),
                            meter_unit=SESSION_LOG_UNIT,
                            meter_start=_coerce_int_or_none(meter_start),
                            id_tag=card_uuid,
                            transaction_data=payload.get("transactionData"),
                            timestamp=start_time,
                        )
                    )
                except Exception:
                    logging.debug(
                        "Failed to schedule extended session log for start of %s",
                        station_id,
                        exc_info=True,
                    )
                resp = [
                    3,
                    uid,
                    {
                        "transactionId": tx_id,
                        "idTagInfo": {"status": status},
                    },
                ]
                await ws.send(json.dumps(resp))
                logging.info(
                    "StartTransaction response for %s at %s: %s",
                    card_uuid,
                    station_id,
                    status,
                )
                log_ocpp_message(station_id, "server_to_client", resp, action)
                action_timer.success()
            elif action == "MeterValues":
                meter_value, unit, ts_dt = _extract_meter_value_data(
                    payload, default_ts=datetime.now(timezone.utc)
                )
                connector_id = payload.get("connectorId")
                connector_id_int = _coerce_int_or_none(connector_id)
                meter_value_int = _coerce_int_or_none(meter_value)
                if meter_value is not None and connector_id_int is not None:
                    try:
                        await store_last_meter_reading(
                            station_id, connector_id_int, meter_value, ts_dt
                        )
                    except Exception:
                        pass
                if station_id in pending_meter_triggers:
                    if meter_value is not None:
                        try:
                            await store_trigger_meter_value(
                                station_id, meter_value, unit or "", ts_dt
                            )
                        except Exception:
                            pass
                    pending_meter_triggers.discard(station_id)
                progress_tx_id = _coerce_int_or_none(payload.get("transactionId"))
                start_entry = current_transactions.get(progress_tx_id) if progress_tx_id is not None else None
                start_metadata = start_entry.get("start_info") if isinstance(start_entry, dict) else {}
                start_meter_value = (
                    _coerce_int_or_none(start_metadata.get("meterStart"))
                    if isinstance(start_metadata, Mapping)
                    else None
                )
                id_tag_value = (
                    start_metadata.get("idTag") if isinstance(start_metadata, Mapping) else None
                ) or payload.get("idTag")
                energy_wh = None
                if start_meter_value is not None and meter_value_int is not None:
                    diff = meter_value_int - start_meter_value
                    if diff >= 0:
                        energy_wh = diff
                if meter_value is not None:
                    try:
                        asyncio.create_task(
                            log_extended_session_event(
                                station_id,
                                "Progress",
                                connector_id=connector_id_int,
                                transaction_id=progress_tx_id,
                                meter_value=meter_value,
                                meter_unit=unit or SESSION_LOG_UNIT,
                                meter_start=start_meter_value,
                                meter_stop=_coerce_int_or_none(meter_value),
                                id_tag=id_tag_value,
                                timestamp=ts_dt,
                            )
                        )
                    except Exception:
                        logging.debug(
                            "Failed to schedule extended session log for meter values of %s",
                            station_id,
                            exc_info=True,
                        )
                normalized_station_id = _normalize_chargepoint_id(station_id) or station_id
                try:
                    await store_server_session(
                        normalized_station_id,
                        progress_tx_id,
                        connector_id=connector_id_int,
                        id_tag=id_tag_value,
                        last_meter_value=meter_value_int,
                        last_meter_ts=ts_dt,
                        energy_wh=energy_wh,
                        status="Progress",
                        ended=False,
                    )
                except Exception:
                    logging.debug(
                        "Failed to upsert server session progress for %s (tx %s)",
                        normalized_station_id,
                        progress_tx_id,
                        exc_info=True,
                    )
                resp = [3, uid, {}]
                await ws.send(json.dumps(resp))
                log_ocpp_message(station_id, "server_to_client", resp, action)
                action_timer.success()
            elif action == "StatusNotification":
                try:
                    connector_id_raw = payload.get("connectorId")
                except AttributeError:
                    connector_id_raw = None
                try:
                    connector_id_int = int(connector_id_raw)
                except (TypeError, ValueError):
                    connector_id_int = 0
                status_value = str(payload.get("status") or "").strip()
                if not status_value:
                    status_value = "Unknown"
                error_code = payload.get("errorCode")
                if isinstance(error_code, str):
                    error_code = error_code.strip() or None
                else:
                    error_code = None
                info_value = payload.get("info")
                if isinstance(info_value, str):
                    info_value = info_value.strip() or None
                else:
                    info_value = None
                vendor_id = payload.get("vendorId")
                if isinstance(vendor_id, str):
                    vendor_id = vendor_id.strip() or None
                else:
                    vendor_id = None
                status_ts = _parse_ocpp_timestamp(payload.get("timestamp"))
                if connector_id_int == 0:
                    availability = _availability_from_status_notification(status_value)
                    if availability:
                        availability_state[station_id] = availability
                try:
                    await store_status_notification_entry(
                        station_id,
                        connector_id_int,
                        status_value,
                        error_code,
                        info_value,
                        vendor_id,
                        status_ts,
                    )
                except Exception:
                    pass
                try:
                    asyncio.create_task(
                        process_status_notification_for_hubject(
                            station_id,
                            connector_id_int,
                            status_value,
                            status_ts,
                        )
                    )
                except Exception:
                    logging.debug(
                        "Failed to schedule Hubject status update for %s", station_id,
                        exc_info=True,
                    )
                resp = [3, uid, {}]
                await ws.send(json.dumps(resp))
                log_ocpp_message(station_id, "server_to_client", resp, action)
                action_timer.success()
            elif action == "DataTransfer":
                status_value = await _resolve_datatransfer_status(payload, station_id)

                resp = [3, uid, {"status": status_value}]
                await ws.send(json.dumps(resp))
                log_ocpp_message(station_id, "server_to_client", resp, action)
                action_timer.success()
            elif action == "StopTransaction":
                tx_id = payload.get("transactionId")
                tx_id_int = _coerce_int_or_none(tx_id)
                connector_id = None
                connector_id_int = None
                oicp_enabled = False
                meter_stop = payload.get("meterStop")
                ts = payload.get("timestamp")
                try:
                    stop_time = (
                        datetime.fromisoformat(ts.replace("Z", "+00:00"))
                        if ts
                        else datetime.now(timezone.utc)
                    )
                except Exception:
                    stop_time = datetime.now(timezone.utc)
                info = None
                start_time = None
                start_info: dict[str, Any] = {}
                if tx_id is not None:
                    info = current_transactions.pop(tx_id, None)
                    if info:
                        connector_id = info.get("connector_id")
                        connector_id_int = info.get("connector_id_int")
                        oicp_enabled = bool(info.get("oicp_enabled"))
                        start_time = info.get("start_time")
                        start_info = info.get("start_info") or {}
                if connector_id is None:
                    connector_id = payload.get("connectorId")
                if connector_id_int is None and isinstance(start_info, dict):
                    connector_id_int = _coerce_int_or_none(start_info.get("connectorId"))
                if meter_stop is not None and connector_id is not None:
                    try:
                        if connector_id_int is None:
                            connector_id_int = int(connector_id)
                        await store_last_meter_reading(
                            station_id,
                            connector_id_int,
                            float(meter_stop),
                            stop_time,
                        )
                    except (ValueError, TypeError):
                        connector_id_int = None
                if connector_id_int is None and connector_id is not None:
                    try:
                        connector_id_int = int(connector_id)
                    except (TypeError, ValueError):
                        connector_id_int = None
                meter_start = None
                if isinstance(start_info, dict):
                    meter_start = _coerce_int_or_none(start_info.get("meterStart"))
                if meter_start is None:
                    meter_start = _coerce_int_or_none(payload.get("meterStart"))
                meter_stop_int = _coerce_int_or_none(meter_stop)
                if meter_stop_int is not None:
                    meter_stop = meter_stop_int
                id_tag_value = None
                if isinstance(start_info, dict):
                    id_tag_value = start_info.get("idTag")
                if not id_tag_value:
                    id_tag_value = payload.get("idTag")
                tx_id_int = _coerce_int_or_none(tx_id)
                energy_charged = None
                if meter_start is not None and meter_stop_int is not None:
                    diff = meter_stop_int - meter_start
                    if diff >= 0:
                        energy_charged = diff
                normalized_station_id = _normalize_chargepoint_id(station_id) or station_id
                try:
                    await store_server_session(
                        normalized_station_id,
                        tx_id_int,
                        connector_id=connector_id_int,
                        id_tag=id_tag_value,
                        session_start=start_time,
                        session_end=stop_time,
                        meter_start=meter_start,
                        last_meter_value=meter_stop_int,
                        last_meter_ts=stop_time,
                        energy_wh=energy_charged,
                        status=payload.get("reason") or "Stopped",
                        ended=True,
                    )
                except Exception:
                    logging.debug(
                        "Failed to upsert server session stop for %s (tx %s)",
                        normalized_station_id,
                        tx_id,
                        exc_info=True,
                    )
                try:
                    await store_charging_session_record(
                        station_id,
                        connector_id_int,
                        tx_id_int,
                        id_tag_value,
                        start_time,
                        stop_time,
                        meter_start,
                        meter_stop_int,
                        energy_charged,
                        payload.get("reason"),
                    )
                except Exception:
                    logging.debug(
                        "Failed to persist charging session for %s (tx %s)",
                        station_id,
                        tx_id,
                        exc_info=True,
                    )
                try:
                    asyncio.create_task(
                        log_extended_session_event(
                            station_id,
                            "Stop",
                            connector_id=connector_id_int,
                            transaction_id=tx_id_int if tx_id_int is not None else tx_id,
                            meter_unit=SESSION_LOG_UNIT,
                            meter_start=meter_start,
                            meter_stop=meter_stop_int if meter_stop_int is not None else meter_stop,
                            id_tag=id_tag_value,
                            reason=payload.get("reason"),
                            transaction_data=payload.get("transactionData"),
                            timestamp=stop_time,
                        )
                    )
                except Exception:
                    logging.debug(
                        "Failed to schedule extended session log for stop of %s",
                        station_id,
                        exc_info=True,
                    )
                if (
                    not oicp_enabled
                    and HUBJECT_CLIENT is not None
                    and connector_id_int is not None
                    and connector_id_int > 0
                ):
                    oicp_enabled = await is_oicp_enabled(station_id)
                if (
                    oicp_enabled
                    and connector_id_int is not None
                    and connector_id_int > 0
                ):
                    try:
                        asyncio.create_task(
                            _push_hubject_transaction_status(
                                station_id,
                                connector_id_int,
                                "Available",
                                stop_time,
                            )
                        )
                    except Exception:
                        logging.debug(
                            "Failed to schedule Hubject available status for %s",
                            station_id,
                            exc_info=True,
                        )
                hubject_context = info.get("hubject") if info else None
                if hubject_context:
                    try:
                        asyncio.create_task(
                            _notify_hubject_authorize_stop(
                                hubject_context,
                                dict(payload),
                                stop_time,
                            )
                        )
                    except Exception:
                        logging.debug(
                            "Failed to schedule Hubject authorize-stop notification for %s",
                            station_id,
                            exc_info=True,
                        )
                if tx_id is not None or (info and info.get("hubject")):
                    _schedule_hubject_cdr_submission(
                        station_id,
                        connector_id_int,
                        tx_id,
                        info,
                        payload,
                        stop_time,
                        meter_stop,
                    )
                resp = [3, uid, {}]
                await ws.send(json.dumps(resp))
                log_ocpp_message(station_id, "server_to_client", resp, action)
                action_timer.success()
                try:
                    asyncio.create_task(
                        trigger_meter_values(station_id, connector_id_int)
                    )
                except Exception:
                    logging.warning(
                        "Failed to trigger meter values for %s", station_id
                    )
            else:
                resp = [3, uid, {}]
                await ws.send(json.dumps(resp))
                log_ocpp_message(station_id, "server_to_client", resp, action)
                action_timer.success()
            action_timer.finalize()
    except ConnectionClosed as e:
        logging.warning(
            "Connection closed for %s with code %s (%s)",
            station_id,
            e.code,
            e.reason,
        )
    finally:
        for t in tasks:
            t.cancel()
        await asyncio.gather(*tasks, return_exceptions=True)
        connected.pop(station_id, None)
        connected_since.pop(station_id, None)
        last_seen.pop(station_id, None)
        stale_timeouts.pop(station_id, None)
        boot_notifications.discard(station_id)
        boot_requested.discard(station_id)
        logging.info("Cleaned up connection for %s", station_id)


def _collect_websocket_metrics() -> dict[str, Any]:
    active_connections = 0
    earliest_connected_since: datetime | None = None
    latest_last_seen: datetime | None = None

    for station_id, ws in list(connected.items()):
        if getattr(ws, "closed", False):
            continue
        active_connections += 1

        since = connected_since.get(station_id)
        if since and (earliest_connected_since is None or since < earliest_connected_since):
            earliest_connected_since = since

        last = last_seen.get(station_id)
        if last and (latest_last_seen is None or last > latest_last_seen):
            latest_last_seen = last

    fd_metrics: dict[str, Any] = {}
    try:
        fd_metrics["open_fds"] = len(os.listdir("/proc/self/fd"))
    except Exception:
        fd_metrics["open_fds"] = None

    try:
        soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE)
        fd_metrics["fd_soft_limit"] = soft_limit
    except Exception:
        fd_metrics["fd_soft_limit"] = None

    soft_limit = fd_metrics.get("fd_soft_limit")
    open_fds = fd_metrics.get("open_fds")
    if (
        isinstance(open_fds, int)
        and isinstance(soft_limit, int)
        and soft_limit > 0
    ):
        fd_metrics["fd_usage_ratio"] = open_fds / soft_limit
    else:
        fd_metrics["fd_usage_ratio"] = None

    return {
        "connections": {
            "active": active_connections,
            "tracked_entries": len(connected),
            "stale_timeouts_tracked": len(stale_timeouts),
            "stale_timeout_events": sum(stale_timeout_events.values()),
            "earliest_connected_since": _format_timestamp_value(earliest_connected_since),
            "latest_last_seen": _format_timestamp_value(latest_last_seen),
        },
        "fd_metrics": fd_metrics,
    }


def _build_station_liveness_snapshot(
    now: datetime | None = None, *, warning_threshold: int | None = None
) -> list[dict[str, Any]]:
    snapshot_time = now or datetime.now(timezone.utc)
    threshold: int | None = None
    if warning_threshold is not None:
        try:
            threshold = max(int(warning_threshold), 0)
        except (TypeError, ValueError):
            threshold = None

    entries: list[dict[str, Any]] = []
    for station_id, ws in list(connected.items()):
        if getattr(ws, "closed", False):
            continue
        since = connected_since.get(station_id)
        last = last_seen.get(station_id) or since or snapshot_time
        if not isinstance(last, datetime):
            last = snapshot_time
        elif last.tzinfo is None:
            last = last.replace(tzinfo=timezone.utc)
        else:
            last = last.astimezone(timezone.utc)
        timeout_seconds_raw = stale_timeouts.get(station_id, DEFAULT_STALE_TIMEOUT)
        try:
            timeout_seconds = int(timeout_seconds_raw)
        except (TypeError, ValueError):
            timeout_seconds = DEFAULT_STALE_TIMEOUT

        elapsed_seconds = max(int((snapshot_time - last).total_seconds()), 0)
        remaining_seconds = max(timeout_seconds - elapsed_seconds, 0)

        entry: dict[str, Any] = {
            "station_id": station_id,
            "connected_since": _format_timestamp_value(since),
            "last_seen": _format_timestamp_value(last),
            "seconds_since_last_seen": elapsed_seconds,
            "stale_timeout_seconds": timeout_seconds,
            "seconds_until_timeout": remaining_seconds,
            "stale_event_count": stale_timeout_events.get(station_id, 0),
            "ocpp_subprotocol": getattr(ws, "subprotocol", None),
        }
        if threshold is not None:
            entry["at_risk"] = remaining_seconds <= threshold

        entries.append(entry)

    return entries


app = web.Application()
routes = web.RouteTableDef()


@routes.get("/robots.txt")
async def robots_txt(request):
    return web.Response(
        text="User-agent: *\nDisallow: /\n",
        content_type="text/plain",
    )


@routes.get("/api/db_pool_status")
async def api_db_pool_status(request):
    await _init_shared_db_pool()
    if _db_pool is not None:
        return web.json_response(
            {
                "pool_type": "aiomysql",
                "mysql_pool_size": MYSQL_POOL_SIZE,
                "minsize": getattr(_db_pool, "minsize", None),
                "maxsize": getattr(_db_pool, "maxsize", None),
                "size": getattr(_db_pool, "size", None),
                "freesize": getattr(_db_pool, "freesize", None),
            }
        )
    if _db_sync_pool is not None:
        created = _db_sync_pool.created_count
        return web.json_response(
            {
                "pool_type": "pymysql_sync",
                "mysql_pool_size": MYSQL_POOL_SIZE,
                "maxsize": MYSQL_POOL_SIZE,
                "created": created,
                "available": _db_sync_pool.available_count,
                "free_slots": max(MYSQL_POOL_SIZE - created, 0),
            }
        )
    return web.json_response({"error": "No database pool configured"}, status=500)


@routes.get("/api/ocpp_log_queue_metrics")
async def api_ocpp_log_queue_metrics(request):
    window_param = request.query.get("window_minutes")
    window_minutes: float | None
    if window_param is None:
        window_minutes = OCPP_LOG_DROP_DEFAULT_WINDOW_MINUTES
    else:
        try:
            window_minutes = max(float(window_param), 0.0)
        except (TypeError, ValueError):
            window_minutes = OCPP_LOG_DROP_DEFAULT_WINDOW_MINUTES

    metrics = _build_ocpp_log_queue_metrics(window_minutes)
    return web.json_response(metrics)


@routes.get("/api/websocketCounts")
async def api_websocket_counts(request):
    metrics = _collect_websocket_metrics()
    return web.json_response(metrics)


@routes.get("/api/ocpp_action_metrics")
async def api_ocpp_action_metrics(request):
    metrics_snapshot, warnings = _build_action_metrics_snapshot()
    payload: dict[str, Any] = dict(metrics_snapshot)
    if warnings:
        payload["_warnings"] = warnings
    return web.json_response(payload)


@routes.get("/api/station_liveness")
async def api_station_liveness(request):
    now = datetime.now(timezone.utc)
    threshold_raw = request.query.get("threshold")
    threshold: int | None = None
    if threshold_raw is not None:
        try:
            threshold = max(int(threshold_raw), 0)
        except (TypeError, ValueError):
            threshold = None

    sort_param = (request.query.get("sort") or request.query.get("order_by") or "").lower()
    liveness_entries = _build_station_liveness_snapshot(now, warning_threshold=threshold)

    if sort_param in {"at_risk", "risk"}:
        liveness_entries.sort(
            key=lambda entry: (
                0 if entry.get("at_risk") else 1,
                entry.get("seconds_until_timeout", DEFAULT_STALE_TIMEOUT),
                entry.get("station_id", ""),
            )
        )
    elif sort_param in {"remaining", "time_left", "seconds_until_timeout"}:
        liveness_entries.sort(
            key=lambda entry: (
                entry.get("seconds_until_timeout", DEFAULT_STALE_TIMEOUT),
                entry.get("station_id", ""),
            )
        )
    else:
        liveness_entries.sort(key=lambda entry: entry.get("station_id", ""))

    response_payload: dict[str, Any] = {
        "generated_at": _format_timestamp_value(now),
        "stations": liveness_entries,
        "sort": sort_param or "station_id",
    }
    if threshold is not None:
        response_payload["threshold_seconds"] = threshold

    return web.json_response(response_payload)


@routes.get("/api/connected_stations")
async def api_connected_stations_compact(request):
    stations = []
    for station_id, ws in list(connected.items()):
        if getattr(ws, "closed", False):
            continue
        station_entry = {"station_id": station_id}
        since = connected_since.get(station_id)
        if since:
            station_entry["connected_since"] = _format_timestamp_value(since)
        last = last_seen.get(station_id)
        if last:
            station_entry["last_seen"] = _format_timestamp_value(last)
        stations.append(station_entry)

    return web.json_response({"stations": stations})


@routes.get("/api/getConfiguration")
async def api_get_configuration(request):
    station_id = request.query.get("station_id")
    if not station_id:
        return web.json_response({"error": "station_id required"}, status=400)
    try:
        data = await get_configuration(station_id)
        return web.json_response(data)
    except Exception as e:
        return web.json_response({"error": str(e)}, status=500)


@routes.post("/api/setConfiguration")
async def api_set_configuration(request):
    try:
        body = await request.json()
    except Exception:
        body = {}
    station_id = body.get("station_id")
    key = body.get("key")
    value = body.get("value")
    if not (station_id and key is not None and value is not None):
        return web.json_response({"error": "station_id, key and value required"}, status=400)
    try:
        data = await set_configuration(station_id, key, value)
        return web.json_response(data)
    except Exception as e:
        return web.json_response({"error": str(e)}, status=500)


@routes.post("/api/changeConfiguration/custom")
async def api_custom_change_configuration(request):
    try:
        body = await request.json()
    except Exception:
        body = {}

    station_id = body.get("station_id")
    payload = body.get("payload")

    if not station_id:
        return web.json_response({"error": "station_id required"}, status=400)
    if not isinstance(payload, dict):
        return web.json_response({"error": "payload must be an object"}, status=400)

    try:
        data = await send_custom_change_configuration(station_id, payload)
        return web.json_response(data)
    except Exception as e:
        return web.json_response({"error": str(e)}, status=500)


@routes.get("/api/localAuth/configuration")
async def api_get_local_auth_configuration(request):
    station_id = request.query.get("station_id")
    if not station_id:
        return web.json_response({"error": "station_id required"}, status=400)
    try:
        data = await get_configuration(station_id)
        settings: dict[str, Any] = {}
        for entry in data.get("configurationKey", []):
            key = entry.get("key")
            if key in LOCAL_AUTH_CONFIGURATION_KEYS:
                settings[key] = entry.get("value")
        return web.json_response(
            {
                "station_id": station_id,
                "settings": settings,
                "raw": data,
            }
        )
    except Exception as e:
        return web.json_response({"error": str(e)}, status=500)


@routes.post("/api/localAuth/configuration")
async def api_set_local_auth_configuration(request):
    body = await _parse_json(request)
    station_id = body.get("station_id")
    if not station_id:
        return web.json_response({"error": "station_id required"}, status=400)

    settings = {
        key: body.get(key)
        for key in LOCAL_AUTH_CONFIGURATION_KEYS
        if key in body
    }

    if not settings:
        return web.json_response(
            {
                "error": "no LocalAuth configuration values provided",
                "valid_keys": LOCAL_AUTH_CONFIGURATION_KEYS,
            },
            status=400,
        )

    try:
        result = await set_local_auth_configuration(station_id, settings)
        return web.json_response({"station_id": station_id, "result": result})
    except Exception as e:
        return web.json_response({"error": str(e)}, status=500)


@routes.get("/api/localAuth/listVersion")
async def api_get_local_list_version(request):
    station_id = request.query.get("station_id")
    if not station_id:
        return web.json_response({"error": "station_id required"}, status=400)
    try:
        data = await get_local_list_version(station_id)
        return web.json_response({"station_id": station_id, "result": data})
    except Exception as e:
        return web.json_response({"error": str(e)}, status=500)


@routes.post("/api/localAuth/sendLocalList")
async def api_send_local_list(request):
    body = await _parse_json(request)
    station_id = body.get("station_id")
    if not station_id:
        return web.json_response({"error": "station_id required"}, status=400)

    raw_version = body.get("listVersion")
    if raw_version is None:
        raw_version = body.get("versionNumber")
    if raw_version is None:
        return web.json_response({"error": "listVersion required"}, status=400)

    try:
        list_version = int(raw_version)
    except (TypeError, ValueError):
        return web.json_response({"error": "listVersion must be an integer"}, status=400)

    update_type = body.get("updateType") or body.get("update_type") or "Full"
    local_authorization_list = (
        body.get("localAuthorizationList")
        if "localAuthorizationList" in body
        else body.get("local_authorization_list")
    )

    try:
        data = await send_local_list(
            station_id,
            list_version,
            update_type,
            local_authorization_list,
        )
        return web.json_response({"station_id": station_id, "result": data})
    except Exception as e:
        return web.json_response({"error": str(e)}, status=500)


@routes.post("/api/changeAvailability")
async def api_change_availability(request):
    body = await _parse_json(request)
    station_id = body.get("station_id") or body.get("chargepoint_id")
    availability_type = body.get("type") or body.get("availability")
    connector_id = body.get("connectorId") or body.get("connector_id") or 0
    if station_id is None or availability_type is None:
        return web.json_response(
            {"error": "station_id and type required"}, status=400
        )
    try:
        connector_id_int = int(connector_id)
    except (TypeError, ValueError):
        return web.json_response({"error": "connectorId must be an integer"}, status=400)

    normalized_type = str(availability_type).strip().lower()
    if normalized_type not in {"operative", "inoperative"}:
        return web.json_response(
            {"error": "type must be 'Operative' or 'Inoperative'"}, status=400
        )
    normalized_type = (
        "Operative" if normalized_type == "operative" else "Inoperative"
    )
    try:
        data = await change_availability(station_id, normalized_type, connector_id_int)
        if isinstance(data, dict):
            data.setdefault(
                "availability",
                availability_state.get(station_id, AVAILABILITY_UNKNOWN),
            )
            return web.json_response(data)
        return web.json_response(
            {
                "result": data,
                "availability": availability_state.get(
                    station_id, AVAILABILITY_UNKNOWN
                ),
            }
        )
    except Exception as e:
        return web.json_response({"error": str(e)}, status=500)


@routes.get("/api/connectedStations")
async def api_connected_stations(request):
    now = datetime.now(timezone.utc)
    stations = []
    station_ids = list(connected.keys())

    connector_status_map: dict[str, dict[int, dict[str, Any]]] = defaultdict(dict)
    meter_value_map: dict[str, dict[int, dict[str, Any]]] = defaultdict(dict)
    transactions_by_station: dict[str, dict[int, dict[str, Any]]] = defaultdict(dict)

    for tx_id, info in current_transactions.items():
        chargepoint_id = info.get("chargepoint_id")
        connector_id_int = info.get("connector_id_int")
        if not chargepoint_id or connector_id_int is None:
            continue
        try:
            connector_key = int(connector_id_int)
        except (TypeError, ValueError):
            continue
        tx_entry: dict[str, Any] = {"transactionId": str(tx_id)}
        start_method = _normalize_start_method(info.get("start_method"))
        if start_method:
            tx_entry["startMethod"] = start_method
        transactions_by_station[chargepoint_id][connector_key] = tx_entry

    if station_ids:
        placeholders = ", ".join(["%s"] * len(station_ids))
        conn = None
        try:
            conn = get_db_conn()
            with conn.cursor(DictCursor) as cur:
                cur.execute(
                    f"""
                    SELECT chargepoint_id, connector_id, status, error_code, status_timestamp
                    FROM op_server_statusnotification
                    WHERE chargepoint_id IN ({placeholders})
                    """,
                    station_ids,
                )
                for row in cur.fetchall():
                    station_id = row.get("chargepoint_id")
                    connector_id = row.get("connector_id")
                    if station_id is None or connector_id is None:
                        continue
                    try:
                        connector_id_int = int(connector_id)
                    except (TypeError, ValueError):
                        continue
                    connector_status_map[station_id][connector_id_int] = {
                        "connectorId": connector_id_int,
                        "status": str(row.get("status") or "Unknown"),
                        "errorCode": str(row.get("error_code") or "NoError"),
                        "timestamp": _format_timestamp_value(row.get("status_timestamp")),
                    }

                cur.execute(
                    f"""
                    SELECT chargepoint_id, connector_id, meter_value, ts
                    FROM op_broker_lastmeterreading
                    WHERE chargepoint_id IN ({placeholders})
                    """,
                    station_ids,
                )
                for row in cur.fetchall():
                    station_id = row.get("chargepoint_id")
                    connector_id = row.get("connector_id")
                    if station_id is None or connector_id is None:
                        continue
                    try:
                        connector_id_int = int(connector_id)
                    except (TypeError, ValueError):
                        continue
                    meter_value_map[station_id][connector_id_int] = {
                        "timestamp": _format_timestamp_value(row.get("ts")),
                        "Energy.Active.Import.Register": row.get("meter_value"),
                    }
        except Exception:
            logging.debug("Failed to load connector details for connected stations", exc_info=True)
        finally:
            if conn is not None:
                try:
                    conn.close()
                except Exception:
                    pass

    for station_id in station_ids:
        since = connected_since.get(station_id)
        duration = int((now - since).total_seconds()) if since else None

        status_entries = connector_status_map.get(station_id, {})
        meter_entries = meter_value_map.get(station_id, {})
        tx_entries = transactions_by_station.get(station_id, {})
        connector_ids = (
            set(status_entries.keys())
            | set(meter_entries.keys())
            | set(tx_entries.keys())
        )

        connectors: list[dict[str, Any]] = []
        for connector_id in sorted(connector_ids):
            base_info = status_entries.get(connector_id)
            connector_info = {
                "connectorId": connector_id,
                "status": (base_info or {}).get("status", "Unknown"),
                "errorCode": (base_info or {}).get("errorCode", "NoError"),
                "timestamp": (base_info or {}).get("timestamp"),
            }
            meter_info = meter_entries.get(connector_id)
            if meter_info:
                connector_info["meterValues"] = meter_info
            tx_entry = tx_entries.get(connector_id)
            if tx_entry:
                transaction_id = tx_entry.get("transactionId")
                if transaction_id is not None:
                    connector_info["transactionId"] = transaction_id
                start_method = tx_entry.get("startMethod")
                if start_method:
                    connector_info["startMethod"] = start_method
            connectors.append(connector_info)

        stations.append(
            {
                "chargepoint_id": station_id,
                "connected": True,
                "connected_since": _format_timestamp_value(since),
                "connected_seconds": duration,
                "availability": availability_state.get(
                    station_id, AVAILABILITY_UNKNOWN
                ),
                "connectors": connectors,
            }
        )
    return web.json_response({"stations": stations})


async def _parse_json(request):
    try:
        return await request.json()
    except Exception:
        return {}


@routes.post("/api/triggerBootNotification")
async def api_trigger_boot(request):
    body = await _parse_json(request)
    station_id = body.get("station_id")
    if not station_id:
        return web.json_response({"error": "station_id required"}, status=400)
    try:
        data = await trigger_message(station_id, "BootNotification")
        return web.json_response(data)
    except Exception as e:
        return web.json_response({"error": str(e)}, status=500)


@routes.post("/api/triggerMeterValues")
async def api_trigger_meter_values(request):
    body = await _parse_json(request)
    station_id = body.get("station_id")
    connector_id = body.get("connectorId")
    if not station_id:
        return web.json_response({"error": "station_id required"}, status=400)
    try:
        data = await trigger_meter_values(station_id, connector_id)
        return web.json_response(data)
    except Exception as e:
        return web.json_response({"error": str(e)}, status=500)


@routes.post("/api/triggerStatusNotification")
async def api_trigger_status_notification(request):
    body = await _parse_json(request)
    station_id = body.get("station_id")
    connector_id = body.get("connectorId")
    if not station_id:
        return web.json_response({"error": "station_id required"}, status=400)
    try:
        data = await trigger_message(station_id, "StatusNotification", connector_id)
        return web.json_response(data)
    except Exception as e:
        return web.json_response({"error": str(e)}, status=500)


@routes.post("/api/statusRoundCall")
async def api_status_round_call(request):
    triggered: list[str] = []
    errors: dict[str, str] = {}
    for station_id in sorted(connected.keys()):
        try:
            await trigger_message(station_id, "StatusNotification")
            triggered.append(station_id)
        except Exception as exc:
            errors[station_id] = str(exc)
    response: dict[str, Any] = {"triggered": triggered}
    if errors:
        response["errors"] = errors
    return web.json_response(response)


@routes.post("/api/oicp/sync")
async def api_oicp_sync(request):
    if HUBJECT_CLIENT is None:
        message = "Hubject client ist nicht konfiguriert"
        if HUBJECT_CLIENT_ERROR:
            message = f"{message}: {HUBJECT_CLIENT_ERROR}"
        return web.json_response({"error": message}, status=503)

    body = await _parse_json(request)
    station_id = body.get("chargepoint_id") or body.get("station_id")
    if not station_id:
        return web.json_response(
            {"error": "chargepoint_id oder station_id erforderlich"}, status=400
        )
    action_type = body.get("action_type") or body.get("ActionType")

    try:
        result = await push_oicp_evse_status(station_id, action_type)
        return web.json_response(result)
    except ValueError as exc:
        return web.json_response({"error": str(exc)}, status=400)
    except RuntimeError as exc:
        message = str(exc)
        status = 503 if "Hubject client" in message else 500
        return web.json_response({"error": message}, status=status)
    except Exception as exc:
        if HubjectClientError is not None and isinstance(exc, HubjectClientError):
            status = getattr(exc, "status", 502) or 502
            return web.json_response(
                {"error": str(exc), "status": status}, status=502
            )
        logging.exception(
            "Fehler beim Auslösen von PushEvseStatusData für %s", station_id
        )
        return web.json_response({"error": str(exc)}, status=500)


@routes.post("/hubject/evse-data")
async def hubject_evse_data(request):
    if HUBJECT_CLIENT is None:
        message = "Hubject client ist nicht konfiguriert"
        if HUBJECT_CLIENT_ERROR:
            message = f"{message}: {HUBJECT_CLIENT_ERROR}"
        return web.json_response({"error": message}, status=503)

    body = await _parse_json(request)

    operator_id = (
        body.get("OperatorID")
        or body.get("operator_id")
        or (HUBJECT_CLIENT.config.operator_id if HUBJECT_CLIENT.config.operator_id else None)
    )
    if not operator_id:
        return web.json_response({"error": "OperatorID erforderlich"}, status=400)

    allowed_actions = {"insert", "update", "delete", "fullLoad"}
    action_type = body.get("ActionType") or body.get("action_type") or "update"
    action_text = str(action_type or "").strip()
    if not action_text:
        action_text = "update"
    normalized_action = next(
        (candidate for candidate in allowed_actions if candidate.lower() == action_text.lower()),
        None,
    )
    if normalized_action is None:
        return web.json_response(
            {
                "error": f"Ungültiger ActionType '{action_type}'",
                "allowed": sorted(allowed_actions),
            },
            status=400,
        )

    payload: dict[str, Any] = dict(body)
    payload["OperatorID"] = operator_id
    payload["ActionType"] = normalized_action
    payload.pop("operator_id", None)

    try:
        response = await HUBJECT_CLIENT.push_evse_data(operator_id, payload)
    except HubjectClientError as exc:
        return web.json_response({"error": str(exc), "status": exc.status}, status=502)
    except Exception as exc:
        logging.exception(
            "Fehler beim Übermitteln von Hubject EVSE-Daten für Operator %s",
            operator_id,
        )
        return web.json_response({"error": str(exc)}, status=500)

    return web.json_response(
        {
            "OperatorID": operator_id,
            "ActionType": normalized_action,
            "hubject_response": response,
        }
    )


@routes.post("/hubject/remote-start")
@routes.post("/api/hubject/remote-start")
async def hubject_remote_start(request):
    body = await _parse_json(request)
    await log_remote_api_call(request, body)
    _prune_hubject_cdr_cache()
    raw_request = body.get("raw_request") if isinstance(body, Mapping) else None
    if not isinstance(raw_request, Mapping):
        raw_request = {}

    def _extract_from_sources(keys: Sequence[str]) -> Any:
        for source in (body, raw_request):
            if not isinstance(source, Mapping):
                continue
            for key in keys:
                value = source.get(key)
                if value not in (None, ""):
                    return value
        return None

    evse_identifier = body.get("evse_id") or body.get("EVSEId")
    logging.info(
        "Hubject remote-start request received for evse_id=%s, chargepoint_id=%s: payload=%s",
        (str(evse_identifier).strip() if evse_identifier else "unknown"),
        body.get("chargepoint_id") or body.get("station_id") or "unknown",
        body,
    )
    if not evse_identifier:
        response_payload = {"error": "evse_id required"}
        logging.info(
            "Hubject remote-start response (status=%s) for evse_id=%s, chargepoint_id=%s: %s",
            400,
            "unknown",
            body.get("chargepoint_id") or body.get("station_id") or "unknown",
            response_payload,
        )
        return web.json_response(response_payload, status=400)

    evse_text = str(evse_identifier).strip()
    evse_base, evse_connector = _split_evse_identifier(evse_text)

    connector_raw = (
        body.get("connector_id")
        or body.get("ConnectorID")
        or body.get("connectorId")
        or body.get("ConnectorId")
    )
    connector_id = evse_connector
    if connector_raw not in (None, ""):
        try:
            connector_id = int(str(connector_raw).strip())
        except (TypeError, ValueError):
            logging.info(
                "Hubject remote-start request received for EVSE %s (chargepoint_id=%s) with invalid connector_id=%s: payload=%s",
                evse_text,
                body.get("chargepoint_id") or body.get("station_id") or "unknown",
                connector_raw,
                body,
            )
            response_payload = {"error": "connector_id must be an integer"}
            logging.info(
                "Hubject remote-start response (status=%s) for evse_id=%s, chargepoint_id=%s: %s",
                400,
                evse_text,
                body.get("chargepoint_id") or body.get("station_id") or "unknown",
                response_payload,
            )
            return web.json_response(response_payload, status=400)

    if connector_id is None or connector_id <= 0:
        connector_id = 1

    chargepoint_id = body.get("chargepoint_id") or body.get("station_id")
    if not chargepoint_id:
        chargepoint_id = _find_chargepoint_by_evse(evse_base)
    if not chargepoint_id:
        response_payload = {
            "error": f"unknown chargepoint for EVSE '{evse_text}'",
            "evse_id": evse_base,
        }
        logging.info(
            "Hubject remote-start response (status=%s) for evse_id=%s, chargepoint_id=%s: %s",
            404,
            evse_text,
            body.get("chargepoint_id") or body.get("station_id") or "unknown",
            response_payload,
        )
        return web.json_response(response_payload, status=404)

    chargepoint_id = str(chargepoint_id)

    token = body.get("token")
    id_tag = str(token).strip() if token else "remote"
    register_remote_api_authorization(
        chargepoint_id, id_tag, allow_any_rfid=True
    )
    logging.info(
        "Received Hubject remote start for EVSE %s (chargepoint=%s, connector=%s, id_tag=%s) with payload=%s",
        evse_text,
        chargepoint_id,
        connector_id,
        id_tag,
        body,
    )

    connector_int = int(connector_id)

    try:
        ocpp_response = await remote_start_transaction(chargepoint_id, connector_int, id_tag)
    except Exception as exc:
        logging.exception(
            "Remote start failed for %s (connector %s)", chargepoint_id, connector_id
        )
        response_payload = {"error": str(exc)}
        logging.info(
            "Hubject remote-start response (status=%s) for evse_id=%s, chargepoint_id=%s: %s",
            502,
            evse_text,
            chargepoint_id,
            response_payload,
        )
        return web.json_response(response_payload, status=502)

    response_payload: dict[str, Any] = {
        "status": ocpp_response.get("status")
        if isinstance(ocpp_response, Mapping)
        else ocpp_response,
        "chargepoint_id": chargepoint_id,
        "connector_id": connector_int,
        "id_tag": id_tag,
        "evse_id": evse_base,
    }
    if isinstance(ocpp_response, Mapping):
        response_payload["ocpp_response"] = ocpp_response

    status_value = response_payload.get("status")
    if isinstance(status_value, str) and status_value.lower() == "accepted":
        received_at = datetime.now(timezone.utc)
        identification = _extract_from_sources(
            ("Identification", "identification")
        )
        metadata: dict[str, Any] = {
            "session_id": _extract_from_sources(("session_id", "SessionID", "SessionId")),
            "cpo_partner_session_id": _extract_from_sources(
                ("CPOPartnerSessionID", "cpo_partner_session_id")
            ),
            "emp_partner_session_id": _extract_from_sources(
                ("EMPPartnerSessionID", "emp_partner_session_id")
            ),
            "operator_id": _extract_from_sources(("operator_id", "OperatorID")),
            "provider_id": _extract_from_sources(("provider_id", "ProviderID")),
            "chargepoint_id": chargepoint_id,
            "connector_id": connector_int,
            "evse_id": evse_base,
            "token": token,
            "id_tag": id_tag,
            "identification": identification,
            "raw_payload": copy.deepcopy(body),
            "raw_request": copy.deepcopy(raw_request),
            "received_at": _format_timestamp_value(received_at),
        }
        _set_pending_start_method(
            chargepoint_id, connector_int, "remote", now=received_at
        )
        _set_pending_hubject_cdr(
            chargepoint_id,
            connector_int,
            metadata,
            now=received_at,
        )
    else:
        _pop_pending_hubject_cdr(chargepoint_id, connector_int)

    logging.info(
        "Hubject remote-start response (status=%s) for evse_id=%s, chargepoint_id=%s: %s",
        200,
        evse_text,
        chargepoint_id,
        response_payload,
    )
    return web.json_response(response_payload)


@routes.post("/hubject/reserve-now")
@routes.post("/api/hubject/reserve-now")
async def hubject_reserve_now(request):
    body = await _parse_json(request)
    await log_remote_api_call(request, body)

    evse_identifier = body.get("evse_id") or body.get("EVSEId")
    if not evse_identifier:
        return web.json_response({"error": "evse_id required"}, status=400)
    evse_text = str(evse_identifier).strip()
    evse_base, evse_connector = _split_evse_identifier(evse_text)

    connector_raw = (
        body.get("connector_id")
        or body.get("ConnectorID")
        or body.get("connectorId")
        or body.get("ConnectorId")
    )
    connector_id = evse_connector or 1
    if connector_raw not in (None, ""):
        try:
            connector_id = int(str(connector_raw).strip())
        except (TypeError, ValueError):
            return web.json_response({"error": "connector_id must be an integer"}, status=400)
    if connector_id is None or connector_id <= 0:
        connector_id = 1

    reservation_raw = body.get("reservation_id") or body.get("reservationId")
    try:
        reservation_id = int(reservation_raw) if reservation_raw not in (None, "") else secrets.randbelow(900000) + 100000
    except (TypeError, ValueError):
        return web.json_response({"error": "reservation_id must be an integer"}, status=400)

    expiry_raw = body.get("expiry_date") or body.get("expiryDate")
    expiry_date: datetime | None = None
    if expiry_raw:
        try:
            expiry_text = str(expiry_raw).strip()
            if expiry_text.endswith("Z"):
                expiry_text = expiry_text.replace("Z", "+00:00")
            expiry_date = datetime.fromisoformat(expiry_text)
        except Exception:
            return web.json_response({"error": "expiry_date must be ISO-8601"}, status=400)

    chargepoint_id = body.get("chargepoint_id") or body.get("station_id")
    if not chargepoint_id:
        chargepoint_id = _find_chargepoint_by_evse(evse_base)
    if not chargepoint_id:
        return web.json_response(
            {"error": f"unknown chargepoint for EVSE '{evse_text}'", "evse_id": evse_base},
            status=404,
        )
    chargepoint_id = str(chargepoint_id)

    id_tag = str(body.get("token") or body.get("id_tag") or "remote").strip()
    parent_id_tag = body.get("parent_id_tag") or body.get("parentIdTag")

    try:
        ocpp_response = await reserve_now(
            chargepoint_id,
            int(connector_id),
            id_tag or "remote",
            reservation_id=reservation_id,
            expiry_date=expiry_date,
            parent_id_tag=str(parent_id_tag).strip() if parent_id_tag else None,
        )
    except Exception as exc:
        logging.exception("ReserveNow failed for %s (connector %s)", chargepoint_id, connector_id)
        return web.json_response({"error": str(exc)}, status=502)

    response_status = ocpp_response.get("status") if isinstance(ocpp_response, Mapping) else ocpp_response
    response_payload: dict[str, Any] = {
        "status": response_status,
        "chargepoint_id": chargepoint_id,
        "connector_id": connector_id,
        "reservation_id": reservation_id,
        "evse_id": evse_base,
    }
    if isinstance(ocpp_response, Mapping):
        response_payload["ocpp_response"] = ocpp_response

    return web.json_response(response_payload)


@routes.post("/api/pnc/ocpp-command")
async def api_pnc_ocpp_command(request):
    body = await _parse_json(request)
    await log_remote_api_call(request, body)

    action_raw = body.get("action") or body.get("command")
    station_id = (body.get("station_id") or body.get("chargepoint_id") or "").strip()
    if not action_raw:
        return web.json_response({"error": "action required"}, status=400)
    if not station_id:
        return web.json_response({"error": "station_id required"}, status=400)

    if station_id not in connected or getattr(connected.get(station_id), "closed", False):
        return web.json_response(
            {"error": "station not connected", "station_id": station_id}, status=404
        )

    ws = connected[station_id]

    action = str(action_raw).strip().lower()
    metadata_raw = body.get("metadata") if isinstance(body, Mapping) else None
    metadata = dict(metadata_raw) if isinstance(metadata_raw, Mapping) else {}
    for key in (
        "contract_cert_reference",
        "session_id",
        "partner_session_id",
        "initiated_by",
        "notes",
    ):
        value = body.get(key)
        if value not in (None, ""):
            metadata.setdefault(key, value)

    connector_raw = body.get("connectorId") or body.get("connector_id")
    connector_int: int | None = None
    if connector_raw not in (None, ""):
        try:
            connector_int = int(connector_raw)
        except (TypeError, ValueError):
            return web.json_response(
                {"error": "connectorId must be an integer"}, status=400
            )

    response_payload: dict[str, Any] = {
        "station_id": station_id,
        "action": action,
        "metadata": metadata or None,
    }

    try:
        if action == "remote_start":
            if connector_int is None:
                return web.json_response(
                    {"error": "connectorId is required for remote_start"}, status=400
                )
            id_tag = body.get("id_tag") or body.get("idTag")
            contract_reference = metadata.get("contract_cert_reference")
            if not id_tag and contract_reference:
                id_tag = f"pnc:{contract_reference}"
            ocpp_response = await remote_start_transaction(
                station_id, connector_int, id_tag
            )
            response_payload.update(
                {"connectorId": connector_int, "ocpp_response": ocpp_response}
            )
        elif action == "remote_stop":
            transaction_raw = body.get("transaction_id") or body.get("transactionId")
            transaction_int: int | None = None
            if transaction_raw not in (None, ""):
                try:
                    transaction_int = int(transaction_raw)
                except (TypeError, ValueError):
                    return web.json_response(
                        {"error": "transaction_id must be an integer"}, status=400
                    )
            if transaction_int is None:
                transaction_int = _find_active_transaction_id(station_id, connector_int)
            if transaction_int is None:
                return web.json_response(
                    {"error": "transaction_id required and no active transaction found"},
                    status=404,
                )
            ocpp_response = await remote_stop_transaction(station_id, transaction_int)
            response_payload.update(
                {"transactionId": transaction_int, "ocpp_response": ocpp_response}
            )
        elif action == "diagnostics":
            location = (
                body.get("location")
                or body.get("diagnostics_location")
                or DEFAULT_DIAGNOSTIC_LOCATION
            )
            retries_raw = body.get("retries") or body.get("retryCount")
            retry_interval_raw = body.get("retryInterval") or body.get("retry_interval")
            try:
                retries_value = int(retries_raw) if retries_raw not in (None, "") else 1
            except (TypeError, ValueError):
                return web.json_response(
                    {"error": "retries must be an integer"}, status=400
                )
            try:
                retry_interval_value = (
                    int(retry_interval_raw) if retry_interval_raw not in (None, "") else 60
                )
            except (TypeError, ValueError):
                return web.json_response(
                    {"error": "retryInterval must be an integer"}, status=400
                )
            ocpp_response = await request_diagnostics(
                station_id,
                str(location),
                retries=retries_value,
                retry_interval=retry_interval_value,
                start_time=body.get("startTime") or body.get("start_time"),
                stop_time=body.get("stopTime") or body.get("stop_time"),
            )
            response_payload.update(
                {
                    "connectorId": connector_int,
                    "location": location,
                    "retries": retries_value,
                    "retryInterval": retry_interval_value,
                    "ocpp_response": ocpp_response,
                }
            )
        elif action == "reset":
            reset_type = body.get("reset_type") or body.get("type") or "Hard"
            normalized_reset = str(reset_type).strip().capitalize()
            if normalized_reset not in {"Hard", "Soft"}:
                return web.json_response(
                    {"error": "reset_type must be 'Hard' or 'Soft'"}, status=400
                )
            ocpp_response = await reset_chargepoint(station_id, normalized_reset)
            response_payload.update(
                {"reset_type": normalized_reset, "ocpp_response": ocpp_response}
            )
        elif action == "trigger_boot":
            ocpp_response = await trigger_message(
                station_id, "BootNotification", connector_int
            )
            response_payload.update(
                {"connectorId": connector_int, "ocpp_response": ocpp_response}
            )
        elif action == "get_15118_ev_certificate":
            payload = body.get("payload") if isinstance(body, Mapping) else None
            if payload is None:
                payload = {}
            if not isinstance(payload, Mapping):
                return web.json_response(
                    {"error": "payload must be an object"}, status=400
                )

            extracted_payload = dict(payload)
            for key in (
                "exiRequest",
                "iso15118SchemaVersion",
                "contractSignatureCertChain",
                "emaid",
                "dhPublicKey",
                "manufacturerId",
                "serialNumber",
            ):
                if key in body and body[key] is not None:
                    extracted_payload.setdefault(key, body[key])

            if not extracted_payload:
                return web.json_response(
                    {"error": "payload must include exiRequest, iso15118SchemaVersion, or related fields"},
                    status=400,
                )

            ocpp_response = await call_action(
                ws, "Get15118EVCertificate", dict(extracted_payload)
            )
            response_payload.update({"payload": extracted_payload, "ocpp_response": ocpp_response})
        elif action == "authorize":
            payload = body.get("payload") if isinstance(body, Mapping) else None
            if payload is None:
                payload = {}
            if not isinstance(payload, Mapping):
                return web.json_response(
                    {"error": "payload must be an object"}, status=400
                )

            extracted_payload: dict[str, Any] = dict(payload)
            for key in (
                "idTag",
                "iso15118CertificateHashData",
                "certificateStatus",
                "iso15118CertificateHashDataChain",
                "status",
                "transactionId",
                "customData",
            ):
                value = body.get(key)
                if value is not None:
                    extracted_payload.setdefault(key, value)

            if not extracted_payload:
                return web.json_response(
                    {"error": "payload must include idTag, iso15118CertificateHashData, or related status fields"},
                    status=400,
                )

            ocpp_response = await call_action(ws, "Authorize", dict(extracted_payload))
            response_payload.update({"payload": extracted_payload, "ocpp_response": ocpp_response})
        elif action == "custom_action":
            custom_action_name = (
                body.get("ocpp_action")
                or body.get("target_action")
                or body.get("custom_action_name")
            )
            if not custom_action_name or not str(custom_action_name).strip():
                return web.json_response(
                    {"error": "custom_action requires 'ocpp_action'"}, status=400
                )

            payload = body.get("payload") if isinstance(body, Mapping) else None
            if payload is None:
                payload = {}
            if not isinstance(payload, Mapping):
                return web.json_response(
                    {"error": "payload must be an object"}, status=400
                )

            ocpp_response = await call_action(
                ws, str(custom_action_name), dict(payload)
            )
            response_payload.update(
                {
                    "ocpp_action": str(custom_action_name),
                    "payload": dict(payload),
                    "ocpp_response": ocpp_response,
                }
            )
        else:
            return web.json_response(
                {"error": f"unknown action '{action}'"}, status=400
            )
    except ValueError as exc:
        message = str(exc)
        status = 404 if "not connected" in message.lower() else 400
        return web.json_response({"error": message}, status=status)
    except Exception as exc:
        return web.json_response({"error": str(exc)}, status=500)

    return web.json_response(response_payload)


@routes.post("/api/pnc/ocpp-certificates")
async def api_pnc_ocpp_certificates(request):
    body = await _parse_json(request)
    await log_remote_api_call(request, body)

    action_raw = body.get("action") or body.get("command")
    station_id = (body.get("station_id") or body.get("chargepoint_id") or "").strip()
    if not action_raw:
        return web.json_response({"error": "action required"}, status=400)
    if not station_id:
        return web.json_response({"error": "station_id required"}, status=400)

    if station_id not in connected or getattr(connected.get(station_id), "closed", False):
        return web.json_response(
            {"error": "station not connected", "station_id": station_id}, status=404
        )

    action = str(action_raw).strip().lower()
    allowed_certificate_types = {
        "v2grootcertificate": "V2GRootCertificate",
        "morootcertificate": "MORootCertificate",
        "csmsrootcertificate": "CSMSRootCertificate",
    }
    allowed_certificate_signed_types = {
        "chargingstationcertificate": "ChargingStationCertificate",
        "v2gcertificatechain": "V2GCertificateChain",
    }
    allowed_hash_algorithms = {"sha256": "SHA256", "sha384": "SHA384", "sha512": "SHA512"}
    response_payload: dict[str, Any] = {"station_id": station_id, "action": action}

    def _extract_certificate_hash_data(
        source: Mapping[str, Any]
    ) -> tuple[str, str, str, str] | web.Response:
        certificate_hash_data = source.get("certificateHashData") or {}
        hash_algorithm_raw = certificate_hash_data.get("hashAlgorithm") or source.get(
            "hashAlgorithm"
        )
        issuer_name_hash = certificate_hash_data.get("issuerNameHash") or source.get(
            "issuerNameHash"
        )
        issuer_key_hash = certificate_hash_data.get("issuerKeyHash") or source.get(
            "issuerKeyHash"
        )
        serial_number = certificate_hash_data.get("serialNumber") or source.get(
            "serialNumber"
        )

        hash_algo_key = str(hash_algorithm_raw or "").strip().lower()
        if hash_algo_key not in allowed_hash_algorithms:
            return web.json_response(
                {"error": "hashAlgorithm must be SHA256, SHA384, or SHA512"},
                status=400,
            )

        if not all([issuer_name_hash, issuer_key_hash, serial_number]):
            return web.json_response(
                {
                    "error": "issuerNameHash, issuerKeyHash, and serialNumber are required",
                },
                status=400,
            )

        return (
            allowed_hash_algorithms[hash_algo_key],
            str(issuer_name_hash).strip(),
            str(issuer_key_hash).strip(),
            str(serial_number).strip(),
        )

    try:
        if action == "install_certificate":
            certificate_type_raw = body.get("certificateType") or body.get("certificate_type")
            certificate_type_key = (str(certificate_type_raw or "").strip()).lower()
            if certificate_type_key not in allowed_certificate_types:
                return web.json_response(
                    {"error": "certificateType must be V2GRootCertificate, MORootCertificate, or CSMSRootCertificate"},
                    status=400,
                )
            certificate_raw = body.get("certificate") or body.get("certificateChain")
            if not certificate_raw:
                return web.json_response({"error": "certificate content required"}, status=400)
            certificate_payload = _normalize_certificate_blob(certificate_raw)
            ocpp_response = await install_certificate(
                station_id, allowed_certificate_types[certificate_type_key], certificate_payload
            )
            response_payload.update(
                {
                    "certificateType": allowed_certificate_types[certificate_type_key],
                    "ocpp_response": ocpp_response,
                }
            )
        elif action == "certificate_signed":
            certificate_chain_raw = body.get("certificateChain") or body.get("certificate_chain")
            if not certificate_chain_raw:
                return web.json_response({"error": "certificateChain required"}, status=400)
            certificate_type_raw = body.get("certificateType") or body.get("certificate_type")
            normalized_type = None
            if certificate_type_raw not in (None, ""):
                type_key = str(certificate_type_raw).strip().lower()
                if type_key not in allowed_certificate_signed_types:
                    return web.json_response(
                        {
                            "error": "certificateType must be ChargingStationCertificate or V2GCertificateChain"
                        },
                        status=400,
                    )
                normalized_type = allowed_certificate_signed_types[type_key]
            normalized_chain = _normalize_certificate_blob(certificate_chain_raw)
            ocpp_response = await send_certificate_signed(
                station_id, normalized_chain, normalized_type
            )
            response_payload.update(
                {
                    "certificateChain": normalized_chain,
                    "ocpp_response": ocpp_response,
                    **({"certificateType": normalized_type} if normalized_type else {}),
                }
            )
        elif action == "get_installed_certificate_ids":
            certificate_type_raw = body.get("certificateType") or body.get("certificate_type")
            normalized_type = None
            if certificate_type_raw not in (None, ""):
                type_key = str(certificate_type_raw).strip().lower()
                if type_key not in allowed_certificate_types:
                    return web.json_response(
                        {"error": "certificateType must be V2GRootCertificate, MORootCertificate, or CSMSRootCertificate"},
                        status=400,
                    )
                normalized_type = allowed_certificate_types[type_key]
            ocpp_response = await get_installed_certificate_ids(station_id, normalized_type)
            if normalized_type:
                response_payload["certificateType"] = normalized_type
            response_payload["ocpp_response"] = ocpp_response
        elif action == "delete_certificate":
            parsed_hash_data = _extract_certificate_hash_data(body)
            if isinstance(parsed_hash_data, web.Response):
                return parsed_hash_data
            hash_algorithm, issuer_name_hash, issuer_key_hash, serial_number = (
                parsed_hash_data
            )
            ocpp_response = await delete_certificate(
                station_id, hash_algorithm, issuer_name_hash, issuer_key_hash, serial_number
            )
            response_payload.update(
                {
                    "certificateHashData": {
                        "hashAlgorithm": hash_algorithm,
                        "issuerNameHash": issuer_name_hash,
                        "issuerKeyHash": issuer_key_hash,
                        "serialNumber": serial_number,
                    },
                    "ocpp_response": ocpp_response,
                }
            )
        elif action == "get_certificate_status":
            parsed_hash_data = _extract_certificate_hash_data(body)
            if isinstance(parsed_hash_data, web.Response):
                return parsed_hash_data
            hash_algorithm, issuer_name_hash, issuer_key_hash, serial_number = (
                parsed_hash_data
            )

            ocpp_response = await get_certificate_status(
                station_id, hash_algorithm, issuer_name_hash, issuer_key_hash, serial_number
            )
            response_payload.update(
                {
                    "certificateHashData": {
                        "hashAlgorithm": hash_algorithm,
                        "issuerNameHash": issuer_name_hash,
                        "issuerKeyHash": issuer_key_hash,
                        "serialNumber": serial_number,
                    },
                    "ocpp_response": ocpp_response,
                }
            )
        else:
            return web.json_response(
                {"error": f"unknown action '{action}'"}, status=400
            )
    except ValueError as exc:
        message = str(exc)
        status = 404 if "not connected" in message.lower() else 400
        return web.json_response({"error": message}, status=status)
    except Exception as exc:
        return web.json_response({"error": str(exc)}, status=500)

    return web.json_response(response_payload)


@routes.post("/hubject/remote-stop")
@routes.post("/api/hubject/remote-stop")
async def hubject_remote_stop(request):
    body = await _parse_json(request)
    await log_remote_api_call(request, body)
    _prune_hubject_cdr_cache()

    raw_request = body.get("raw_request") if isinstance(body, Mapping) else None
    if not isinstance(raw_request, Mapping):
        raw_request = {}

    def _first_non_empty(source: Mapping[str, Any], keys: Sequence[str]) -> Any:
        for candidate_key in keys:
            value = source.get(candidate_key)
            if value not in (None, ""):
                return value
        return None

    evse_identifier = _first_non_empty(
        body,
        ("evse_id", "EVSEId", "EvseID", "EVSEID"),
    ) or _first_non_empty(raw_request, ("evse_id", "EVSEId", "EvseID", "EVSEID"))

    session_identifier = _first_non_empty(
        body,
        (
            "session_id",
            "SessionID",
            "SessionId",
            "CPOPartnerSessionID",
            "EMPPartnerSessionID",
        ),
    ) or _first_non_empty(
        raw_request,
        (
            "session_id",
            "SessionID",
            "SessionId",
            "CPOPartnerSessionID",
            "EMPPartnerSessionID",
        ),
    )

    logging.info(
        "Hubject remote-stop request received for evse_id=%s, session_id=%s, chargepoint_id=%s: payload=%s",
        (str(evse_identifier).strip() if evse_identifier else "unknown"),
        session_identifier or "-",
        body.get("chargepoint_id") or body.get("station_id") or "unknown",
        body,
    )

    if not evse_identifier:
        response_payload = {"error": "evse_id required"}
        logging.info(
            "Hubject remote-stop response (status=%s) for evse_id=%s, chargepoint_id=%s: %s",
            400,
            "unknown",
            body.get("chargepoint_id") or body.get("station_id") or "unknown",
            response_payload,
        )
        return web.json_response(response_payload, status=400)

    evse_text = str(evse_identifier).strip()
    evse_base, evse_connector = _split_evse_identifier(evse_text)

    chargepoint_id = body.get("chargepoint_id") or body.get("station_id")
    if not chargepoint_id:
        chargepoint_id = _first_non_empty(
            raw_request,
            ("chargepoint_id", "station_id", "ChargePointID", "StationID"),
        )
    if not chargepoint_id:
        chargepoint_id = _find_chargepoint_by_evse(evse_base)
    if not chargepoint_id:
        response_payload = {
            "error": f"unknown chargepoint for EVSE '{evse_text}'",
            "evse_id": evse_base,
        }
        logging.info(
            "Hubject remote-stop response (status=%s) for evse_id=%s, chargepoint_id=%s: %s",
            404,
            evse_text,
            body.get("chargepoint_id") or body.get("station_id") or "unknown",
            response_payload,
        )
        return web.json_response(response_payload, status=404)

    connector_raw = _first_non_empty(
        body,
        ("connector_id", "ConnectorID", "connectorId", "ConnectorId"),
    )
    if connector_raw in (None, ""):
        connector_raw = _first_non_empty(
            raw_request,
            ("connector_id", "ConnectorID", "connectorId", "ConnectorId"),
        )

    connector_id: int | None = evse_connector
    if connector_raw not in (None, ""):
        try:
            connector_id = int(str(connector_raw).strip())
        except (TypeError, ValueError):
            response_payload = {"error": "connector_id must be an integer"}
            logging.info(
                "Hubject remote-stop response (status=%s) for evse_id=%s, chargepoint_id=%s: %s",
                400,
                evse_text,
                chargepoint_id,
                response_payload,
            )
            return web.json_response(response_payload, status=400)

    transaction_raw = _first_non_empty(
        body,
        ("transaction_id", "TransactionID", "TransactionId"),
    )
    if transaction_raw in (None, ""):
        transaction_raw = _first_non_empty(
            raw_request,
            ("transaction_id", "TransactionID", "TransactionId"),
        )

    transaction_id: int | None = None
    if transaction_raw not in (None, ""):
        try:
            transaction_id = int(str(transaction_raw).strip())
        except (TypeError, ValueError):
            response_payload = {"error": "transaction_id must be an integer"}
            logging.info(
                "Hubject remote-stop response (status=%s) for evse_id=%s, chargepoint_id=%s: %s",
                400,
                evse_text,
                chargepoint_id,
                response_payload,
            )
            return web.json_response(response_payload, status=400)

    if transaction_id is None:
        transaction_id = _find_active_transaction_id(chargepoint_id, connector_id)
    if transaction_id is None and session_identifier:
        transaction_id = _find_active_transaction_id_by_session(
            chargepoint_id, str(session_identifier), connector_id
        )
    if transaction_id is None and session_identifier:
        transaction_id = _find_active_transaction_id_by_session_any(
            str(session_identifier),
            evse_id=evse_base,
            connector_id=connector_id,
        )

    if connector_id is not None:
        _pop_pending_hubject_cdr(chargepoint_id, connector_id)
    if transaction_id is not None:
        _pop_active_hubject_cdr(transaction_id)

    if transaction_id is None:
        response_payload = {
            "error": "active transaction not found",
            "chargepoint_id": chargepoint_id,
            "connector_id": connector_id,
            "evse_id": evse_base,
        }
        logging.info(
            "Hubject remote-stop response (status=%s) for evse_id=%s, chargepoint_id=%s: %s",
            404,
            evse_text,
            chargepoint_id,
            response_payload,
        )
        return web.json_response(response_payload, status=404)

    try:
        ocpp_response = await remote_stop_transaction(chargepoint_id, transaction_id)
    except Exception as exc:
        logging.exception(
            "Remote stop failed for %s (transaction %s)", chargepoint_id, transaction_id
        )
        response_payload = {"error": str(exc)}
        logging.info(
            "Hubject remote-stop response (status=%s) for evse_id=%s, chargepoint_id=%s: %s",
            502,
            evse_text,
            chargepoint_id,
            response_payload,
        )
        return web.json_response(response_payload, status=502)

    response_payload: dict[str, Any] = {
        "status": ocpp_response.get("status")
        if isinstance(ocpp_response, Mapping)
        else ocpp_response,
        "chargepoint_id": chargepoint_id,
        "transaction_id": transaction_id,
        "evse_id": evse_base,
    }
    if connector_id is not None:
        response_payload["connector_id"] = connector_id
    if isinstance(ocpp_response, Mapping):
        response_payload["ocpp_response"] = ocpp_response

    logging.info(
        "Hubject remote-stop response (status=%s) for evse_id=%s, chargepoint_id=%s: %s",
        200,
        evse_text,
        chargepoint_id,
        response_payload,
    )
    return web.json_response(response_payload)


@routes.post("/api/reset")
async def api_reset(request):
    body = await _parse_json(request)
    station_id = body.get("chargepoint_id") or body.get("station_id")
    reset_type = body.get("type") or body.get("reset_type") or "Hard"
    normalized = str(reset_type).strip().capitalize()
    if normalized not in {"Hard", "Soft"}:
        return web.json_response(
            {"error": "type must be 'Hard' or 'Soft'"}, status=400
        )
    if not station_id:
        return web.json_response(
            {"error": "chargepoint_id or station_id required"}, status=400
        )
    try:
        data = await reset_chargepoint(station_id, normalized)
        return web.json_response(data)
    except Exception as e:
        return web.json_response({"error": str(e)}, status=500)


@routes.post("/api/disconnect")
async def api_disconnect(request):
    body = await _parse_json(request)
    station_id = body.get("station_id")
    if not station_id:
        return web.json_response({"error": "station_id required"}, status=400)
    try:
        data = await disconnect_chargepoint(station_id)
        status = 404 if data.get("status") == "not_connected" else 200
        return web.json_response(data, status=status)
    except Exception as e:
        return web.json_response({"error": str(e)}, status=500)


@routes.post("/api/startTransaction")
async def api_start_transaction(request):
    body = await _parse_json(request)
    await log_remote_api_call(request, body)
    station_id = body.get("chargepoint_id") or body.get("station_id")
    connector_id = body.get("connector_id") or body.get("connectorId")
    id_tag = body.get("id_tag") or body.get("idTag")
    if not station_id or connector_id is None:
        return web.json_response(
            {"error": "chargepoint_id and connector_id required"}, status=400
        )
    try:
        connector_int = int(connector_id)
    except (TypeError, ValueError):
        return web.json_response({"error": "connector_id must be an integer"}, status=400)
    normalized_id_tag = id_tag or "remote"
    register_remote_api_authorization(
        station_id, normalized_id_tag, allow_any_rfid=True
    )
    logging.info(
        "Received remote start API request for %s (connector=%s, id_tag=%s)",
        station_id,
        connector_int,
        normalized_id_tag,
    )
    try:
        data = await remote_start_transaction(station_id, connector_int, normalized_id_tag)
        return web.json_response(data)
    except Exception as e:
        return web.json_response({"error": str(e)}, status=500)


@routes.post("/api/stopTransaction")
async def api_stop_transaction(request):
    body = await _parse_json(request)
    await log_remote_api_call(request, body)
    station_id = body.get("chargepoint_id") or body.get("station_id")
    transaction_id = body.get("transaction_id")
    if not station_id or transaction_id is None:
        return web.json_response(
            {"error": "chargepoint_id and transaction_id required"}, status=400
        )
    try:
        transaction_int = int(transaction_id)
    except (TypeError, ValueError):
        return web.json_response(
            {"error": "transaction_id must be an integer"}, status=400
        )
    logging.info(
        "Received remote stop API request for %s (transaction_id=%s)",
        station_id,
        transaction_int,
    )
    try:
        data = await remote_stop_transaction(station_id, transaction_int)
        return web.json_response(data)
    except Exception as e:
        return web.json_response({"error": str(e)}, status=500)


@routes.post("/api/unlockConnector")
async def api_unlock_connector(request):
    body = await _parse_json(request)
    station_id = body.get("station_id") or body.get("chargepoint_id")
    connector_id = body.get("connectorId") or body.get("connector_id")
    if station_id is None or connector_id is None:
        return web.json_response(
            {"error": "station_id and connectorId required"}, status=400
        )
    try:
        connector_int = int(connector_id)
    except (TypeError, ValueError):
        return web.json_response({"error": "connectorId must be an integer"}, status=400)
    try:
        data = await unlock_connector(station_id, connector_int)
        return web.json_response(data)
    except Exception as e:
        return web.json_response({"error": str(e)}, status=500)


@routes.post("/api/triggerMessage")
async def api_trigger_message(request):
    body = await _parse_json(request)
    station_id = body.get("station_id") or body.get("chargepoint_id")
    message = body.get("requestedMessage") or body.get("message")
    connector_id = body.get("connectorId") or body.get("connector_id")
    if not station_id or not message:
        return web.json_response(
            {"error": "station_id and requestedMessage required"}, status=400
        )
    connector_value: int | None
    if connector_id is None or connector_id == "":
        connector_value = None
    else:
        try:
            connector_value = int(connector_id)
        except (TypeError, ValueError):
            return web.json_response(
                {"error": "connectorId must be an integer"}, status=400
            )
    try:
        data = await trigger_message(station_id, str(message), connector_value)
        return web.json_response(data)
    except Exception as e:
        return web.json_response({"error": str(e)}, status=500)


@routes.post("/api/datatransfer")
async def api_datatransfer(request):
    body = await _parse_json(request)
    await log_remote_api_call(request, body)
    station_id = (
        body.get("station_id")
        or body.get("stationId")
        or body.get("chargepoint_id")
        or body.get("topic")
    )
    vendor_id = body.get("vendorId") or body.get("vendor_id")
    message_id = body.get("messageId") or body.get("message_id")
    data_field = body.get("data")
    connector_id_raw = body.get("connectorId") or body.get("connector_id")

    if not station_id:
        return web.json_response({"error": "station_id required"}, status=400)
    if not vendor_id:
        return web.json_response({"error": "vendorId required"}, status=400)
    if message_id is None:
        return web.json_response({"error": "messageId required"}, status=400)
    if data_field is None:
        return web.json_response({"error": "data required"}, status=400)

    if station_id not in connected or getattr(connected.get(station_id), "closed", False):
        return web.json_response(
            {"error": "station not connected", "station_id": station_id}, status=404
        )

    connector_id: int | None = None
    if connector_id_raw not in (None, ""):
        try:
            connector_id = int(connector_id_raw)
        except (TypeError, ValueError):
            return web.json_response({"error": "connectorId must be an integer"}, status=400)

    dt_payload: dict[str, Any] = {
        "vendorId": str(vendor_id),
        "messageId": str(message_id),
        "data": data_field,
    }
    if connector_id is not None:
        dt_payload["connectorId"] = connector_id

    try:
        ocpp_response = await call_action(connected[station_id], "DataTransfer", dt_payload)
        return web.json_response(
            {
                "station_id": station_id,
                "payload": dt_payload,
                "ocpp_response": ocpp_response,
            }
        )
    except Exception as e:
        return web.json_response({"error": str(e)}, status=500)


@routes.post("/api/getDiagnostics")
async def api_get_diagnostics(request):
    body = await _parse_json(request)
    station_id = body.get("station_id") or body.get("chargepoint_id")
    location_raw = body.get("location") or body.get("ftp_location")
    retries_raw = body.get("retries") or body.get("retryCount")
    retry_interval_raw = body.get("retryInterval") or body.get("retry_interval")
    start_time = body.get("startTime") or body.get("start_time")
    stop_time = body.get("stopTime") or body.get("stop_time")
    if not station_id:
        return web.json_response({"error": "station_id required"}, status=400)

    location = (str(location_raw).strip() if location_raw else "") or DEFAULT_DIAGNOSTIC_LOCATION

    try:
        retries = int(retries_raw) if retries_raw not in (None, "") else 1
    except (TypeError, ValueError):
        return web.json_response({"error": "retries must be an integer"}, status=400)

    try:
        retry_interval = (
            int(retry_interval_raw) if retry_interval_raw not in (None, "") else 60
        )
    except (TypeError, ValueError):
        return web.json_response(
            {"error": "retryInterval must be an integer"}, status=400
        )

    try:
        response = await request_diagnostics(
            station_id,
            location,
            retries=retries,
            retry_interval=retry_interval,
            start_time=start_time,
            stop_time=stop_time,
        )
    except ValueError as exc:
        message = str(exc)
        status = 404 if "not connected" in message.lower() else 400
        return web.json_response({"error": message}, status=status)
    except Exception as exc:
        return web.json_response({"error": str(exc)}, status=500)

    payload: dict[str, Any] = {
        "status": "queued",
        "location": location,
        "retries": retries,
        "retryInterval": retry_interval,
    }
    if start_time:
        payload["startTime"] = start_time
    if stop_time:
        payload["stopTime"] = stop_time
    if response is not None:
        payload["ocpp_response"] = response
    return web.json_response(payload)


@routes.get("/updateFirmware/{station_id:.*}")
async def http_update_firmware(request):
    station_id = request.match_info.get("station_id", "").strip("/")
    if not station_id:
        return web.Response(
            status=400,
            text="Parameter 'station_id' fehlt\n",
            content_type="text/plain",
            charset="utf-8",
        )

    location = request.rel_url.query.get("location")
    if not location:
        return web.Response(
            status=400,
            text="Parameter 'location' fehlt\n",
            content_type="text/plain",
            charset="utf-8",
        )

    parsed = urlparse(location)
    if parsed.scheme not in {"http", "https", "ftp"} or not parsed.netloc:
        return web.Response(
            status=400,
            text="Parameter 'location' muss eine absolute http(s)- oder ftp-URL sein\n",
            content_type="text/plain",
            charset="utf-8",
        )

    retries = request.rel_url.query.get("retries")
    retry_interval = request.rel_url.query.get("retryInterval") or request.rel_url.query.get(
        "retry_interval"
    )

    try:
        retries_value = int(retries) if retries is not None else None
    except (TypeError, ValueError):
        return web.Response(
            status=400,
            text="Parameter 'retries' muss ganzzahlig sein\n",
            content_type="text/plain",
            charset="utf-8",
        )

    try:
        retry_interval_value = (
            int(retry_interval) if retry_interval is not None else None
        )
    except (TypeError, ValueError):
        return web.Response(
            status=400,
            text="Parameter 'retryInterval' muss ganzzahlig sein\n",
            content_type="text/plain",
            charset="utf-8",
        )

    try:
        logging.info(
            "Received HTTP UpdateFirmware request for %s with location=%s, retries=%s, retryInterval=%s",
            station_id,
            location,
            retries_value,
            retry_interval_value,
        )
        data = await update_firmware_request(
            station_id, location, retries_value, retry_interval_value
        )
    except ValueError as exc:
        message = str(exc)
        status = 404 if "not connected" in message.lower() else 400
        return web.Response(
            status=status,
            text=f"Fehler: {message}\n",
            content_type="text/plain",
            charset="utf-8",
        )
    except asyncio.TimeoutError as exc:
        logging.warning(
            "UpdateFirmware timeout for %s after %ss (location=%s, retries=%s, retryInterval=%s)",
            station_id,
            UPDATE_FIRMWARE_TIMEOUT,
            location,
            retries_value,
            retry_interval_value,
        )
        return web.Response(
            status=504,
            text="Fehler: Timeout beim Warten auf Antwort des Ladepunkts\n",
            content_type="text/plain",
            charset="utf-8",
        )
    except Exception as exc:  # pragma: no cover - unexpected runtime failure
        logging.exception("Firmware-Update-Aufruf fehlgeschlagen")
        return web.Response(
            status=500,
            text=f"Fehler: {exc}\n",
            content_type="text/plain",
            charset="utf-8",
        )

    payload = json.dumps(data, ensure_ascii=False)
    return web.Response(
        status=200,
        text=f"UpdateFirmware gesendet: {station_id} -> {payload}\n",
        content_type="text/plain",
        charset="utf-8",
    )


@routes.post("/api/updateFirmware")
async def api_update_firmware(request):
    body = await _parse_json(request)
    station_id = body.get("station_id")
    location = body.get("location")
    retries = body.get("retries")
    retry_interval = body.get("retryInterval", body.get("retry_interval"))

    if not station_id or not location:
        return web.json_response(
            {"error": "station_id and location required"}, status=400
        )

    parsed = urlparse(location)
    if parsed.scheme not in {"http", "https", "ftp"} or not parsed.netloc:
        return web.json_response(
            {"error": "location must be an absolute http(s) or ftp URL"}, status=400
        )

    try:
        retries_value = int(retries) if retries is not None else None
    except (TypeError, ValueError):
        return web.json_response({"error": "retries must be an integer"}, status=400)

    try:
        retry_interval_value = (
            int(retry_interval) if retry_interval is not None else None
        )
    except (TypeError, ValueError):
        return web.json_response(
            {"error": "retryInterval must be an integer"}, status=400
        )

    try:
        logging.info(
            "Received API UpdateFirmware request for %s with location=%s, retries=%s, retryInterval=%s",
            station_id,
            location,
            retries_value,
            retry_interval_value,
        )
        data = await update_firmware_request(
            station_id, location, retries_value, retry_interval_value
        )
        return web.json_response(data)
    except ValueError as exc:
        message = str(exc)
        status = 404 if "not connected" in message else 400
        return web.json_response({"error": message}, status=status)
    except asyncio.TimeoutError:
        logging.warning(
            "UpdateFirmware timeout for %s after %ss (location=%s, retries=%s, retryInterval=%s)",
            station_id,
            UPDATE_FIRMWARE_TIMEOUT,
            location,
            retries_value,
            retry_interval_value,
        )
        return web.json_response(
            {
                "error": "timeout waiting for charge point to acknowledge UpdateFirmware",
                "status": "timeout",
            },
            status=504,
        )
    except Exception as exc:
        return web.json_response({"error": str(exc)}, status=500)


@routes.get("/api/getAllCurrentTransactions")
async def api_get_all_current_transactions(request):
    txs = [
        {
            "chargepoint_id": info["chargepoint_id"],
            "transaction_id": tx_id,
            "start_time": info["start_time"].isoformat(),
        }
        for tx_id, info in current_transactions.items()
    ]
    return web.json_response({"transactions": txs})


app.add_routes(routes)


async def main():

    print(
        f"OCPP server starting up. Checking database tables."
    )
    await ensure_server_config_table()
    await ensure_cp_config_table()
    await ensure_oicp_enable_table()
    await ensure_cp_metadata_table()
    await ensure_rfid_tables()
    await ensure_voucher_tables()
    await ensure_connection_log_table()
    await ensure_server_sessions_table()
    await ensure_session_log_table()
    await ensure_triggermeter_table()
    await ensure_last_meterreading_table()
    await ensure_status_notification_table()
    await ensure_api_call_log_table()
    await ensure_pnc_csr_table()
    await ensure_cp_sign_requests_table()
    # Respect configured log level for websockets' internal server logger
    print(
        "700: Setting lLOG_LEVEL to " + str(LOG_LEVEL)
    )    

    print(
        "701: Setting LOG_LEVEL_NAME to " + str(LOG_LEVEL_NAME)
    )    
        
    logging.getLogger("websockets.server").setLevel(LOG_LEVEL)
    startup_cfg = load_server_startup_config()
    log_server_startup_config(startup_cfg)
    logging.info("[Startup] Log level configured to %s", LOG_LEVEL_NAME)
    
    print(
        "702: Setting LOG_LEVEL_NAME to " + str(LOG_LEVEL)
    )    
    
    loop = asyncio.get_running_loop()
    loop.set_exception_handler(handle_asyncio_exception)
    await _start_ocpp_log_worker(loop)
    asyncio.create_task(log_connection_count())
    asyncio.create_task(cleanup_stale_connections())
    if MAIL_EXTENDED_SESSION_SUMMARY_ENABLED:
        asyncio.create_task(send_periodic_session_email())
    else:
        logging.info("Extended session summary emails disabled via config")
    logging.info(
        "[Startup] Using websocket listener %s:%s (host_source=%s, port_source=%s)",
        SERVER_HOST,
        SERVER_PORT,
        _network_settings.host_source,
        _network_settings.ocpp_port_source,
    )
    logging.info(
        "[Startup] Using API listener %s:%s (host_source=%s, port_source=%s)",
        SERVER_HOST,
        API_PORT,
        _network_settings.host_source,
        _network_settings.api_port_source,
    )
    logging.info(
        "[Startup] Supported websocket subprotocols: %s",
        ", ".join(OCPP_SUBPROTOCOLS),
    )

    ws_server = serve(
        handle_connection,
        SERVER_HOST,
        SERVER_PORT,
        subprotocols=OCPP_SUBPROTOCOLS,
        process_request=log_ws_request,
        create_protocol=LoggingWebSocketServerProtocol,
        ping_interval=PING_INTERVAL,
        ping_timeout=PING_TIMEOUT,
    )
    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner, SERVER_HOST, API_PORT)
    await asyncio.gather(ws_server, site.start())
    print(
        f"OCPP server listening on {SERVER_HOST}:{SERVER_PORT}"
    )
    print(f"API server listening on {SERVER_HOST}:{API_PORT}")
    try:
        await asyncio.Future()
    except asyncio.CancelledError:
        pass
    finally:
        await _shutdown_ocpp_log_worker()


if __name__ == "__main__":
    asyncio.run(main())
