feat(spider-scheduler): Add gRPC binary.#369
Conversation
# Conflicts: # components/spider-scheduler/src/error.rs # components/spider-scheduler/src/lib.rs # components/spider-scheduler/src/service.rs
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: ⛔ Files ignored due to path filters (1)
📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
WalkthroughThis PR adds protobuf-to-core conversion for task assignment records, scheduler request unpacking, a tonic gRPC adapter, server configuration types, and a new scheduler gRPC server binary with Cargo wiring. ChangesScheduler gRPC server
Estimated code review effort: 3 (Moderate) | ~30 minutes Sequence Diagram(s)sequenceDiagram
participant Client
participant GrpcSchedulerService
participant SchedulerServiceState
participant RequestUnpack
Client->>GrpcSchedulerService: next_task / heartbeat / shutdown
GrpcSchedulerService->>RequestUnpack: unpack protobuf request
RequestUnpack-->>GrpcSchedulerService: domain inputs
GrpcSchedulerService->>SchedulerServiceState: next_task / heartbeat / shutdown
SchedulerServiceState-->>GrpcSchedulerService: result or SchedulerServiceError
GrpcSchedulerService-->>Client: protobuf response or tonic Status
sequenceDiagram
participant Main
participant ServerConfig
participant GrpcSchedulerStorageClient
participant GrpcSchedulerService
participant TonicServer
Main->>ServerConfig: load YAML config
Main->>GrpcSchedulerStorageClient: connect(storage_endpoint)
Main->>GrpcSchedulerService: new(state, cancellation_token)
Main->>TonicServer: serve_with_shutdown(grpc_service)
TonicServer-->>Main: shutdown on Ctrl-C or cancellation
Main->>Main: runtime.stop().await
Possibly related PRs
Suggested reviewers: 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@CodeRabbit review |
✅ Action performedReview finished.
|
There was a problem hiding this comment.
🧹 Nitpick comments (4)
components/spider-proto-rust/src/unpack/scheduler.rs (1)
22-32: 🩺 Stability & Availability | 🔵 Trivial | ⚡ Quick winUnbounded client-supplied
wait_time_mscan hold a request open indefinitely.
Duration::from_millis(self.wait_time_ms)accepts any client-suppliedu64unmodified and is later passed straight todispatch_source.dequeue(wait_time). A malicious or buggy client can request an effectively unbounded wait (e.g.,u64::MAXms), tying up server resources for that RPC until an assignment appears or the connection drops. Consider clampingwait_time_msto a sane maximum before converting toDuration.🛡️ Proposed clamp
+const MAX_WAIT_TIME_MS: u64 = 60_000; + impl RequestUnpack for NextTaskRequest { type Unpacked = (ExecutionManagerId, Option<TaskAssignmentRecord>, Duration); fn unpack(self) -> Result<Self::Unpacked, UnpackError> { Ok(( ExecutionManagerId::from(self.execution_manager_id), self.prev_assignment.map(ProtoTaskAssignmentRecord::into), - Duration::from_millis(self.wait_time_ms), + Duration::from_millis(self.wait_time_ms.min(MAX_WAIT_TIME_MS)), )) } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@components/spider-proto-rust/src/unpack/scheduler.rs` around lines 22 - 32, The NextTaskRequest::unpack path currently converts client-controlled wait_time_ms directly into a Duration, allowing an effectively unbounded wait. Update NextTaskRequest::unpack to clamp self.wait_time_ms to a sensible maximum before calling Duration::from_millis, and keep the rest of the unpacked tuple unchanged. Make the limit explicit near the Duration::from_millis conversion so the behavior is easy to find and maintain.components/spider-scheduler/src/grpc.rs (1)
161-212: 📐 Maintainability & Code Quality | 🔵 Trivial | 💤 Low valueMinor inconsistency:
TAGconstant only used innext_task.
next_taskdefines a localconst TAGwhileheartbeat/shutdownpass string literals directly toservice_error_handler. Purely cosmetic, no functional issue.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@components/spider-scheduler/src/grpc.rs` around lines 161 - 212, The local TAG constant in next_task is inconsistent with heartbeat and shutdown, which pass string literals directly to service_error_handler. Clean this up by either reusing a shared tag symbol across next_task, heartbeat, and shutdown, or by removing TAG and passing the same literal style everywhere so the error handling calls are uniform in grpc.rs.components/spider-scheduler/src/bin/grpc_server.rs (2)
44-59: 🩺 Stability & Availability | 🔵 Trivial | ⚡ Quick winNo SIGTERM handling for graceful shutdown.
Shutdown is only wired to the cancellation token and
Ctrl-C(SIGINT). In containerized/orchestrated deployments (e.g., Kubernetes, systemd), shutdown is typically signalled via SIGTERM, which this server would not currently handle gracefully, likely resulting in a hard kill instead of a clean drain/stop.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@components/spider-scheduler/src/bin/grpc_server.rs` around lines 44 - 59, The gRPC server shutdown path in serve_with_shutdown only reacts to the cancellation token and tokio::signal::ctrl_c, so SIGTERM is not handled. Update the shutdown future in grpc_server.rs to also listen for SIGTERM alongside Ctrl-C and cancellation_token, and trigger cancellation when SIGTERM is received. Use the existing serve_with_shutdown, cancellation_token, and select! shutdown block so the server can drain cleanly in orchestrated environments.
61-64: 🩺 Stability & Availability | 🔵 Trivial | ⚡ Quick winStop error is silently swallowed when serve also fails.
stop_resultis computed unconditionally, but ifserve_resultisErr, the?on line 62 returns before line 63 is evaluated, so any error fromruntime.stop()is dropped without ever being surfaced (not even logged).Suggested fix
let stop_result = runtime.stop().await; - serve_result?; - stop_result?; + if let Err(ref error) = stop_result { + tracing::error!(error = %error, "Failed to stop scheduler runtime cleanly."); + } + serve_result?; + stop_result?; Ok(())🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@components/spider-scheduler/src/bin/grpc_server.rs` around lines 61 - 64, The grpc_server main flow is dropping a runtime.stop() failure whenever serve_result is Err because the serve_result? return short-circuits before stop_result? is reached. Update the grpc_server logic to evaluate and report both outcomes explicitly in this block around runtime.stop(), serve_result, and stop_result, so a stop error is not silently lost even when serving also fails.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@components/spider-proto-rust/src/unpack/scheduler.rs`:
- Around line 22-32: The NextTaskRequest::unpack path currently converts
client-controlled wait_time_ms directly into a Duration, allowing an effectively
unbounded wait. Update NextTaskRequest::unpack to clamp self.wait_time_ms to a
sensible maximum before calling Duration::from_millis, and keep the rest of the
unpacked tuple unchanged. Make the limit explicit near the Duration::from_millis
conversion so the behavior is easy to find and maintain.
In `@components/spider-scheduler/src/bin/grpc_server.rs`:
- Around line 44-59: The gRPC server shutdown path in serve_with_shutdown only
reacts to the cancellation token and tokio::signal::ctrl_c, so SIGTERM is not
handled. Update the shutdown future in grpc_server.rs to also listen for SIGTERM
alongside Ctrl-C and cancellation_token, and trigger cancellation when SIGTERM
is received. Use the existing serve_with_shutdown, cancellation_token, and
select! shutdown block so the server can drain cleanly in orchestrated
environments.
- Around line 61-64: The grpc_server main flow is dropping a runtime.stop()
failure whenever serve_result is Err because the serve_result? return
short-circuits before stop_result? is reached. Update the grpc_server logic to
evaluate and report both outcomes explicitly in this block around
runtime.stop(), serve_result, and stop_result, so a stop error is not silently
lost even when serving also fails.
In `@components/spider-scheduler/src/grpc.rs`:
- Around line 161-212: The local TAG constant in next_task is inconsistent with
heartbeat and shutdown, which pass string literals directly to
service_error_handler. Clean this up by either reusing a shared tag symbol
across next_task, heartbeat, and shutdown, or by removing TAG and passing the
same literal style everywhere so the error handling calls are uniform in
grpc.rs.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 2e12ec90-561a-4584-889b-6dfd122af107
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (10)
components/spider-proto-rust/src/assignment.rscomponents/spider-proto-rust/src/unpack/mod.rscomponents/spider-proto-rust/src/unpack/scheduler.rscomponents/spider-scheduler/Cargo.tomlcomponents/spider-scheduler/src/bin/grpc_server.rscomponents/spider-scheduler/src/config.rscomponents/spider-scheduler/src/execution_manager_registry.rscomponents/spider-scheduler/src/grpc.rscomponents/spider-scheduler/src/lib.rscomponents/spider-scheduler/src/runtime.rs
Description
This PR:
Note
This PR does not add new tests.
Checklist
breaking change.
Validation performed
Summary by CodeRabbit