Documentation
¶
Index ¶
- type Batch
- type BatchConfig
- type BatchMetrics
- type Batcher
- func (b *Batcher) AggregateBatch(entries []redistream.Entry) (string, error)
- func (b *Batcher) BatchDest() string
- func (b *Batcher) BatcherDelay() time.Duration
- func (b *Batcher) ConfigsKey() string
- func (b *Batcher) Consumer() redistream.Consumer
- func (b *Batcher) MetaPrefix() string
- func (b *Batcher) MetricsPrefix() string
- func (b *Batcher) Prefix() string
- func (b *Batcher) ReapBatch(name string) error
- func (b *Batcher) ReapSome(entries []redistream.Entry, streamName string) ([]redistream.Entry, error)
- func (b *Batcher) ReaperConsumer() redistream.Consumer
- func (b *Batcher) Reclaim(name string) error
- func (b *Batcher) ScheduleBatch(name string, signals <-chan batchSignal)
- func (b *Batcher) SendBatch(name string) error
- func (b *Batcher) StreamPrefix() string
- type BatcherConfig
- type LockPool
- type MaxAge
- type MaxRetries
- type ReaperConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BatchConfig ¶
type BatchMetrics ¶
type Batcher ¶
type Batcher struct {
// contains filtered or unexported fields
}
func NewBatcher ¶
func NewBatcher(config *BatcherConfig) (*Batcher, error)
func (*Batcher) AggregateBatch ¶
func (b *Batcher) AggregateBatch(entries []redistream.Entry) (string, error)
func (*Batcher) BatcherDelay ¶
func (*Batcher) ConfigsKey ¶
func (*Batcher) Consumer ¶
func (b *Batcher) Consumer() redistream.Consumer
func (*Batcher) MetaPrefix ¶
MetaPrefix is used for "private" parts of the Batcher redis keyspace
func (*Batcher) MetricsPrefix ¶
func (*Batcher) ReapBatch ¶
ReapBatch aggregates and stores entries that failed to be batched. the resulting stream functions as a log of failed entries. Entries get reaped based on being idle for too long, or being retried too many times. In normal operation, entries shouldn't be reaped very often if at all.
NOTE: this name is somewhat ambiguous, as it might be interpreted as reaping failed batches. However, processing the batches should be left entirely to the client, or handled by a companion redis stream worker -- maybe even a differently configured batcher instance. In short, processing the destination batch is outside the scope of Batcher. e.g. maybe the client should check how many times it's tried the batch (or how long it sat idle) and, using MULTI EXEC, XADD to a failed batch log stream and XDEL the batch
func (*Batcher) ReapSome ¶
func (b *Batcher) ReapSome(entries []redistream.Entry, streamName string) ([]redistream.Entry, error)
func (*Batcher) ReaperConsumer ¶
func (b *Batcher) ReaperConsumer() redistream.Consumer
func (*Batcher) Reclaim ¶
Reclaim `XCLAIM`s entries from old consumers on behalf of the current consumer Don't delete the consumer, or we'll get repeats.
func (*Batcher) ScheduleBatch ¶
ScheduleBatch
func (*Batcher) SendBatch ¶
SendBatch aggregates up to BatchConfig.MaxSize entries into one entry, adding to Batcher.batchDest() stream
func (*Batcher) StreamPrefix ¶
type BatcherConfig ¶
type BatcherConfig struct {
RedisOpts *redis.Options
BatcherShardKey string
DefaultBatchConfig *BatchConfig
Concurrency uint
MinDelaySeconds uint
MaxDelaySeconds uint
Reaper ReaperConfig
}
type LockPool ¶
type LockPool struct {
// contains filtered or unexported fields
}
func NewLockPool ¶
type MaxRetries ¶
type MaxRetries uint
type ReaperConfig ¶
type ReaperConfig struct {
MaxAgeSeconds MaxAge
MaxRetries MaxRetries
ShouldReap func(redis.XPendingExt, MaxAge, MaxRetries) bool
}