processor

package
v0.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2026 License: GPL-3.0 Imports: 18 Imported by: 0

Documentation

Overview

Package processor coordinates block processing across multiple processor types.

Architecture Overview

The Manager is the central coordinator that:

  • Discovers new blocks from execution nodes
  • Dispatches processing tasks to registered processors
  • Manages distributed task queues via Redis/Asynq
  • Implements leader election for multi-instance deployments
  • Provides backpressure control via queue monitoring

Processing Flow

  1. Manager.Start() initializes processors and begins the processing loop
  2. processBlocks() is called on each interval (default 10s)
  3. Each processor's ProcessNextBlock() discovers and enqueues tasks
  4. Asynq workers (potentially distributed) process the tasks
  5. Completion tracking marks blocks done when all tasks finish

Leader Election

When multiple instances run, only the leader performs block discovery. All instances can process tasks as Asynq workers.

Index

Constants

View Source
const (
	// DefaultNoWorkBackoff is the backoff duration when no work is available.
	// Used in zero-interval mode to prevent CPU spin when idle.
	DefaultNoWorkBackoff = 10 * time.Millisecond

	// DefaultConcurrency is the default number of concurrent workers for task processing.
	DefaultConcurrency = 20

	// DefaultMaxProcessQueue is the default max queue size for asynq.
	DefaultMaxProcessQueue = 1000

	// DefaultBackpressureHysteresis is the default hysteresis factor for backpressure.
	// When backpressure is triggered, it won't be released until queue size drops
	// to this fraction of the threshold.
	DefaultBackpressureHysteresis = 0.8

	// DefaultLeaderTTL is the default TTL for leader election locks.
	DefaultLeaderTTL = 10 * time.Second

	// DefaultLeaderRenewalInterval is the default renewal interval for leader election.
	DefaultLeaderRenewalInterval = 3 * time.Second

	// DefaultStaleBlockCheckInterval is the default interval for checking stale blocks.
	DefaultStaleBlockCheckInterval = 1 * time.Minute

	// DefaultBackpressureBackoffMin is the minimum backoff duration when backpressure is detected.
	DefaultBackpressureBackoffMin = 10 * time.Millisecond

	// DefaultBackpressureBackoffMax is the maximum backoff duration when backpressure persists.
	DefaultBackpressureBackoffMax = 1 * time.Second

	// DefaultBackpressureJitterFraction is the fraction of backoff to add as random jitter (0.25 = 25%).
	DefaultBackpressureJitterFraction = 0.25

	// DefaultGapScanInterval is the default interval for scanning gaps.
	DefaultGapScanInterval = 5 * time.Minute

	// DefaultGapBatchSize is the default max gaps to process per scan.
	DefaultGapBatchSize = 10

	// DefaultGapLookbackRange is the default max blocks to look back for gaps.
	DefaultGapLookbackRange = uint64(10000)
)

Default configuration values for the processor manager. Processor-specific defaults are in the tracker package.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// Processing interval
	Interval time.Duration `yaml:"interval"`

	// Processing mode: forwards, backwards
	Mode string `yaml:"mode"`

	// Maximum concurrent transactions to process
	Concurrency int `yaml:"concurrency"`

	// Leader election configuration
	LeaderElection LeaderElectionConfig `yaml:"leaderElection"`

	// Queue control configuration
	MaxProcessQueueSize    int     `yaml:"maxProcessQueueSize"`
	BackpressureHysteresis float64 `yaml:"backpressureHysteresis"`

	// Stale block detection configuration
	StaleBlockDetection StaleBlockDetectionConfig `yaml:"staleBlockDetection"`

	// Gap detection configuration
	GapDetection GapDetectionConfig `yaml:"gapDetection"`

	// Processor configurations
	TransactionStructlog    structlog.Config     `yaml:"transactionStructlog"`
	TransactionSimple       simple.Config        `yaml:"transactionSimple"`
	TransactionStructlogAgg structlog_agg.Config `yaml:"transactionStructlogAgg"`
}

Config holds the unified processor configuration.

func (*Config) Validate

func (c *Config) Validate() error

type GapDetectionConfig added in v0.1.5

type GapDetectionConfig struct {
	// Enabled enables gap detection (default: false)
	Enabled bool `yaml:"enabled"`

	// ScanInterval is how often to scan for gaps (default: 5m)
	ScanInterval time.Duration `yaml:"scanInterval"`

	// BatchSize is max gaps to process per scan (default: 10)
	BatchSize int `yaml:"batchSize"`

	// LookbackRange is max blocks to look back, 0 = unlimited (default: 10000)
	LookbackRange uint64 `yaml:"lookbackRange"`
}

GapDetectionConfig holds configuration for gap detection.

type LeaderElectionConfig

type LeaderElectionConfig struct {
	// Enable leader election (default: true)
	Enabled bool `yaml:"enabled"`

	// TTL for leader lock (default: 10s)
	TTL time.Duration `yaml:"ttl"`

	// Renewal interval (default: 3s)
	RenewalInterval time.Duration `yaml:"renewalInterval"`

	// Optional node ID (auto-generated if empty)
	NodeID string `yaml:"nodeId"`
}

LeaderElectionConfig holds configuration for leader election.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager coordinates multiple processors with distributed task processing.

It manages the complete lifecycle:

  • Processor initialization and startup
  • Block discovery loop (when leader)
  • Asynq server for task processing (always)
  • Queue monitoring and backpressure
  • Graceful shutdown with goroutine cleanup

func NewManager

func NewManager(log logrus.FieldLogger, config *Config, pool *ethereum.Pool, state *s.Manager, redis *r.Client, redisPrefix string) (*Manager, error)

func (*Manager) GetQueueName added in v0.0.10

func (m *Manager) GetQueueName() string

GetQueueName returns the current queue name based on processing mode.

func (*Manager) QueueBlockManually added in v0.0.10

func (m *Manager) QueueBlockManually(ctx context.Context, processorName string, blockNumber uint64) (*QueueResult, error)

QueueBlockManually allows manual queuing of a specific block for processing.

func (*Manager) Start

func (m *Manager) Start(ctx context.Context) error

func (*Manager) Stop

func (m *Manager) Stop(ctx context.Context) error

type QueueResult added in v0.0.10

type QueueResult struct {
	TransactionCount int
	TasksCreated     int
}

QueueResult contains the result of queuing a block.

type StaleBlockDetectionConfig added in v0.1.5

type StaleBlockDetectionConfig struct {
	// Enabled enables stale block detection (default: true)
	Enabled bool `yaml:"enabled"`

	// StaleThreshold is the time after which a block is considered stale (default: 5m)
	StaleThreshold time.Duration `yaml:"staleThreshold"`

	// CheckInterval is how often to check for stale blocks (default: 1m)
	CheckInterval time.Duration `yaml:"checkInterval"`
}

StaleBlockDetectionConfig holds configuration for stale block detection.

type WorkerConfig

type WorkerConfig struct {
	Enabled     bool `yaml:"enabled"`
	Concurrency int  `yaml:"concurrency"`
}

WorkerConfig holds worker configuration.

Directories

Path Synopsis
Package tracker provides block processing coordination and tracking.
Package tracker provides block processing coordination and tracking.
transaction

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL