batcher

package
v0.0.0-...-d951d24 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 13, 2018 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

type BatchConfig

type BatchConfig struct {
	TargetInterval time.Duration
	MaxSize        int
	MinSize        int
	Active         bool
	NoWait         bool
}

type BatchMetrics

type BatchMetrics struct {
	EntriesPerHour float64
	LastSend       time.Time
}

type Batcher

type Batcher struct {
	// contains filtered or unexported fields
}

func NewBatcher

func NewBatcher(config *BatcherConfig) (*Batcher, error)

func (*Batcher) AggregateBatch

func (b *Batcher) AggregateBatch(entries []redistream.Entry) (string, error)

func (*Batcher) BatchDest

func (b *Batcher) BatchDest() string

func (*Batcher) BatcherDelay

func (b *Batcher) BatcherDelay() time.Duration

func (*Batcher) ConfigsKey

func (b *Batcher) ConfigsKey() string

func (*Batcher) Consumer

func (b *Batcher) Consumer() redistream.Consumer

func (*Batcher) MetaPrefix

func (b *Batcher) MetaPrefix() string

MetaPrefix is used for "private" parts of the Batcher redis keyspace

func (*Batcher) MetricsPrefix

func (b *Batcher) MetricsPrefix() string

func (*Batcher) Prefix

func (b *Batcher) Prefix() string

Prefix is used for the entirety of the Batcher redis keyspace

func (*Batcher) ReapBatch

func (b *Batcher) ReapBatch(name string) error

ReapBatch aggregates and stores entries that failed to be batched. the resulting stream functions as a log of failed entries. Entries get reaped based on being idle for too long, or being retried too many times. In normal operation, entries shouldn't be reaped very often if at all.

NOTE: this name is somewhat ambiguous, as it might be interpreted as reaping failed batches. However, processing the batches should be left entirely to the client, or handled by a companion redis stream worker -- maybe even a differently configured batcher instance. In short, processing the destination batch is outside the scope of Batcher. e.g. maybe the client should check how many times it's tried the batch (or how long it sat idle) and, using MULTI EXEC, XADD to a failed batch log stream and XDEL the batch

func (*Batcher) ReapSome

func (b *Batcher) ReapSome(entries []redistream.Entry, streamName string) ([]redistream.Entry, error)

func (*Batcher) ReaperConsumer

func (b *Batcher) ReaperConsumer() redistream.Consumer

func (*Batcher) Reclaim

func (b *Batcher) Reclaim(name string) error

Reclaim `XCLAIM`s entries from old consumers on behalf of the current consumer Don't delete the consumer, or we'll get repeats.

func (*Batcher) ScheduleBatch

func (b *Batcher) ScheduleBatch(name string, signals <-chan batchSignal)

ScheduleBatch

func (*Batcher) SendBatch

func (b *Batcher) SendBatch(name string) error

SendBatch aggregates up to BatchConfig.MaxSize entries into one entry, adding to Batcher.batchDest() stream

func (*Batcher) StreamPrefix

func (b *Batcher) StreamPrefix() string

type BatcherConfig

type BatcherConfig struct {
	RedisOpts          *redis.Options
	BatcherShardKey    string
	DefaultBatchConfig *BatchConfig
	Concurrency        uint
	MinDelaySeconds    uint
	MaxDelaySeconds    uint
	Reaper             ReaperConfig
}

type LockPool

type LockPool struct {
	// contains filtered or unexported fields
}

func NewLockPool

func NewLockPool(num uint) *LockPool

func (*LockPool) Lock

func (l *LockPool) Lock()

func (*LockPool) Unlock

func (l *LockPool) Unlock()

type MaxAge

type MaxAge uint

type MaxRetries

type MaxRetries uint

type ReaperConfig

type ReaperConfig struct {
	MaxAgeSeconds MaxAge
	MaxRetries    MaxRetries
	ShouldReap    func(redis.XPendingExt, MaxAge, MaxRetries) bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL