Documentation
¶
Overview ¶
Package tracker provides block processing coordination and tracking.
Processing Pipeline ¶
The execution-processor uses a multi-stage pipeline for block processing:
┌─────────────────┐
│ Block Source │ Execution node provides blocks
└────────┬────────┘
│
▼
┌─────────────────┐
│ ProcessNextBlock│ Manager calls processor to discover next block
└────────┬────────┘
│
▼
┌─────────────────┐
│ Task Creation │ Processor creates tasks for each transaction
└────────┬────────┘
│
▼
┌─────────────────┐
│ Redis/Asynq │ Tasks enqueued to distributed queue
└────────┬────────┘
│
▼
┌─────────────────┐
│ Worker Handler │ Asynq workers process tasks (may be distributed)
└────────┬────────┘
│
▼
┌─────────────────┐
│ ClickHouse │ Data inserted using columnar protocol
└────────┬────────┘
│
▼
┌─────────────────┐
│Block Completion │ Pending tracker marks block complete when all tasks finish
└─────────────────┘
Processing Modes ¶
- FORWARDS_MODE: Process blocks from current head forward (real-time)
- BACKWARDS_MODE: Process blocks from start point backward (backfill)
Backpressure Control ¶
The pipeline uses MaxPendingBlockRange to control backpressure:
- Limits concurrent incomplete blocks
- Prevents memory exhaustion during slow ClickHouse inserts
- Uses Redis to track pending tasks per block
Index ¶
- Constants
- func GenerateTaskID(processor, network string, blockNum uint64, identifier string) string
- func IsBlockNotFoundError(err error) bool
- func PrefixedProcessBackwardsQueue(processorName, prefix string) string
- func PrefixedProcessForwardsQueue(processorName, prefix string) string
- func PrefixedProcessReprocessBackwardsQueue(processorName, prefix string) string
- func PrefixedProcessReprocessForwardsQueue(processorName, prefix string) string
- func ProcessBackwardsQueue(processorName string) string
- func ProcessForwardsQueue(processorName string) string
- func ProcessQueue(processorName string) string
- func ProcessReprocessBackwardsQueue(processorName string) string
- func ProcessReprocessForwardsQueue(processorName string) string
- type BlockCompletionTracker
- func (t *BlockCompletionTracker) ClearBlock(ctx context.Context, blockNum uint64, network, processor, mode string) error
- func (t *BlockCompletionTracker) ClearStaleBlocks(ctx context.Context, network, processor, mode string) (int, error)
- func (t *BlockCompletionTracker) GetBlockStatus(ctx context.Context, blockNum uint64, network, processor, mode string) (completed int64, expected int64, enqueuedAt time.Time, err error)
- func (t *BlockCompletionTracker) GetStaleBlocks(ctx context.Context, network, processor, mode string) ([]uint64, error)
- func (t *BlockCompletionTracker) HasBlockTracking(ctx context.Context, blockNum uint64, network, processor, mode string) (bool, error)
- func (t *BlockCompletionTracker) MarkBlockComplete(ctx context.Context, blockNum uint64, network, processor, mode string) error
- func (t *BlockCompletionTracker) RegisterBlock(ctx context.Context, blockNum uint64, expectedCount int, ...) error
- func (t *BlockCompletionTracker) TrackTaskCompletion(ctx context.Context, taskID string, blockNum uint64, ...) (bool, error)
- type BlockCompletionTrackerConfig
- type BlockProcessor
- type GapResult
- type GapStateProvider
- type Limiter
- func (l *Limiter) GetAvailableCapacity(ctx context.Context, nextBlock uint64, mode string) (int, error)
- func (l *Limiter) GetGaps(ctx context.Context, currentBlock uint64, lookbackRange uint64, limit int) (*GapResult, error)
- func (l *Limiter) IsBlockedByIncompleteBlocks(ctx context.Context, nextBlock uint64, mode string) (bool, *uint64, error)
- func (l *Limiter) ValidateBatchWithinLeash(ctx context.Context, startBlock uint64, count int, mode string) error
- type LimiterConfig
- type LimiterDeps
- type Processor
- type QueueInfo
- type StateProvider
Constants ¶
const ( // DefaultBlockMetaTTL is the default TTL for block tracking keys. DefaultBlockMetaTTL = 30 * time.Minute // DefaultStaleThreshold is the default time after which a block is considered stale. DefaultStaleThreshold = 5 * time.Minute )
Default configuration values for BlockCompletionTracker.
const ( // DefaultMaxPendingBlockRange is the maximum distance between the oldest // incomplete block and the current block before blocking new block processing. DefaultMaxPendingBlockRange = 2 // DefaultClickHouseTimeout is the default timeout for ClickHouse operations. DefaultClickHouseTimeout = 30 * time.Second // DefaultTraceTimeout is the default timeout for trace fetching operations. DefaultTraceTimeout = 30 * time.Second )
Default configuration values for processors. These provide a single source of truth for default configuration.
const ( BACKWARDS_MODE = "backwards" FORWARDS_MODE = "forwards" )
Variables ¶
This section is empty.
Functions ¶
func GenerateTaskID ¶
GenerateTaskID creates a deterministic task ID for deduplication. Format: {processor}:{network}:{blockNum}:{identifier} For transaction-based processors: identifier = txHash. For block-based processors: identifier = "block".
func IsBlockNotFoundError ¶
IsBlockNotFoundError checks if an error indicates a block was not found. Uses errors.Is for sentinel errors, with fallback to string matching for wrapped errors.
func PrefixedProcessBackwardsQueue ¶
PrefixedProcessBackwardsQueue returns the backwards process queue name with prefix.
func PrefixedProcessForwardsQueue ¶
PrefixedProcessForwardsQueue returns the forwards process queue name with prefix.
func PrefixedProcessReprocessBackwardsQueue ¶
PrefixedProcessReprocessBackwardsQueue returns the reprocess backwards queue name with prefix.
func PrefixedProcessReprocessForwardsQueue ¶
PrefixedProcessReprocessForwardsQueue returns the reprocess forwards queue name with prefix.
func ProcessBackwardsQueue ¶
ProcessBackwardsQueue returns the backwards process queue name for a processor.
func ProcessForwardsQueue ¶
ProcessForwardsQueue returns the forwards process queue name for a processor.
func ProcessQueue ¶
ProcessQueue returns the process queue name for a processor (deprecated - use mode-specific queues).
func ProcessReprocessBackwardsQueue ¶
ProcessReprocessBackwardsQueue returns the reprocess backwards queue name for a processor.
func ProcessReprocessForwardsQueue ¶
ProcessReprocessForwardsQueue returns the reprocess forwards queue name for a processor.
Types ¶
type BlockCompletionTracker ¶
type BlockCompletionTracker struct {
// contains filtered or unexported fields
}
BlockCompletionTracker tracks block completion using Redis SETs for task deduplication. This replaces the counter-based PendingTracker with a SET-based approach that: - Uses asynq.TaskID() for deterministic task deduplication - Tracks completed taskIDs in a Redis SET (idempotent SADD) - Stores expected count and enqueued_at metadata - Supports stale block detection for auto-retry.
func NewBlockCompletionTracker ¶
func NewBlockCompletionTracker( redisClient *redis.Client, prefix string, log logrus.FieldLogger, stateProvider StateProvider, config BlockCompletionTrackerConfig, ) *BlockCompletionTracker
NewBlockCompletionTracker creates a new BlockCompletionTracker.
func (*BlockCompletionTracker) ClearBlock ¶
func (t *BlockCompletionTracker) ClearBlock( ctx context.Context, blockNum uint64, network, processor, mode string, ) error
ClearBlock removes all tracking data for a block (used when retrying).
func (*BlockCompletionTracker) ClearStaleBlocks ¶
func (t *BlockCompletionTracker) ClearStaleBlocks( ctx context.Context, network, processor, mode string, ) (int, error)
ClearStaleBlocks removes all stale blocks and their associated tracking data. Uses ZRANGEBYSCORE to identify stale blocks and a pipeline to efficiently delete all related keys. Returns the number of blocks cleared.
func (*BlockCompletionTracker) GetBlockStatus ¶
func (t *BlockCompletionTracker) GetBlockStatus( ctx context.Context, blockNum uint64, network, processor, mode string, ) (completed int64, expected int64, enqueuedAt time.Time, err error)
GetBlockStatus returns the completion status of a block.
func (*BlockCompletionTracker) GetStaleBlocks ¶
func (t *BlockCompletionTracker) GetStaleBlocks( ctx context.Context, network, processor, mode string, ) ([]uint64, error)
GetStaleBlocks returns blocks that have been processing longer than the stale threshold. Uses ZRANGEBYSCORE on the pending blocks sorted set for O(log N + M) complexity.
func (*BlockCompletionTracker) HasBlockTracking ¶
func (t *BlockCompletionTracker) HasBlockTracking( ctx context.Context, blockNum uint64, network, processor, mode string, ) (bool, error)
HasBlockTracking checks if a block has Redis tracking data. Returns true if block_meta key exists (block is being tracked). Used to detect orphaned blocks that are in ClickHouse (complete=0) but have no Redis tracking.
func (*BlockCompletionTracker) MarkBlockComplete ¶
func (t *BlockCompletionTracker) MarkBlockComplete( ctx context.Context, blockNum uint64, network, processor, mode string, ) error
MarkBlockComplete writes to ClickHouse and cleans up Redis.
func (*BlockCompletionTracker) RegisterBlock ¶
func (t *BlockCompletionTracker) RegisterBlock( ctx context.Context, blockNum uint64, expectedCount int, network, processor, mode, queue string, ) error
RegisterBlock initializes tracking for a new block. Clears any existing completion data (safe for retries). This should be called AFTER MarkBlockEnqueued to ensure ClickHouse has the record.
func (*BlockCompletionTracker) TrackTaskCompletion ¶
func (t *BlockCompletionTracker) TrackTaskCompletion( ctx context.Context, taskID string, blockNum uint64, network, processor, mode string, ) (bool, error)
TrackTaskCompletion records a task completion and checks if block is done. Returns true if all tasks are now complete. Uses a Lua script for atomic completion tracking in a single round trip.
type BlockCompletionTrackerConfig ¶
type BlockCompletionTrackerConfig struct {
// StaleThreshold is the time after which a block is considered stale.
StaleThreshold time.Duration
// AutoRetryStale enables automatic retry of stale blocks.
AutoRetryStale bool
}
BlockCompletionTrackerConfig holds configuration for the BlockCompletionTracker.
type BlockProcessor ¶
type BlockProcessor interface {
Processor
// ProcessNextBlock discovers and enqueues tasks for the next block.
// Returns an error if no block is available or processing fails.
ProcessNextBlock(ctx context.Context) error
// GetQueues returns the Asynq queues used by this processor.
// Queues have priorities to control processing order.
GetQueues() []QueueInfo
// GetHandlers returns task handlers for Asynq worker registration.
// These handlers process the distributed tasks.
GetHandlers() map[string]asynq.HandlerFunc
// EnqueueTask adds a task to the distributed queue.
// Uses infinite retries to ensure eventual processing.
EnqueueTask(ctx context.Context, task *asynq.Task, opts ...asynq.Option) error
// SetProcessingMode configures forwards or backwards processing.
SetProcessingMode(mode string)
// ReprocessBlock re-enqueues tasks for an orphaned block.
// Used when a block is in ClickHouse (complete=0) but has no Redis tracking.
// This can happen due to Redis TTL expiry, Redis restart, or crashes.
ReprocessBlock(ctx context.Context, blockNum uint64) error
// ProcessBlock processes a single block - fetches, marks enqueued, and enqueues tasks.
// This is used for gap filling of missing blocks (blocks with no row in DB).
ProcessBlock(ctx context.Context, block execution.Block) error
// GetCompletionTracker returns the block completion tracker for checking tracking status.
GetCompletionTracker() *BlockCompletionTracker
}
BlockProcessor extends Processor with block-level processing capabilities. It coordinates the full pipeline from block discovery through task completion.
Pipeline stages managed by BlockProcessor:
- Block Discovery: ProcessNextBlock identifies the next block to process
- Task Creation: Creates distributed tasks for each unit of work
- Queue Management: Manages Asynq queues for forwards/backwards processing
- Task Handling: Worker handlers process individual tasks
- Completion Tracking: Marks blocks complete when all tasks finish
type GapResult ¶
type GapResult struct {
Incomplete []uint64 // Blocks with row but complete=0
Missing []uint64 // Blocks with no row at all
ScanDuration time.Duration // Time taken to perform the scan
}
GapResult contains the results of a gap scan.
type GapStateProvider ¶
type GapStateProvider interface {
StateProvider
GetIncompleteBlocksInRange(ctx context.Context, network, processor string, minBlock, maxBlock uint64, limit int) ([]uint64, error)
GetMissingBlocksInRange(ctx context.Context, network, processor string, minBlock, maxBlock uint64, limit int) ([]uint64, error)
GetMinMaxStoredBlocks(ctx context.Context, network, processor string) (*big.Int, *big.Int, error)
}
GapStateProvider extends StateProvider with gap detection capabilities.
type Limiter ¶
type Limiter struct {
// contains filtered or unexported fields
}
Limiter provides shared blocking and completion functionality for processors.
func NewLimiter ¶
func NewLimiter(deps *LimiterDeps, config LimiterConfig) *Limiter
NewLimiter creates a new Limiter.
func (*Limiter) GetAvailableCapacity ¶
func (l *Limiter) GetAvailableCapacity(ctx context.Context, nextBlock uint64, mode string) (int, error)
GetAvailableCapacity returns how many more blocks can be enqueued before hitting the maxPendingBlockRange limit. Returns 0 if at or over capacity.
func (*Limiter) GetGaps ¶
func (l *Limiter) GetGaps(ctx context.Context, currentBlock uint64, lookbackRange uint64, limit int) (*GapResult, error)
GetGaps returns both incomplete and missing blocks outside the maxPendingBlockRange window. If lookbackRange is 0, scans from the oldest stored block. This performs a full-range scan for gap detection, excluding the recent window that is already handled by IsBlockedByIncompleteBlocks. Returns a GapResult containing:
- Incomplete: blocks with a row in DB but complete=0
- Missing: blocks with no row in DB at all
func (*Limiter) IsBlockedByIncompleteBlocks ¶
func (l *Limiter) IsBlockedByIncompleteBlocks( ctx context.Context, nextBlock uint64, mode string, ) (bool, *uint64, error)
IsBlockedByIncompleteBlocks checks if processing should be blocked based on distance from the oldest/newest incomplete block (depending on processing mode). Returns: blocked status, blocking block number (if blocked), error. The blocking block number can be used to check if the block is orphaned (no Redis tracking).
func (*Limiter) ValidateBatchWithinLeash ¶
func (l *Limiter) ValidateBatchWithinLeash(ctx context.Context, startBlock uint64, count int, mode string) error
ValidateBatchWithinLeash ensures a batch of blocks won't exceed the maxPendingBlockRange. Returns an error if the batch would violate the constraint.
type LimiterConfig ¶
type LimiterConfig struct {
MaxPendingBlockRange int
}
LimiterConfig holds configuration for the Limiter.
type LimiterDeps ¶
type LimiterDeps struct {
Log logrus.FieldLogger
StateProvider StateProvider
Network string
Processor string
}
LimiterDeps holds dependencies for the Limiter.
type Processor ¶
type Processor interface {
// Start initializes the processor and its dependencies (e.g., ClickHouse).
Start(ctx context.Context) error
// Stop gracefully shuts down the processor.
Stop(ctx context.Context) error
// Name returns the unique identifier for this processor.
Name() string
}
Processor defines the base interface for all processors. Processors are responsible for transforming blockchain data into a format suitable for storage and analysis.
type StateProvider ¶
type StateProvider interface {
GetOldestIncompleteBlock(ctx context.Context, network, processor string, minBlockNumber uint64) (*uint64, error)
GetNewestIncompleteBlock(ctx context.Context, network, processor string, maxBlockNumber uint64) (*uint64, error)
MarkBlockComplete(ctx context.Context, blockNumber uint64, network, processor string) error
}
StateProvider defines the state manager methods needed by Limiter.