Documentation
¶
Index ¶
- Variables
- type AckOptions
- type BulkDequeuer
- type BulkEnqueuer
- type DequeueFunc
- type DequeueMiddleware
- type DequeueOptions
- type Dequeuer
- type EnqueueFunc
- type EnqueueMiddleware
- type EnqueueOptions
- type Enqueuer
- type HandleFunc
- type HandleMiddleware
- type Job
- func (j Job) Delay(d time.Duration) *Job
- func (j *Job) MarshalJSONPayload(v interface{}) error
- func (j *Job) MarshalPayload(v interface{}) error
- func (j *Job) UnmarshalJSONPayload(v interface{}) error
- func (j *Job) UnmarshalPayload(v interface{}) error
- func (j Job) WithPayload(v interface{}) (*Job, error)
- type JobOptions
- type Metrics
- type MetricsExporter
- type Queue
- type QueueMetrics
- type QueueMetricsOptions
- type Worker
- type WorkerOptions
Constants ¶
This section is empty.
Variables ¶
var ( ErrEmptyNamespace = errors.New("work: empty namespace") ErrEmptyQueueID = errors.New("work: empty queue id") ErrAt = errors.New("work: at should not be zero") ErrInvisibleSec = errors.New("work: invisible sec should be >= 0") )
options validation errors
var ( ErrMaxExecutionTime = errors.New("work: max execution time should be > 0") ErrNumGoroutines = errors.New("work: number of goroutines should be > 0") ErrIdleWait = errors.New("work: idle wait should be > 0") )
options validation error
var ( // ErrQueueNotFound is returned if the queue is not yet // defined with Register(). ErrQueueNotFound = errors.New("work: queue is not found") // ErrUnrecoverable is returned if the error is unrecoverable. // The job will be discarded. ErrUnrecoverable = errors.New("work: permanent error") // ErrUnsupported is returned if it is not implemented. ErrUnsupported = errors.New("work: unsupported") )
var ( // ErrEmptyQueue is returned if Dequeue() is called on an empty queue. ErrEmptyQueue = errors.New("work: no job is found") )
Functions ¶
This section is empty.
Types ¶
type AckOptions ¶
AckOptions specifies how a job is deleted from a queue.
func (*AckOptions) Validate ¶
func (opt *AckOptions) Validate() error
Validate validates AckOptions.
type BulkDequeuer ¶
type BulkDequeuer interface {
BulkDequeue(int64, *DequeueOptions) ([]*Job, error)
BulkAck([]*Job, *AckOptions) error
}
BulkDequeuer dequeues jobs in a batch.
type BulkEnqueuer ¶
type BulkEnqueuer interface {
BulkEnqueue([]*Job, *EnqueueOptions) error
}
BulkEnqueuer enqueues jobs in a batch.
type DequeueFunc ¶
type DequeueFunc func(*DequeueOptions) (*Job, error)
DequeueFunc generates a job.
type DequeueMiddleware ¶
type DequeueMiddleware func(DequeueFunc) DequeueFunc
DequeueMiddleware modifies DequeueFunc behavior.
type DequeueOptions ¶
type DequeueOptions struct {
// Namespace is the namespace of a queue.
Namespace string
// QueueID is the id of a queue.
QueueID string
// At is the current time of the dequeuer.
// Any job that is scheduled before this can be executed.
At time.Time
// After the job is dequeued, no other dequeuer can see this job for a while.
// InvisibleSec controls how long this period is.
InvisibleSec int64
}
DequeueOptions specifies how a job is dequeued.
func (*DequeueOptions) Validate ¶
func (opt *DequeueOptions) Validate() error
Validate validates DequeueOptions.
type Dequeuer ¶
type Dequeuer interface {
Dequeue(*DequeueOptions) (*Job, error)
Ack(*Job, *AckOptions) error
}
Dequeuer dequeues a job. If a job is processed successfully, call Ack() to delete the job.
type EnqueueFunc ¶
type EnqueueFunc func(*Job, *EnqueueOptions) error
EnqueueFunc takes in a job for processing.
type EnqueueMiddleware ¶
type EnqueueMiddleware func(EnqueueFunc) EnqueueFunc
EnqueueMiddleware modifies EnqueueFunc behavior.
type EnqueueOptions ¶
type EnqueueOptions struct {
// Namespace is the namespace of a queue.
Namespace string
// QueueID is the id of a queue.
QueueID string
}
EnqueueOptions specifies how a job is enqueued.
func (*EnqueueOptions) Validate ¶
func (opt *EnqueueOptions) Validate() error
Validate validates EnqueueOptions.
type Enqueuer ¶
type Enqueuer interface {
Enqueue(*Job, *EnqueueOptions) error
}
Enqueuer enqueues a job.
type HandleMiddleware ¶
type HandleMiddleware func(HandleFunc) HandleFunc
HandleMiddleware modifies HandleFunc hehavior.
type Job ¶
type Job struct {
// ID is the unique id of a job.
ID string `msgpack:"id"`
// CreatedAt is set to the time when NewJob() is called.
CreatedAt time.Time `msgpack:"created_at"`
// UpdatedAt is when the job is last executed.
// UpdatedAt is set to the time when NewJob() is called initially.
UpdatedAt time.Time `msgpack:"updated_at"`
// EnqueuedAt is when the job will be executed next.
// EnqueuedAt is set to the time when NewJob() is called initially.
EnqueuedAt time.Time `msgpack:"enqueued_at"`
// Payload is raw bytes.
Payload []byte `msgpack:"payload"`
// If the job previously fails, Retries will be incremented.
Retries int64 `msgpack:"retries"`
// If the job previously fails, LastError will be populated with error string.
LastError string `msgpack:"last_error"`
}
Job is a single unit of work.
func (*Job) MarshalJSONPayload ¶
MarshalJSONPayload encodes a variable into the JSON payload.
func (*Job) MarshalPayload ¶
MarshalPayload encodes a variable into the msgpack payload.
func (*Job) UnmarshalJSONPayload ¶
UnmarshalJSONPayload decodes the JSON payload into a variable.
func (*Job) UnmarshalPayload ¶
UnmarshalPayload decodes the msgpack payload into a variable.
func (Job) WithPayload ¶
WithPayload adds payload to the job.
type JobOptions ¶
type JobOptions struct {
WorkerOptions
MaxExecutionTime time.Duration
IdleWait time.Duration
NumGoroutines int64
DequeueMiddleware []DequeueMiddleware
HandleMiddleware []HandleMiddleware
}
JobOptions specifies how a job is executed.
func (*JobOptions) AddDequeueMiddleware ¶
func (opt *JobOptions) AddDequeueMiddleware(mw DequeueMiddleware) *JobOptions
AddDequeueMiddleware adds DequeueMiddleware.
func (*JobOptions) AddHandleMiddleware ¶
func (opt *JobOptions) AddHandleMiddleware(mw HandleMiddleware) *JobOptions
AddHandleMiddleware adds HandleMiddleware.
func (*JobOptions) Validate ¶
func (opt *JobOptions) Validate() error
Validate validates JobOptions.
type Metrics ¶
type Metrics struct {
Queue []*QueueMetrics
}
Metrics wraps metrics reported by MetricsExporter.
type MetricsExporter ¶
type MetricsExporter interface {
GetQueueMetrics(*QueueMetricsOptions) (*QueueMetrics, error)
}
MetricsExporter can be implemented by Queue to report metrics.
type Queue ¶
Queue can enqueue and dequeue jobs.
func NewRedisQueue ¶
func NewRedisQueue(client redis.UniversalClient) Queue
NewRedisQueue creates a new queue stored in redis.
type QueueMetrics ¶
type QueueMetrics struct {
Namespace string
QueueID string
// Total number of jobs that can be executed right now.
ReadyTotal int64
// Total number of jobs that are scheduled to run in future.
ScheduledTotal int64
}
QueueMetrics contains metrics from a queue.
type QueueMetricsOptions ¶
QueueMetricsOptions specifies how to fetch queue metrics.
func (*QueueMetricsOptions) Validate ¶
func (opt *QueueMetricsOptions) Validate() error
Validate validates QueueMetricsOptions.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker runs jobs.
func (*Worker) ExportMetrics ¶
ExportMetrics dumps queue stats if the queue implements MetricsExporter.
func (*Worker) Register ¶
func (w *Worker) Register(queueID string, h HandleFunc, opt *JobOptions) error
Register adds handler for a queue.
type WorkerOptions ¶
WorkerOptions is used to create a worker.
