structlog_agg

package
v0.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2026 License: GPL-3.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultBufferMaxRows       = 100000
	DefaultBufferFlushInterval = time.Second
)

Default buffer configuration values.

View Source
const (
	ProcessorName            = "transaction_structlog_agg"
	ProcessForwardsTaskType  = "transaction_structlog_agg_process_forwards"
	ProcessBackwardsTaskType = "transaction_structlog_agg_process_backwards"
)

Variables

View Source
var ErrTaskIDConflict = asynq.ErrTaskIDConflict

ErrTaskIDConflict is returned when a task with the same ID already exists.

Functions

func GenerateTaskID

func GenerateTaskID(network string, blockNumber uint64, txHash string) string

GenerateTaskID creates a deterministic task ID for deduplication. Format: {processor}:{network}:{blockNum}:{txHash}.

func NewProcessBackwardsTask

func NewProcessBackwardsTask(payload *ProcessPayload) (*asynq.Task, string, error)

NewProcessBackwardsTask creates a new backwards process task. Returns the task, taskID for deduplication, and any error.

func NewProcessForwardsTask

func NewProcessForwardsTask(payload *ProcessPayload) (*asynq.Task, string, error)

NewProcessForwardsTask creates a new forwards process task. Returns the task, taskID for deduplication, and any error.

Types

type CallFrameRow

type CallFrameRow struct {
	CallFrameID       uint32
	ParentCallFrameID *uint32  // nil for root frame
	CallFramePath     []uint32 // Path from root to this frame
	Depth             uint32
	TargetAddress     *string
	CallType          string // CALL/DELEGATECALL/STATICCALL/CALLCODE/CREATE/CREATE2 (empty for root)
	Operation         string // Empty for summary row, opcode name for per-opcode rows
	OpcodeCount       uint64
	ErrorCount        uint64
	Gas               uint64  // SUM(gas_self) - excludes child frame gas
	GasCumulative     uint64  // For summary: frame gas_cumulative; for per-opcode: SUM(gas_used)
	MinDepth          uint32  // Per-opcode: MIN(depth); summary: same as Depth
	MaxDepth          uint32  // Per-opcode: MAX(depth); summary: same as Depth
	GasRefund         uint64  // Max refund from trace (0 for failed txs, per-opcode rows, and non-root frames)
	IntrinsicGas      *uint64 // Root frame only (computed)

	// Resource gas building blocks.
	MemWordsSumBefore   uint64
	MemWordsSumAfter    uint64
	MemWordsSqSumBefore uint64
	MemWordsSqSumAfter  uint64
	MemExpansionGas     uint64 // SUM(memory_expansion_gas) — exact per-opcode memory expansion cost
	ColdAccessCount     uint64
}

CallFrameRow represents aggregated data for a single call frame or per-opcode aggregation. This is the output format that gets inserted into ClickHouse. Two types of rows:

  • Summary row: Operation="" contains frame-level metadata (call_type, target_address, etc.)
  • Per-opcode row: Operation="SSTORE" etc. contains gas/count for that specific opcode

type ClickHouseTime

type ClickHouseTime time.Time

ClickHouseTime wraps time.Time for ClickHouse DateTime formatting.

func NewClickHouseTime

func NewClickHouseTime(t time.Time) ClickHouseTime

NewClickHouseTime creates a new ClickHouseTime from time.Time.

func (ClickHouseTime) Time

func (t ClickHouseTime) Time() time.Time

Time returns the underlying time.Time.

type Columns

type Columns struct {
	UpdatedDateTime     proto.ColDateTime
	BlockNumber         proto.ColUInt64
	TransactionHash     proto.ColStr
	TransactionIndex    proto.ColUInt32
	CallFrameID         proto.ColUInt32
	ParentCallFrameID   *proto.ColNullable[uint32]
	CallFramePath       *proto.ColArr[uint32] // Path from root to this frame
	Depth               proto.ColUInt32
	TargetAddress       *proto.ColNullable[string]
	CallType            proto.ColStr
	Operation           proto.ColStr // Empty string for summary row, opcode name for per-opcode rows
	OpcodeCount         proto.ColUInt64
	ErrorCount          proto.ColUInt64
	Gas                 proto.ColUInt64 // SUM(gas_self) - excludes child frame gas
	GasCumulative       proto.ColUInt64 // For summary: frame gas_cumulative; for per-opcode: SUM(gas_used)
	MinDepth            proto.ColUInt32 // Per-opcode: MIN(depth); summary: same as Depth
	MaxDepth            proto.ColUInt32 // Per-opcode: MAX(depth); summary: same as Depth
	GasRefund           *proto.ColNullable[uint64]
	IntrinsicGas        *proto.ColNullable[uint64]
	MemWordsSumBefore   proto.ColUInt64
	MemWordsSumAfter    proto.ColUInt64
	MemWordsSqSumBefore proto.ColUInt64
	MemWordsSqSumAfter  proto.ColUInt64
	MemExpansionGas     proto.ColUInt64
	ColdAccessCount     proto.ColUInt64
	MetaNetworkName     proto.ColStr
}

Columns holds all columns for structlog_agg batch insert using ch-go columnar protocol.

func NewColumns

func NewColumns() *Columns

NewColumns creates a new Columns instance with all columns initialized.

func (*Columns) Append

func (c *Columns) Append(
	updatedDateTime time.Time,
	blockNumber uint64,
	txHash string,
	txIndex uint32,
	callFrameID uint32,
	parentCallFrameID *uint32,
	callFramePath []uint32,
	depth uint32,
	targetAddress *string,
	callType string,
	operation string,
	opcodeCount uint64,
	errorCount uint64,
	gas uint64,
	gasCumulative uint64,
	minDepth uint32,
	maxDepth uint32,
	gasRefund uint64,
	intrinsicGas *uint64,
	memWordsSumBefore uint64,
	memWordsSumAfter uint64,
	memWordsSqSumBefore uint64,
	memWordsSqSumAfter uint64,
	memExpansionGas uint64,
	coldAccessCount uint64,
	network string,
)

Append adds a row to all columns.

func (*Columns) Input

func (c *Columns) Input() proto.Input

Input returns the proto.Input for inserting data.

func (*Columns) Reset

func (c *Columns) Reset()

Reset clears all columns for reuse.

func (*Columns) Rows

func (c *Columns) Rows() int

Rows returns the number of rows in the columns.

type Config

type Config struct {
	clickhouse.Config `yaml:",inline"`
	Enabled           bool   `yaml:"enabled"`
	Table             string `yaml:"table"`

	// Row buffer settings for batched ClickHouse inserts
	BufferMaxRows       int           `yaml:"bufferMaxRows"`       // Max rows before flush. Default: 100000
	BufferFlushInterval time.Duration `yaml:"bufferFlushInterval"` // Max time before flush. Default: 1s

	// Block completion tracking
	MaxPendingBlockRange int `yaml:"maxPendingBlockRange"` // Max distance between oldest incomplete and current block. Default: 2
}

Config holds configuration for transaction structlog_agg processor.

func (*Config) Validate

func (c *Config) Validate() error

Validate validates the configuration.

type Dependencies

type Dependencies struct {
	Log            logrus.FieldLogger
	Pool           *ethereum.Pool
	Network        *ethereum.Network
	State          *state.Manager
	AsynqClient    *asynq.Client
	AsynqInspector *asynq.Inspector
	RedisClient    *redis.Client
	RedisPrefix    string
}

Dependencies contains the dependencies needed for the processor.

type FrameAccumulator

type FrameAccumulator struct {
	CallFrameID      uint32
	CallFramePath    []uint32 // Path from root to this frame
	FirstOpcodeIndex uint32
	FirstGas         uint64 // Gas at first opcode
	LastGas          uint64 // Gas at last opcode
	LastGasUsed      uint64 // GasUsed of last opcode
	OpcodeCount      uint64
	ErrorCount       uint64
	MaxRefund        uint64
	TargetAddress    *string
	CallType         string
	Depth            uint32

	// Per-opcode tracking
	OpcodeStats map[string]*OpcodeStats // opcode -> stats
}

FrameAccumulator tracks data for a single frame during processing.

type FrameAggregator

type FrameAggregator struct {
	// contains filtered or unexported fields
}

FrameAggregator aggregates structlog data into call frame rows.

func NewFrameAggregator

func NewFrameAggregator() *FrameAggregator

NewFrameAggregator creates a new FrameAggregator.

func (*FrameAggregator) Finalize

func (fa *FrameAggregator) Finalize(trace *execution.TraceTransaction, receiptGas uint64) []CallFrameRow

Finalize computes final call frame rows from the accumulated data. Returns the call frame rows ready for insertion. Emits two types of rows per frame:

  • Summary row: Operation="" with frame-level metadata and totals
  • Per-opcode rows: Operation="SSTORE" etc. with gas/count for that opcode

func (*FrameAggregator) ProcessStructlog

func (fa *FrameAggregator) ProcessStructlog(
	sl *execution.StructLog,
	index int,
	frameID uint32,
	framePath []uint32,
	gasUsed uint64,
	gasSelf uint64,
	callToAddr *string,
	prevStructlog *execution.StructLog,
	memWordsBefore uint32,
	memWordsAfter uint32,
	memExpansionGas uint64,
	coldAccessCount uint64,
)

ProcessStructlog processes a single structlog entry and updates frame accumulators. Parameters:

  • sl: The structlog entry
  • index: Index of this structlog in the trace
  • frameID: The call frame ID for this structlog
  • framePath: The path from root to current frame
  • gasUsed: Pre-computed gas used for this opcode (includes child frame gas for CALL/CREATE)
  • gasSelf: Pre-computed gas used excluding child frame gas
  • callToAddr: Target address for CALL/CREATE opcodes (nil otherwise)
  • prevStructlog: Previous structlog (for detecting frame entry via CALL/CREATE)
  • memWordsBefore: Memory words before this opcode (0 if unavailable)
  • memWordsAfter: Memory words after this opcode (0 if unavailable)
  • memExpansionGas: Memory expansion gas for this opcode (exact per-opcode cost)
  • coldAccessCount: Number of cold accesses for this opcode (0, 1, or 2)

func (*FrameAggregator) Reset

func (fa *FrameAggregator) Reset()

Reset clears the aggregator for reuse.

func (*FrameAggregator) SetRootTargetAddress

func (fa *FrameAggregator) SetRootTargetAddress(addr *string)

SetRootTargetAddress sets the target address for the root frame (frame ID 0). This should be called after processing all structlogs, as the root frame's target address comes from the transaction's to_address, not from an initiating CALL.

type OpcodeStats

type OpcodeStats struct {
	Count         uint64
	Gas           uint64 // SUM(gas_self) - excludes child frame gas
	GasCumulative uint64 // SUM(gas_used) - includes child frame gas for CALL/CREATE
	ErrorCount    uint64
	MinDepth      uint32
	MaxDepth      uint32

	// Resource gas building blocks for decomposing gas into categories.
	MemWordsSumBefore   uint64 // SUM(memory_words_before)
	MemWordsSumAfter    uint64 // SUM(memory_words_after)
	MemWordsSqSumBefore uint64 // SUM(memory_words_before²)
	MemWordsSqSumAfter  uint64 // SUM(memory_words_after²)
	MemExpansionGas     uint64 // SUM(memory_expansion_gas) per opcode
	ColdCount           uint64 // Number of cold accesses
}

OpcodeStats tracks gas and count for a specific opcode within a frame.

type ProcessPayload

type ProcessPayload struct {
	BlockNumber      big.Int `json:"block_number"`
	TransactionHash  string  `json:"transaction_hash"`
	TransactionIndex uint32  `json:"transaction_index"`
	NetworkName      string  `json:"network_name"`
	Network          string  `json:"network"`         // Alias for NetworkName
	ProcessingMode   string  `json:"processing_mode"` // "forwards" or "backwards"
}

ProcessPayload represents the payload for processing a transaction.

func (*ProcessPayload) MarshalBinary

func (p *ProcessPayload) MarshalBinary() ([]byte, error)

MarshalBinary implements encoding.BinaryMarshaler.

func (*ProcessPayload) UnmarshalBinary

func (p *ProcessPayload) UnmarshalBinary(data []byte) error

UnmarshalBinary implements encoding.BinaryUnmarshaler.

type Processor

type Processor struct {

	// Embedded limiter for shared blocking/completion logic
	*tracker.Limiter
	// contains filtered or unexported fields
}

Processor handles transaction structlog_agg processing.

func New

func New(deps *Dependencies, config *Config) (*Processor, error)

New creates a new transaction structlog_agg processor.

func (*Processor) EnqueueTask

func (p *Processor) EnqueueTask(ctx context.Context, task *asynq.Task, opts ...asynq.Option) error

EnqueueTask enqueues a task to the specified queue with infinite retries.

func (*Processor) EnqueueTransactionTasks

func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block execution.Block) (int, error)

EnqueueTransactionTasks enqueues transaction processing tasks for a given block. Uses TaskID for deduplication - tasks with conflicting IDs are skipped but still count.

func (*Processor) GetCompletionTracker

func (p *Processor) GetCompletionTracker() *tracker.BlockCompletionTracker

GetCompletionTracker returns the block completion tracker.

func (*Processor) GetHandlers

func (p *Processor) GetHandlers() map[string]asynq.HandlerFunc

GetHandlers returns the task handlers for this processor.

func (*Processor) GetQueues

func (p *Processor) GetQueues() []tracker.QueueInfo

GetQueues returns the queues used by this processor.

func (*Processor) Name

func (p *Processor) Name() string

Name returns the processor name.

func (*Processor) ProcessBlock

func (p *Processor) ProcessBlock(ctx context.Context, block execution.Block) error

ProcessBlock processes a single block - fetches, marks enqueued, and enqueues tasks. This is used for both normal processing and gap filling of missing blocks.

func (*Processor) ProcessNextBlock

func (p *Processor) ProcessNextBlock(ctx context.Context) error

ProcessNextBlock processes the next available block(s). In zero-interval mode, this attempts to fetch and process multiple blocks up to the available capacity for improved throughput.

func (*Processor) ProcessTransaction

func (p *Processor) ProcessTransaction(ctx context.Context, block execution.Block, index int, tx execution.Transaction) (int, error)

ProcessTransaction processes a transaction and inserts aggregated call frame data to ClickHouse.

func (*Processor) ReprocessBlock

func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error

ReprocessBlock re-enqueues tasks for an orphaned block. Used when a block is in ClickHouse (complete=0) but has no Redis tracking. TaskID deduplication ensures no duplicate tasks are created.

func (*Processor) SetProcessingMode

func (p *Processor) SetProcessingMode(mode string)

SetProcessingMode sets the processing mode for the processor.

func (*Processor) Start

func (p *Processor) Start(ctx context.Context) error

Start starts the processor.

func (*Processor) Stop

func (p *Processor) Stop(ctx context.Context) error

Stop stops the processor.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL