From 27cf040f4539530645f6a83fa899547593e4c624 Mon Sep 17 00:00:00 2001 From: Jonas Linter <{email_address}> Date: Fri, 17 Oct 2025 19:56:04 +0200 Subject: [PATCH] Worker coordination cleanup --- src/alpine_bits_python/worker_coordination.py | 50 ++++++++++++++++++- 1 file changed, 48 insertions(+), 2 deletions(-) diff --git a/src/alpine_bits_python/worker_coordination.py b/src/alpine_bits_python/worker_coordination.py index a30b4b1..199aa7d 100644 --- a/src/alpine_bits_python/worker_coordination.py +++ b/src/alpine_bits_python/worker_coordination.py @@ -21,12 +21,32 @@ class WorkerLock: services like schedulers only run on one worker. """ - def __init__(self, lock_file: str = "/tmp/alpinebits_primary_worker.lock"): + def __init__(self, lock_file: str | None = None): """Initialize the worker lock. Args: - lock_file: Path to the lock file + lock_file: Path to the lock file. If None, will try /var/run first, + falling back to /tmp if /var/run is not writable. """ + if lock_file is None: + # Try /var/run first (more persistent), fall back to /tmp + for candidate in ["/var/run/alpinebits_primary_worker.lock", + "/tmp/alpinebits_primary_worker.lock"]: + try: + candidate_path = Path(candidate) + candidate_path.parent.mkdir(parents=True, exist_ok=True) + # Test if we can write to this location + test_file = candidate_path.parent / ".alpinebits_test" + test_file.touch() + test_file.unlink() + lock_file = candidate + break + except (PermissionError, OSError): + continue + else: + # If all fail, default to /tmp + lock_file = "/tmp/alpinebits_primary_worker.lock" + self.lock_file = Path(lock_file) self.lock_fd = None self.is_primary = False @@ -107,6 +127,7 @@ def is_primary_worker() -> tuple[bool, WorkerLock | None]: """Determine if this worker should run singleton services. Uses file-based locking to coordinate between workers. + Includes stale lock detection and cleanup. Returns: Tuple of (is_primary, lock_object) @@ -114,6 +135,31 @@ def is_primary_worker() -> tuple[bool, WorkerLock | None]: - lock_object: WorkerLock instance (must be kept alive) """ lock = WorkerLock() + + # Check for stale locks from dead processes + if lock.lock_file.exists(): + try: + with open(lock.lock_file, 'r') as f: + old_pid_str = f.read().strip() + if old_pid_str: + old_pid = int(old_pid_str) + # Check if the process with this PID still exists + try: + os.kill(old_pid, 0) # Signal 0 just checks existence + _LOGGER.debug("Lock held by active process pid=%d", old_pid) + except ProcessLookupError: + # Process is dead, remove stale lock + _LOGGER.warning( + "Removing stale lock file from dead process pid=%d", + old_pid + ) + try: + lock.lock_file.unlink() + except Exception as e: + _LOGGER.warning("Failed to remove stale lock: %s", e) + except (ValueError, FileNotFoundError, PermissionError) as e: + _LOGGER.warning("Error checking lock file: %s", e) + is_primary = lock.acquire() return is_primary, lock