Merging schema_extension #9
@@ -21,12 +21,32 @@ class WorkerLock:
|
||||
services like schedulers only run on one worker.
|
||||
"""
|
||||
|
||||
def __init__(self, lock_file: str = "/tmp/alpinebits_primary_worker.lock"):
|
||||
def __init__(self, lock_file: str | None = None):
|
||||
"""Initialize the worker lock.
|
||||
|
||||
Args:
|
||||
lock_file: Path to the lock file
|
||||
lock_file: Path to the lock file. If None, will try /var/run first,
|
||||
falling back to /tmp if /var/run is not writable.
|
||||
"""
|
||||
if lock_file is None:
|
||||
# Try /var/run first (more persistent), fall back to /tmp
|
||||
for candidate in ["/var/run/alpinebits_primary_worker.lock",
|
||||
"/tmp/alpinebits_primary_worker.lock"]:
|
||||
try:
|
||||
candidate_path = Path(candidate)
|
||||
candidate_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
# Test if we can write to this location
|
||||
test_file = candidate_path.parent / ".alpinebits_test"
|
||||
test_file.touch()
|
||||
test_file.unlink()
|
||||
lock_file = candidate
|
||||
break
|
||||
except (PermissionError, OSError):
|
||||
continue
|
||||
else:
|
||||
# If all fail, default to /tmp
|
||||
lock_file = "/tmp/alpinebits_primary_worker.lock"
|
||||
|
||||
self.lock_file = Path(lock_file)
|
||||
self.lock_fd = None
|
||||
self.is_primary = False
|
||||
@@ -107,6 +127,7 @@ def is_primary_worker() -> tuple[bool, WorkerLock | None]:
|
||||
"""Determine if this worker should run singleton services.
|
||||
|
||||
Uses file-based locking to coordinate between workers.
|
||||
Includes stale lock detection and cleanup.
|
||||
|
||||
Returns:
|
||||
Tuple of (is_primary, lock_object)
|
||||
@@ -114,6 +135,31 @@ def is_primary_worker() -> tuple[bool, WorkerLock | None]:
|
||||
- lock_object: WorkerLock instance (must be kept alive)
|
||||
"""
|
||||
lock = WorkerLock()
|
||||
|
||||
# Check for stale locks from dead processes
|
||||
if lock.lock_file.exists():
|
||||
try:
|
||||
with open(lock.lock_file, 'r') as f:
|
||||
old_pid_str = f.read().strip()
|
||||
if old_pid_str:
|
||||
old_pid = int(old_pid_str)
|
||||
# Check if the process with this PID still exists
|
||||
try:
|
||||
os.kill(old_pid, 0) # Signal 0 just checks existence
|
||||
_LOGGER.debug("Lock held by active process pid=%d", old_pid)
|
||||
except ProcessLookupError:
|
||||
# Process is dead, remove stale lock
|
||||
_LOGGER.warning(
|
||||
"Removing stale lock file from dead process pid=%d",
|
||||
old_pid
|
||||
)
|
||||
try:
|
||||
lock.lock_file.unlink()
|
||||
except Exception as e:
|
||||
_LOGGER.warning("Failed to remove stale lock: %s", e)
|
||||
except (ValueError, FileNotFoundError, PermissionError) as e:
|
||||
_LOGGER.warning("Error checking lock file: %s", e)
|
||||
|
||||
is_primary = lock.acquire()
|
||||
|
||||
return is_primary, lock
|
||||
|
||||
Reference in New Issue
Block a user