Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,15 @@ CACHE_TTL_HOURS=1
# Log level for both console and file (DEBUG|INFO|WARNING|ERROR).
LOG_LEVEL=INFO
LOG_RETENTION_DAYS=7

# --- Graceful shutdown (optional) ---
# Max time to drain queued Slack messages on SIGTERM/SIGINT shutdown.
SHUTDOWN_MQ_DRAIN_TIMEOUT_SECONDS=30
Comment thread
coderabbitai[bot] marked this conversation as resolved.
# Max time to wait for server threads (health, Bolt) to exit during shutdown.
SHUTDOWN_THREAD_JOIN_TIMEOUT_SECONDS=5
# Set to the container stop grace period (seconds). When non-zero, a startup
# warning is emitted if the combined shutdown budget
# (SHUTDOWN_MQ_DRAIN_TIMEOUT_SECONDS + 2 × SHUTDOWN_THREAD_JOIN_TIMEOUT_SECONDS)
# meets or exceeds this value. In docker-compose.yml this also controls
# stop_grace_period (default 45 s, which exceeds the 40 s default budget).
STOP_GRACE_PERIOD_SECONDS=45
10 changes: 10 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,21 @@ services:
env_file: .env
environment:
HEALTH_BIND_HOST: "0.0.0.0"
# Passed into the container so the startup budget check can compare it
# against the configured shutdown timeouts. Must match stop_grace_period.
STOP_GRACE_PERIOD_SECONDS: "${STOP_GRACE_PERIOD_SECONDS:-45}"
extra_hosts:
- "host.docker.internal:host-gateway"
logging:
driver: json-file
options:
max-size: "10m"
max-file: "5"
# Must exceed the combined shutdown budget:
# SHUTDOWN_MQ_DRAIN_TIMEOUT_SECONDS (30 s default)
# + SHUTDOWN_THREAD_JOIN_TIMEOUT_SECONDS × 2 (5 s × 2 = 10 s default)
# = 40 s. Keep at least 5 s of headroom above that sum.
# Override STOP_GRACE_PERIOD_SECONDS in the environment or .env file to
# change both this period and the in-process budget check simultaneously.
stop_grace_period: "${STOP_GRACE_PERIOD_SECONDS:-45}s"
restart: unless-stopped
111 changes: 89 additions & 22 deletions src/paperscout/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import asyncio
import logging
import logging.handlers
import signal
import sys
import threading
from datetime import datetime, timezone
Expand All @@ -22,6 +23,7 @@
notify_users,
register_handlers,
)
from .shutdown import shutdown_services
from .sources import ISOProber, WG21Index
from .storage import ProbeState, UserWatchlist

Expand Down Expand Up @@ -128,8 +130,37 @@ def _setup_logging(data_dir: Path, console_level: str = "INFO", retention_days:
logging.getLogger(lib).setLevel(logging.WARNING)


def _register_shutdown_signals(
loop: asyncio.AbstractEventLoop,
shutdown_event: asyncio.Event,
shutdown_reason: list[str | None],
) -> None:
"""Register SIGTERM/SIGINT handlers that set *shutdown_event*."""

def _on_signal(signame: str) -> None:
"""Record the first shutdown signal and wake the scheduler."""
if shutdown_reason[0] is None:
shutdown_reason[0] = signame
shutdown_event.set()

for sig, name in ((signal.SIGTERM, "SIGTERM"), (signal.SIGINT, "SIGINT")):
try:
loop.add_signal_handler(sig, lambda n=name: _on_signal(n))
except NotImplementedError:
signal.signal(sig, lambda *_a, n=name: _on_signal(n))


async def _async_main() -> None:
"""Start DB, Slack app, health server, and the polling scheduler."""
shutdown_event = asyncio.Event()
shutdown_reason: list[str | None] = [None]
health_server = None
bolt_thread = None
mq = None
app = None

_register_shutdown_signals(asyncio.get_running_loop(), shutdown_event, shutdown_reason)

data_dir = settings.data_dir
data_dir.mkdir(parents=True, exist_ok=True)

Expand Down Expand Up @@ -157,6 +188,27 @@ async def _async_main() -> None:
settings.frontier_gap_threshold,
)

_shutdown_budget = (
settings.shutdown_mq_drain_timeout_seconds
+ 2 * settings.shutdown_thread_join_timeout_seconds
)
log.info(
"Shutdown budget: %.0fs (mq_drain=%.0f + 2×thread_join=%.0f)",
_shutdown_budget,
settings.shutdown_mq_drain_timeout_seconds,
settings.shutdown_thread_join_timeout_seconds,
)
if (
settings.stop_grace_period_seconds > 0
and _shutdown_budget >= settings.stop_grace_period_seconds
):
log.warning(
"Shutdown budget %.0fs ≥ stop_grace_period %.0fs — increase "
"STOP_GRACE_PERIOD_SECONDS or reduce SHUTDOWN_*_TIMEOUT_SECONDS",
_shutdown_budget,
settings.stop_grace_period_seconds,
)

if not settings.database_url:
log.error("DATABASE_URL is not set — cannot start")
sys.exit(1)
Expand Down Expand Up @@ -215,27 +267,42 @@ def _extra_health_fields() -> dict:
_pool_status(pool),
)

register_handlers(app, user_watchlist, state, paper_count_fn, launch_time)

start_health_server(
settings.health_port,
launch_time,
state,
paper_count_fn,
bind_host=settings.health_bind_host,
extra_fields_fn=_extra_health_fields,
)
log.info("Starting Slack Bolt app on port %d", settings.port)
bolt_thread = threading.Thread(
target=app.start,
kwargs={"port": settings.port},
daemon=True,
)
bolt_thread.start()

enqueue_startup_status(mq, state, paper_count_fn)

await scheduler.run_forever()
try:
register_handlers(app, user_watchlist, state, paper_count_fn, launch_time)

health_server = start_health_server(
settings.health_port,
launch_time,
state,
paper_count_fn,
bind_host=settings.health_bind_host,
extra_fields_fn=_extra_health_fields,
)
log.info("Starting Slack Bolt app on port %d", settings.port)
bolt_thread = threading.Thread(
target=app.start,
kwargs={"port": settings.port},
daemon=True,
name="bolt",
)
bolt_thread.start()

enqueue_startup_status(mq, state, paper_count_fn)

await scheduler.run_forever(shutdown_event)
finally:
shutdown_services(
reason=shutdown_reason[0] or "unknown",
mq=mq,
health_server=health_server,
health_thread=(
getattr(health_server, "_paperscout_thread", None) if health_server else None
),
app=app,
bolt_thread=bolt_thread,
mq_drain_timeout=settings.shutdown_mq_drain_timeout_seconds,
thread_join_timeout=settings.shutdown_thread_join_timeout_seconds,
)


def main() -> None:
Expand All @@ -244,7 +311,7 @@ def main() -> None:
asyncio.run(_async_main())
except KeyboardInterrupt:
log.info("=== Paperscout shutting down (KeyboardInterrupt) ===")
sys.exit(0)
sys.exit(0)


if __name__ == "__main__":
Expand Down
8 changes: 8 additions & 0 deletions src/paperscout/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ class Settings(BaseSettings):
mq_circuit_breaker_cooldown_seconds: int = Field(default=60, ge=1)
mq_max_size: int = Field(default=1000, ge=1)

# -- Graceful shutdown --
shutdown_mq_drain_timeout_seconds: float = Field(default=30.0, ge=0.1)
shutdown_thread_join_timeout_seconds: float = Field(default=5.0, ge=0.1)
# Set to the container orchestrator's stop/grace period (seconds).
# When non-zero, a startup warning is emitted if the combined shutdown budget
# (mq_drain + 2 × thread_join) meets or exceeds this value.
stop_grace_period_seconds: float = Field(default=0.0, ge=0.0)

@model_validator(mode="after")
def _require_slack_credentials_unless_testing(self) -> Settings:
"""Slack tokens must be set for real runs; pytest sets ``_PAPERSCOUT_TESTING=1``."""
Expand Down
1 change: 1 addition & 0 deletions src/paperscout/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def start_health_server(

server = HTTPServer((bind_host, port), handler)
thread = threading.Thread(target=server.serve_forever, daemon=True, name="health")
server._paperscout_thread = thread # noqa: SLF001 — joined during graceful shutdown
thread.start()
log.info("Health endpoint listening on %s:%d", bind_host, port)
return server
60 changes: 56 additions & 4 deletions src/paperscout/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import asyncio
import contextlib
import copy
import logging
import threading
Expand Down Expand Up @@ -456,7 +457,33 @@ async def poll_once(self) -> PollResult:
self._publish_health_snapshot()
return result

async def run_forever(self) -> None:
async def _poll_once_or_cancel(self, shutdown_event: asyncio.Event) -> None:
"""Run ``poll_once`` or cancel promptly when *shutdown_event* is set."""
poll_task = asyncio.create_task(self.poll_once())
shutdown_task = asyncio.create_task(shutdown_event.wait())
try:
done, _pending = await asyncio.wait(
{poll_task, shutdown_task},
return_when=asyncio.FIRST_COMPLETED,
)
if shutdown_task in done:
poll_task.cancel()
await poll_task
else:
shutdown_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await shutdown_task
await poll_task
except asyncio.CancelledError:
raise
finally:
for task in (poll_task, shutdown_task):
if not task.done():
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await task

async def run_forever(self, shutdown_event: asyncio.Event | None = None) -> None:
"""Run ``poll_once`` on an interval, with overrun cooldown between cycles."""
interval = self.cfg.poll_interval_minutes * 60
cooldown = self.cfg.poll_overrun_cooldown_seconds
Expand All @@ -468,10 +495,20 @@ async def run_forever(self) -> None:
self.cfg.enable_bulk_wg21,
)
run_started_wall = time.time()
while True:
shutdown_requested = False
while shutdown_event is None or not shutdown_event.is_set():
t0 = time.monotonic()
try:
await self.poll_once()
if shutdown_event is not None:
await self._poll_once_or_cancel(shutdown_event)
else:
await self.poll_once()
except asyncio.CancelledError:
if shutdown_event is None or not shutdown_event.is_set():
raise
log.info("POLL-CANCELLED poll=%d reason=shutdown", self._poll_count)
shutdown_requested = True
break
Comment thread
coderabbitai[bot] marked this conversation as resolved.
except ConfigurationError as exc:
log.critical(
"POLL-FATAL failure_category=%s poll=%d %s",
Expand Down Expand Up @@ -514,6 +551,10 @@ async def run_forever(self) -> None:
)
elapsed = time.monotonic() - t0

if shutdown_event is not None and shutdown_event.is_set():
shutdown_requested = True
break

if self.ops_alert_fn:
alert_threshold = 2 * interval
now_wall = time.time()
Expand Down Expand Up @@ -542,4 +583,15 @@ async def run_forever(self) -> None:
elapsed,
interval,
)
await asyncio.sleep(sleep_for)
if shutdown_event is not None:
try:
await asyncio.wait_for(shutdown_event.wait(), timeout=sleep_for)
shutdown_requested = True
break
except asyncio.TimeoutError:
pass
else:
await asyncio.sleep(sleep_for)

if shutdown_requested or (shutdown_event is not None and shutdown_event.is_set()):
log.info("SCHEDULER-STOP reason=shutdown_event")
Loading