feat(dlq): per-topic DLQ reconciliation + ErrorProcessor split#192
Conversation
|
|
baaeb9a to
61ad479
Compare
| } | ||
|
|
||
| // Process records that a log message landed in the DLQ and acks it. | ||
| func (c *logController) Process(_ context.Context, delivery consumer.Delivery) (retErr error) { |
There was a problem hiding this comment.
ideally the request log is consumed by gateway and orchestrator is just a publisher to it...
There was a problem hiding this comment.
Do you recommend to not do DLQ at all for it?
There was a problem hiding this comment.
For this one, i think we should be able republish back..if they failed due to mysql issues, if poison pills (ideally we shouldn't, given its free form) then we could mark it in the log itself with some generic event that we lost something...
But from request state perspective, this likely has no meaning. Because to derive the request state we need this on gateway end all the times.
There was a problem hiding this comment.
Not sure I understood. DLQ controllers process those that regular controllers could not, possibly because of programmer's bug.
The other option is to not have dlq log controller at all, monitor dlq, and defer to the operator to fix any outstanding messages in it.
What I want to avoid is fixing things in DLQ controllers, i.e. republishing back to primary ones. Error processing layers should only transition one way.
| logEntry := entity.NewRequestLog(requestID, entity.RequestStatusError, logVersion, "", nil) | ||
| if err := store.GetRequestLogStore().Insert(ctx, logEntry); err != nil { | ||
| return fmt.Errorf("failed to insert request log for %s: %w", requestID, err) | ||
| } |
There was a problem hiding this comment.
should this publish to log topic and let gateway manage all the writes to it as they should be written to gateway DB anyway..with this, they will get written to orcgstrator DB.
We can probably do a separate PR to fix it
There was a problem hiding this comment.
Request Log controller is in orchestrator right now. Gateway does not have queue-based controllers. If the controller is at the gateway, it probably makes sense to publish, otherwise I'd like to avoid republishing from dlq layer to primary layer.
| } | ||
|
|
||
| // Process reconciles a single DLQ delivery for the buildsignal topic. | ||
| func (c *buildSignalController) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { |
There was a problem hiding this comment.
we probably want to add build topic as well...so if we fail to trigger builds say buildkite is unavailable then they can be transitioned to failed as well
There was a problem hiding this comment.
Every new topic will need to have a corresponding dlq controller added... not part of this diff though?
Adds a dlq controller package that drains every primary pipeline topic's
{topic}_dlq companion and transitions the affected request or batch to a
terminal failed state, so requests cannot be stuck in non-terminal states
after the primary controller exhausts its retry budget.
Splits error-classification policy from consumer transport via a new
errs.ErrorProcessor interface with two implementations: NewClassifierProcessor
for the primary pipeline (framework-wrap short-circuit + classifier walk) and
AlwaysRetryableProcessor for DLQ consumers (every failure redelivered, paired
with high MaxAttempts and DLQ.Enabled=false to avoid a _dlq_dlq cascade).
Documentation updates cover the new package design, the processor choices,
and the per-stage DLQ companions in the workflow RFC.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Summary
submitqueue/orchestrator/controller/dlq/package that drains every primary pipeline topic's{topic}_dlqcompanion and transitions the affected request or batch to a terminal failed state. Without this, a request that exhausts its retry budget in the primary pipeline stays stuck in a non-terminal state and the gateway keeps reporting it as "in progress".errs.ErrorProcessorinterface with two implementations:NewClassifierProcessorfor primary pipeline consumers (framework-wrap short-circuit + per-node classifier walk) andAlwaysRetryableProcessorfor DLQ consumers (every failure redelivered). DLQ subscriptions disable their own DLQ and run withRetry.MaxAttempts=1000to avoid a_dlq_dlqcascade while preserving convergence.dlq/README.md(DLQ-is-final-destination design constraints), updatedcore/errs/README.md(processor choices), updatedsubmitqueue/core/consumer/README.md(ErrorProcessorargument + error-handling rewrite), new DLQ reconciliation section indoc/rfc/submitqueue/workflow.md, and a sentence inCLAUDE.mdon the two default classifiers.Design notes
RequestControllerfor request-scoped topics (start, validate, batch, cancel, log);BatchControllerfor batch-scoped topics (score, speculate, build, merge, conclude) with fan-out to member requests; small dedicated controller forbuildsignalpayloads.dlq.go.storage.ErrNotFound→ warn+skip;storage.ErrVersionMismatch→ wrapped retryable; everything else → plain (then forced retryable byAlwaysRetryableProcessor).Test plan
bazel test //core/errs/... //submitqueue/core/consumer:consumer_test //submitqueue/orchestrator/controller/dlq:dlq_test— all PASSED locally.bazel build //example/submitqueue/... //example/stovepipe/...— clean locally.🤖 Generated with Claude Code