#!/opt/cloudlinux/venv/bin/python3
# Copyright Cloud Linux Software, Inc 2010-2026 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

"""
lvestats-plugin-runner -- long-lived subprocess host for Python LveStatsPlugin classes.

The Rust daemon (lvestats-server) spawns ONE runner for the whole daemon lifetime
and drives it over stdin/stdout with line-delimited JSON. Every Python plugin
discovered in the plugins directory is hosted as an instance inside this single
process, so plugin state (counters, caches, DB engines) survives across ticks,
matching the semantics of the Python lve-stats daemon's in-process executor.

Protocol (one request per line, one response per line, stderr = logs):

    {"op":"list","plugins_dir":"<dir>"}
        -> {"status":"ok","plugins":[{"file":..,"class":..,"order":..,"period":..,"timeout":..}, ...]}

    {"op":"init","class":"<Name>","plugin_config":{...},"db_url":"..."|null,"is_user_plugin":true}
        -> {"status":"ok"} | {"status":"error","message":..,"traceback":..}

    {"op":"execute","class":"<Name>","now":<float>,"lve_data":{...}}
        -> {"status":"ok","lve_data":{...}}
         | {"status":"terminated"}
         | {"status":"error","message":..,"traceback":..}

    {"op":"shutdown"}
        -> {"status":"ok"}   (process exits after writing response)

SIGUSR1 dumps stack traces of all threads to stderr.
SIGUSR2 is ignored outside of execute() and raises LveStatsPluginTerminated inside,
so the Rust daemon can interrupt a sleeping plugin during graceful shutdown.
"""

import importlib.util
import inspect
import io
import json
import os
import signal
import sys
import traceback


# Isolate the JSON-RPC channel from anything a plugin (or a library it imports)
# might write to stdout.
#
# The Rust daemon reads this process's stdout line-by-line and parses each line
# as JSON. A single stray `print(...)`, `os.write(1, ...)`, or C-extension
# `fprintf(stdout, ...)` corrupts the stream and breaks every subsequent
# request/response. Clone fd 1 to a private fd used only for protocol writes,
# then point fd 1 at fd 2 (stderr) so all other writes — including those from
# C extensions and subprocesses that inherit fd 1 — land in stderr, which the
# daemon already drains into its logs.
_protocol_fd = os.dup(1)
os.dup2(2, 1)
_protocol = io.TextIOWrapper(
    os.fdopen(_protocol_fd, "wb", buffering=0),
    encoding="utf-8",
    write_through=True,
)
# Rebind Python-level stdout too so `print()` without an explicit `file=`
# goes to stderr instead of leaving the stream partly redirected.
sys.stdout = sys.stderr


# Import the real base class and termination exception from the installed
# lve-stats package so a plugin's `except LveStatsPluginTerminated:` catches
# the same class our SIGUSR2 handler raises. Fall back to local stubs so the
# runner still works on systems where the package is not installed
# (e.g. development / CI environments that rely only on this runner).
try:
    from lvestats.core.plugin import LveStatsPlugin, LveStatsPluginTerminated  # noqa: F401
except ImportError:
    class LveStatsPluginTerminated(Exception):
        pass

    class LveStatsPlugin:
        order = 0
        period = None
        timeout = None

        def execute(self, lve_data):
            pass

        def set_config(self, config):
            pass

        def set_db_engine(self, engine):
            pass


# Schema-level SQLAlchemy exceptions we treat specially so the Rust daemon
# can recreate the schema and retry (mirrors Python daemon's recover_db()
# path in `lvestats/eventloop/plugin_executors.py:106-114`). If SQLAlchemy
# isn't installed, the tuple is empty and no classification happens.
try:
    from sqlalchemy.exc import (
        NoSuchColumnError, NoSuchTableError, NoReferenceError,
    )
    _DB_SCHEMA_ERRORS = (NoSuchColumnError, NoSuchTableError, NoReferenceError)
except ImportError:
    _DB_SCHEMA_ERRORS = ()


# Single-threaded request loop — no locking needed.
_classes = {}    # class_name -> class object, populated by op_list
_instances = {}  # class_name -> plugin instance, populated by op_init


def _sigusr1_handler(signum, frame):
    lines = ["--- lvestats-plugin-runner thread traces ---"]
    for tid, stack in sys._current_frames().items():
        lines.append("# Thread %d" % tid)
        lines.extend(traceback.format_stack(stack))
    sys.stderr.write("\n".join(lines) + "\n")
    sys.stderr.flush()


def _sigusr2_handler(signum, frame):
    raise LveStatsPluginTerminated("SIGUSR2")


def _is_broken_symlink(path):
    return os.path.islink(path) and not os.path.exists(os.path.realpath(path))


def _discover_classes(plugins_dir):
    """Import every .py file in plugins_dir and collect all LveStatsPlugin subclasses.

    Mirrors lvestats.core.plugin_loader.PluginLoader: loads every subclass by
    isinstance check (not by filename), registers modules in sys.modules so
    sibling imports work, and skips broken symlinks with a warning.
    """
    descriptors = []
    classes = {}

    if not os.path.isdir(plugins_dir):
        sys.stderr.write("Warning: plugins directory not found: %s\n" % plugins_dir)
        return descriptors, classes

    for filename in sorted(os.listdir(plugins_dir)):
        if not filename.endswith(".py"):
            continue
        full_path = os.path.join(plugins_dir, filename)
        if _is_broken_symlink(full_path):
            sys.stderr.write("Warning: skipping broken symlink %s\n" % full_path)
            continue

        module_name = filename[:-3]
        try:
            spec = importlib.util.spec_from_file_location(module_name, full_path)
            if spec is None or spec.loader is None:
                continue
            module = importlib.util.module_from_spec(spec)
            # Register before exec so sibling modules can find each other.
            sys.modules[module_name] = module
            spec.loader.exec_module(module)
        except Exception as exc:
            sys.stderr.write("Warning: failed to import %s: %s\n" % (full_path, exc))
            sys.modules.pop(module_name, None)
            continue

        for name, obj in inspect.getmembers(module, inspect.isclass):
            if not issubclass(obj, LveStatsPlugin) or obj is LveStatsPlugin:
                continue
            # Skip classes re-exported from other modules (match plugin_loader.py).
            if getattr(obj, "__module__", None) != module_name:
                continue
            classes[name] = obj
            descriptors.append({
                "file": filename,
                "class": name,
                "order": getattr(obj, "order", 0),
                "period": getattr(obj, "period", None),
                "timeout": getattr(obj, "timeout", None),
            })

    return descriptors, classes


def op_list(req):
    plugins_dir = req.get("plugins_dir")
    if not plugins_dir:
        return {"status": "error", "message": "list: plugins_dir missing"}
    descriptors, classes = _discover_classes(plugins_dir)
    _classes.clear()
    _classes.update(classes)
    # Instances tied to previously-loaded classes are no longer valid.
    _instances.clear()
    return {"status": "ok", "plugins": descriptors}


def op_init(req):
    class_name = req.get("class")
    if not class_name:
        return {"status": "error", "message": "init: class missing"}
    cls = _classes.get(class_name)
    if cls is None:
        return {
            "status": "error",
            "message": "class %r not loaded; send list first" % class_name,
        }

    try:
        cls.__is_user_plugin__ = bool(req.get("is_user_plugin", True))
        instance = cls()

        plugin_config = req.get("plugin_config") or {}
        if hasattr(instance, "set_config"):
            instance.set_config(plugin_config)

        db_url = req.get("db_url")
        if db_url and hasattr(instance, "set_db_engine"):
            try:
                from sqlalchemy import create_engine  # noqa: WPS433
                engine = create_engine(db_url)
                instance.set_db_engine(engine)
            except Exception as exc:
                # Plugins that actually use the engine will fail later; log and continue.
                sys.stderr.write(
                    "Warning: set_db_engine failed for %s: %s\n" % (class_name, exc)
                )

        _instances[class_name] = instance
        return {"status": "ok"}
    except Exception as exc:
        return {
            "status": "error",
            "message": str(exc),
            "traceback": traceback.format_exc(),
        }


def op_execute(req):
    class_name = req.get("class")
    if not class_name:
        return {"status": "error", "message": "execute: class missing"}
    instance = _instances.get(class_name)
    if instance is None:
        return {
            "status": "error",
            "message": "plugin %r not initialized; send init first" % class_name,
        }

    now = req.get("now", 0.0)
    lve_data = req.get("lve_data") or {}

    try:
        instance.now = now
    except Exception:
        pass

    # Arm SIGUSR2 only around execute() so an out-of-band signal between
    # requests doesn't kill the runner.
    signal.signal(signal.SIGUSR2, _sigusr2_handler)
    try:
        instance.execute(lve_data)
    except LveStatsPluginTerminated:
        return {"status": "terminated"}
    except _DB_SCHEMA_ERRORS as exc:
        # Tell the Rust side to run ensure_schema() and respawn us; matches
        # the `recover_db()` flow in Python lve-stats
        # (plugin_executors.py:106-114 + plugin_context.py:34-35).
        return {
            "status": "db_schema_error",
            "message": str(exc),
            "traceback": traceback.format_exc(),
        }
    except Exception as exc:
        return {
            "status": "error",
            "message": str(exc),
            "traceback": traceback.format_exc(),
        }
    finally:
        signal.signal(signal.SIGUSR2, signal.SIG_IGN)

    return {"status": "ok", "lve_data": lve_data}


def op_shutdown(_req):
    return {"status": "ok", "_exit": True}


_DISPATCH = {
    "list": op_list,
    "init": op_init,
    "execute": op_execute,
    "shutdown": op_shutdown,
}


def main():
    signal.signal(signal.SIGUSR1, _sigusr1_handler)
    signal.signal(signal.SIGUSR2, signal.SIG_IGN)

    for line in sys.stdin:
        line = line.strip()
        if not line:
            continue
        try:
            req = json.loads(line)
        except json.JSONDecodeError as exc:
            resp = {"status": "error", "message": "invalid json: %s" % exc}
            _protocol.write(json.dumps(resp) + "\n")
            _protocol.flush()
            continue

        op = req.get("op")
        handler = _DISPATCH.get(op)
        if handler is None:
            resp = {"status": "error", "message": "unknown op %r" % op}
        else:
            try:
                resp = handler(req)
            except Exception as exc:
                resp = {
                    "status": "error",
                    "message": str(exc),
                    "traceback": traceback.format_exc(),
                }

        exit_after = resp.pop("_exit", False)
        _protocol.write(json.dumps(resp) + "\n")
        _protocol.flush()
        if exit_after:
            sys.exit(0)


if __name__ == "__main__":
    main()
