Files
alpinebits_python/tests/test_worker_coordination.py
2025-10-15 10:07:42 +02:00

63 lines
1.9 KiB
Python

#!/usr/bin/env python3
"""Test script to verify worker coordination with file locking.
This simulates multiple workers trying to acquire the primary worker lock.
"""
import multiprocessing
import time
from pathlib import Path
from src.alpine_bits_python.worker_coordination import WorkerLock
def worker_process(worker_id: int, lock_file: str):
"""Simulate a worker process trying to acquire the lock."""
print(f"Worker {worker_id} (PID {multiprocessing.current_process().pid}): Starting")
lock = WorkerLock(lock_file)
is_primary = lock.acquire()
if is_primary:
print(f"Worker {worker_id} (PID {multiprocessing.current_process().pid}): ✓ I am PRIMARY")
# Simulate running singleton services
time.sleep(3)
print(f"Worker {worker_id} (PID {multiprocessing.current_process().pid}): Releasing lock")
lock.release()
else:
print(f"Worker {worker_id} (PID {multiprocessing.current_process().pid}): ✗ I am SECONDARY")
# Simulate regular worker work
time.sleep(3)
print(f"Worker {worker_id} (PID {multiprocessing.current_process().pid}): Exiting")
if __name__ == "__main__":
# Use a test lock file
lock_file = "/tmp/test_alpinebits_worker.lock"
# Clean up any existing lock file
Path(lock_file).unlink(missing_ok=True)
print("Starting 4 worker processes (simulating uvicorn --workers 4)")
print("=" * 70)
# Start multiple workers
processes = []
for i in range(4):
p = multiprocessing.Process(target=worker_process, args=(i, lock_file))
p.start()
processes.append(p)
# Small delay to make output clearer
time.sleep(0.1)
# Wait for all workers to complete
for p in processes:
p.join()
print("=" * 70)
print("✓ Test complete: Only ONE worker should have been PRIMARY")
# Clean up
Path(lock_file).unlink(missing_ok=True)