commandqueue

package module
v0.0.0-...-17eb08c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2026 License: MIT Imports: 5 Imported by: 0

Documentation

Overview

Package commandqueue provides a transport-agnostic interface for command queuing. Implementations include in-memory (for testing), PostgreSQL, SQLite, and NATS.

The Queue interface decouples command senders from processors, allowing the same domain logic to work across different infrastructure choices:

  • Development: memqueue (zero infra)
  • Single server: sqlitequeue (just a file)
  • Multi server: pgqueue (just Postgres)
  • Scale: natscommand (NATS JetStream)

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrQueueClosed is returned when operating on a closed queue.
	ErrQueueClosed = errors.New("commandqueue: queue closed")
)

Functions

This section is empty.

Types

type Claim

type Claim struct {
	ID          string
	Command     EnqueuedCommand
	Attempt     int
	MaxAttempts int
	// contains filtered or unexported fields
}

Claim represents a dequeued command that must be acknowledged or rejected.

func NewClaim

func NewClaim(id string, cmd EnqueuedCommand, attempt, maxAttempts int, ackFn func(context.Context, *Result) error, nackFn func(context.Context, error) error) *Claim

NewClaim creates a Claim with the given ack and nack callbacks.

func (*Claim) Ack

func (c *Claim) Ack(ctx context.Context, result *Result) error

Ack acknowledges successful processing of the command.

func (*Claim) Nack

func (c *Claim) Nack(ctx context.Context, reason error) error

Nack rejects the command, making it available for redelivery.

type EnqueuedCommand

type EnqueuedCommand struct {
	ID          string // set by queue if empty
	CommandName string
	StreamID    string
	Data        []byte
	Metadata    map[string]string
}

EnqueuedCommand represents a command to be processed.

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor dequeues commands from any Queue and routes them to a handler.

func NewProcessor

func NewProcessor(cfg ProcessorConfig) *Processor

NewProcessor creates a Processor with the given configuration.

func (*Processor) Start

func (p *Processor) Start(ctx context.Context) error

Start begins processing commands. It blocks until ctx is cancelled or Stop is called.

func (*Processor) Stop

func (p *Processor) Stop() error

Stop cancels processing and waits for in-flight handlers to finish.

type ProcessorConfig

type ProcessorConfig struct {
	Queue       Queue
	Handler     func(ctx context.Context, cmd EnqueuedCommand) (*Result, error)
	Concurrency int // max concurrent handlers (default 1 for single-writer)
	Logger      *slog.Logger
}

ProcessorConfig configures a Processor.

type Queue

type Queue interface {
	// Enqueue adds a command to the queue and returns its assigned ID.
	Enqueue(ctx context.Context, cmd EnqueuedCommand) (string, error)
	// EnqueueAndWait enqueues a command and blocks until a result is available or timeout.
	EnqueueAndWait(ctx context.Context, cmd EnqueuedCommand, timeout time.Duration) (*Result, error)
	// Dequeue blocks until a command is available and returns a Claim.
	Dequeue(ctx context.Context) (*Claim, error)
	// Close shuts down the queue, unblocking any waiting Dequeue calls.
	Close() error
}

Queue is the transport-agnostic interface for command queuing.

type Result

type Result struct {
	Success       bool
	Error         string
	StreamVersion int
	EventCount    int
	Data          []byte // optional serialized result data
}

Result holds the outcome of command processing.

Directories

Path Synopsis
Package memqueue provides an in-memory Queue implementation for testing.
Package memqueue provides an in-memory Queue implementation for testing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL