Documentation
¶
Overview ¶
Package processor coordinates block processing across multiple processor types.
Architecture Overview ¶
The Manager is the central coordinator that:
- Discovers new blocks from execution nodes
- Dispatches processing tasks to registered processors
- Manages distributed task queues via Redis/Asynq
- Implements leader election for multi-instance deployments
- Provides backpressure control via queue monitoring
Processing Flow ¶
- Manager.Start() initializes processors and begins the processing loop
- processBlocks() is called on each interval (default 10s)
- Each processor's ProcessNextBlock() discovers and enqueues tasks
- Asynq workers (potentially distributed) process the tasks
- Completion tracking marks blocks done when all tasks finish
Leader Election ¶
When multiple instances run, only the leader performs block discovery. All instances can process tasks as Asynq workers.
Index ¶
Constants ¶
const ( // DefaultNoWorkBackoff is the backoff duration when no work is available. // Used in zero-interval mode to prevent CPU spin when idle. DefaultNoWorkBackoff = 10 * time.Millisecond // DefaultConcurrency is the default number of concurrent workers for task processing. DefaultConcurrency = 20 // DefaultMaxProcessQueue is the default max queue size for asynq. DefaultMaxProcessQueue = 1000 // DefaultBackpressureHysteresis is the default hysteresis factor for backpressure. // When backpressure is triggered, it won't be released until queue size drops // to this fraction of the threshold. DefaultBackpressureHysteresis = 0.8 // DefaultLeaderTTL is the default TTL for leader election locks. DefaultLeaderTTL = 10 * time.Second // DefaultLeaderRenewalInterval is the default renewal interval for leader election. DefaultLeaderRenewalInterval = 3 * time.Second // DefaultStaleBlockCheckInterval is the default interval for checking stale blocks. DefaultStaleBlockCheckInterval = 1 * time.Minute // DefaultBackpressureBackoffMin is the minimum backoff duration when backpressure is detected. DefaultBackpressureBackoffMin = 10 * time.Millisecond // DefaultBackpressureBackoffMax is the maximum backoff duration when backpressure persists. DefaultBackpressureBackoffMax = 1 * time.Second // DefaultBackpressureJitterFraction is the fraction of backoff to add as random jitter (0.25 = 25%). DefaultBackpressureJitterFraction = 0.25 // DefaultGapScanInterval is the default interval for scanning gaps. DefaultGapScanInterval = 5 * time.Minute // DefaultGapBatchSize is the default max gaps to process per scan. DefaultGapBatchSize = 10 // DefaultGapLookbackRange is the default max blocks to look back for gaps. DefaultGapLookbackRange = uint64(10000) )
Default configuration values for the processor manager. Processor-specific defaults are in the tracker package.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// Processing interval
Interval time.Duration `yaml:"interval"`
// Processing mode: forwards, backwards
Mode string `yaml:"mode"`
// Maximum concurrent transactions to process
Concurrency int `yaml:"concurrency"`
// Leader election configuration
LeaderElection LeaderElectionConfig `yaml:"leaderElection"`
// Queue control configuration
MaxProcessQueueSize int `yaml:"maxProcessQueueSize"`
BackpressureHysteresis float64 `yaml:"backpressureHysteresis"`
// Stale block detection configuration
StaleBlockDetection StaleBlockDetectionConfig `yaml:"staleBlockDetection"`
// Gap detection configuration
GapDetection GapDetectionConfig `yaml:"gapDetection"`
// Processor configurations
TransactionStructlog structlog.Config `yaml:"transactionStructlog"`
TransactionSimple simple.Config `yaml:"transactionSimple"`
TransactionStructlogAgg structlog_agg.Config `yaml:"transactionStructlogAgg"`
}
Config holds the unified processor configuration.
type GapDetectionConfig ¶ added in v0.1.5
type GapDetectionConfig struct {
// Enabled enables gap detection (default: false)
Enabled bool `yaml:"enabled"`
// ScanInterval is how often to scan for gaps (default: 5m)
ScanInterval time.Duration `yaml:"scanInterval"`
// BatchSize is max gaps to process per scan (default: 10)
BatchSize int `yaml:"batchSize"`
// LookbackRange is max blocks to look back, 0 = unlimited (default: 10000)
LookbackRange uint64 `yaml:"lookbackRange"`
}
GapDetectionConfig holds configuration for gap detection.
type LeaderElectionConfig ¶
type LeaderElectionConfig struct {
// Enable leader election (default: true)
Enabled bool `yaml:"enabled"`
// TTL for leader lock (default: 10s)
TTL time.Duration `yaml:"ttl"`
// Renewal interval (default: 3s)
RenewalInterval time.Duration `yaml:"renewalInterval"`
// Optional node ID (auto-generated if empty)
NodeID string `yaml:"nodeId"`
}
LeaderElectionConfig holds configuration for leader election.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager coordinates multiple processors with distributed task processing.
It manages the complete lifecycle:
- Processor initialization and startup
- Block discovery loop (when leader)
- Asynq server for task processing (always)
- Queue monitoring and backpressure
- Graceful shutdown with goroutine cleanup
func NewManager ¶
func (*Manager) GetQueueName ¶ added in v0.0.10
GetQueueName returns the current queue name based on processing mode.
func (*Manager) QueueBlockManually ¶ added in v0.0.10
func (m *Manager) QueueBlockManually(ctx context.Context, processorName string, blockNumber uint64) (*QueueResult, error)
QueueBlockManually allows manual queuing of a specific block for processing.
type QueueResult ¶ added in v0.0.10
QueueResult contains the result of queuing a block.
type StaleBlockDetectionConfig ¶ added in v0.1.5
type StaleBlockDetectionConfig struct {
// Enabled enables stale block detection (default: true)
Enabled bool `yaml:"enabled"`
// StaleThreshold is the time after which a block is considered stale (default: 5m)
StaleThreshold time.Duration `yaml:"staleThreshold"`
// CheckInterval is how often to check for stale blocks (default: 1m)
CheckInterval time.Duration `yaml:"checkInterval"`
}
StaleBlockDetectionConfig holds configuration for stale block detection.
type WorkerConfig ¶
WorkerConfig holds worker configuration.
Directories
¶
| Path | Synopsis |
|---|---|
|
Package tracker provides block processing coordination and tracking.
|
Package tracker provides block processing coordination and tracking. |
|
transaction
|
|