diff --git a/submitqueue/orchestrator/controller/conclude/BUILD.bazel b/submitqueue/orchestrator/controller/conclude/BUILD.bazel index b9ad9ff3..7aab80ea 100644 --- a/submitqueue/orchestrator/controller/conclude/BUILD.bazel +++ b/submitqueue/orchestrator/controller/conclude/BUILD.bazel @@ -8,6 +8,7 @@ go_library( deps = [ "//core/metrics", "//submitqueue/core/consumer", + "//submitqueue/core/request", "//submitqueue/entity", "//submitqueue/extension/storage", "@com_github_uber_go_tally_v4//:tally", diff --git a/submitqueue/orchestrator/controller/conclude/conclude.go b/submitqueue/orchestrator/controller/conclude/conclude.go index 21a23270..4d939ab7 100644 --- a/submitqueue/orchestrator/controller/conclude/conclude.go +++ b/submitqueue/orchestrator/controller/conclude/conclude.go @@ -21,6 +21,7 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/metrics" "github.com/uber/submitqueue/submitqueue/core/consumer" + corerequest "github.com/uber/submitqueue/submitqueue/core/request" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/storage" "go.uber.org/zap" @@ -101,8 +102,17 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r metrics.NamedCounter(c.metricsScope, "process", "unexpected_state_errors", 1) return fmt.Errorf("unexpected batch state %q for batch %s: %w", batch.State, batch.ID, err) } + requestStatus, err := requestStateToStatus(requestState) + if err != nil { + // Unreachable: batchStateToRequestState only returns terminal request states. + return fmt.Errorf("failed to map request state %s to status: %w", requestState, err) + } - // Update each request's state to reflect the batch outcome. + // Reconcile each request to the batch's terminal state and emit a terminal + // log entry. The flow is idempotent under at-least-once delivery: a prior + // attempt may have completed the CAS but failed before publishing the log, + // so the log publish must still run when the request is already in the + // target terminal state. for _, requestID := range batch.Contains { request, err := c.store.GetRequestStore().Get(ctx, requestID) if err != nil { @@ -110,18 +120,47 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r return fmt.Errorf("failed to get request %s: %w", requestID, err) } - newVersion := request.Version + 1 - if err := c.store.GetRequestStore().UpdateState(ctx, requestID, request.Version, newVersion, requestState); err != nil { - metrics.NamedCounter(c.metricsScope, "process", "request_update_errors", 1) - return fmt.Errorf("failed to update request %s state to %s: %w", requestID, requestState, err) + switch { + case request.State == requestState: + // Idempotent retry: a prior delivery already wrote the terminal + // state. Skip the CAS and fall through to the log publish. + metrics.NamedCounter(c.metricsScope, "process", "already_reconciled", 1) + case entity.IsRequestStateTerminal(request.State): + // Divergent terminal state — a concurrent path (e.g. a racing + // cancel-not-yet-batched transition) reached terminal first. Skip + // the reconcile and the log publish; the other writer owns the + // terminal log entry for the state it actually wrote. + c.logger.Warnw("request already in different terminal state, skipping reconcile", + "batch_id", batch.ID, + "request_id", requestID, + "actual_state", string(request.State), + "expected_state", string(requestState), + ) + metrics.NamedCounter(c.metricsScope, "process", "terminal_state_divergence", 1) + continue + default: + newVersion := request.Version + 1 + if err := c.store.GetRequestStore().UpdateState(ctx, requestID, request.Version, newVersion, requestState); err != nil { + metrics.NamedCounter(c.metricsScope, "process", "request_update_errors", 1) + return fmt.Errorf("failed to update request %s state to %s: %w", requestID, requestState, err) + } + request.Version = newVersion + request.State = requestState + + c.logger.Infow("updated request state", + "batch_id", batch.ID, + "request_id", requestID, + "new_state", string(requestState), + ) } - request.Version = newVersion - c.logger.Infow("updated request state", - "batch_id", batch.ID, - "request_id", requestID, - "new_state", string(requestState), - ) + logEntry := entity.NewRequestLog(requestID, requestStatus, request.Version, "", map[string]string{ + "batch_id": batch.ID, + }) + if err := corerequest.PublishLog(ctx, c.registry, logEntry, requestID); err != nil { + metrics.NamedCounter(c.metricsScope, "process", "log_publish_errors", 1) + return fmt.Errorf("failed to publish request log for %s: %w", requestID, err) + } } return nil // Success - message will be acked @@ -155,3 +194,17 @@ func batchStateToRequestState(state entity.BatchState) (entity.RequestState, err return entity.RequestStateUnknown, fmt.Errorf("non-terminal batch state: %s", state) } } + +// requestStateToStatus maps a terminal request state to the corresponding log status. +func requestStateToStatus(state entity.RequestState) (entity.RequestStatus, error) { + switch state { + case entity.RequestStateLanded: + return entity.RequestStatusLanded, nil + case entity.RequestStateError: + return entity.RequestStatusError, nil + case entity.RequestStateCancelled: + return entity.RequestStatusCancelled, nil + default: + return entity.RequestStatusUnknown, fmt.Errorf("non-terminal request state: %s", state) + } +} diff --git a/submitqueue/orchestrator/controller/conclude/conclude_test.go b/submitqueue/orchestrator/controller/conclude/conclude_test.go index 18d10c73..dd2b0be2 100644 --- a/submitqueue/orchestrator/controller/conclude/conclude_test.go +++ b/submitqueue/orchestrator/controller/conclude/conclude_test.go @@ -41,7 +41,10 @@ func batchIDPayload(t *testing.T, id string) []byte { } // newTestController creates a controller with test dependencies. -func newTestController(t *testing.T, ctrl *gomock.Controller, mockStorage *storagemock.MockStorage) *Controller { +// expectLogPublish controls whether the log topic publisher is wired with an +// expectation; tests that don't reach the log publish step pass false so an +// unexpected publish would fail the test. +func newTestController(t *testing.T, ctrl *gomock.Controller, mockStorage *storagemock.MockStorage, expectLogPublish bool) (*Controller, *queuemock.MockPublisher) { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope @@ -53,15 +56,26 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, mockStorage *stora mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() } - registry, err := consumer.NewTopicRegistry(nil) + mockPub := queuemock.NewMockPublisher(ctrl) + if expectLogPublish { + mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + } + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{ + {Key: consumer.TopicKeyLog, Name: "log", Queue: mockQ}, + }, + ) require.NoError(t, err) - return NewController(logger, scope, mockStorage, registry, consumer.TopicKeyConclude, "orchestrator-conclude") + return NewController(logger, scope, mockStorage, registry, consumer.TopicKeyConclude, "orchestrator-conclude"), mockPub } func TestNewController(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + controller, _ := newTestController(t, ctrl, nil, false) require.NotNil(t, controller) assert.Equal(t, consumer.TopicKeyConclude, controller.TopicKey()) @@ -71,11 +85,12 @@ func TestNewController(t *testing.T) { func TestController_Process(t *testing.T) { tests := []struct { - name string - batch entity.Batch - setupStore func(*gomock.Controller) *storagemock.MockStorage - wantErr bool - retryable bool + name string + batch entity.Batch + setupStore func(*gomock.Controller) *storagemock.MockStorage + expectLogPublish bool + wantErr bool + retryable bool }{ { name: "succeeded batch lands requests", @@ -111,6 +126,7 @@ func TestController_Process(t *testing.T) { mockStorage.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() return mockStorage }, + expectLogPublish: true, }, { name: "failed batch errors requests", @@ -142,6 +158,7 @@ func TestController_Process(t *testing.T) { mockStorage.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() return mockStorage }, + expectLogPublish: true, }, { name: "cancelled batch cancels requests", @@ -173,6 +190,74 @@ func TestController_Process(t *testing.T) { mockStorage.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() return mockStorage }, + expectLogPublish: true, + }, + { + name: "idempotent retry: request already in target terminal state still publishes log", + batch: entity.Batch{ + ID: "test-queue/batch/8", + Queue: "test-queue", + Contains: []string{"test-queue/20"}, + State: entity.BatchStateSucceeded, + Version: 2, + }, + setupStore: func(ctrl *gomock.Controller) *storagemock.MockStorage { + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/8").Return(entity.Batch{ + ID: "test-queue/batch/8", + Queue: "test-queue", + Contains: []string{"test-queue/20"}, + State: entity.BatchStateSucceeded, + Version: 2, + }, nil) + + // Request is already Landed (prior delivery wrote it). UpdateState + // must NOT be called — gomock will fail the test if it is. + mockRequestStore := storagemock.NewMockRequestStore(ctrl) + mockRequestStore.EXPECT().Get(gomock.Any(), "test-queue/20").Return(entity.Request{ + ID: "test-queue/20", Version: 7, State: entity.RequestStateLanded, + }, nil) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + mockStorage.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() + return mockStorage + }, + expectLogPublish: true, + }, + { + name: "divergent terminal state skips reconcile and log publish", + batch: entity.Batch{ + ID: "test-queue/batch/9", + Queue: "test-queue", + Contains: []string{"test-queue/30"}, + State: entity.BatchStateSucceeded, + Version: 2, + }, + setupStore: func(ctrl *gomock.Controller) *storagemock.MockStorage { + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/9").Return(entity.Batch{ + ID: "test-queue/batch/9", + Queue: "test-queue", + Contains: []string{"test-queue/30"}, + State: entity.BatchStateSucceeded, + Version: 2, + }, nil) + + // Request is already in a *different* terminal state (Cancelled). + // Conclude must not write the log entry (the other writer owns it), + // and must not attempt UpdateState. + mockRequestStore := storagemock.NewMockRequestStore(ctrl) + mockRequestStore.EXPECT().Get(gomock.Any(), "test-queue/30").Return(entity.Request{ + ID: "test-queue/30", Version: 5, State: entity.RequestStateCancelled, + }, nil) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + mockStorage.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() + return mockStorage + }, + expectLogPublish: false, }, { name: "non-terminal batch state returns error", @@ -201,7 +286,7 @@ func TestController_Process(t *testing.T) { retryable: false, }, { - name: "request store get failure is retryable", + name: "request store get failure returns error", batch: entity.Batch{ ID: "test-queue/batch/5", Queue: "test-queue", @@ -231,7 +316,7 @@ func TestController_Process(t *testing.T) { retryable: false, }, { - name: "request store update failure is retryable", + name: "request store update failure returns error", batch: entity.Batch{ ID: "test-queue/batch/6", Queue: "test-queue", @@ -296,7 +381,7 @@ func TestController_Process(t *testing.T) { mockStorage = tt.setupStore(ctrl) } - controller := newTestController(t, ctrl, mockStorage) + controller, _ := newTestController(t, ctrl, mockStorage, tt.expectLogPublish) msg := entityqueue.NewMessage(tt.batch.ID, batchIDPayload(t, tt.batch.ID), tt.batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -324,7 +409,7 @@ func TestController_Process_StorageFailure(t *testing.T) { mockStorage := storagemock.NewMockStorage(ctrl) mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() - controller := newTestController(t, ctrl, mockStorage) + controller, _ := newTestController(t, ctrl, mockStorage, false) msg := entityqueue.NewMessage("test-queue/batch/1", batchIDPayload(t, "test-queue/batch/1"), "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) @@ -338,7 +423,7 @@ func TestController_Process_StorageFailure(t *testing.T) { func TestController_InterfaceImplementation(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + controller, _ := newTestController(t, ctrl, nil, false) var _ consumer.Controller = controller }