Source code for deluxe.process

# Copyright (c) 2024 - Gilles Coissac
# This file is part of standard-deluxe library.
#
# standard-deluxe is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
#
# standard-deluxe is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with standard-deluxe. If not, see <https://www.gnu.org/licenses/>
"""Process utilities for working with system commands."""

from __future__ import annotations

import asyncio
import atexit
import contextlib
import locale
import multiprocessing as mp
import os
import re
import shutil
import signal
import subprocess
import sys
import threading
import time
import weakref
from abc import ABC, ABCMeta, abstractmethod
from asyncio import Future, Task
from pathlib import Path
from typing import (
    TYPE_CHECKING,
    Any,
    ClassVar,
    Literal,
    TypeVar,
    cast,
    final,
    overload,
)
from warnings import warn

from deluxe.availability import availability, supported
from deluxe.types import Unset


_USER_SUPPORT: bool = supported(only=("posix",), but=("wasi", "ios"))

if _USER_SUPPORT:
    import pwd

if TYPE_CHECKING:
    from asyncio.subprocess import Process
    from collections.abc import Mapping

    from deluxe.types import AnyFilePath


__all__ = ("Command", "Daemon", "get_real_users")


# Registry for daemon constructor arguments, keyed by controller instance.
# Uses weak references so entries are automatically cleaned up when the
# controller is garbage collected. This avoids setting attributes on the
# controller instance (which would break __slots__ subclasses) and avoids
# writing sensitive constructor arguments to disk.
_daemon_args_registry: weakref.WeakKeyDictionary[
    object, tuple[tuple[Any, ...], dict[str, Any]]
] = weakref.WeakKeyDictionary()

# Lock to prevent same-process fork races when multiple controllers
# call start() concurrently (e.g. from different threads).
_start_lock = threading.Lock()


[docs] @availability(only="posix", but=("wasi", "ios")) def get_real_users() -> set[str]: """Return a set of all real user accounts on the system. Retrieves a set of all user accounts that have a valid login shell. On Linux, ``/etc/login.defs`` is parsed for the ``UID_MIN`` value. On other POSIX systems (macOS, BSD), a default minimum UID of ``500`` is used, which correctly includes real users starting at ``501``. System accounts with a UID below the minimum are excluded, as are accounts with nologin-style shells. .. note:: Availability: Unix, not WASI, not iOS Returns: :obj:`set` [:obj:`str` ]: A set of usernames for all real user accounts on the system. """ # Determine min UID: parse /etc/login.defs on Linux if available, # otherwise fall back to 500 (covers macOS/BSD real users starting at 501) min_uid = 500 login_defs = Path("/etc/login.defs") if login_defs.exists(): with login_defs.open( # noqa: FURB101 "r", encoding=locale.getpreferredencoding(False), # noqa: FBT003 ) as f: if sch := re.search(r"^UID_MIN\s+(\d+)", f.read()): # pragma: no cover min_uid = int(sch[1]) # Shells that indicate "no login" nologin_shells = {"/usr/sbin/nologin", "/bin/false", "/usr/bin/nologin", "/sbin/nologin"} return { p.pw_name for p in pwd.getpwall() # pyright: ignore[reportPossiblyUnboundVariable] if p.pw_uid >= min_uid and p.pw_shell not in nologin_shells }
[docs] class Command: """System command class. Represents a system command and provides methods for running the command and handling the output. It also supports specifying the user to run the command as on POSIX systems. The actual implementation use the sudo command to execute the command if user is specified. Commands are never executed through a shell. On POSIX systems, if a ``user`` is specified, the command is executed via ``sudo -u <user>``. This class supports both synchronous execution via :meth:`__call__` and asynchronous execution via :meth:`async_call` using :mod:`asyncio`. Args: name (:obj:`str`): The name of the command. path (:obj:`os.PathLike` [:obj:`str` ] | ``None``): The path to the command executable. Default: ``None``. user (:obj:`str` | ``None`): The user to run the command as. Requires ``sudo`` to be available on the system. Default: ``None``. Raises: Command.Error: If the command is not found on the system. NotImplementedError: if user is specified on non POSIX systems. Examples:: >>> cmd = Command("ls") >>> cmd("-la", "/tmp") User Switching ______________ The ``user`` parameter is only available on POSIX systems (Linux, macOS, BSD). It is not supported on Windows. This section explains why and provides guidance for cross-platform development. **Why Windows is Different** On POSIX systems, ``sudo -u <user>`` allows running a command as a different user without knowing their password, provided the invoking user has sudo privileges. This model assumes: - A centralized authentication system (PAM, ``/etc/passwd``) - Passwordless sudo configuration (``/etc/sudoers``) - A flat UID-based user model (integer UIDs) Windows uses a fundamentally different security model: - **UAC (User Account Control)**: Elevation is per-process, not per-command. The ``runas`` verb triggers a UAC prompt, but it elevates the *current user* to Administrator — it does not switch to a different user account. - **No passwordless elevation**: Unlike ``sudo``, Windows ``runas`` requires the user's password interactively. There is no equivalent of ``/etc/sudoers`` for passwordless cross-user execution. - **Separate security contexts**: An elevated process runs in a different security token. Standard ``subprocess`` cannot capture stdout/stderr from an elevated process because the pipes cannot cross the security boundary. - **SID-based identity**: Windows identifies users by SID (Security Identifier), not integer UIDs. User enumeration requires Windows API calls (``NetUserEnum``) rather than parsing ``/etc/passwd``. **Cross-Platform Guidance** If you need to run commands with elevated privileges across platforms, use platform-specific branches: .. code-block:: python import sys from deluxe.process import Command if sys.platform == "win32": # Windows: use PowerShell for elevation pws = Command("powershell") pws("-Command", "Start-Process regedit -Verb RunAs -Wait") else: # POSIX: use sudo apt = Command("apt") apt.user = "root" apt("update") **Summary of Platform Differences** +---------------------+---------------------+---------------------------+ | Feature | POSIX | Windows | +=====================+=====================+===========================+ | Run as different | ``sudo -u <user>`` | Not possible without | | user | | password (use ``runas``) | +---------------------+---------------------+---------------------------+ | Elevate same user | ``sudo <cmd>`` | ``runas`` verb (UAC | | | | prompt) | +---------------------+---------------------+---------------------------+ | Capture stdout | ✅ Direct | ❌ Separate security | | | | context | +---------------------+---------------------+---------------------------+ | Pass stdin | ✅ Direct | ❌ Not possible | +---------------------+---------------------+---------------------------+ | Wait for exit code | ✅ Direct | ⚠️ Requires pywin32 | +---------------------+---------------------+---------------------------+ """ _SYS_USERS: set[str] = get_real_users() if _USER_SUPPORT else Unset class Error(Exception): """Exception raised when a system command fails. Attributes: msg (:obj:`str`): The error message. returncode (:obj:`int`): The non-zero exit status code, if available. """ def __init__( self, msg: str | bytes | None, retcode: int = 0, cmd: tuple[str, ...] | None = None ) -> None: if retcode and cmd: cmd_ = " ".join(cmd) msg = f"command <{cmd_}> returned non-zero exit status {retcode}.\n{msg or ''}" self.returncode: int = retcode self.msg: str = msg.decode() if isinstance(msg, bytes) else msg or "" super().__init__(msg) __slots__: tuple[str, ...] = ("_user", "command", "name") def __init__( self, name: str, *, path: os.PathLike[str] | None = None, user: str | None = None, ) -> None: if path and Path(path).is_file(): command = str(path) elif not (command := shutil.which(name)): msg = f"Command {path or name} not found on your system." raise Command.Error(msg) self.command: str = command self.name: str = name if not _USER_SUPPORT and user: msg = "specifying user is only supported on POSIX plateforms." raise NotImplementedError(msg) self._user: str | None self._user = user if _USER_SUPPORT else None @property def user(self) -> str | None: """The user associated with this command. Returns: :obj:`str` | ``None``: The username, or ``None`` if not set. """ return self._user @user.setter @availability(only=("posix",), but=("wasi", "ios")) def user(self, user: str | None) -> None: """Set the user to run the command as. Args: user (:obj:`str` | ``None``): The username, or ``None`` to clear. Raises: Command.Error: If the user is not found on the system. """ if user and user not in Command._SYS_USERS and user != "root": msg = f"User {user} not found on your system." raise Command.Error(msg) self._user = user def _create_exception(self) -> type[Command.Error]: return type(f"{self.name.capitalize()}Error", (Command.Error,), {}) def _compose(self, *args: str) -> tuple[str, ...]: if self._user: return ("sudo", "-u", self._user, "--preserve-env", self.command, *args) return (self.command, *args) @overload def __call__( self, *args: str, input: bytes | None = None, capture: bool = True, text: Literal[False], encoding: str | None = "UTF-8", cwd: AnyFilePath | None = None, env: Mapping[str, str] | None = None, ) -> bytes: ... @overload def __call__( self, *args: str, input: str | None = None, capture: bool = True, text: Literal[True] = True, encoding: str | None = "UTF-8", cwd: AnyFilePath | None = None, env: Mapping[str, str] | None = None, ) -> str: ... def __call__( self, *args: str, input: str | bytes | None = None, # noqa: A002 capture: bool = True, text: bool = True, encoding: str | None = "UTF-8", cwd: AnyFilePath | None = None, env: Mapping[str, str] | None = None, ) -> str | bytes: """Run this command. Executes the command synchronously and returns its output. Args: *args (:obj:`str`): The arguments to pass to the command. input (:obj:`str` | :obj:`bytes` | ``None``): The input to pass to the command's stdin. Default: ``None``. capture (:obj:`bool`): Whether to capture the command output. Default: ``True``. text (:obj:`bool`): Whether to return the output as text or bytes. Default: ``True``. encoding (:obj:`str` | ``None``): The encoding to use for text output. Default: ``"UTF-8"``. cwd (:class:`~deluxe.types.AnyFilePath` | ``None``): The current working directory for the command. Default: ``None``. env (:obj:`~collections.abc.Mapping` [:obj:`str`, :obj:`str` ] | ``None``): The environment variables for the command. Default: ``None``. Returns: :obj:`str` | :obj:`bytes`: The output of the command when it completes successfully. Raises: Command.Error: If the command returns with a non-zero exit status. """ # noqa: DOC502 args_ = self._compose(*args) cp = subprocess.run( # noqa: S603 args_, capture_output=capture, shell=False, check=False, input=input, text=text, encoding=encoding if text else None, cwd=cwd, env=env, ) if cp.returncode: raise self._create_exception()(cp.stderr, cp.returncode, args_) return cp.stdout or ("" if text else b"")
[docs] async def async_call( self, *args: str, input: bytes | None = None, # noqa: A002 capture: bool = True, cwd: AnyFilePath | None = None, env: Mapping[str, str] | None = None, ) -> Task[Future[bytes]]: """Run this command asynchronously. Executes the command as an :class:`asyncio.subprocess.Process` and returns a :class:`asyncio.Task` wrapping a :class:`asyncio.Future` that resolves to the command's stdout bytes. Args: *args (:obj:`str`): The arguments to pass to the command. input (:obj:`bytes` | ``None``): The input to pass to the command's stdin. Default: ``None``. capture (:obj:`bool`): Whether to capture the command output. Default: ``True``. cwd (:class:`~deluxe.types.AnyFilePath` | ``None``): The current working directory for the command. Default: ``None``. env (:obj:`~collections.abc.Mapping` [:obj:`str`, :obj:`str` ] | ``None``): The environment variables for the command. Default: ``None``. Returns: :class:`asyncio.Task` [:class:`asyncio.Future` [:obj:`bytes` ] ]: A task that resolves to the command's stdout bytes. Raises: Command.Error: A dynamically created subclass if the process exits due to a signal (negative return code). """ # noqa: DOC502 args_ = self._compose(*args) proc = await asyncio.subprocess.create_subprocess_exec( *args_, stdin=subprocess.PIPE if input else None, stdout=subprocess.PIPE if capture else None, stderr=subprocess.PIPE if capture else None, cwd=cwd, env=env, ) loop = asyncio.get_running_loop() future = loop.create_future() async def _wait_co( proc: Process, input_: bytes | None, future: Future[bytes] ) -> Future[bytes]: stdout, stderr = await proc.communicate(input_) if proc.returncode and proc.returncode < 0: future.set_exception( self._create_exception()( stderr, proc.returncode, args_, ), ) else: future.set_result(stdout or b"") return future return loop.create_task( _wait_co(proc, input, future), name=self.name, )
## # Disabling those Ruff rules: # <'open()' should be replaced by 'path.open()'>, # is more convenient for standard files # # <use context handler for opening file>, # standard file should not be closed # # ruff: noqa: PTH123, SIM115 class _RealDaemon: # pragma: no cover """Internal mixin that performs the Unix double-fork daemonization. This class is used internally by :class:`_DaemonMeta` to create the actual daemon process. It should not be instantiated directly. """ __workpath__: ClassVar[Path] __pidfile__: ClassVar[Path] def __init__(self, *args: Any, **kwds: Any) -> None: self.running: bool = False self.daemonize() super().__init__(*args, **kwds) self.daemonized: Daemon = cast("Daemon", super()) self.daemonized.run() def daemonize(self) -> None: """Daemonize the process using an Unix double fork mechanism.""" pid_tmp: int try: # decouple from parent environment os.chdir("/") except OSError as err: sys.stderr.write(f"chdir to <{self.__workpath__}> failed:\n{err}\n") sys.exit(1) else: os.setsid() os.umask(0) try: # do the second fork pid_tmp = os.fork() if pid_tmp > 0: sys.exit(0) # exit from second parent except OSError as err: sys.stderr.write(f"fork #2 failed: {err}\n") sys.exit(1) ## # Daemon code only here pid_tmp = os.getpid() # redirect standard file descriptors to devnull sys.stdout.flush() sys.stderr.flush() si = open(os.devnull, encoding=None) so = open(os.devnull, "a+", encoding=None) se = open(os.devnull, "a+", encoding=None) os.dup2(si.fileno(), sys.stdin.fileno()) os.dup2(so.fileno(), sys.stdout.fileno()) os.dup2(se.fileno(), sys.stderr.fileno()) # write pidfile atexit.register(self.atexit) with self.__pidfile__.open("w+") as file: file.write(f"{pid_tmp}\n") # signal handler def _sigterm_handler(_signum: int, _frame: Any) -> None: sys.exit(0) signal.signal(signal.SIGTERM, _sigterm_handler) # user-defined signal handlers def _sigusr1_handler(_signum: int, _frame: Any) -> None: self.daemonized.on_user1() def _sigusr2_handler(_signum: int, _frame: Any) -> None: self.daemonized.on_user2() signal.signal(signal.SIGUSR1, _sigusr1_handler) signal.signal(signal.SIGUSR2, _sigusr2_handler) def atexit(self) -> None: self.daemonized.atexit() self.__pidfile__.unlink() _T = TypeVar("_T") _STOP_TIMEOUT: float = 5.0 class _DaemonMeta(ABCMeta): # pragma: posix cover """Metaclass for the :class:`Daemon` abstract base class. Controls the daemon lifecycle: manages pidfile creation, process forking via :mod:`multiprocessing`, and the singleton pattern that prevents multiple instances of the same daemon from running simultaneously. """ WORKPATH_VAR: str = "__workpath__" PIDFILE_VAR: str = "__pidfile__" def __new__( cls: type[type[_T]], name: str, bases: tuple[type, ...], namespace: dict[str, Any], **kwds: Any, ) -> type[_T]: workpath = kwds.pop("workpath", "/") if not (workpath := Path(workpath)).is_dir(): msg = f"<{workpath}> should be an existing directory." raise AttributeError(msg) if name == "Daemonized": pidfile = (Path.home() / f"._{bases[-1].__name__}").with_suffix(".pid") else: pidfile = (Path.home() / f"._{name}").with_suffix(".pid") namespace[_DaemonMeta.PIDFILE_VAR] = pidfile namespace[_DaemonMeta.WORKPATH_VAR] = workpath return super().__new__(cls, name, bases, namespace, **kwds) @staticmethod def fork(cls_: type, *args: Any, **kwds: Any) -> None: ctx = mp.get_context("fork") ps = ctx.Process(target=cls_, args=args, kwargs=kwds) ps.start() ps.join() @staticmethod def subclass(daemon: type[_T]) -> type: return type("Daemonized", (_RealDaemon, daemon), {}) def __call__(cls: type[_T], *args: Any, **kwds: Any) -> _T: if not supported(only=("posix",), but=("wasi")): # pragma: win32 cover return super().__call__(*args, **kwds) if cls.__name__ == "Daemonized": # pragma: no cover # return an instance of the Daemon if not already running pidfile: Path = getattr(cls, _DaemonMeta.PIDFILE_VAR) if pidfile.exists(): sys.exit() return super().__call__(*args, **kwds) # return a Daemon controller kind instance _DaemonMeta.fork(_DaemonMeta.subclass(daemon=cls), *args, **kwds) controller = super().__call__(*args, **kwds) _daemon_args_registry[controller] = (args, kwds) return controller
[docs] @availability(only="posix", but="wasi") class Daemon(ABC, metaclass=_DaemonMeta): # pragma: posix cover """A generic Unix daemon abstract base class. Make instance of this class execute in a daemonized process using an Unix double fork mechanism. .. note:: Availability: Unix, not WASI Subclasses must implement the :meth:`run` method, which contains the daemon's working logic. User-defined :class:`Daemon` instances should not call :meth:`run` directly. The daemon writes a pidfile at its start to prevent multiple instances from running simultaneously. Once daemonized, the :meth:`run` method is called with no parameters. The daemon executes in its own detached session with no tty attached, so it will not inherit the standard files from the python interpreter where it was instancied. Code instantiating the daemon will receive a functional instance of the defined class. This instance acts as a daemon controller. Keyword Args: workpath (:class:`~pathlib.Path` | :obj:`str`): The working directory for the daemon process. Must be an existing directory. Default: ``"/"``. Interprocess Communication -------------------------- Daemon subclasses will end up with two instances: the *controller* living in the calling process, and the *worker* living in its own detached session. The controller provides a built-in set of methods to manage the daemon's life cycle: :meth:`start`, :meth:`stop`, :meth:`restart`, :meth:`signal_user1`, and :meth:`signal_user2` (see next section for details). For richer communication channels (bidirectional pipes, shared memory, sockets, etc.), this class makes no provision for a specific protocol; it is up to the subclass implementation. Common Python options include: - :class:`multiprocessing.Queue` — thread-safe message passing between the controller and the daemon worker. - :class:`multiprocessing.connection.Connection` (via :func:`multiprocessing.Pipe`) — lightweight bidirectional byte stream. - :mod:`multiprocessing.shared_memory` — zero-copy shared data between processes (Python 3.8+). - :mod:`socket` — Unix domain sockets for structured protocols or TCP sockets for network-accessible daemons. - :mod:`asyncio` event loop with :class:`asyncio.Queue` — for daemons built on asynchronous I/O. Controller Semantics -------------------- Instantiating a :class:`Daemon` subclass returns a *controller* — a lightweight handle that manages the daemon's lifecycle. The controller is not the daemon itself; it is a proxy that communicates with the daemon through the pidfile and signals. **Multiple controllers are allowed.** Several controller instances can coexist for the same daemon class within a single process or across processes. All controllers for a given class point to the same daemon process. Calling :meth:`stop`, :meth:`start`, :meth:`restart`, :meth:`signal_user1`, or :meth:`signal_user2` on *any* controller affects the shared daemon. **The daemon is a system-level singleton.** Only one daemon process runs at a time for a given class, enforced by the pidfile. When a new controller is created while the daemon is already running, the singleton guard prevents a duplicate daemon from starting. The new controller simply becomes another handle to the existing daemon. **Constructor arguments are preserved per controller.** Each controller retains the ``*args`` and ``**kwds`` passed to its constructor. When :meth:`start` is called to (re)launch the daemon, it uses the calling controller's original arguments. This means different controllers may hold different argument sets; the last controller to call :meth:`start` determines the daemon's configuration. .. warning:: Constructor arguments are stored in memory for the lifetime of the controller and are never written to disk. If the controller is garbage collected, its arguments are lost. If a daemon dies and needs to be restarted, the controller calling :meth:`start` must have been created with the intended arguments. **Concurrency safety.** A process-level lock prevents two controllers in the same process from racing through :meth:`start` concurrently. Cross-process races are handled by the pidfile singleton guard. User Signals ^^^^^^^^^^^^ The controller's :meth:`signal_user1` and :meth:`signal_user2` methods send ``SIGUSR1`` and ``SIGUSR2`` to the daemon process. When the daemon receives one of these signals, it calls the overridable hook method :meth:`on_user1` or :meth:`on_user2`. The default implementations are no-ops; override them in your subclass to define custom behavior. .. code-block:: python import time from deluxe.process import Daemon class Worker(Daemon): def run(self): while True: time.sleep(1) def on_user1(self): # Called when controller sends SIGUSR1 self.reloading = True daemon = Worker() daemon.start() daemon.signal_user1() # sends SIGUSR1 -> on_user1() daemon.stop() .. note:: The ``on_user1`` / ``on_user2`` hooks run inside a signal handler. Keep them lightweight: set a flag or event, and defer heavy work to the main ``run()`` loop. About The Unix Double Fork Mechanism ------------------------------------ In Unix every process belongs to a group which in turn belongs to a session (session (SID) -> process Group (PGID) -> process (PID)). The first process in the session becomes the session leader. Every session can have one TTY associated with it and only a session leader can take control of a TTY. Normally, when launching a daemon, ``setsid`` is called (from the child process after calling ``fork``) to dissociate the daemon from its controlling terminal. However, calling ``setsid`` also means that the calling process will be the session leader of the new session, which leaves open the possibility that the daemon could reacquire a controlling terminal in the future. The double-fork technique ensures that the daemon process is no longer a session leader, making the init process responsible for its cleanup. Forking a second child and exiting immediately prevents zombies and causes the second child process to be orphaned, preventing it from ever acquiring inadvertently a controlling terminal. """ __workpath__: ClassVar[Path] __pidfile__: ClassVar[Path] @final @property def pid(self) -> int: """The PID of this daemon. Returns: :obj:`int`: The PID of the daemon if running, ``0`` otherwise. """ try: with self.__pidfile__.open("r") as file: return int(file.read().strip()) except (OSError, ValueError): return 0
[docs] @final def stop(self) -> None: """Stop the daemon. Sends ``SIGTERM`` to the daemon process and waits for it to terminate. If the daemon does not terminate within a timeout, ``SIGKILL`` is sent as a last resort. If the daemon is not running, a warning is issued and the pidfile is cleaned up if present. Raises: OSError: If the daemon process could not be killed for a reason other than it not existing. """ if not (pid := self.pid): msg: str = "Daemon is not running.\n" warn(msg, stacklevel=1) return # Send SIGTERM once try: os.kill(pid, signal.SIGTERM) except OSError as err: if "No such process" in str(err): # pragma: no cover if self.__pidfile__.exists(): self.__pidfile__.unlink() return raise # Wait for the process to terminate, with a timeout deadline = time.monotonic() + _STOP_TIMEOUT while time.monotonic() < deadline: time.sleep(0.1) try: os.kill(pid, 0) except OSError: # pragma: no cover break else: # Timeout: force-kill with SIGKILL with contextlib.suppress(OSError): os.kill(pid, signal.SIGKILL) if self.__pidfile__.exists(): # pragma: no cover self.__pidfile__.unlink()
[docs] @final def start(self) -> int: """Start the daemon. If the daemon is not already running, it is daemonized. If it is already running, a warning is issued. Returns: :obj:`int`: The PID of the already-running daemon, or ``0`` if a new daemon was started. """ if not (pid := self.pid): with _start_lock: # Double-check after acquiring the lock to prevent race # conditions when multiple controllers call start() # concurrently from the same process. if not (pid := self.pid): args, kwds = _daemon_args_registry.get(self, ((), {})) _DaemonMeta.fork(_DaemonMeta.subclass(daemon=self.__class__), *args, **kwds) else: msg: str = f"Daemon is already running with pid <{pid}>...\n" warn(msg, stacklevel=1) return pid
[docs] @final def restart(self) -> None: """Restart the daemon. Stops the daemon if running, then starts it again. """ self.stop() self.start()
[docs] @final def signal_user1(self) -> None: """Send ``SIGUSR1`` to the daemon process. The daemon's :meth:`on_user1` method is invoked when this signal is received. Override :meth:`on_user1` in your subclass to define the response. If the daemon is not running, a warning is issued. """ if not (pid := self.pid): msg: str = "Daemon is not running.\n" warn(msg, stacklevel=1) return os.kill(pid, signal.SIGUSR1)
[docs] @final def signal_user2(self) -> None: """Send ``SIGUSR2`` to the daemon process. The daemon's :meth:`on_user2` method is invoked when this signal is received. Override :meth:`on_user2` in your subclass to define the response. If the daemon is not running, a warning is issued. """ if not (pid := self.pid): msg: str = "Daemon is not running.\n" warn(msg, stacklevel=1) return os.kill(pid, signal.SIGUSR2)
[docs] def atexit(self) -> None: # noqa: PLR6301 """Called when the daemon terminates. Override this method to include cleanup code. This method is registered via :func:`atexit.register` and will be executed upon normal interpreter termination. See Also: :func:`atexit.register` """ return # pragma: no cover
[docs] @abstractmethod def run(self) -> None: """Daemon worker method. You must override this method when subclassing :class:`Daemon`. It will be called after the process has been daemonized by :meth:`start` or :meth:`restart`. """
[docs] def on_user1(self) -> None: # noqa: PLR6301 """Handler for ``SIGUSR1`` received by the daemon. Override this method in your subclass to define custom behavior when the controller sends ``SIGUSR1`` via :meth:`signal_user1`. The default implementation is a no-op. .. note:: This method is called from a signal handler. Keep the implementation lightweight and avoid blocking operations. Use a flag or event to defer heavy work to the main loop. """ return # pragma: no cover
[docs] def on_user2(self) -> None: # noqa: PLR6301 """Handler for ``SIGUSR2`` received by the daemon. Override this method in your subclass to define custom behavior when the controller sends ``SIGUSR2`` via :meth:`signal_user2`. The default implementation is a no-op. .. note:: This method is called from a signal handler. Keep the implementation lightweight and avoid blocking operations. Use a flag or event to defer heavy work to the main loop. """ return # pragma: no cover