rmq

package module
v0.0.0-...-7b1f241 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2026 License: MIT Imports: 9 Imported by: 0

README

RabbitMQ Go Library

A Client library for Go with built-in reconnection, retry logic, dead letter queue support, and worker pools.

Features

  • Automatic reconnection with exponential backoff
  • Message retry with configurable backoff strategies
  • Dead Letter Queue (DLQ) support
  • Worker pool for concurrent message processing
  • Publisher confirms for reliable message delivery
  • Channel pooling for efficient publishing
  • Graceful shutdown handling
  • Structured logging with log/slog

Installation

go get github.com/xorfall/rmq

Usage

Basic Setup
package main

import (
    "context"
    "log/slog"
    "os"
    "time"

    "github.com/xorfall/rmq"
)

func main() {
    // Setup logger
    logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
        Level: slog.LevelInfo,
    }))

    ctx := context.Background()

    // Create RabbitMQ client
    client := rmq.NewClient(ctx, rmq.Config{
        URL:              "amqp://guest:guest@localhost:5672/",
        ReconnectWait:    time.Second,
        MaxReconnectWait: 30 * time.Second,
        DialTimeout:      5 * time.Second,
    }, logger)
    defer client.Close()

    // Wait for connection
    if err := client.WaitUntilReady(); err != nil {
        logger.Error("failed to connect", "error", err)
        os.Exit(1)
    }
}
Publishing Messages
// Create producer
producer := rmq.NewProducer(client, rmq.ProducerConfig{
    PoolSize:    5,
    UseConfirms: true,
}, logger)
defer producer.Close()

// Publish a message
err := producer.Publish(ctx, []byte("Hello World"), rmq.PublishOptions{
    Exchange:   "my-exchange",
    RoutingKey: "my-routing-key",
    Persistent: true,
    MessageID:  "msg-123",
})
Consuming Messages
// Create consumer
consumer := rmq.NewConsumer(client, rmq.ConsumerConfig{
    Queue:         "my-queue",
    Exchange:      "my-exchange",
    RoutingKey:    "my-routing-key",
    PrefetchCount: 10,
    MaxRetries:    3,
    RetryBackoff: []time.Duration{
        1 * time.Second,
        5 * time.Second,
        15 * time.Second,
    },
    ProcessTimeout: 30 * time.Second,
    Workers:        3,
}, logger)

// Define message handler
handler := func(ctx context.Context, msg *rmq.Message) error {
    logger.Info("processing message", "id", msg.MessageID)
    // Process your message here
    return nil
}

// Define error handler (called when message fails after all retries)
errorHandler := func(ctx context.Context, msg *rmq.Message, err error) {
    logger.Error("message failed permanently", "id", msg.MessageID, "error", err)
}

// Start consumer
if err := consumer.Start(handler, errorHandler); err != nil {
    logger.Error("failed to start consumer", "error", err)
    os.Exit(1)
}
defer consumer.Stop()
Exchange Types

The library supports all RabbitMQ exchange types: direct, fanout, topic, and headers. You can configure the exchange type using the ExchangeType field in ConsumerConfig.

Direct Exchange (Default)

Routes messages to queues based on exact routing key match. Best for point-to-point communication.

consumer := rmq.NewConsumer(client, rmq.ConsumerConfig{
    Queue:        "orders",
    Exchange:     "orders",
    ExchangeType: "direct",  // default
    RoutingKey:   "order.create",
}, logger)

Use cases: Task distribution, command routing, specific event handling

Fanout Exchange

Broadcasts messages to all bound queues, ignoring routing keys. Best for pub/sub patterns.

consumer := rmq.NewConsumer(client, rmq.ConsumerConfig{
    Queue:        "notifications.email",
    Exchange:     "notifications",
    ExchangeType: "fanout",
    RoutingKey:   "",  // ignored for fanout
}, logger)

Use cases: Broadcasting notifications, cache invalidation, event replication

See examples/fanout for a complete example.

Topic Exchange

Routes messages based on wildcard pattern matching on routing keys. Best for flexible routing patterns.

Wildcard patterns:

  • * matches exactly one word
  • # matches zero or more words
// Consumer 1: All error logs from any service
consumer := rmq.NewConsumer(client, rmq.ConsumerConfig{
    Queue:        "logs.errors",
    Exchange:     "logs",
    ExchangeType: "topic",
    RoutingKey:   "*.error",  // matches: auth.error, payment.error, etc.
}, logger)

// Consumer 2: All logs from auth service
consumer := rmq.NewConsumer(client, rmq.ConsumerConfig{
    Queue:        "logs.auth",
    Exchange:     "logs",
    ExchangeType: "topic",
    RoutingKey:   "auth.*",  // matches: auth.info, auth.error, auth.warning
}, logger)

// Consumer 3: All logs
consumer := rmq.NewConsumer(client, rmq.ConsumerConfig{
    Queue:        "logs.all",
    Exchange:     "logs",
    ExchangeType: "topic",
    RoutingKey:   "#",  // matches everything
}, logger)

Use cases: Log routing, hierarchical data, multi-criteria routing

See examples/topic for a complete example.

Headers Exchange

Routes messages based on header attributes instead of routing keys. Best for complex routing logic.

consumer := rmq.NewConsumer(client, rmq.ConsumerConfig{
    Queue:        "premium.orders",
    Exchange:     "orders",
    ExchangeType: "headers",
}, logger)

// When publishing, use headers for routing
producer.Publish(ctx, data, rmq.PublishOptions{
    Exchange: "orders",
    Headers: map[string]interface{}{
        "customer-tier": "premium",
        "region":        "us-east",
    },
})

Use cases: Complex routing conditions, attribute-based routing, multi-dimensional filtering

Note: Headers exchange requires manual queue binding with arguments via RabbitMQ management or AMQP commands, as Go's QueueBind doesn't support binding arguments directly.

Processing Dead Letter Queue (DLQ)

The library provides a DLQ processor to handle messages that have permanently failed. This allows you to:

  • Inspect failed messages
  • Retry messages after manual fixes
  • Discard invalid messages
  • Keep suspicious messages for investigation
// Create DLQ processor
dlqProcessor := rmq.NewDLQProcessor(client, rmq.DLQProcessorConfig{
    Queue:          "my-queue.dlq",      // DLQ name
    RetryQueue:     "my-queue",          // Original queue to retry to
    PrefetchCount:  5,
    ProcessTimeout: 30 * time.Second,
}, logger)

// Define DLQ handler - decides what to do with each failed message
dlqHandler := func(ctx context.Context, msg *rmq.Message) (rmq.DLQAction, error) {
    logger.Info("processing DLQ message", "id", msg.MessageID, "retry_count", msg.RetryCount)

    // Access error information from headers
    var lastError string
    if msg.Headers != nil {
        if err, ok := msg.Headers[rmq.HeaderErrorMsg].(string); ok {
            lastError = err
        }
    }

    // Decide action based on your business logic
    if strings.Contains(lastError, "temporary") {
        // Retry if it was a temporary error
        return rmq.DLQActionRetry, nil
    } else if strings.Contains(lastError, "invalid format") {
        // Discard messages with permanent errors
        return rmq.DLQActionDiscard, nil
    }

    // Keep in DLQ for manual review
    return rmq.DLQActionKeep, nil
}

// Start DLQ processor
if err := dlqProcessor.Start(dlqHandler); err != nil {
    logger.Error("failed to start DLQ processor", "error", err)
    os.Exit(1)
}
defer dlqProcessor.Stop()
DLQ Actions

The DLQ handler can return three different actions:

  • DLQActionRetry: Requeues the message to the original queue with retry count reset

    • Use when: The issue has been fixed (e.g., external service is back online)
    • The message will be processed fresh, starting from retry count 0
    • Adds x-dlq-reprocessed: true header to track it was reprocessed from DLQ
  • DLQActionDiscard: Permanently removes the message from DLQ

    • Use when: The message is invalid and cannot be processed
    • Message is acknowledged and deleted
    • Make sure to log important information before discarding
  • DLQActionKeep: Leaves the message in the DLQ

    • Use when: Manual investigation is needed
    • Message is nacked without requeue
    • Message stays in DLQ for later processing

Configuration

Client Config
  • URL: RabbitMQ connection URL (e.g., "amqp://user:pass@host:port/vhost")
  • ReconnectWait: Initial wait time before reconnection (default: 1s)
  • MaxReconnectWait: Maximum wait time between reconnection attempts (default: 60s)
  • DialTimeout: Timeout for establishing connection (default: 5s)
Producer Config
  • PoolSize: Number of channels in the pool (default: 5)
  • UseConfirms: Enable publisher confirms for reliability (default: false)
Consumer Config
  • Queue: Queue name to consume from
  • Exchange: Exchange to bind the queue to
  • RoutingKey: Routing key for queue binding
  • ExchangeType: Exchange type - "direct", "fanout", "topic", or "headers" (default: "direct")
  • PrefetchCount: Number of messages to prefetch (default: 10)
  • MaxRetries: Maximum retry attempts before sending to DLQ (default: 3)
  • RetryBackoff: Backoff duration for each retry attempt
  • DeadLetterExchange: DLX name (default: "{Exchange}.dlx")
  • DeadLetterRoutingKey: DLQ routing key (default: "{Queue}.dead")
  • ProcessTimeout: Timeout for processing a single message (default: 30s)
  • Workers: Number of concurrent workers (default: 1)
Retry Backoff Patterns

The library supports flexible retry backoff patterns. You can either specify delays manually or use built-in helper functions:

Manual Array (Full Control):

RetryBackoff: []time.Duration{1*time.Second, 5*time.Second, 15*time.Second}

Exponential Backoff:

RetryBackoff: rmq.ExponentialBackoff(
    1*time.Second,   // initial delay
    2.0,             // multiplier
    30*time.Second,  // max delay
    5,               // retry count
)
// Results: [1s, 2s, 4s, 8s, 16s]

Exponential with Jitter (Recommended for Distributed Systems):

RetryBackoff: rmq.ExponentialBackoffWithJitter(
    1*time.Second,   // initial delay
    2.0,             // multiplier
    30*time.Second,  // max delay
    5,               // retry count
    0.3,             // 30% jitter
)
// Results: [~0.7-1.3s, ~1.4-2.6s, ~2.8-5.2s, ~5.6-10.4s, ~11.2-20.8s]

Why Jitter? Jitter adds randomness to retry delays, preventing the "thundering herd" problem where many failed messages retry at exactly the same time, potentially overwhelming the system again. Recommended for production systems with multiple consumers.

Linear Backoff:

RetryBackoff: rmq.LinearBackoff(1*time.Second, 2*time.Second, 5)
// Results: [1s, 3s, 5s, 7s, 9s]

Constant Backoff:

RetryBackoff: rmq.ConstantBackoff(5*time.Second, 3)
// Results: [5s, 5s, 5s]
DLQ Processor Config
  • Queue: DLQ name to process (e.g., "my-queue.dlq")
  • RetryQueue: Original queue name to retry messages to (e.g., "my-queue")
  • PrefetchCount: Number of messages to prefetch from DLQ (default: 5)
  • ProcessTimeout: Timeout for processing a single DLQ message (default: 30s)

Examples

The library includes four comprehensive examples:

1. Basic Example

See examples/basic for a complete example showing:

  • Basic producer and consumer setup
  • Message retry with backoff
  • Error handling
  • Graceful shutdown
# Start RabbitMQ (using Docker)
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

# Run the basic example
cd examples/basic
go run main.go
2. DLQ Example

See examples/dlq for a complete example demonstrating:

  • Messages failing and going to DLQ
  • DLQ processor handling dead-lettered messages
  • Different DLQ actions (Retry, Discard, Keep)
  • Error metadata tracking
  • Conditional message reprocessing
# Start RabbitMQ (using Docker)
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

# Run the DLQ example
cd examples/dlq
go run main.go

What the DLQ example demonstrates:

  • PAY-001: Processes successfully
  • PAY-002: Fails first time, succeeds on retry (temporary failure)
  • PAY-003: Goes to DLQ after retries, then gets retried from DLQ
  • PAY-004: Goes to DLQ and stays there (fraud case)
  • PAY-005: Processes successfully
3. Topic Exchange Example

See examples/topic for a complete example demonstrating:

  • Topic exchange with wildcard routing patterns
  • Multiple consumers with different routing key patterns
  • Pattern matching with * (one word) and # (zero or more words)
  • Log routing by service and level
# Run the topic example
cd examples/topic
go run main.go
4. Fanout Exchange Example

See examples/fanout for a complete example demonstrating:

  • Fanout exchange broadcasting to all queues
  • Multiple consumers receiving the same messages
  • Notification delivery via different channels (email, SMS, push)
# Run the fanout example
cd examples/fanout
go run main.go

How It Works

Retry Mechanism

When a message fails to process:

  1. The library checks if max retries has been reached
  2. If not, it republishes the message with an incremented retry count
  3. A backoff delay is applied before the message is reprocessed
  4. Retry metadata is stored in message headers
Dead Letter Queue (DLQ)

The Dead Letter Queue (DLQ) is a safety mechanism for handling messages that cannot be processed successfully after multiple retry attempts.

Message Flow

Here's how a message flows through the system when it encounters errors:

1. Message published to queue
   ↓
2. Consumer attempts to process
   ↓
3. Processing fails → Error returned
   ↓
4. [RETRY LOGIC]
   - Check if retry count < MaxRetries
   - If YES: Apply backoff delay → Republish to same queue (with incremented retry count)
   - If NO: Proceed to step 5
   ↓
5. Max retries exhausted
   ↓
6. ErrorHandler callback invoked (for logging, alerting, etc.)
   ↓
7. Message published to Dead Letter Exchange (DLX)
   ↓
8. Message routed to Dead Letter Queue (DLQ)
   ↓
9. DLQ Processor handles the message (optional)
   - Inspect message and error details
   - Decide action: Retry, Discard, or Keep
Automatic DLQ Setup

When you create a consumer, the library automatically:

  1. Creates Dead Letter Exchange (DLX)

    • Name: {Exchange}.dlx (e.g., "orders.dlx")
    • Type: Direct exchange
    • Durable: Yes
  2. Creates Dead Letter Queue

    • Name: {Queue}.dlq (e.g., "orders.process.dlq")
    • Durable: Yes
    • Bound to DLX with routing key: {Queue}.dead
  3. Configures Main Queue with DLX

    • Sets x-dead-letter-exchange argument
    • Sets x-dead-letter-routing-key argument
    • Messages that are rejected/nacked go to DLX
Message Headers in DLQ

Messages in the DLQ contain helpful metadata in their headers:

  • x-retry-count (int32): Number of times the message was retried
  • x-first-error (int64): Unix timestamp of first failure
  • x-last-error (int64): Unix timestamp of last failure
  • x-error-message (string): The error message from the last failure
  • x-dlq-reprocessed (bool): Set to true when retried from DLQ
  • x-dlq-reprocessed-at (int64): Unix timestamp when reprocessed from DLQ

Access these in your handler:

func dlqHandler(ctx context.Context, msg *rmq.Message) (rmq.DLQAction, error) {
    // Access retry count
    retryCount := msg.RetryCount

    // Access error information
    var errorMsg string
    if msg.Headers != nil {
        if err, ok := msg.Headers[rmq.HeaderErrorMsg].(string); ok {
            errorMsg = err
        }
    }

    // Access timestamps
    firstError := msg.FirstError // time.Time

    // Make decision based on this data
    // ...
}
DLQ Processing Strategies

1. Immediate Retry - Retry as soon as issue is resolved

// After deploying a bug fix, retry all messages
return rmq.DLQActionRetry, nil

2. Conditional Retry - Retry based on error type

if strings.Contains(errorMsg, "timeout") || strings.Contains(errorMsg, "unavailable") {
    // External service might be back up
    return rmq.DLQActionRetry, nil
}
return rmq.DLQActionKeep, nil

3. Time-based Retry - Retry after certain time period

if time.Since(msg.FirstError) > 24*time.Hour {
    // It's been a day, the issue might be resolved
    return rmq.DLQActionRetry, nil
}
return rmq.DLQActionKeep, nil

4. Alert and Keep - Notify team and keep for investigation

if msg.RetryCount > 5 {
    // Send alert to ops team
    sendAlert(msg)
    return rmq.DLQActionKeep, nil
}
return rmq.DLQActionDiscard, nil

5. Selective Discard - Remove messages that can't be fixed

// Parse message to check if it's fixable
var payment Payment
json.Unmarshal(msg.Body, &payment)

if payment.IsValid() {
    return rmq.DLQActionRetry, nil
}
// Invalid data, cannot be fixed
return rmq.DLQActionDiscard, nil
Best Practices
  1. Always Log Before Discarding

    if action == rmq.DLQActionDiscard {
        logger.Error("discarding message",
            "message_id", msg.MessageID,
            "body", string(msg.Body),
            "error", errorMsg)
    }
    return rmq.DLQActionDiscard, nil
    
  2. Set Alerts for DLQ Growth

    • Monitor DLQ depth
    • Alert when messages accumulate
    • Investigate root causes
  3. Implement DLQ Processor Carefully

    • Start with DLQActionKeep as default
    • Only auto-retry when you're confident
    • Avoid infinite retry loops
  4. Track Reprocessed Messages

    • Check for x-dlq-reprocessed header
    • Limit number of DLQ reprocessing attempts
    • Prevent messages from bouncing between queue and DLQ
  5. Use ErrorHandler for Immediate Actions

    errorHandler := func(ctx context.Context, msg *rmq.Message, err error) {
        // Send immediate alert
        alerting.Send("Message failed permanently", msg.MessageID)
    
        // Update database
        db.UpdateStatus(msg.MessageID, "failed")
    
        // Create support ticket
        ticketing.Create(msg, err)
    }
    
Automatic Reconnection

The client automatically handles connection failures:

  1. Detects connection loss
  2. Attempts reconnection with exponential backoff
  3. Consumers automatically resume consuming after reconnection
  4. In-flight messages are handled gracefully

Project Structure

.
├── client.go           # RabbitMQ client with reconnection
├── consumer.go         # Consumer with retry and DLQ
├── producer.go         # Producer with channel pooling
├── types.go            # Shared types and configs
├── dlq_processor.go    # DLQ processing utilities
├── examples/
│   ├── basic/
│   │   └── main.go     # Basic producer/consumer example
│   ├── dlq/
│   │   └── main.go     # DLQ processing example
│   ├── topic/
│   │   └── main.go     # Topic exchange example
│   └── fanout/
│       └── main.go     # Fanout exchange example
└── README.md

License

MIT

Documentation

Overview

Package rmq provides a production-ready RabbitMQ client library for Go with built-in reconnection, retry logic, dead letter queue support, and worker pools.

Features

  • Automatic reconnection with exponential backoff
  • Message retry with exponential backoff and jitter (configurable range)
  • Dead Letter Queue (DLQ) support with automated setup
  • Worker pool for concurrent message processing
  • Publisher confirms for reliable message delivery
  • Channel pooling for efficient publishing
  • Graceful shutdown handling
  • Structured logging with log/slog
  • Support for all RabbitMQ exchange types (direct, fanout, topic, headers)

Quick Start

Create a client and connect to RabbitMQ:

client := rmq.NewClient(ctx, rmq.Config{
    URL:              "amqp://guest:guest@localhost:5672/",
    ReconnectWait:    time.Second,
    MaxReconnectWait: 30 * time.Second,
}, logger)
defer client.Close()

if err := client.WaitUntilReady(); err != nil {
    log.Fatal(err)
}

Publish messages:

producer := rmq.NewProducer(client, rmq.ProducerConfig{
    PoolSize:    5,
    UseConfirms: true,
}, logger)
defer producer.Close()

err := producer.Publish(ctx, []byte("Hello"), rmq.PublishOptions{
    Exchange:   "my-exchange",
    RoutingKey: "my-key",
    Persistent: true,
})

Consume messages:

consumer := rmq.NewConsumer(client, rmq.ConsumerConfig{
    Queue:         "my-queue",
    Exchange:      "my-exchange",
    RoutingKey:    "my-key",
    MaxRetries:    3,
    Workers:       5,
}, logger)

handler := func(ctx context.Context, msg *rmq.Message) error {
    // Process message
    return nil
}

if err := consumer.Start(handler, nil); err != nil {
    log.Fatal(err)
}
defer consumer.Stop()

Retry Mechanism

When a message fails to process, the library automatically retries it with exponential backoff (multiplier=2.0, ±25% jitter). Configure the delay range:

RetryInitialDelay: 500 * time.Millisecond, // first retry delay
RetryMaxDelay:     30 * time.Second,        // upper bound

Dead Letter Queue

The library automatically sets up DLQ infrastructure. Process failed messages:

dlqProcessor := rmq.NewDLQProcessor(client, rmq.DLQProcessorConfig{
    Queue:      "my-queue.dlq",
    RetryQueue: "my-queue",
}, logger)

dlqHandler := func(ctx context.Context, msg *rmq.Message) (rmq.DLQAction, error) {
    // Inspect and decide: Retry, Discard, or Keep
    return rmq.DLQActionRetry, nil
}

dlqProcessor.Start(dlqHandler)
defer dlqProcessor.Stop()

Connection Management

The client automatically handles connection failures with exponential backoff and reconnects transparently. Consumers and producers resume operation after reconnection without manual intervention.

For more details and examples, see: https://github.com/xorfall/rmq

Index

Constants

View Source
const (
	HeaderRetryCount = "x-retry-count"
	HeaderFirstError = "x-first-error"
	HeaderLastError  = "x-last-error"
	HeaderErrorMsg   = "x-error-message"
)

Header keys for retry tracking

Variables

View Source
var (
	// ErrClientClosed is returned when operations are attempted on a closed client.
	ErrClientClosed = errors.New("rabbitmq client is closed")

	// ErrNotConnected is returned when operations require an active connection but none exists.
	ErrNotConnected = errors.New("not connected to rabbitmq")
)

Common errors returned by the client.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client manages a persistent connection to RabbitMQ with automatic reconnection. It handles connection lifecycle, exponential backoff on failures, and provides thread-safe access to the underlying connection for producers and consumers.

func NewClient

func NewClient(ctx context.Context, cfg Config, logger *slog.Logger) *Client

NewClient creates a new RabbitMQ client and starts the connection loop in the background. The client will automatically attempt to connect and reconnect with exponential backoff. Use WaitUntilReady() to block until the first successful connection is established.

The provided context controls the client's lifecycle. Canceling it will trigger graceful shutdown. If logger is nil, the default slog logger will be used.

func (*Client) Close

func (c *Client) Close() error

Close gracefully shuts down the client, stopping the connection loop and closing the active connection. It blocks until all background goroutines have exited. Consumers and producers using this client should be stopped before calling Close.

func (*Client) Connection

func (c *Client) Connection() *amqp091.Connection

Connection returns the current active connection, or nil if not connected. This method is thread-safe and primarily used internally by Producer and Consumer.

func (*Client) Context

func (c *Client) Context() context.Context

Context returns the client's context, which is canceled when Close() is called. Consumers and producers use this context to detect shutdown.

func (*Client) IsReady

func (c *Client) IsReady() bool

IsReady returns true if the client is currently connected to RabbitMQ. This is a non-blocking check of the connection state.

func (*Client) WaitUntilReady

func (c *Client) WaitUntilReady() error

WaitUntilReady blocks until the client has successfully connected to RabbitMQ or until the client is closed. Returns ErrClientClosed if the client shuts down before establishing a connection.

This method is safe to call concurrently from multiple goroutines.

type Config

type Config struct {
	// URL is the AMQP connection string (e.g., "amqp://user:pass@host:port/vhost").
	URL string

	// ReconnectWait is the initial wait time before attempting reconnection.
	// Default: 1 second.
	ReconnectWait time.Duration

	// MaxReconnectWait is the maximum wait time between reconnection attempts.
	// The backoff will not exceed this value. Default: 60 seconds.
	MaxReconnectWait time.Duration

	// DialTimeout is the timeout for establishing a new connection.
	// Default: 5 seconds.
	DialTimeout time.Duration
}

Config holds RabbitMQ client configuration.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer handles message consumption with automatic retry, DLQ support, and worker pools. It manages the queue topology (exchanges, queues, bindings), implements retry logic with configurable backoff, and automatically sends failed messages to the dead letter queue.

func NewConsumer

func NewConsumer(client *Client, cfg ConsumerConfig, logger *slog.Logger) *Consumer

NewConsumer creates a new Consumer instance. The consumer will automatically set up the required topology (exchanges, queues, and DLQ) when Start() is called. If logger is nil, the default slog logger will be used.

func (*Consumer) Start

func (c *Consumer) Start(handler Handler, errorHandler ErrorHandler) error

Start begins consuming messages from the configured queue. It sets up the queue topology (exchange, queue, DLQ, bindings), starts worker goroutines, and begins processing messages.

The handler function is called for each message. Return nil for success, or an error to trigger retry logic. The errorHandler (optional) is called when a message fails permanently after exhausting all retries, just before being sent to the DLQ.

Start is non-blocking and returns immediately after launching background workers. Use Stop() to gracefully shut down the consumer.

func (*Consumer) Stop

func (c *Consumer) Stop()

Stop gracefully stops the consumer, waiting for all worker goroutines to finish processing their current messages. This blocks until all workers have exited. In-flight messages will be nacked and requeued if not processed before shutdown.

type ConsumerConfig

type ConsumerConfig struct {
	// Queue settings
	Queue      string
	Exchange   string
	RoutingKey string

	// Exchange type: direct, fanout, topic, headers
	// Default: direct
	ExchangeType string

	// Prefetch (QoS)
	PrefetchCount int

	// Retry settings
	MaxRetries        int
	RetryInitialDelay time.Duration // First retry delay (default: 1s)
	RetryMaxDelay     time.Duration // Maximum retry delay (default: 30s)

	// Dead letter settings
	DeadLetterExchange   string
	DeadLetterRoutingKey string

	// Timeouts
	ProcessTimeout time.Duration

	// Concurrency
	Workers int
}

ConsumerConfig holds configuration options for a Consumer. Use setDefaults() to populate zero values with sensible defaults.

type DLQAction

type DLQAction int

DLQAction represents the action to take for a dead-lettered message.

const (
	// DLQActionDiscard permanently removes the message from the DLQ.
	// Use this for invalid messages that cannot be fixed.
	DLQActionDiscard DLQAction = iota

	// DLQActionRetry requeues the message to the original queue with retry count reset.
	// Use this after fixing the underlying issue that caused the message to fail.
	// The message will get a fresh start with retry count = 0.
	DLQActionRetry

	// DLQActionKeep leaves the message in the DLQ for later processing or manual inspection.
	// The message is nacked without requeue and remains in the DLQ.
	DLQActionKeep
)

type DLQHandler

type DLQHandler func(ctx context.Context, msg *Message) (DLQAction, error)

DLQHandler processes dead-lettered messages and decides their fate. Inspect the message and its error metadata (available in msg.Headers) to determine whether to retry, discard, or keep the message in the DLQ.

Return one of DLQActionRetry, DLQActionDiscard, or DLQActionKeep. Returning an error will keep the message in the DLQ (equivalent to DLQActionKeep).

type DLQProcessor

type DLQProcessor struct {
	// contains filtered or unexported fields
}

DLQProcessor handles processing of messages in the dead letter queue. It consumes messages from the DLQ and invokes a handler to decide whether to retry, discard, or keep each message.

func NewDLQProcessor

func NewDLQProcessor(client *Client, cfg DLQProcessorConfig, logger *slog.Logger) *DLQProcessor

NewDLQProcessor creates a new DLQ processor instance. If logger is nil, the default slog logger will be used.

func (*DLQProcessor) Start

func (p *DLQProcessor) Start(handler DLQHandler) error

Start begins processing messages from the DLQ. If handler is nil, a default handler that keeps all messages will be used. This is non-blocking and returns immediately after starting the background processor.

func (*DLQProcessor) Stop

func (p *DLQProcessor) Stop()

Stop gracefully stops the DLQ processor and waits for it to finish. This blocks until the processor has fully stopped.

type DLQProcessorConfig

type DLQProcessorConfig struct {
	// Queue is the name of the dead letter queue to consume from.
	// Example: "my-queue.dlq"
	Queue string

	// RetryQueue is the name of the original queue to send messages to when retrying.
	// Example: "my-queue"
	RetryQueue string

	// PrefetchCount is the number of messages to prefetch from the DLQ.
	// Default: 5.
	PrefetchCount int

	// ProcessTimeout is the timeout for processing a single DLQ message.
	// Default: 30 seconds.
	ProcessTimeout time.Duration
}

DLQProcessorConfig holds configuration for the DLQ processor.

type ErrorHandler

type ErrorHandler func(ctx context.Context, msg *Message, err error)

ErrorHandler is called when message processing fails permanently after exhausting all retries. This callback is invoked just before the message is sent to the dead letter queue. Use this for logging, alerting, or performing cleanup actions for permanently failed messages.

type Handler

type Handler func(ctx context.Context, msg *Message) error

Handler is the function signature for processing consumed messages. Return nil for successful processing, or an error to trigger retry logic. The context may be canceled if the consumer is shutting down or the process timeout is reached.

type Message

type Message struct {
	// Raw body
	Body []byte

	// Queue info
	Queue       string
	RoutingKey  string
	ContentType string
	MessageID   string
	Timestamp   time.Time

	// Headers
	Headers map[string]interface{}

	// Retry info (extracted from headers)
	RetryCount int
	FirstError time.Time
}

Message represents a consumed RabbitMQ message with metadata and helper fields. It provides access to the message body, headers, routing information, and retry tracking.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer handles message publishing with channel pooling and optional publisher confirms. It maintains a pool of channels for efficient concurrent publishing and automatically handles channel lifecycle and recovery.

func NewProducer

func NewProducer(client *Client, cfg ProducerConfig, logger *slog.Logger) *Producer

NewProducer creates a new Producer instance with channel pooling. The producer will use the provided client's connection and automatically handle channel lifecycle. If logger is nil, the default slog logger will be used.

func (*Producer) Close

func (p *Producer) Close()

Close closes all pooled channels and releases resources. After calling Close, the Producer should not be used.

func (*Producer) Publish

func (p *Producer) Publish(ctx context.Context, body []byte, opts PublishOptions) error

Publish sends a message to RabbitMQ using the specified options. If UseConfirms is enabled, this method blocks until the broker acknowledges the message. The context can be used to set a deadline or cancel the publish operation.

Returns an error if publishing fails or if publisher confirms are enabled and the message is not acknowledged by the broker.

func (*Producer) PublishBatch

func (p *Producer) PublishBatch(ctx context.Context, messages []struct {
	Body    []byte
	Options PublishOptions
}) error

PublishBatch sends multiple messages efficiently using a single channel. This is more efficient than calling Publish multiple times when you need to send many messages at once. Publisher confirms are not currently supported for batch operations.

Returns an error on the first failed publish, leaving subsequent messages unpublished.

type ProducerConfig

type ProducerConfig struct {
	// PoolSize is the number of channels to maintain in the pool.
	// Default: 5.
	PoolSize int

	// UseConfirms enables publisher confirms for reliable delivery.
	// When enabled, Publish() will wait for broker acknowledgment.
	// Default: false.
	UseConfirms bool
}

ProducerConfig holds producer configuration options.

type PublishOptions

type PublishOptions struct {
	Exchange    string
	RoutingKey  string
	ContentType string
	MessageID   string
	Headers     map[string]interface{}
	Persistent  bool
	Mandatory   bool
}

PublishOptions contains options for publishing a message to RabbitMQ.

Directories

Path Synopsis
examples
basic command
dlq command
fanout command
topic command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL