Documentation
¶
Index ¶
- Constants
- Variables
- func GenerateTaskID(network string, blockNumber uint64, txHash string) string
- func NewProcessBackwardsTask(payload *ProcessPayload) (*asynq.Task, string, error)
- func NewProcessForwardsTask(payload *ProcessPayload) (*asynq.Task, string, error)
- type CallFrameRow
- type ClickHouseTime
- type Columns
- type Config
- type Dependencies
- type FrameAccumulator
- type FrameAggregator
- func (fa *FrameAggregator) Finalize(trace *execution.TraceTransaction, receiptGas uint64) []CallFrameRow
- func (fa *FrameAggregator) ProcessStructlog(sl *execution.StructLog, index int, frameID uint32, framePath []uint32, ...)
- func (fa *FrameAggregator) Reset()
- func (fa *FrameAggregator) SetRootTargetAddress(addr *string)
- type OpcodeStats
- type ProcessPayload
- type Processor
- func (p *Processor) EnqueueTask(ctx context.Context, task *asynq.Task, opts ...asynq.Option) error
- func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block execution.Block) (int, error)
- func (p *Processor) GetCompletionTracker() *tracker.BlockCompletionTracker
- func (p *Processor) GetHandlers() map[string]asynq.HandlerFunc
- func (p *Processor) GetQueues() []tracker.QueueInfo
- func (p *Processor) Name() string
- func (p *Processor) ProcessBlock(ctx context.Context, block execution.Block) error
- func (p *Processor) ProcessNextBlock(ctx context.Context) error
- func (p *Processor) ProcessTransaction(ctx context.Context, block execution.Block, index int, ...) (int, error)
- func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error
- func (p *Processor) SetProcessingMode(mode string)
- func (p *Processor) Start(ctx context.Context) error
- func (p *Processor) Stop(ctx context.Context) error
Constants ¶
const ( DefaultBufferMaxRows = 100000 DefaultBufferFlushInterval = time.Second )
Default buffer configuration values.
const ( ProcessorName = "transaction_structlog_agg" ProcessForwardsTaskType = "transaction_structlog_agg_process_forwards" ProcessBackwardsTaskType = "transaction_structlog_agg_process_backwards" )
Variables ¶
var ErrTaskIDConflict = asynq.ErrTaskIDConflict
ErrTaskIDConflict is returned when a task with the same ID already exists.
Functions ¶
func GenerateTaskID ¶
GenerateTaskID creates a deterministic task ID for deduplication. Format: {processor}:{network}:{blockNum}:{txHash}.
func NewProcessBackwardsTask ¶
func NewProcessBackwardsTask(payload *ProcessPayload) (*asynq.Task, string, error)
NewProcessBackwardsTask creates a new backwards process task. Returns the task, taskID for deduplication, and any error.
func NewProcessForwardsTask ¶
func NewProcessForwardsTask(payload *ProcessPayload) (*asynq.Task, string, error)
NewProcessForwardsTask creates a new forwards process task. Returns the task, taskID for deduplication, and any error.
Types ¶
type CallFrameRow ¶
type CallFrameRow struct {
CallFrameID uint32
ParentCallFrameID *uint32 // nil for root frame
CallFramePath []uint32 // Path from root to this frame
Depth uint32
TargetAddress *string
CallType string // CALL/DELEGATECALL/STATICCALL/CALLCODE/CREATE/CREATE2 (empty for root)
Operation string // Empty for summary row, opcode name for per-opcode rows
OpcodeCount uint64
ErrorCount uint64
Gas uint64 // SUM(gas_self) - excludes child frame gas
GasCumulative uint64 // For summary: frame gas_cumulative; for per-opcode: SUM(gas_used)
MinDepth uint32 // Per-opcode: MIN(depth); summary: same as Depth
MaxDepth uint32 // Per-opcode: MAX(depth); summary: same as Depth
GasRefund uint64 // Max refund from trace (0 for failed txs, per-opcode rows, and non-root frames)
IntrinsicGas *uint64 // Root frame only (computed)
// Resource gas building blocks.
MemWordsSumBefore uint64
MemWordsSumAfter uint64
MemWordsSqSumBefore uint64
MemWordsSqSumAfter uint64
MemExpansionGas uint64 // SUM(memory_expansion_gas) — exact per-opcode memory expansion cost
ColdAccessCount uint64
}
CallFrameRow represents aggregated data for a single call frame or per-opcode aggregation. This is the output format that gets inserted into ClickHouse. Two types of rows:
- Summary row: Operation="" contains frame-level metadata (call_type, target_address, etc.)
- Per-opcode row: Operation="SSTORE" etc. contains gas/count for that specific opcode
type ClickHouseTime ¶
ClickHouseTime wraps time.Time for ClickHouse DateTime formatting.
func NewClickHouseTime ¶
func NewClickHouseTime(t time.Time) ClickHouseTime
NewClickHouseTime creates a new ClickHouseTime from time.Time.
func (ClickHouseTime) Time ¶
func (t ClickHouseTime) Time() time.Time
Time returns the underlying time.Time.
type Columns ¶
type Columns struct {
UpdatedDateTime proto.ColDateTime
BlockNumber proto.ColUInt64
TransactionHash proto.ColStr
TransactionIndex proto.ColUInt32
CallFrameID proto.ColUInt32
ParentCallFrameID *proto.ColNullable[uint32]
CallFramePath *proto.ColArr[uint32] // Path from root to this frame
Depth proto.ColUInt32
TargetAddress *proto.ColNullable[string]
CallType proto.ColStr
Operation proto.ColStr // Empty string for summary row, opcode name for per-opcode rows
OpcodeCount proto.ColUInt64
ErrorCount proto.ColUInt64
Gas proto.ColUInt64 // SUM(gas_self) - excludes child frame gas
GasCumulative proto.ColUInt64 // For summary: frame gas_cumulative; for per-opcode: SUM(gas_used)
MinDepth proto.ColUInt32 // Per-opcode: MIN(depth); summary: same as Depth
MaxDepth proto.ColUInt32 // Per-opcode: MAX(depth); summary: same as Depth
GasRefund *proto.ColNullable[uint64]
IntrinsicGas *proto.ColNullable[uint64]
MemWordsSumBefore proto.ColUInt64
MemWordsSumAfter proto.ColUInt64
MemWordsSqSumBefore proto.ColUInt64
MemWordsSqSumAfter proto.ColUInt64
MemExpansionGas proto.ColUInt64
ColdAccessCount proto.ColUInt64
MetaNetworkName proto.ColStr
}
Columns holds all columns for structlog_agg batch insert using ch-go columnar protocol.
func NewColumns ¶
func NewColumns() *Columns
NewColumns creates a new Columns instance with all columns initialized.
func (*Columns) Append ¶
func (c *Columns) Append( updatedDateTime time.Time, blockNumber uint64, txHash string, txIndex uint32, callFrameID uint32, parentCallFrameID *uint32, callFramePath []uint32, depth uint32, targetAddress *string, callType string, operation string, opcodeCount uint64, errorCount uint64, gas uint64, gasCumulative uint64, minDepth uint32, maxDepth uint32, gasRefund uint64, intrinsicGas *uint64, memWordsSumBefore uint64, memWordsSumAfter uint64, memWordsSqSumBefore uint64, memWordsSqSumAfter uint64, memExpansionGas uint64, coldAccessCount uint64, network string, )
Append adds a row to all columns.
type Config ¶
type Config struct {
clickhouse.Config `yaml:",inline"`
Enabled bool `yaml:"enabled"`
Table string `yaml:"table"`
// Row buffer settings for batched ClickHouse inserts
BufferMaxRows int `yaml:"bufferMaxRows"` // Max rows before flush. Default: 100000
BufferFlushInterval time.Duration `yaml:"bufferFlushInterval"` // Max time before flush. Default: 1s
// Block completion tracking
MaxPendingBlockRange int `yaml:"maxPendingBlockRange"` // Max distance between oldest incomplete and current block. Default: 2
}
Config holds configuration for transaction structlog_agg processor.
type Dependencies ¶
type Dependencies struct {
Log logrus.FieldLogger
Pool *ethereum.Pool
Network *ethereum.Network
State *state.Manager
AsynqClient *asynq.Client
AsynqInspector *asynq.Inspector
RedisClient *redis.Client
RedisPrefix string
}
Dependencies contains the dependencies needed for the processor.
type FrameAccumulator ¶
type FrameAccumulator struct {
CallFrameID uint32
CallFramePath []uint32 // Path from root to this frame
FirstOpcodeIndex uint32
FirstGas uint64 // Gas at first opcode
LastGas uint64 // Gas at last opcode
LastGasUsed uint64 // GasUsed of last opcode
OpcodeCount uint64
ErrorCount uint64
MaxRefund uint64
TargetAddress *string
CallType string
Depth uint32
// Per-opcode tracking
OpcodeStats map[string]*OpcodeStats // opcode -> stats
}
FrameAccumulator tracks data for a single frame during processing.
type FrameAggregator ¶
type FrameAggregator struct {
// contains filtered or unexported fields
}
FrameAggregator aggregates structlog data into call frame rows.
func NewFrameAggregator ¶
func NewFrameAggregator() *FrameAggregator
NewFrameAggregator creates a new FrameAggregator.
func (*FrameAggregator) Finalize ¶
func (fa *FrameAggregator) Finalize(trace *execution.TraceTransaction, receiptGas uint64) []CallFrameRow
Finalize computes final call frame rows from the accumulated data. Returns the call frame rows ready for insertion. Emits two types of rows per frame:
- Summary row: Operation="" with frame-level metadata and totals
- Per-opcode rows: Operation="SSTORE" etc. with gas/count for that opcode
func (*FrameAggregator) ProcessStructlog ¶
func (fa *FrameAggregator) ProcessStructlog( sl *execution.StructLog, index int, frameID uint32, framePath []uint32, gasUsed uint64, gasSelf uint64, callToAddr *string, prevStructlog *execution.StructLog, memWordsBefore uint32, memWordsAfter uint32, memExpansionGas uint64, coldAccessCount uint64, )
ProcessStructlog processes a single structlog entry and updates frame accumulators. Parameters:
- sl: The structlog entry
- index: Index of this structlog in the trace
- frameID: The call frame ID for this structlog
- framePath: The path from root to current frame
- gasUsed: Pre-computed gas used for this opcode (includes child frame gas for CALL/CREATE)
- gasSelf: Pre-computed gas used excluding child frame gas
- callToAddr: Target address for CALL/CREATE opcodes (nil otherwise)
- prevStructlog: Previous structlog (for detecting frame entry via CALL/CREATE)
- memWordsBefore: Memory words before this opcode (0 if unavailable)
- memWordsAfter: Memory words after this opcode (0 if unavailable)
- memExpansionGas: Memory expansion gas for this opcode (exact per-opcode cost)
- coldAccessCount: Number of cold accesses for this opcode (0, 1, or 2)
func (*FrameAggregator) Reset ¶
func (fa *FrameAggregator) Reset()
Reset clears the aggregator for reuse.
func (*FrameAggregator) SetRootTargetAddress ¶
func (fa *FrameAggregator) SetRootTargetAddress(addr *string)
SetRootTargetAddress sets the target address for the root frame (frame ID 0). This should be called after processing all structlogs, as the root frame's target address comes from the transaction's to_address, not from an initiating CALL.
type OpcodeStats ¶
type OpcodeStats struct {
Count uint64
Gas uint64 // SUM(gas_self) - excludes child frame gas
GasCumulative uint64 // SUM(gas_used) - includes child frame gas for CALL/CREATE
ErrorCount uint64
MinDepth uint32
MaxDepth uint32
// Resource gas building blocks for decomposing gas into categories.
MemWordsSumBefore uint64 // SUM(memory_words_before)
MemWordsSumAfter uint64 // SUM(memory_words_after)
MemWordsSqSumBefore uint64 // SUM(memory_words_before²)
MemWordsSqSumAfter uint64 // SUM(memory_words_after²)
MemExpansionGas uint64 // SUM(memory_expansion_gas) per opcode
ColdCount uint64 // Number of cold accesses
}
OpcodeStats tracks gas and count for a specific opcode within a frame.
type ProcessPayload ¶
type ProcessPayload struct {
BlockNumber big.Int `json:"block_number"`
TransactionHash string `json:"transaction_hash"`
TransactionIndex uint32 `json:"transaction_index"`
NetworkName string `json:"network_name"`
Network string `json:"network"` // Alias for NetworkName
ProcessingMode string `json:"processing_mode"` // "forwards" or "backwards"
}
ProcessPayload represents the payload for processing a transaction.
func (*ProcessPayload) MarshalBinary ¶
func (p *ProcessPayload) MarshalBinary() ([]byte, error)
MarshalBinary implements encoding.BinaryMarshaler.
func (*ProcessPayload) UnmarshalBinary ¶
func (p *ProcessPayload) UnmarshalBinary(data []byte) error
UnmarshalBinary implements encoding.BinaryUnmarshaler.
type Processor ¶
type Processor struct {
// Embedded limiter for shared blocking/completion logic
*tracker.Limiter
// contains filtered or unexported fields
}
Processor handles transaction structlog_agg processing.
func New ¶
func New(deps *Dependencies, config *Config) (*Processor, error)
New creates a new transaction structlog_agg processor.
func (*Processor) EnqueueTask ¶
EnqueueTask enqueues a task to the specified queue with infinite retries.
func (*Processor) EnqueueTransactionTasks ¶
func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block execution.Block) (int, error)
EnqueueTransactionTasks enqueues transaction processing tasks for a given block. Uses TaskID for deduplication - tasks with conflicting IDs are skipped but still count.
func (*Processor) GetCompletionTracker ¶
func (p *Processor) GetCompletionTracker() *tracker.BlockCompletionTracker
GetCompletionTracker returns the block completion tracker.
func (*Processor) GetHandlers ¶
func (p *Processor) GetHandlers() map[string]asynq.HandlerFunc
GetHandlers returns the task handlers for this processor.
func (*Processor) ProcessBlock ¶
ProcessBlock processes a single block - fetches, marks enqueued, and enqueues tasks. This is used for both normal processing and gap filling of missing blocks.
func (*Processor) ProcessNextBlock ¶
ProcessNextBlock processes the next available block(s). In zero-interval mode, this attempts to fetch and process multiple blocks up to the available capacity for improved throughput.
func (*Processor) ProcessTransaction ¶
func (p *Processor) ProcessTransaction(ctx context.Context, block execution.Block, index int, tx execution.Transaction) (int, error)
ProcessTransaction processes a transaction and inserts aggregated call frame data to ClickHouse.
func (*Processor) ReprocessBlock ¶
ReprocessBlock re-enqueues tasks for an orphaned block. Used when a block is in ClickHouse (complete=0) but has no Redis tracking. TaskID deduplication ensures no duplicate tasks are created.
func (*Processor) SetProcessingMode ¶
SetProcessingMode sets the processing mode for the processor.