pgqueue

package module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2026 License: MIT Imports: 19 Imported by: 0

README

pgqueue

GitHub go.mod Go version License Project Status Tests

pgqueue is a lightweight, asynchronous, durable, PostgreSQL-backed job queue for Go.

It is designed to be simple, safe, and easy to reason about, using only PostgreSQL and standard SQL.


Features

  • ✅ Distributed-safe workers
  • ⏱ Delayed execution
  • 🔁 Automatic retries with exponential backoff + jitter
  • 🚦 Job priorities
  • 🧠 Deduplication support
  • ⏰ Cron jobs (run once across many servers)
  • 📊 Queue metrics & stats
  • 💥 Crash-resilient, at-least-once delivery

Why pgqueue?

If you already use PostgreSQL, you don’t need Redis, SQS, or Kafka just to run background jobs.

PostgreSQL is already:

  • Durable
  • Transactional
  • Highly available
  • Operationally familiar

pgqueue builds a background job queue using:

  • SELECT … FOR UPDATE SKIP LOCKED
  • Advisory locking semantics
  • Transactions for correctness
  • LISTEN / NOTIFY for fast wake-ups

Architecture Overview

This diagram shows how producers, PostgreSQL, workers, and cron jobs interact inside pgqueue.

flowchart LR
    %% Nodes
    P["Producers<br/>queue.Enqueue()"]
    C["Cron Scheduler<br/>ScheduleCron()"]

    T["PostgreSQL<br/>tasks table"]
    A["tasks_archive"]
    N["LISTEN / NOTIFY"]

    W["Worker Pool<br/>StartConsumer(n)"]
    M["ServeMux"]
    H["Task Handlers"]
    R["Retry & Rescue"]

    %% Flows
    P --> T
    C --> T
    T --> N
    N --> W
    W --> M
    M --> H
    H -->|success| T
    H -->|failure| R
    R --> T
    T --> A

    %% Styles
    classDef producer fill:#E3F2FD,stroke:#1565C0,stroke-width:2px;
    classDef postgres fill:#E8F5E9,stroke:#2E7D32,stroke-width:2px;
    classDef worker fill:#FFF8E1,stroke:#EF6C00,stroke-width:2px;
    classDef handler fill:#F3E5F5,stroke:#6A1B9A,stroke-width:2px;

    class P,C producer;
    class T,A,N postgres;
    class W,M,R worker;
    class H handler;

Installation

go get github.com/i-christian/pgqueue

Initilise queue's client with options

client, err := pgqueue.NewClient(
    db,
    pgqueue.WithRescueConfig(5*time.Minute, 30*time.Minute),
    pgqueue.WithCleanupConfig(1*time.Hour, 24*time.Hour, pgqueue.ArchiveStrategy),
    pgqueue.WithCronEnabled(),
)
if err != nil {
    log.Fatalf("Failed to init queue: %v", err)
}

Enqueue a Job

type EmailPayload struct {
    Subject string `json:"subject"`
}

client.Enqueue(
    ctx,
    "task:send:email",
    EmailPayload{Subject: "Welcome!"},
)
Enqueue with Options
client.Enqueue(
    ctx,
    "task:send:email",
    payload,
    pgqueue.WithPriority(pgqueue.HighPriority),
    pgqueue.WithDelay(5*time.Minute),
    pgqueue.WithMaxRetries(10),
    pgqueue.WithDedup("email:user:123"),
)

Supported options include:

  • Priority
  • Delayed execution
  • Retry limits
  • Deduplication keys

Start Workers (ServeMux)

pgqueue uses a ServeMux to route tasks by type, similar to http.ServeMux.

mux := pgqueue.NewServeMux()

// Middleware runs for every task
mux.Use(pgqueue.SlogMiddleware(client.Logger))

// Exact match
mux.HandleFunc("task:send:email", sendEmailHandler)

// Prefix match
mux.HandleFunc("task:cleanup:", cleanupHandler)
mux.HandleFunc("task:report:", reportHandler)

// Start worker pool
server := pgqueue.NewServer(db, connStr, 3, mux, pgqueue.WithBatchSize(20))
if err := server.Start(); err != nil {
    log.Fatal(err)
}
log.Println("Worker server started...")

⚠️ Bounded Task Types (Important)

Task types must be bounded.

✅ Good (bounded)
task:send:email
task:cleanup:expired-sessions
task:report:daily
❌ Bad (unbounded)
task:report:user:123
task:email:user:UUID

Rule of thumb: Use task categories, not per-entity identifiers.


Cron Jobs

Run scheduled jobs once, even when multiple workers or servers are running.

cronID, err := client.ScheduleCron(
    "0 * * * *",
    "hourly-report",
    TaskReportBase+"hourly",
    ReportPayload{ReportName: "Hourly"},
)
if err != nil {
    log.Fatal(err)
}

jobs, _ := client.ListCronJobs()
for _, job := range jobs {
    fmt.Printf(
        "Cron %d → next: %s\n",
        job.ID,
        job.NextRun.Format(time.DateTime),
    )
}

// Optional cleanup
client.RemoveCron(cronID)

Retries & Backoff

  • At-least-once execution
  • Automatic retries on failure
  • Exponential backoff: 2^attempts
  • Jitter added to prevent thundering-herd effects
  • Max retries configurable per job

Queue Stats

stats, _ := client.Stats(ctx)

fmt.Printf(
    "Pending: %d | Processing: %d | Failed: %d | Done: %d\n",
    stats.Pending,
    stats.Processing,
    stats.Failed,
    stats.Done,
)

Examples

A complete, runnable example demonstrating:

  • Worker pools
  • ServeMux routing
  • slog logging
  • Priorities
  • Retries
  • Cron jobs

➡️ See the full example here: 👉 Examples


🖥️ CLI Dashboard

pgqueue-dash is a high-performance Terminal User Interface (TUI) for monitoring your queue in real-time. It allows you to view processed tasks, inspect payloads, and retry failed tasks manually.

Installation:

go install github.com/i-christian/pgqueue/cmd/pgqueue-dash@latest

Usage:

pgqueue-dash --dsn="postgres://user:pass@localhost:5432/dbname"

👉 View Full Dashboard Documentation


Guarantees

pgqueue provides the following guarantees:

At-least-once executionNo concurrent double-processing of the same taskSafe concurrency across multiple workers and processesCrash resilience


When Not to Use pgqueue

pgqueue is not a replacement for high-throughput message brokers.

Avoid pgqueue if you need:

  • Ultra-low latency (<1ms)
  • Massive fan-out (millions of jobs per second)
  • Cross-region replication
  • Exactly-once semantics

Testing

pgqueue uses PostgreSQL V18 for integration tests.

Run tests locally (Docker required)
make test-full

This will:

  • start a temporary PostgreSQL container
  • run all tests with the race detector
  • clean up automatically
Run tests against an existing PostgreSQL instance
export TEST_DB_DSN="postgres://user:pass@localhost:5432/task_queue_test?sslmode=disable"
go test -v ./...
Benchmarks
make bench

Benchmarks can be run against a fresh PostgreSQL container and are intended for local performance exploration only.


Contributing

Contributions are welcome! Here’s how you can help:

  • 🐛 Report bugs by opening issues
  • 💡 Suggest features via GitHub discussions or issues
  • ✍️ Submit pull requests with clear descriptions
  • 📝 Update documentation and examples

Please follow standard Golang conventions and run make test-full before submitting any PR to ensure nothing breaks.

Documentation

Overview

Package pgqueue provides a lightweight, PostgreSQL-backed job queue for Go.

It enables asynchronous background processing using PostgreSQL while offering safe concurrency, retries with backoff, delayed jobs, and cron scheduling.

Index

Constants

View Source
const (
	TaskDone       = "done"
	TaskPending    = "pending"
	TaskProcessing = "processing"
	TaskFailed     = "failed"
)

Variables

View Source
var ErrHandlerNotFound = errors.New("handler not found for task")

Functions

This section is empty.

Types

type CleanupStrategy

type CleanupStrategy int
const (
	// DeleteStrategy hard deletes old tasks.
	DeleteStrategy CleanupStrategy = iota
	// ArchiveStrategy moves old tasks to the tasks_archive table.
	ArchiveStrategy
)

func (CleanupStrategy) String

func (c CleanupStrategy) String() string

type Client

type Client struct {
	Logger *slog.Logger
	// contains filtered or unexported fields
}

func NewClient

func NewClient(db *sql.DB, opts ...QueueOption) (client *Client, err error)

NewClient returns a Queue's Client.

func (*Client) Close

func (c *Client) Close() error

Close shuts down the Client's background maintenance routines and Cron scheduler.

func (*Client) Enqueue

func (c *Client) Enqueue(ctx context.Context, task TaskType, payload any, opts ...EnqueueOption) error

Enqueue adds a task to the queue

func (*Client) ListCronJobs

func (c *Client) ListCronJobs() ([]CronJobInfo, error)

ListCronJobs returns a list of scheduled tasks

func (*Client) RemoveCron

func (c *Client) RemoveCron(id CronID, jobID string) error

RemoveCron removes a scheduled task from cron

func (*Client) ScheduleCron

func (c *Client) ScheduleCron(
	spec string,
	jobName string,
	task TaskType,
	payload any,
) (CronID, error)

ScheduleCron registers a recurring job.

func (*Client) Stats

func (c *Client) Stats(ctx context.Context) (QueueStats, error)

type CronID

type CronID int

type CronJobInfo

type CronJobInfo struct {
	ID      CronID
	NextRun time.Time
	PrevRun time.Time
}

type EnqueueOption

type EnqueueOption func(*enqueueConfig)

EnqueueOption allows configuring options like delays or deduplication

func WithDedup

func WithDedup(key string) EnqueueOption

WithDedup ensures a task with this key is only enqueued once

func WithDelay

func WithDelay(d time.Duration) EnqueueOption

WithDelay schedules the task to run in the future

func WithMaxRetries

func WithMaxRetries(n int) EnqueueOption

WithMaxRetries overrides the default retry count (default is 5)

func WithPriority

func WithPriority(p Priority) EnqueueOption

WithPriority sets the priority

type HandlerFunc

type HandlerFunc func(context.Context, *Task) error

The HandlerFunc type is an adapter to allow the use of ordinary functions as a Handler. If f is a function with the appropriate signature, HandlerFunc(f) is a Handler that calls f.

func (HandlerFunc) ProcessTask

func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error

ProcessTask calls fn(ctx, task)

type Middleware

type Middleware func(WorkerHandler) WorkerHandler

Middleware wraps a WorkerHandler with extra behavior.

func SlogMiddleware

func SlogMiddleware(logger *slog.Logger) Middleware

SlogMiddleware logs task lifecycle events.

type Priority

type Priority int
const (
	HighPriority    Priority = 6
	DefaultPriority Priority = 3
	LowPriority     Priority = 1
)

func (Priority) String

func (p Priority) String() string

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

type QueueOption

type QueueOption func(*queueConfig)

QueueOption is a function that modifies the queue configuration.

func WithCleanupConfig

func WithCleanupConfig(interval, retention time.Duration, strategy CleanupStrategy) QueueOption

WithCleanupConfig configures automatic removal of old data.

params:

  • interval: how often to run the cleanup job.
  • retention: how old a 'done'/'failed' task must be to be removed.
  • strategy: either pgqueue.DeleteStrategy or pgqueue.ArchiveStrategy.

func WithCronEnabled

func WithCronEnabled() QueueOption

WithCronEnabled enables cron jobs functionality.

Cron jobs are disabled by default.

func WithRescueConfig

func WithRescueConfig(interval, visibilityTimeout time.Duration) QueueOption

WithRescueConfig configures the automatic stuck task rescue.

params:

  • interval: how often to check for stuck tasks.
  • visibilityTimeout: how long a task can stay 'processing' before being reset.

type QueueStats

type QueueStats struct {
	Pending    int
	Processing int
	Failed     int
	Done       int
	Total      int
}

type ServeMux

type ServeMux struct {
	// contains filtered or unexported fields
}

ServeMux is a multiplexer for tasks which matches the type of each task against a list of registered patterns and calls the workerhandler for the pattern that most closely matches the task's type.

func NewServeMux

func NewServeMux() *ServeMux

NewServeMux allocates and returns a new ServeMux.

func (*ServeMux) HandleFunc

func (mux *ServeMux) HandleFunc(pattern string, handler func(context.Context, *Task) error)

HandleFunc registers the handler function for the given pattern.

func (*ServeMux) ProcessTask

func (mux *ServeMux) ProcessTask(ctx context.Context, task *Task) error

ProcessTask dispatches the task to the handler whose pattern most closely matches the task type.

func (*ServeMux) Use

func (mux *ServeMux) Use(mw ...Middleware)

Use appends middleware to the mux. Middleware runs in the order it is added.

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(db *sql.DB, connString string, concurrency int, handler WorkerHandler, opts ...ServerOption) *Server

NewServer initializes a pgqueue worker server.

A Server manages a pool of background workers that:

  • Listen for task notifications via LISTEN / NOTIFY
  • Fetch tasks safely using SELECT ... FOR UPDATE SKIP LOCKED
  • Process tasks concurrently with bounded parallelism

It requires a shared *sql.DB connection, a connection string for the LISTEN/NOTIFY listener, the desired number of concurrent worker goroutines, and a handler to process the tasks.

The server is safe to run across multiple processes or machines connected to the same PostgreSQL database.

func (*Server) Shutdown

func (s *Server) Shutdown(ctx context.Context) error

Shutdown gracefully stops the worker server.

It cancels the internal context and waits for all workers to finish processing their current tasks or until ctx expires.

func (*Server) Start

func (s *Server) Start() error

Start launches the worker pool and PostgreSQL LISTEN loop.

Start is strictly non-blocking: it initializes background goroutines and returns immediately if startup is successful.

Calling Start on an already-running server returns an error. Shutdown(ctx) must be called to gracefully stop workers.

type ServerOption added in v0.5.0

type ServerOption func(*Server)

ServerOption configures a worker Server.

Server options control how workers fetch and process tasks, such as batch size or concurrency-related behavior.

func WithBatchSize added in v0.5.0

func WithBatchSize(n uint16) ServerOption

WithBatchSize configures how many tasks a worker fetches per database round-trip.

A larger batch size increases throughput by reducing database transactions, but may reduce fairness between workers(starvation of goroutines) and increase the number of tasks locked by a single worker.

Sensible values typically range from 5 to 20. The default batch size is 10.

type Task

type Task struct {
	ID         uuid.UUID
	Type       TaskType
	Payload    json.RawMessage
	Attempts   int
	MaxRetries int
	Priority   Priority
	CreatedAt  time.Time
}

type TaskType

type TaskType string

type WorkerHandler

type WorkerHandler interface {
	ProcessTask(context.Context, *Task) error
}

WorkerHandler processes tasks.

ProcessTask should return nil if the processing of a task is successful.

func NotFoundHandler

func NotFoundHandler() WorkerHandler

NotFoundHandler returns a simple task handler that returns a “not found“ error.

Directories

Path Synopsis
cmd
pgqueue-dash module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL