Documentation
¶
Overview ¶
Package rmq provides a production-ready RabbitMQ client library for Go with built-in reconnection, retry logic, dead letter queue support, and worker pools.
Features ¶
- Automatic reconnection with exponential backoff
- Message retry with exponential backoff and jitter (configurable range)
- Dead Letter Queue (DLQ) support with automated setup
- Worker pool for concurrent message processing
- Publisher confirms for reliable message delivery
- Channel pooling for efficient publishing
- Graceful shutdown handling
- Structured logging with log/slog
- Support for all RabbitMQ exchange types (direct, fanout, topic, headers)
Quick Start ¶
Create a client and connect to RabbitMQ:
client := rmq.NewClient(ctx, rmq.Config{
URL: "amqp://guest:guest@localhost:5672/",
ReconnectWait: time.Second,
MaxReconnectWait: 30 * time.Second,
}, logger)
defer client.Close()
if err := client.WaitUntilReady(); err != nil {
log.Fatal(err)
}
Publish messages:
producer := rmq.NewProducer(client, rmq.ProducerConfig{
PoolSize: 5,
UseConfirms: true,
}, logger)
defer producer.Close()
err := producer.Publish(ctx, []byte("Hello"), rmq.PublishOptions{
Exchange: "my-exchange",
RoutingKey: "my-key",
Persistent: true,
})
Consume messages:
consumer := rmq.NewConsumer(client, rmq.ConsumerConfig{
Queue: "my-queue",
Exchange: "my-exchange",
RoutingKey: "my-key",
MaxRetries: 3,
Workers: 5,
}, logger)
handler := func(ctx context.Context, msg *rmq.Message) error {
// Process message
return nil
}
if err := consumer.Start(handler, nil); err != nil {
log.Fatal(err)
}
defer consumer.Stop()
Retry Mechanism ¶
When a message fails to process, the library automatically retries it with exponential backoff (multiplier=2.0, ±25% jitter). Configure the delay range:
RetryInitialDelay: 500 * time.Millisecond, // first retry delay RetryMaxDelay: 30 * time.Second, // upper bound
Dead Letter Queue ¶
The library automatically sets up DLQ infrastructure. Process failed messages:
dlqProcessor := rmq.NewDLQProcessor(client, rmq.DLQProcessorConfig{
Queue: "my-queue.dlq",
RetryQueue: "my-queue",
}, logger)
dlqHandler := func(ctx context.Context, msg *rmq.Message) (rmq.DLQAction, error) {
// Inspect and decide: Retry, Discard, or Keep
return rmq.DLQActionRetry, nil
}
dlqProcessor.Start(dlqHandler)
defer dlqProcessor.Stop()
Connection Management ¶
The client automatically handles connection failures with exponential backoff and reconnects transparently. Consumers and producers resume operation after reconnection without manual intervention.
For more details and examples, see: https://github.com/xorfall/rmq
Index ¶
Constants ¶
const ( HeaderRetryCount = "x-retry-count" HeaderFirstError = "x-first-error" HeaderLastError = "x-last-error" HeaderErrorMsg = "x-error-message" )
Header keys for retry tracking
Variables ¶
var ( // ErrClientClosed is returned when operations are attempted on a closed client. ErrClientClosed = errors.New("rabbitmq client is closed") // ErrNotConnected is returned when operations require an active connection but none exists. ErrNotConnected = errors.New("not connected to rabbitmq") )
Common errors returned by the client.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client manages a persistent connection to RabbitMQ with automatic reconnection. It handles connection lifecycle, exponential backoff on failures, and provides thread-safe access to the underlying connection for producers and consumers.
func NewClient ¶
NewClient creates a new RabbitMQ client and starts the connection loop in the background. The client will automatically attempt to connect and reconnect with exponential backoff. Use WaitUntilReady() to block until the first successful connection is established.
The provided context controls the client's lifecycle. Canceling it will trigger graceful shutdown. If logger is nil, the default slog logger will be used.
func (*Client) Close ¶
Close gracefully shuts down the client, stopping the connection loop and closing the active connection. It blocks until all background goroutines have exited. Consumers and producers using this client should be stopped before calling Close.
func (*Client) Connection ¶
func (c *Client) Connection() *amqp091.Connection
Connection returns the current active connection, or nil if not connected. This method is thread-safe and primarily used internally by Producer and Consumer.
func (*Client) Context ¶
Context returns the client's context, which is canceled when Close() is called. Consumers and producers use this context to detect shutdown.
func (*Client) IsReady ¶
IsReady returns true if the client is currently connected to RabbitMQ. This is a non-blocking check of the connection state.
func (*Client) WaitUntilReady ¶
WaitUntilReady blocks until the client has successfully connected to RabbitMQ or until the client is closed. Returns ErrClientClosed if the client shuts down before establishing a connection.
This method is safe to call concurrently from multiple goroutines.
type Config ¶
type Config struct {
// URL is the AMQP connection string (e.g., "amqp://user:pass@host:port/vhost").
URL string
// ReconnectWait is the initial wait time before attempting reconnection.
// Default: 1 second.
ReconnectWait time.Duration
// MaxReconnectWait is the maximum wait time between reconnection attempts.
// The backoff will not exceed this value. Default: 60 seconds.
MaxReconnectWait time.Duration
// DialTimeout is the timeout for establishing a new connection.
// Default: 5 seconds.
DialTimeout time.Duration
}
Config holds RabbitMQ client configuration.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer handles message consumption with automatic retry, DLQ support, and worker pools. It manages the queue topology (exchanges, queues, bindings), implements retry logic with configurable backoff, and automatically sends failed messages to the dead letter queue.
func NewConsumer ¶
func NewConsumer(client *Client, cfg ConsumerConfig, logger *slog.Logger) *Consumer
NewConsumer creates a new Consumer instance. The consumer will automatically set up the required topology (exchanges, queues, and DLQ) when Start() is called. If logger is nil, the default slog logger will be used.
func (*Consumer) Start ¶
func (c *Consumer) Start(handler Handler, errorHandler ErrorHandler) error
Start begins consuming messages from the configured queue. It sets up the queue topology (exchange, queue, DLQ, bindings), starts worker goroutines, and begins processing messages.
The handler function is called for each message. Return nil for success, or an error to trigger retry logic. The errorHandler (optional) is called when a message fails permanently after exhausting all retries, just before being sent to the DLQ.
Start is non-blocking and returns immediately after launching background workers. Use Stop() to gracefully shut down the consumer.
type ConsumerConfig ¶
type ConsumerConfig struct {
// Queue settings
Queue string
Exchange string
RoutingKey string
// Exchange type: direct, fanout, topic, headers
// Default: direct
ExchangeType string
// Prefetch (QoS)
PrefetchCount int
// Retry settings
MaxRetries int
RetryInitialDelay time.Duration // First retry delay (default: 1s)
RetryMaxDelay time.Duration // Maximum retry delay (default: 30s)
// Dead letter settings
DeadLetterExchange string
DeadLetterRoutingKey string
// Timeouts
ProcessTimeout time.Duration
// Concurrency
Workers int
}
ConsumerConfig holds configuration options for a Consumer. Use setDefaults() to populate zero values with sensible defaults.
type DLQAction ¶
type DLQAction int
DLQAction represents the action to take for a dead-lettered message.
const ( // DLQActionDiscard permanently removes the message from the DLQ. // Use this for invalid messages that cannot be fixed. DLQActionDiscard DLQAction = iota // DLQActionRetry requeues the message to the original queue with retry count reset. // Use this after fixing the underlying issue that caused the message to fail. // The message will get a fresh start with retry count = 0. DLQActionRetry // DLQActionKeep leaves the message in the DLQ for later processing or manual inspection. // The message is nacked without requeue and remains in the DLQ. DLQActionKeep )
type DLQHandler ¶
DLQHandler processes dead-lettered messages and decides their fate. Inspect the message and its error metadata (available in msg.Headers) to determine whether to retry, discard, or keep the message in the DLQ.
Return one of DLQActionRetry, DLQActionDiscard, or DLQActionKeep. Returning an error will keep the message in the DLQ (equivalent to DLQActionKeep).
type DLQProcessor ¶
type DLQProcessor struct {
// contains filtered or unexported fields
}
DLQProcessor handles processing of messages in the dead letter queue. It consumes messages from the DLQ and invokes a handler to decide whether to retry, discard, or keep each message.
func NewDLQProcessor ¶
func NewDLQProcessor(client *Client, cfg DLQProcessorConfig, logger *slog.Logger) *DLQProcessor
NewDLQProcessor creates a new DLQ processor instance. If logger is nil, the default slog logger will be used.
func (*DLQProcessor) Start ¶
func (p *DLQProcessor) Start(handler DLQHandler) error
Start begins processing messages from the DLQ. If handler is nil, a default handler that keeps all messages will be used. This is non-blocking and returns immediately after starting the background processor.
func (*DLQProcessor) Stop ¶
func (p *DLQProcessor) Stop()
Stop gracefully stops the DLQ processor and waits for it to finish. This blocks until the processor has fully stopped.
type DLQProcessorConfig ¶
type DLQProcessorConfig struct {
// Queue is the name of the dead letter queue to consume from.
// Example: "my-queue.dlq"
Queue string
// RetryQueue is the name of the original queue to send messages to when retrying.
// Example: "my-queue"
RetryQueue string
// PrefetchCount is the number of messages to prefetch from the DLQ.
// Default: 5.
PrefetchCount int
// ProcessTimeout is the timeout for processing a single DLQ message.
// Default: 30 seconds.
ProcessTimeout time.Duration
}
DLQProcessorConfig holds configuration for the DLQ processor.
type ErrorHandler ¶
ErrorHandler is called when message processing fails permanently after exhausting all retries. This callback is invoked just before the message is sent to the dead letter queue. Use this for logging, alerting, or performing cleanup actions for permanently failed messages.
type Handler ¶
Handler is the function signature for processing consumed messages. Return nil for successful processing, or an error to trigger retry logic. The context may be canceled if the consumer is shutting down or the process timeout is reached.
type Message ¶
type Message struct {
// Raw body
Body []byte
// Queue info
Queue string
RoutingKey string
ContentType string
MessageID string
Timestamp time.Time
// Headers
Headers map[string]interface{}
// Retry info (extracted from headers)
RetryCount int
FirstError time.Time
}
Message represents a consumed RabbitMQ message with metadata and helper fields. It provides access to the message body, headers, routing information, and retry tracking.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer handles message publishing with channel pooling and optional publisher confirms. It maintains a pool of channels for efficient concurrent publishing and automatically handles channel lifecycle and recovery.
func NewProducer ¶
func NewProducer(client *Client, cfg ProducerConfig, logger *slog.Logger) *Producer
NewProducer creates a new Producer instance with channel pooling. The producer will use the provided client's connection and automatically handle channel lifecycle. If logger is nil, the default slog logger will be used.
func (*Producer) Close ¶
func (p *Producer) Close()
Close closes all pooled channels and releases resources. After calling Close, the Producer should not be used.
func (*Producer) Publish ¶
Publish sends a message to RabbitMQ using the specified options. If UseConfirms is enabled, this method blocks until the broker acknowledges the message. The context can be used to set a deadline or cancel the publish operation.
Returns an error if publishing fails or if publisher confirms are enabled and the message is not acknowledged by the broker.
func (*Producer) PublishBatch ¶
func (p *Producer) PublishBatch(ctx context.Context, messages []struct { Body []byte Options PublishOptions }) error
PublishBatch sends multiple messages efficiently using a single channel. This is more efficient than calling Publish multiple times when you need to send many messages at once. Publisher confirms are not currently supported for batch operations.
Returns an error on the first failed publish, leaving subsequent messages unpublished.
type ProducerConfig ¶
type ProducerConfig struct {
// PoolSize is the number of channels to maintain in the pool.
// Default: 5.
PoolSize int
// UseConfirms enables publisher confirms for reliable delivery.
// When enabled, Publish() will wait for broker acknowledgment.
// Default: false.
UseConfirms bool
}
ProducerConfig holds producer configuration options.