Job queue service for the Gas ecosystem. Provides a gas.JobQueueProvider implementation
backed by AWS SQS, plus a test mock for use in unit tests.
go get github.com/gasmod/gas-queue| Backend | Package | Use case |
|---|---|---|
| SQS | github.com/gasmod/gas-queue/sqs |
Production (AWS SQS / ElasticMQ) |
The SQS backend implements gas.Service, gas.JobQueueProvider, and
gas.ReadyReporter (returns an error before Init and after Close so
callers can drain traffic during graceful shutdown).
package main
import (
"github.com/gasmod/gas"
queuesqs "github.com/gasmod/gas-queue/sqs"
)
func main() {
app := gas.NewApp(
gas.WithSingletonService[*queuesqs.Service](queuesqs.New()),
// ...
)
app.Run()
}With custom configuration:
cfg := queuesqs.DefaultConfig()
cfg.Queue.Region = "eu-west-1"
cfg.Queue.Endpoint = "http://localhost:9324" // ElasticMQ
queuesqs.New(queuesqs.WithConfig(cfg))With a pre-configured AWS client:
queuesqs.New(queuesqs.WithClient(mySQSClient))Services receive the queue through gas.JobQueueProvider via constructor injection:
type Service struct {
queue gas.JobQueueProvider
}
func New(queue gas.JobQueueProvider) *Service {
return &Service{queue: queue}
}
func (s *Service) Init() error {
ctx := context.Background()
_ = s.queue.Enqueue(ctx, "https://sqs.us-east-1.amazonaws.com/123/my-queue", []byte(`{"task":"run"}`))
return nil
}s.queue.Enqueue(ctx, queueURL, payload,
gas.WithDelay(10*time.Second), // initial visibility delay
gas.WithGroupID("order-123"), // FIFO ordering
gas.WithDedupeID("unique-id"), // deduplication
gas.WithJobAttributes(map[string]string{"env": "prod"}),
)for {
jobs, err := s.queue.Dequeue(ctx, queueURL, 10, 20*time.Second)
if err != nil {
log.Error("dequeue failed").Err("error", err).Send()
continue
}
for _, job := range jobs {
if err := process(job); err != nil {
_ = s.queue.Nack(ctx, queueURL, job) // make immediately available for retry
continue
}
_ = s.queue.Ack(ctx, queueURL, job) // remove from queue
}
}If WithConfig is not provided, the backend automatically binds configuration from the gas.ConfigProvider injected
via DI. This lets you drive queue settings from environment variables or a config file without any explicit wiring.
| Field | Default | Description |
|---|---|---|
Queue.Region |
us-east-1 |
AWS region |
Queue.Endpoint |
Custom endpoint URL (e.g. ElasticMQ); empty = default AWS | |
Queue.VisibilityTimeout |
30s |
How long a dequeued message stays invisible to other consumers |
Queue.WaitTimeSeconds |
20 |
Long-poll duration for ReceiveMessage (0-20, SQS hard limit) |
The root queue package defines a sentinel error used by all backends:
queue.ErrClosed // returned when an operation is attempted on a closed serviceThe queuetest package provides a mock implementation of gas.JobQueueProvider:
import "github.com/gasmod/gas-queue/queuetest"
mock := &queuetest.MockQueue{}
mock.EnqueueFn = func(ctx context.Context, queue string, payload []byte, opts ...gas.EnqueueOption) error {
return nil
}
// pass mock as gas.JobQueueProvider
// assert calls:
if mock.CallCount("Enqueue") != 1 {
t.Error("expected one Enqueue call")
}