Documentation
¶
Overview ¶
Package subscription provides guaranteed event delivery for event-sourced systems.
Delivery Guarantee ¶
The core invariant: if an event is written to the store, every registered consumer WILL process it. This is achieved through:
- Durable checkpoints track each consumer's position
- Catch-up on startup replays from checkpoint to head
- Periodic polling ensures no event is missed (belt and suspenders)
- Optional notifiers provide low-latency delivery
The Sequence Gap Problem ¶
When events are stored with auto-increment global sequences, concurrent writers can cause gaps: transaction A gets sequence 10, transaction B gets sequence 11, B commits first. A consumer checkpointing at 11 could miss 10.
Four approaches, in order of recommendation:
## 1. Single Writer (Recommended)
Use the CommandBus for single-writer-per-stream. One goroutine per stream means global sequences are always monotonic with no holes. No gaps, no complexity, fastest possible throughput.
This is the right answer for most applications. The other approaches exist as escape hatches for systems that can't use single writer.
## 2. Published Sequence Table
After committing events, update a separate "published_sequence" table with the highest committed sequence. Consumers read from this table instead of trusting the events table directly.
Trade-offs:
- Extra write per commit (small overhead)
- Reads are trivial — no gap detection needed
- Works with any relational database
- Slight delivery delay (published after commit)
## 3. Postgres Transaction Visibility (pg_snapshot)
Postgres exposes transaction visibility via system columns:
SELECT * FROM events WHERE global_seq > $checkpoint AND xmin::text::bigint < pg_snapshot_xmin(pg_current_snapshot())::text::bigint ORDER BY global_seq;
This reads only guaranteed-committed rows. Mathematically correct — no heuristics, no timeouts.
Trade-offs:
- Postgres-specific (not portable)
- Unknown performance impact — the xmin/snapshot functions may be expensive under high write load. Benchmark for your workload.
- Requires understanding Postgres MVCC internals
## 4. Gap Detection with Timeout (Generic Fallback)
The subscription detects sequence gaps and waits for them to fill:
- Gap detected → wait GapTimeout (default 500ms)
- Gap fills within timeout → process normally
- Gap persists → assume rollback, skip it, log warning
- Known gaps tracked to avoid re-waiting
Trade-offs:
- Works with any store (generic)
- Adds latency on gaps (up to GapTimeout)
- Heuristic — could theoretically skip a very slow transaction (mitigated by generous timeout)
- Simple to understand and debug
This is the built-in fallback used by EventSubscription when gap detection is enabled (GapTimeout > 0).
Notifiers ¶
StoreNotifier is an optimization for low-latency delivery. Without it, subscriptions rely on periodic polling (default 500ms). With it, events are delivered within milliseconds.
Available notifiers:
- ChannelNotifier: in-process, for testing and memory stores
- (Future) PostgresNotifier: LISTEN/NOTIFY
- (Future) SQLiteNotifier: data_version polling
- (Future) NATSNotifier: subject pub/sub
Lag Monitoring ¶
Subscriptions support opt-in lag monitoring via LagMonitorConfig. When configured, a background goroutine periodically compares the consumer's checkpoint position against the store head and fires a callback when lag exceeds a configurable threshold.
Use WithLagMonitor to enable via options, or set Config.LagMonitor directly:
cfg.LagMonitor = &subscription.LagMonitorConfig{
Callback: func(name string, lag, pos, latest uint64) {
slog.Warn("subscription lagging", "consumer", name, "lag", lag)
},
Threshold: 1000, // alert when >1000 events behind
CheckInterval: 10 * time.Second,
}
The monitor runs for the lifetime of the subscription and stops when the subscription's context is cancelled.
Package subscription provides guaranteed event delivery with checkpoint-based catch-up and live notification support.
The core guarantee: if an event is written to the store, every registered consumer WILL process it. This works through:
- Checkpoint: each consumer tracks its last processed global sequence
- Catch-up: on start, replay from checkpoint to head
- Live: after caught up, use notifications + periodic polling
- Retry: failed events retry with backoff, then go to DLQ
Single-writer (via CommandBus) eliminates sequence gaps. For multi-writer scenarios, gap detection handles out-of-order commits. See GapDetector.
Index ¶
- func Add[E any](m *SubscriptionManager, sub *EventSubscription[E]) error
- type BatchHandler
- type ChannelNotifier
- type Checkpoint
- type CheckpointInfo
- type Config
- type EventSubscription
- type GapAction
- type GlobalEvent
- type GlobalReader
- type Handler
- type LagCallback
- type LagMonitorConfig
- type MemoryCheckpoint
- type MemoryGlobalReader
- func (r *MemoryGlobalReader[E]) Append(streamID string, data ...E) []uint64
- func (r *MemoryGlobalReader[E]) AppendTyped(streamID string, events ...) []uint64
- func (r *MemoryGlobalReader[E]) AppendWithSequence(seq uint64, streamID string, data E)
- func (r *MemoryGlobalReader[E]) LatestSequence(_ context.Context) (uint64, error)
- func (r *MemoryGlobalReader[E]) ReadFrom(_ context.Context, fromSequence uint64, limit int) ([]GlobalEvent[E], error)
- type Option
- type SequenceChecker
- type StoreAdapter
- type StoreNotifier
- type StoreReader
- type SubscriptionManager
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Add ¶
func Add[E any](m *SubscriptionManager, sub *EventSubscription[E]) error
Add registers and starts a subscription. Returns error if ID is duplicate.
Types ¶
type BatchHandler ¶
type BatchHandler[E any] func(ctx context.Context, events []GlobalEvent[E]) error
BatchHandler processes a batch of events. Checkpoint advances only after the entire batch succeeds.
type ChannelNotifier ¶
type ChannelNotifier struct {
// contains filtered or unexported fields
}
ChannelNotifier is an in-process StoreNotifier backed by a Go channel. Use for in-memory stores and testing. Call Signal() after appending events.
func NewChannelNotifier ¶
func NewChannelNotifier() *ChannelNotifier
NewChannelNotifier creates a new in-process notifier.
func (*ChannelNotifier) Close ¶
func (n *ChannelNotifier) Close() error
Close shuts down the notifier and closes all listener channels.
func (*ChannelNotifier) Notify ¶
func (n *ChannelNotifier) Notify(ctx context.Context) <-chan uint64
Notify returns a channel that receives signals when new events are appended. The channel is closed when the notifier is closed or ctx is cancelled.
func (*ChannelNotifier) Signal ¶
func (n *ChannelNotifier) Signal(sequence uint64)
Signal notifies all listeners that new events are available at the given sequence. Non-blocking: if a listener's buffer is full, the signal is dropped (polling catches it).
type Checkpoint ¶
type Checkpoint interface {
// Load returns the last processed sequence for a consumer. Returns 0 if new.
Load(ctx context.Context, consumerID string) (uint64, error)
// Save persists the consumer's position. Must be atomic.
Save(ctx context.Context, consumerID string, sequence uint64) error
}
Checkpoint tracks a consumer's last processed global sequence. Implementations must be durable for production use (SQLite, Postgres, etc).
type CheckpointInfo ¶
type CheckpointInfo struct {
// ConsumerID is the subscription's consumer identifier.
ConsumerID string
// Sequence is the global sequence number of the event being processed.
Sequence uint64
}
CheckpointInfo carries checkpoint metadata through context, allowing transactional handlers (e.g., pgview, sqlview) to save the checkpoint within the same database transaction as the projection update.
This enables atomic Evolve + Checkpoint, preventing double processing of non-idempotent side effects on crash recovery.
func CheckpointFromContext ¶
func CheckpointFromContext(ctx context.Context) (CheckpointInfo, bool)
CheckpointFromContext extracts checkpoint info from the context. Returns the info and true if present, or zero value and false otherwise.
Used by transactional projection packages (pgview, sqlview) to save the checkpoint within the same transaction as Evolve:
if info, ok := subscription.CheckpointFromContext(ctx); ok {
tx.Exec(ctx, "INSERT INTO checkpoints ...", info.ConsumerID, info.Sequence)
}
type Config ¶
type Config[E any] struct { // ConsumerID uniquely identifies this consumer. Required. ConsumerID string // Reader provides access to the global event stream. Required. Reader GlobalReader[E] // Checkpoint tracks consumed position. Required. Checkpoint Checkpoint // Handler processes individual events. Either Handler or BatchHandler required. Handler Handler[E] // BatchHandler processes events in batches. Mutually exclusive with Handler. BatchHandler BatchHandler[E] // Notifier provides low-latency event notifications. Optional. // Without it, the subscription relies on polling only. Notifier StoreNotifier // PollInterval is the fallback polling interval. Default: 500ms. // Belt and suspenders — ensures delivery even if notifier drops signals. PollInterval time.Duration // BatchSize is the max events to read per poll. Default: 100. BatchSize int // MaxRetries before sending to DLQ. Default: 5. MaxRetries int // RetryBaseDelay is the base delay for exponential backoff. Default: 100ms. RetryBaseDelay time.Duration // RetryMaxDelay caps the backoff. Default: 10s. RetryMaxDelay time.Duration // OnDLQ is called when an event exhausts retries. Optional. // If nil, the error is logged and the event is skipped. OnDLQ func(ctx context.Context, event GlobalEvent[E], err error) // Logger for operational logging. Optional. Logger *slog.Logger // GapTimeout is how long to wait for a sequence gap to fill before // assuming it was a rollback. Default: 500ms. // Set to 0 to disable gap detection (single-writer mode). GapTimeout time.Duration // MaxGapWait is the total time willing to wait for gaps before moving on. // Default: 5s. MaxGapWait time.Duration // OnGapTimeout controls what happens when a gap times out (GapTimeout elapsed). // Default (GapActionError): stop processing at the gap, retry on next poll. // GapActionSkip: skip the gap immediately (old behavior, risks missing events). OnGapTimeout GapAction // SequenceChecker verifies whether a specific global sequence exists in the // store. Used as a self-healing check before skipping a gap after MaxGapWait. // If nil, gaps are skipped without verification after MaxGapWait. SequenceChecker SequenceChecker // EventTypes filters events by their EventType field. Only events whose // EventType is in this list will be delivered to the handler. // If empty or nil, all events are delivered (no filtering). // Checkpoint still advances past filtered-out events to ensure progress. EventTypes []string // CheckpointEvery saves the checkpoint every N events instead of every event. // Default: 1 (save after every event — current behavior). // Set to 100 to save every 100th event. On crash, at most N-1 events // may be replayed (idempotent handlers recommended for N > 1). CheckpointEvery int // CheckpointMaxAge is the maximum time between checkpoint saves. // When set, a checkpoint is saved when either CheckpointEvery OR // CheckpointMaxAge is reached, whichever comes first. // Default: 0 (disabled — only count-based batching). CheckpointMaxAge time.Duration // AtomicCheckpoint indicates that the Handler saves the checkpoint within // its own transaction (e.g., via CheckpointFromContext). When true, the // subscription injects CheckpointInfo into the context and skips the // external Checkpoint.Save call after each event. // // This prevents double processing on crash recovery for non-idempotent // side effects. See pgview and sqlview CheckpointInTx option. AtomicCheckpoint bool // LagMonitor configures optional lag monitoring. When set, a background // goroutine periodically checks how far behind the subscription is and // fires the callback when lag exceeds the threshold. Nil means no monitoring. LagMonitor *LagMonitorConfig }
Config configures an EventSubscription.
func FromStateView ¶
func FromStateView[E any](view eskit.StateView[E], reader GlobalReader[E], checkpoint Checkpoint, opts ...Option[E]) Config[E]
FromStateView creates a subscription Config from a StateView. This is the standard way to wire a projection into the event stream.
The StateView's Evolve function receives eskit.Event[E], which is adapted from the subscription's GlobalEvent[E]. If the StateView has a Setup function, it is called before the subscription starts processing events.
type EventSubscription ¶
type EventSubscription[E any] struct { // contains filtered or unexported fields }
EventSubscription is a durable event consumer with guaranteed delivery. It catches up from its checkpoint on start, then switches to live mode using notifier signals (if available) and periodic polling.
func New ¶
func New[E any](cfg Config[E]) (*EventSubscription[E], error)
New creates a new EventSubscription. Call Start to begin processing.
func (*EventSubscription[E]) Lag ¶
func (s *EventSubscription[E]) Lag(ctx context.Context) (uint64, error)
Lag returns how many events behind the consumer is (approximately).
func (*EventSubscription[E]) Start ¶
func (s *EventSubscription[E]) Start(ctx context.Context) error
Start begins processing events. Blocks until ctx is cancelled or Stop is called. Typically run in a goroutine.
func (*EventSubscription[E]) Stop ¶
func (s *EventSubscription[E]) Stop()
Stop signals the subscription to shut down gracefully.
func (*EventSubscription[E]) Wait ¶
func (s *EventSubscription[E]) Wait()
Wait blocks until the subscription has fully stopped.
type GapAction ¶
type GapAction int
GapAction determines what happens when a sequence gap times out.
const ( // GapActionError (default) stops processing at the gap and retries on the // next poll. The gap will eventually resolve when the slow transaction // commits or Postgres reclaims the rolled-back sequence. GapActionError GapAction = iota // GapActionSkip skips timed-out gaps immediately (old behavior). // Use only if you explicitly accept the risk of missing events from // slow transactions. GapActionSkip )
type GlobalEvent ¶
type GlobalEvent[E any] struct { // GlobalSequence is the store-wide monotonic position. Never reused. GlobalSequence uint64 // StreamID identifies which stream this event belongs to. StreamID string // EventType is the string name of the event (e.g., "OrderCreated"). // Available when the store has an EventRegistry configured. EventType string // Version is the per-stream version number. Version int // Data is the domain event payload. Data E // Timestamp is when the event was recorded. Timestamp time.Time }
GlobalEvent wraps a domain event with its global sequence number. The global sequence is a monotonically increasing number assigned at append time, used by consumers to track position and detect gaps.
type GlobalReader ¶
type GlobalReader[E any] interface { // ReadFrom returns events starting from the given global sequence (inclusive), // up to limit events. Returns events in global sequence order. ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]GlobalEvent[E], error) // LatestSequence returns the highest global sequence in the store, or 0 if empty. LatestSequence(ctx context.Context) (uint64, error) }
GlobalReader reads events by global sequence. Event stores must implement this to support subscriptions.
type Handler ¶
type Handler[E any] func(ctx context.Context, event GlobalEvent[E]) error
Handler processes events. Return nil to advance the checkpoint. Return an error to trigger retry logic.
type LagCallback ¶
LagCallback is called when a subscription's lag exceeds the configured threshold. Parameters: consumer name, current lag, checkpoint position, latest store sequence.
type LagMonitorConfig ¶
type LagMonitorConfig struct {
// Callback is invoked when lag exceeds Threshold.
Callback LagCallback
// Threshold is the minimum lag (in events) before the callback fires.
// For example, 1000 means the callback fires only when the subscription
// is more than 1000 events behind.
Threshold uint64
// CheckInterval is how often the monitor checks the lag. Default: 10s.
CheckInterval time.Duration
}
LagMonitorConfig configures lag monitoring for a subscription.
type MemoryCheckpoint ¶
type MemoryCheckpoint struct {
// contains filtered or unexported fields
}
MemoryCheckpoint is an in-memory checkpoint for testing.
func NewMemoryCheckpoint ¶
func NewMemoryCheckpoint() *MemoryCheckpoint
type MemoryGlobalReader ¶
type MemoryGlobalReader[E any] struct { // contains filtered or unexported fields }
MemoryGlobalReader is an in-memory GlobalReader for testing. Events are appended manually and assigned monotonic global sequences.
func NewMemoryGlobalReader ¶
func NewMemoryGlobalReader[E any]() *MemoryGlobalReader[E]
NewMemoryGlobalReader creates a new in-memory global reader.
func (*MemoryGlobalReader[E]) Append ¶
func (r *MemoryGlobalReader[E]) Append(streamID string, data ...E) []uint64
Append adds events and assigns global sequences. Returns the assigned sequences.
func (*MemoryGlobalReader[E]) AppendTyped ¶
func (r *MemoryGlobalReader[E]) AppendTyped(streamID string, events ...struct { Data E EventType string }) []uint64
AppendTyped adds events with explicit event types and assigns global sequences.
func (*MemoryGlobalReader[E]) AppendWithSequence ¶
func (r *MemoryGlobalReader[E]) AppendWithSequence(seq uint64, streamID string, data E)
AppendWithSequence adds an event with a specific sequence number. Used for testing gap scenarios.
func (*MemoryGlobalReader[E]) LatestSequence ¶
func (r *MemoryGlobalReader[E]) LatestSequence(_ context.Context) (uint64, error)
func (*MemoryGlobalReader[E]) ReadFrom ¶
func (r *MemoryGlobalReader[E]) ReadFrom(_ context.Context, fromSequence uint64, limit int) ([]GlobalEvent[E], error)
type Option ¶
Option configures a subscription created from a StateView.
func WithLagMonitor ¶
func WithLagMonitor[E any](callback LagCallback, threshold uint64, checkInterval time.Duration) Option[E]
WithLagMonitor enables periodic lag monitoring for the subscription. The callback fires when the subscription falls more than threshold events behind the store head. checkInterval controls how often the check runs.
Example:
WithLagMonitor[MyEvent](func(name string, lag, pos, latest uint64) {
slog.Warn("subscription lagging", "consumer", name, "lag", lag)
}, 1000, 10*time.Second)
func WithNotifier ¶
func WithNotifier[E any](n StoreNotifier) Option[E]
WithNotifier sets the store notifier for low-latency event delivery.
type SequenceChecker ¶
SequenceChecker verifies whether a specific global sequence exists in the store. Used for self-healing gap resolution: before skipping a gap after MaxGapWait, the subscription checks if the event actually committed (slow transaction).
type StoreAdapter ¶
type StoreAdapter[E any] struct { // contains filtered or unexported fields }
StoreAdapter adapts any StoreReader (eskit.Event-based) into a subscription.GlobalReader (GlobalEvent-based). This bridges the type gap between the event store layer and the subscription system.
func NewStoreAdapter ¶
func NewStoreAdapter[E any](store StoreReader[E]) *StoreAdapter[E]
NewStoreAdapter wraps a store's global reader into a subscription.GlobalReader.
func (*StoreAdapter[E]) LatestSequence ¶
func (a *StoreAdapter[E]) LatestSequence(ctx context.Context) (uint64, error)
LatestSequence delegates to the underlying store.
func (*StoreAdapter[E]) ReadFrom ¶
func (a *StoreAdapter[E]) ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]GlobalEvent[E], error)
ReadFrom reads events from the store and converts them to GlobalEvents.
type StoreNotifier ¶
StoreNotifier signals when new events are available in the store. This is an optimization for low latency — subscriptions work without it via periodic polling. The value on the channel is the latest global sequence (or 0 if unknown).
type StoreReader ¶
type StoreReader[E any] interface { ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]eskit.Event[E], error) LatestSequence(ctx context.Context) (uint64, error) }
StoreReader is the interface that event stores implement for global reads. Both MemoryStore and sqlitestore.Store satisfy this via their ReadFrom/LatestSequence methods.
type SubscriptionManager ¶
type SubscriptionManager struct {
// contains filtered or unexported fields
}
SubscriptionManager manages multiple subscriptions with lifecycle control.
func NewManager ¶
func NewManager() *SubscriptionManager
NewManager creates a new subscription manager.
func (*SubscriptionManager) ConsumerIDs ¶
func (m *SubscriptionManager) ConsumerIDs() []string
ConsumerIDs returns the IDs of all active subscriptions.
func (*SubscriptionManager) Remove ¶
func (m *SubscriptionManager) Remove(consumerID string) error
Remove stops and removes a subscription.
func (*SubscriptionManager) StopAll ¶
func (m *SubscriptionManager) StopAll()
StopAll stops all subscriptions and waits for them to finish.