Documentation
¶
Overview ¶
Package pgqueue provides a lightweight, PostgreSQL-backed job queue for Go.
It enables asynchronous background processing using PostgreSQL while offering safe concurrency, retries with backoff, delayed jobs, and cron scheduling.
Index ¶
- Constants
- Variables
- type CleanupStrategy
- type Client
- func (c *Client) Close() error
- func (c *Client) Enqueue(ctx context.Context, task TaskType, payload any, opts ...EnqueueOption) error
- func (c *Client) ListCronJobs() ([]CronJobInfo, error)
- func (c *Client) RemoveCron(id CronID, jobID string) error
- func (c *Client) ScheduleCron(spec string, jobName string, task TaskType, payload any) (CronID, error)
- func (c *Client) Stats(ctx context.Context) (QueueStats, error)
- type CronID
- type CronJobInfo
- type EnqueueOption
- type HandlerFunc
- type Middleware
- type Priority
- type Queue
- type QueueOption
- type QueueStats
- type ServeMux
- type Server
- type ServerOption
- type Task
- type TaskType
- type WorkerHandler
Constants ¶
const ( TaskDone = "done" TaskPending = "pending" TaskProcessing = "processing" TaskFailed = "failed" )
Variables ¶
var ErrHandlerNotFound = errors.New("handler not found for task")
Functions ¶
This section is empty.
Types ¶
type CleanupStrategy ¶
type CleanupStrategy int
const ( // DeleteStrategy hard deletes old tasks. DeleteStrategy CleanupStrategy = iota // ArchiveStrategy moves old tasks to the tasks_archive table. ArchiveStrategy )
func (CleanupStrategy) String ¶
func (c CleanupStrategy) String() string
type Client ¶
func NewClient ¶
func NewClient(db *sql.DB, opts ...QueueOption) (client *Client, err error)
NewClient returns a Queue's Client.
func (*Client) Close ¶
Close shuts down the Client's background maintenance routines and Cron scheduler.
func (*Client) Enqueue ¶
func (c *Client) Enqueue(ctx context.Context, task TaskType, payload any, opts ...EnqueueOption) error
Enqueue adds a task to the queue
func (*Client) ListCronJobs ¶
func (c *Client) ListCronJobs() ([]CronJobInfo, error)
ListCronJobs returns a list of scheduled tasks
func (*Client) RemoveCron ¶
RemoveCron removes a scheduled task from cron
type EnqueueOption ¶
type EnqueueOption func(*enqueueConfig)
EnqueueOption allows configuring options like delays or deduplication
func WithDedup ¶
func WithDedup(key string) EnqueueOption
WithDedup ensures a task with this key is only enqueued once
func WithDelay ¶
func WithDelay(d time.Duration) EnqueueOption
WithDelay schedules the task to run in the future
func WithMaxRetries ¶
func WithMaxRetries(n int) EnqueueOption
WithMaxRetries overrides the default retry count (default is 5)
type HandlerFunc ¶
The HandlerFunc type is an adapter to allow the use of ordinary functions as a Handler. If f is a function with the appropriate signature, HandlerFunc(f) is a Handler that calls f.
func (HandlerFunc) ProcessTask ¶
func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error
ProcessTask calls fn(ctx, task)
type Middleware ¶
type Middleware func(WorkerHandler) WorkerHandler
Middleware wraps a WorkerHandler with extra behavior.
func SlogMiddleware ¶
func SlogMiddleware(logger *slog.Logger) Middleware
SlogMiddleware logs task lifecycle events.
type QueueOption ¶
type QueueOption func(*queueConfig)
QueueOption is a function that modifies the queue configuration.
func WithCleanupConfig ¶
func WithCleanupConfig(interval, retention time.Duration, strategy CleanupStrategy) QueueOption
WithCleanupConfig configures automatic removal of old data.
params:
- interval: how often to run the cleanup job.
- retention: how old a 'done'/'failed' task must be to be removed.
- strategy: either pgqueue.DeleteStrategy or pgqueue.ArchiveStrategy.
func WithCronEnabled ¶
func WithCronEnabled() QueueOption
WithCronEnabled enables cron jobs functionality.
Cron jobs are disabled by default.
func WithRescueConfig ¶
func WithRescueConfig(interval, visibilityTimeout time.Duration) QueueOption
WithRescueConfig configures the automatic stuck task rescue.
params:
- interval: how often to check for stuck tasks.
- visibilityTimeout: how long a task can stay 'processing' before being reset.
type QueueStats ¶
type ServeMux ¶
type ServeMux struct {
// contains filtered or unexported fields
}
ServeMux is a multiplexer for tasks which matches the type of each task against a list of registered patterns and calls the workerhandler for the pattern that most closely matches the task's type.
func (*ServeMux) HandleFunc ¶
HandleFunc registers the handler function for the given pattern.
func (*ServeMux) ProcessTask ¶
ProcessTask dispatches the task to the handler whose pattern most closely matches the task type.
func (*ServeMux) Use ¶
func (mux *ServeMux) Use(mw ...Middleware)
Use appends middleware to the mux. Middleware runs in the order it is added.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
func NewServer ¶
func NewServer(db *sql.DB, connString string, concurrency int, handler WorkerHandler, opts ...ServerOption) *Server
NewServer initializes a pgqueue worker server.
A Server manages a pool of background workers that:
- Listen for task notifications via LISTEN / NOTIFY
- Fetch tasks safely using SELECT ... FOR UPDATE SKIP LOCKED
- Process tasks concurrently with bounded parallelism
It requires a shared *sql.DB connection, a connection string for the LISTEN/NOTIFY listener, the desired number of concurrent worker goroutines, and a handler to process the tasks.
The server is safe to run across multiple processes or machines connected to the same PostgreSQL database.
func (*Server) Shutdown ¶
Shutdown gracefully stops the worker server.
It cancels the internal context and waits for all workers to finish processing their current tasks or until ctx expires.
func (*Server) Start ¶
Start launches the worker pool and PostgreSQL LISTEN loop.
Start is strictly non-blocking: it initializes background goroutines and returns immediately if startup is successful.
Calling Start on an already-running server returns an error. Shutdown(ctx) must be called to gracefully stop workers.
type ServerOption ¶ added in v0.5.0
type ServerOption func(*Server)
ServerOption configures a worker Server.
Server options control how workers fetch and process tasks, such as batch size or concurrency-related behavior.
func WithBatchSize ¶ added in v0.5.0
func WithBatchSize(n uint16) ServerOption
WithBatchSize configures how many tasks a worker fetches per database round-trip.
A larger batch size increases throughput by reducing database transactions, but may reduce fairness between workers(starvation of goroutines) and increase the number of tasks locked by a single worker.
Sensible values typically range from 5 to 20. The default batch size is 10.
type WorkerHandler ¶
WorkerHandler processes tasks.
ProcessTask should return nil if the processing of a task is successful.
func NotFoundHandler ¶
func NotFoundHandler() WorkerHandler
NotFoundHandler returns a simple task handler that returns a “not found“ error.