Documentation
¶
Overview ¶
Package commandqueue provides a transport-agnostic interface for command queuing. Implementations include in-memory (for testing), PostgreSQL, SQLite, and NATS.
The Queue interface decouples command senders from processors, allowing the same domain logic to work across different infrastructure choices:
- Development: memqueue (zero infra)
- Single server: sqlitequeue (just a file)
- Multi server: pgqueue (just Postgres)
- Scale: natscommand (NATS JetStream)
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrQueueClosed is returned when operating on a closed queue. ErrQueueClosed = errors.New("commandqueue: queue closed") )
Functions ¶
This section is empty.
Types ¶
type Claim ¶
type Claim struct {
ID string
Command EnqueuedCommand
Attempt int
MaxAttempts int
// contains filtered or unexported fields
}
Claim represents a dequeued command that must be acknowledged or rejected.
func NewClaim ¶
func NewClaim(id string, cmd EnqueuedCommand, attempt, maxAttempts int, ackFn func(context.Context, *Result) error, nackFn func(context.Context, error) error) *Claim
NewClaim creates a Claim with the given ack and nack callbacks.
type EnqueuedCommand ¶
type EnqueuedCommand struct {
ID string // set by queue if empty
CommandName string
StreamID string
Data []byte
Metadata map[string]string
}
EnqueuedCommand represents a command to be processed.
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor dequeues commands from any Queue and routes them to a handler.
func NewProcessor ¶
func NewProcessor(cfg ProcessorConfig) *Processor
NewProcessor creates a Processor with the given configuration.
type ProcessorConfig ¶
type ProcessorConfig struct {
Queue Queue
Handler func(ctx context.Context, cmd EnqueuedCommand) (*Result, error)
Concurrency int // max concurrent handlers (default 1 for single-writer)
Logger *slog.Logger
}
ProcessorConfig configures a Processor.
type Queue ¶
type Queue interface {
// Enqueue adds a command to the queue and returns its assigned ID.
Enqueue(ctx context.Context, cmd EnqueuedCommand) (string, error)
// EnqueueAndWait enqueues a command and blocks until a result is available or timeout.
EnqueueAndWait(ctx context.Context, cmd EnqueuedCommand, timeout time.Duration) (*Result, error)
// Dequeue blocks until a command is available and returns a Claim.
Dequeue(ctx context.Context) (*Claim, error)
// Close shuts down the queue, unblocking any waiting Dequeue calls.
Close() error
}
Queue is the transport-agnostic interface for command queuing.