Skip to content

feat(dlq): per-topic DLQ reconciliation + ErrorProcessor split#192

Merged
behinddwalls merged 1 commit into
mainfrom
failures
Jun 5, 2026
Merged

feat(dlq): per-topic DLQ reconciliation + ErrorProcessor split#192
behinddwalls merged 1 commit into
mainfrom
failures

Conversation

@sbalabanov
Copy link
Copy Markdown
Contributor

Summary

  • Adds a submitqueue/orchestrator/controller/dlq/ package that drains every primary pipeline topic's {topic}_dlq companion and transitions the affected request or batch to a terminal failed state. Without this, a request that exhausts its retry budget in the primary pipeline stays stuck in a non-terminal state and the gateway keeps reporting it as "in progress".
  • Splits error-classification policy from consumer transport via a new errs.ErrorProcessor interface with two implementations: NewClassifierProcessor for primary pipeline consumers (framework-wrap short-circuit + per-node classifier walk) and AlwaysRetryableProcessor for DLQ consumers (every failure redelivered). DLQ subscriptions disable their own DLQ and run with Retry.MaxAttempts=1000 to avoid a _dlq_dlq cascade while preserving convergence.
  • Documentation: new dlq/README.md (DLQ-is-final-destination design constraints), updated core/errs/README.md (processor choices), updated submitqueue/core/consumer/README.md (ErrorProcessor argument + error-handling rewrite), new DLQ reconciliation section in doc/rfc/submitqueue/workflow.md, and a sentence in CLAUDE.md on the two default classifiers.

Design notes

  • DLQ is the final destination. A genuinely unprocessable DLQ message (e.g. malformed payload) must be removed by an operator. The controllers do not attempt to recover; they only reconcile request/batch state to terminal failed states with the same optimistic-locking CAS the primary pipeline uses.
  • Two controller shapes. RequestController for request-scoped topics (start, validate, batch, cancel, log); BatchController for batch-scoped topics (score, speculate, build, merge, conclude) with fan-out to member requests; small dedicated controller for buildsignal payloads.
  • Error handling in dlq.go. storage.ErrNotFound → warn+skip; storage.ErrVersionMismatch → wrapped retryable; everything else → plain (then forced retryable by AlwaysRetryableProcessor).

Test plan

  • bazel test //core/errs/... //submitqueue/core/consumer:consumer_test //submitqueue/orchestrator/controller/dlq:dlq_test — all PASSED locally.
  • bazel build //example/submitqueue/... //example/stovepipe/... — clean locally.
  • Watch CI on the draft PR.

🤖 Generated with Claude Code

@CLAassistant
Copy link
Copy Markdown

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@sbalabanov sbalabanov force-pushed the failures branch 2 times, most recently from baaeb9a to 61ad479 Compare June 4, 2026 19:23
@sbalabanov sbalabanov marked this pull request as ready for review June 4, 2026 19:24
@sbalabanov sbalabanov requested review from a team and behinddwalls as code owners June 4, 2026 19:24
}

// Process records that a log message landed in the DLQ and acks it.
func (c *logController) Process(_ context.Context, delivery consumer.Delivery) (retErr error) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally the request log is consumed by gateway and orchestrator is just a publisher to it...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you recommend to not do DLQ at all for it?

Copy link
Copy Markdown
Collaborator

@behinddwalls behinddwalls Jun 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this one, i think we should be able republish back..if they failed due to mysql issues, if poison pills (ideally we shouldn't, given its free form) then we could mark it in the log itself with some generic event that we lost something...

But from request state perspective, this likely has no meaning. Because to derive the request state we need this on gateway end all the times.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understood. DLQ controllers process those that regular controllers could not, possibly because of programmer's bug.
The other option is to not have dlq log controller at all, monitor dlq, and defer to the operator to fix any outstanding messages in it.
What I want to avoid is fixing things in DLQ controllers, i.e. republishing back to primary ones. Error processing layers should only transition one way.

Comment on lines +119 to +122
logEntry := entity.NewRequestLog(requestID, entity.RequestStatusError, logVersion, "", nil)
if err := store.GetRequestLogStore().Insert(ctx, logEntry); err != nil {
return fmt.Errorf("failed to insert request log for %s: %w", requestID, err)
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this publish to log topic and let gateway manage all the writes to it as they should be written to gateway DB anyway..with this, they will get written to orcgstrator DB.

We can probably do a separate PR to fix it

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Request Log controller is in orchestrator right now. Gateway does not have queue-based controllers. If the controller is at the gateway, it probably makes sense to publish, otherwise I'd like to avoid republishing from dlq layer to primary layer.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#205 made the change to move it to gateway

}

// Process reconciles a single DLQ delivery for the buildsignal topic.
func (c *buildSignalController) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably want to add build topic as well...so if we fail to trigger builds say buildkite is unavailable then they can be transitioned to failed as well

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every new topic will need to have a corresponding dlq controller added... not part of this diff though?

Adds a dlq controller package that drains every primary pipeline topic's
{topic}_dlq companion and transitions the affected request or batch to a
terminal failed state, so requests cannot be stuck in non-terminal states
after the primary controller exhausts its retry budget.

Splits error-classification policy from consumer transport via a new
errs.ErrorProcessor interface with two implementations: NewClassifierProcessor
for the primary pipeline (framework-wrap short-circuit + classifier walk) and
AlwaysRetryableProcessor for DLQ consumers (every failure redelivered, paired
with high MaxAttempts and DLQ.Enabled=false to avoid a _dlq_dlq cascade).

Documentation updates cover the new package design, the processor choices,
and the per-stage DLQ companions in the workflow RFC.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@behinddwalls behinddwalls merged commit 3799818 into main Jun 5, 2026
14 of 15 checks passed
@behinddwalls behinddwalls deleted the failures branch June 5, 2026 18:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants