Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions .github/workflows/pre-commit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,8 @@ jobs:
- name: Install dependencies
run: uv sync --python ${{ matrix.python-version }} --python-preference only-managed

- name: Run prek
run: uv run prek run --all-files
- name: Lint & format (prek)
run: uv run prek run --all-files --verbose --show-diff-on-failure --skip pytest

- name: Test (pytest)
run: uv run pytest tests/ -v
10 changes: 5 additions & 5 deletions src/cocoindex_code/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,26 +467,26 @@ def daemon_restart() -> None:
@daemon_app.command("stop")
def daemon_stop() -> None:
"""Stop the daemon."""
from .client import stop_daemon
from .client import is_daemon_running, stop_daemon
from .daemon import daemon_pid_path

pid_path = daemon_pid_path()
if not pid_path.exists():
if not pid_path.exists() and not is_daemon_running():
_typer.echo("Daemon is not running.")
return

stop_daemon()

# Wait for process to exit
# Wait for process to exit (check both pid file and socket)
import time

deadline = time.monotonic() + 5.0
while time.monotonic() < deadline:
if not pid_path.exists():
if not pid_path.exists() and not is_daemon_running():
break
time.sleep(0.1)

if pid_path.exists():
if pid_path.exists() or is_daemon_running():
_typer.echo("Warning: daemon may not have stopped cleanly.", err=True)
else:
_typer.echo("Daemon stopped.")
Expand Down
151 changes: 107 additions & 44 deletions src/cocoindex_code/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,26 @@ def start_daemon() -> None:
cmd = [sys.executable, "-m", "cocoindex_code.cli", "run-daemon"]

log_fd = open(log_path, "a")
subprocess.Popen(
cmd,
start_new_session=True,
stdout=log_fd,
stderr=log_fd,
stdin=subprocess.DEVNULL,
)
if sys.platform == "win32":
# DETACHED_PROCESS fully detaches the daemon from the parent console,
# preventing its exit code from leaking back to the calling shell.
_create_new_process_group = 0x00000200
_detached_process = 0x00000008
subprocess.Popen(
cmd,
stdout=log_fd,
stderr=log_fd,
stdin=subprocess.DEVNULL,
creationflags=_create_new_process_group | _detached_process,
)
else:
subprocess.Popen(
cmd,
start_new_session=True,
stdout=log_fd,
stderr=log_fd,
stdin=subprocess.DEVNULL,
)
log_fd.close()


Expand All @@ -205,12 +218,49 @@ def _find_ccc_executable() -> str | None:
return None


def _pid_alive(pid: int) -> bool:
"""Return True if *pid* is still running."""
if sys.platform == "win32":
# Avoid os.kill(pid, 0) on Windows — it has a CPython bug that corrupts
# the C-level exception state, causing subsequent C function calls
# (time.monotonic, time.sleep) to raise SystemError even after the
# OSError is caught. Use OpenProcess via ctypes instead.
import ctypes

kernel32 = getattr(ctypes, "windll").kernel32
handle = kernel32.OpenProcess(0x1000, False, pid) # PROCESS_QUERY_LIMITED_INFORMATION
if handle:
kernel32.CloseHandle(handle)
return True
return False
try:
os.kill(pid, 0) # signal 0: check existence without killing
return True
except ProcessLookupError:
return False
except PermissionError:
return True # process exists but we can't signal it


def stop_daemon() -> None:
"""Stop the daemon gracefully.

Sends a StopRequest, waits for the process to exit, falls back to SIGTERM.
Sends a StopRequest, waits for the process to exit, falls back to
SIGTERM → SIGKILL. Only removes the PID file after confirming that
the specific PID is no longer alive.
"""
# Step 1: try sending StopRequest
pid_path = daemon_pid_path()

# Read the PID early so we can track the actual process.
pid: int | None = None
try:
pid = int(pid_path.read_text().strip())
if pid == os.getpid():
pid = None # safety: never kill ourselves
except (FileNotFoundError, ValueError):
pass

# Step 1: try sending StopRequest via socket
try:
client = DaemonClient.connect()
client.handshake()
Expand All @@ -220,65 +270,78 @@ def stop_daemon() -> None:
pass

# Step 2: wait for process to exit (up to 5s)
pid_path = daemon_pid_path()
deadline = time.monotonic() + 5.0
while time.monotonic() < deadline and pid_path.exists():
time.sleep(0.1)

if not pid_path.exists():
return # Clean exit
if pid is not None:
deadline = time.monotonic() + 5.0
while time.monotonic() < deadline and _pid_alive(pid):
time.sleep(0.1)
if not _pid_alive(pid):
_cleanup_stale_files(pid_path, pid)
return

# Step 3: if still running, try SIGTERM
pid: int | None = None
if pid_path.exists():
if pid is not None and _pid_alive(pid):
try:
pid = int(pid_path.read_text().strip())
if pid != os.getpid():
os.kill(pid, signal.SIGTERM)
else:
pid = None
except (ValueError, ProcessLookupError, PermissionError):
os.kill(pid, signal.SIGTERM)
except (ProcessLookupError, PermissionError):
pass

# Wait a bit more
deadline = time.monotonic() + 2.0
while time.monotonic() < deadline and pid_path.exists():
while time.monotonic() < deadline and _pid_alive(pid):
time.sleep(0.1)

# Step 4: if still running, escalate to SIGKILL (Unix only;
if not _pid_alive(pid):
_cleanup_stale_files(pid_path, pid)
return

# Step 4: escalate to SIGKILL (Unix only;
# on Windows SIGTERM already calls TerminateProcess)
if sys.platform != "win32" and pid_path.exists():
if sys.platform != "win32" and pid is not None and _pid_alive(pid):
try:
pid = int(pid_path.read_text().strip())
if pid != os.getpid():
os.kill(pid, signal.SIGKILL)
else:
pid = None
except (ValueError, ProcessLookupError, PermissionError):
os.kill(pid, signal.SIGKILL)
except (ProcessLookupError, PermissionError):
pass

# SIGKILL is async; give the kernel a moment to reap
deadline = time.monotonic() + 1.0
while time.monotonic() < deadline and _pid_alive(pid):
time.sleep(0.1)

# Step 4b: on Windows, wait for the process to fully exit after TerminateProcess
# so that named pipe handles are released before starting a new daemon.
if sys.platform == "win32" and pid is not None:
deadline = time.monotonic() + 3.0
while time.monotonic() < deadline:
try:
os.kill(pid, 0) # Check if process still exists
time.sleep(0.1)
except (ProcessLookupError, PermissionError, OSError):
break # Process has exited
while time.monotonic() < deadline and _pid_alive(pid):
time.sleep(0.1)

# Step 5: clean up stale files
_cleanup_stale_files(pid_path, pid)


def _cleanup_stale_files(pid_path: Path, pid: int | None) -> None:
"""Remove socket and PID file after the daemon has exited.

Only removes the PID file when *pid* matches what is on disk, to
avoid accidentally deleting a newer daemon's PID file.
"""
if sys.platform != "win32":
sock = daemon_socket_path()
try:
Path(sock).unlink(missing_ok=True)
except Exception:
pass
try:
pid_path.unlink(missing_ok=True)
except Exception:
pass
if pid is not None:
try:
stored = pid_path.read_text().strip()
if stored == str(pid):
pid_path.unlink(missing_ok=True)
except (FileNotFoundError, ValueError):
pass
else:
# No PID known — cautiously remove if file exists
try:
pid_path.unlink(missing_ok=True)
except Exception:
pass


def _wait_for_daemon(timeout: float = 30.0) -> None:
Expand Down
17 changes: 12 additions & 5 deletions src/cocoindex_code/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,17 +447,24 @@ def run_daemon() -> None:
try:
asyncio.run(_async_daemon_main(embedder))
finally:
# Clean up PID file and socket (named pipes on Windows clean up automatically)
try:
pid_path.unlink(missing_ok=True)
except Exception:
pass
# Clean up socket first, then PID file last.
# The PID file is the authoritative "daemon is alive" indicator, so it
# must be the very last thing removed to avoid races where a client
# sees the PID gone but the socket (or process) is still lingering.
if sys.platform != "win32":
sock = daemon_socket_path()
try:
Path(sock).unlink(missing_ok=True)
except Exception:
pass
# Only remove the PID file if it still contains *our* PID.
# A new daemon may have already overwritten it during a restart race.
try:
stored = pid_path.read_text().strip()
if stored == str(os.getpid()):
pid_path.unlink(missing_ok=True)
except Exception:
pass
logger.info("Daemon stopped")


Expand Down
13 changes: 13 additions & 0 deletions tests/test_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
RemoveProjectRequest,
Response,
SearchRequest,
StopRequest,
decode_response,
encode_request,
)
Expand Down Expand Up @@ -92,6 +93,18 @@ def daemon_sock() -> Iterator[str]:

yield sock_path

# Gracefully shut down the daemon thread so named pipes are released on Windows
try:
conn = Client(sock_path, family=_connection_family())
conn.send_bytes(encode_request(HandshakeRequest(version=__version__)))
conn.recv_bytes()
conn.send_bytes(encode_request(StopRequest()))
conn.recv_bytes()
conn.close()
except Exception:
pass
thread.join(timeout=5)

# Restore patches and env var
dm.create_embedder = _orig_create_embedder # type: ignore[attr-defined]
if old_env is None:
Expand Down
Loading