Documentation
¶
Index ¶
- Constants
- Variables
- func ClassifyColdAccess(structlogs []execution.StructLog, gasSelf []uint64, memExpGas []uint64) []uint64
- func ComputeCreateAddresses(structlogs []execution.StructLog) map[int]*string
- func ComputeGasSelf(structlogs []execution.StructLog, gasUsed []uint64) []uint64
- func ComputeGasUsed(structlogs []execution.StructLog) []uint64
- func ComputeMemoryWords(structlogs []execution.StructLog) (wordsBefore, wordsAfter []uint32)
- func GenerateTaskID(network string, blockNumber uint64, txHash string) string
- func IsPrecompile(addr string) bool
- func MemoryExpansionGas(wordsBefore, wordsAfter uint32) uint64
- func NewProcessBackwardsTask(payload *ProcessPayload) (*asynq.Task, string, error)
- func NewProcessForwardsTask(payload *ProcessPayload) (*asynq.Task, string, error)
- type CallFrame
- type CallTracker
- type ClickHouseTime
- type Columns
- type Config
- type Dependencies
- type ProcessPayload
- type Processor
- func (p *Processor) EnqueueTask(ctx context.Context, task *asynq.Task, opts ...asynq.Option) error
- func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block execution.Block) (int, error)
- func (p *Processor) ExtractStructlogs(ctx context.Context, block execution.Block, index int, ...) ([]Structlog, error)
- func (p *Processor) GetCompletionTracker() *tracker.BlockCompletionTracker
- func (p *Processor) GetHandlers() map[string]asynq.HandlerFunc
- func (p *Processor) GetQueues() []tracker.QueueInfo
- func (p *Processor) Name() string
- func (p *Processor) ProcessBlock(ctx context.Context, block execution.Block) error
- func (p *Processor) ProcessNextBlock(ctx context.Context) error
- func (p *Processor) ProcessSingleTransaction(ctx context.Context, block execution.Block, index int, ...) (int, error)
- func (p *Processor) ProcessTransaction(ctx context.Context, block execution.Block, index int, ...) (int, error)
- func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error
- func (p *Processor) SetProcessingMode(mode string)
- func (p *Processor) Start(ctx context.Context) error
- func (p *Processor) Stop(ctx context.Context) error
- type Structlog
Constants ¶
const ( DefaultBufferMaxRows = 100000 DefaultBufferFlushInterval = time.Second )
Default buffer configuration values.
const ( OpcodeCALL = "CALL" OpcodeCALLCODE = "CALLCODE" OpcodeDELEGATECALL = "DELEGATECALL" OpcodeSTATICCALL = "STATICCALL" OpcodeCREATE = "CREATE" OpcodeCREATE2 = "CREATE2" )
Opcode constants for call and create operations.
const ( ProcessorName = "transaction_structlog" ProcessForwardsTaskType = "transaction_structlog_process_forwards" ProcessBackwardsTaskType = "transaction_structlog_process_backwards" )
Variables ¶
var ErrTaskIDConflict = asynq.ErrTaskIDConflict
ErrTaskIDConflict is returned when a task with the same ID already exists.
Functions ¶
func ClassifyColdAccess ¶ added in v0.1.6
func ClassifyColdAccess( structlogs []execution.StructLog, gasSelf []uint64, memExpGas []uint64, ) []uint64
ClassifyColdAccess returns per-opcode cold access counts (0, 1, or 2). It uses gas values and memory expansion costs to determine whether each access-list-affected opcode performed a cold or warm access.
The gasSelf parameter should contain the gas excluding child frame gas (as computed by ComputeGasSelf). The memExpGas parameter contains the memory expansion gas for each opcode (nil if memory data is unavailable, in which case memory expansion is assumed to be 0).
Returns a slice of cold access counts corresponding to each structlog index. Returns nil if structlogs is empty.
func ComputeCreateAddresses ¶ added in v0.1.5
ComputeCreateAddresses pre-computes the created contract addresses for all CREATE/CREATE2 opcodes. It scans the trace and extracts addresses from the stack when each CREATE's constructor returns. The returned map contains opcode index -> created address (only for CREATE/CREATE2 opcodes).
func ComputeGasSelf ¶ added in v0.1.5
ComputeGasSelf calculates the gas consumed by each opcode excluding child frame gas. For CALL/CREATE opcodes, this represents only the call overhead (warm/cold access, memory expansion, value transfer), not the gas consumed by child frames. For all other opcodes, this equals gasUsed.
This is useful for gas analysis where you want to sum gas without double counting: sum(gasSelf) = total transaction execution gas (no double counting).
func ComputeGasUsed ¶ added in v0.1.3
ComputeGasUsed calculates the actual gas consumed for each structlog using the difference between consecutive gas values at the same depth level.
Returns a slice of gasUsed values corresponding to each structlog index. For opcodes that are the last in their call context (before returning to parent), the pre-calculated GasCost is returned since we cannot compute actual cost across call boundaries.
func ComputeMemoryWords ¶ added in v0.1.6
ComputeMemoryWords computes the EVM memory size in 32-byte words before and after each opcode executes. This is derived from the MemorySize field which captures the memory length at opcode entry.
For consecutive same-depth opcodes:
- wordsBefore[i] = ceil(structlogs[i].MemorySize / 32)
- wordsAfter[i] = ceil(structlogs[i+1].MemorySize / 32)
At depth transitions and for the last opcode in a frame, wordsAfter = wordsBefore (we assume no expansion since we can't observe the post-execution state), except for RETURN/REVERT which compute wordsAfter from their stack operands (offset + size) since these opcodes can expand memory.
Returns (nil, nil) if MemorySize data is unavailable (RPC mode where MemorySize=0).
func GenerateTaskID ¶ added in v0.1.5
GenerateTaskID creates a deterministic task ID for deduplication. Format: {processor}:{network}:{blockNum}:{txHash}.
func IsPrecompile ¶ added in v0.1.6
IsPrecompile returns true if the address is a known EVM precompile. Precompile calls don't appear in trace_transaction results (unlike EOA calls which do).
func MemoryExpansionGas ¶ added in v0.1.6
MemoryExpansionGas computes the gas cost of expanding EVM memory from wordsBefore to wordsAfter words. Returns 0 if no expansion occurred.
The EVM memory cost formula is: cost = 3*words + words²/512. Memory expansion gas = cost(wordsAfter) - cost(wordsBefore).
func NewProcessBackwardsTask ¶
func NewProcessBackwardsTask(payload *ProcessPayload) (*asynq.Task, string, error)
NewProcessBackwardsTask creates a new backwards process task. Returns the task, taskID for deduplication, and any error.
func NewProcessForwardsTask ¶
func NewProcessForwardsTask(payload *ProcessPayload) (*asynq.Task, string, error)
NewProcessForwardsTask creates a new forwards process task. Returns the task, taskID for deduplication, and any error.
Types ¶
type CallFrame ¶ added in v0.1.5
type CallFrame struct {
ID uint32 // Sequential frame ID within the transaction
Depth uint64 // EVM depth level
}
CallFrame represents a single call frame in the EVM execution.
type CallTracker ¶ added in v0.1.5
type CallTracker struct {
// contains filtered or unexported fields
}
CallTracker tracks call frames during EVM opcode traversal. It assigns sequential frame IDs as calls are entered and maintains the current path from root to the active frame.
func NewCallTracker ¶ added in v0.1.5
func NewCallTracker() *CallTracker
NewCallTracker creates a new CallTracker initialized with the root frame. The root frame has ID 0 and Depth 1, matching EVM structlog traces where execution starts at depth 1 (not 0).
func (*CallTracker) CurrentFrameID ¶ added in v0.1.5
func (ct *CallTracker) CurrentFrameID() uint32
CurrentFrameID returns the current frame ID without processing a depth change.
func (*CallTracker) CurrentPath ¶ added in v0.1.5
func (ct *CallTracker) CurrentPath() []uint32
CurrentPath returns a copy of the current path.
func (*CallTracker) IssueFrameID ¶ added in v0.1.5
func (ct *CallTracker) IssueFrameID() (frameID uint32, framePath []uint32)
IssueFrameID allocates the next frame ID without processing a depth change. Used for synthetic frames (e.g., EOA calls that don't increase depth). Returns the new frame ID and the path for the synthetic child frame.
func (*CallTracker) ProcessDepthChange ¶ added in v0.1.5
func (ct *CallTracker) ProcessDepthChange(newDepth uint64) (frameID uint32, framePath []uint32)
ProcessDepthChange processes a depth change and returns the current frame ID and path. Call this for each opcode with the opcode's depth value.
type ClickHouseTime ¶ added in v0.1.1
ClickHouseTime wraps time.Time for ClickHouse DateTime formatting.
func NewClickHouseTime ¶ added in v0.1.1
func NewClickHouseTime(t time.Time) ClickHouseTime
NewClickHouseTime creates a new ClickHouseTime from time.Time.
func (ClickHouseTime) Time ¶ added in v0.1.5
func (t ClickHouseTime) Time() time.Time
Time returns the underlying time.Time.
type Columns ¶ added in v0.1.5
type Columns struct {
UpdatedDateTime proto.ColDateTime
BlockNumber proto.ColUInt64
TransactionHash proto.ColStr
TransactionIndex proto.ColUInt32
TransactionGas proto.ColUInt64
TransactionFailed proto.ColBool
TransactionReturnValue *proto.ColNullable[string]
Index proto.ColUInt32
Operation proto.ColStr
Gas proto.ColUInt64
GasCost proto.ColUInt64
GasUsed proto.ColUInt64
GasSelf proto.ColUInt64
Depth proto.ColUInt64
ReturnData *proto.ColNullable[string]
Refund *proto.ColNullable[uint64]
Error *proto.ColNullable[string]
CallToAddress *proto.ColNullable[string]
CallFrameID proto.ColUInt32
CallFramePath *proto.ColArr[uint32]
MetaNetworkName proto.ColStr
}
Columns holds all columns for structlog batch insert using ch-go columnar protocol.
func NewColumns ¶ added in v0.1.5
func NewColumns() *Columns
NewColumns creates a new Columns instance with all columns initialized.
func (*Columns) Append ¶ added in v0.1.5
func (c *Columns) Append( updatedDateTime time.Time, blockNumber uint64, txHash string, txIndex uint32, txGas uint64, txFailed bool, txReturnValue *string, index uint32, op string, gas uint64, gasCost uint64, gasUsed uint64, gasSelf uint64, depth uint64, returnData *string, refund *uint64, errStr *string, callTo *string, callFrameID uint32, callFramePath []uint32, network string, )
Append adds a row to all columns.
type Config ¶
type Config struct {
clickhouse.Config `yaml:",inline"`
Enabled bool `yaml:"enabled"`
Table string `yaml:"table"`
// Row buffer settings for batched ClickHouse inserts
BufferMaxRows int `yaml:"bufferMaxRows"` // Max rows before flush. Default: 100000
BufferFlushInterval time.Duration `yaml:"bufferFlushInterval"` // Max time before flush. Default: 1s
// Block completion tracking
MaxPendingBlockRange int `yaml:"maxPendingBlockRange"` // Max distance between oldest incomplete and current block. Default: 2
}
Config holds configuration for transaction structlog processor.
type Dependencies ¶
type Dependencies struct {
Log logrus.FieldLogger
Pool *ethereum.Pool
Network *ethereum.Network
State *state.Manager
AsynqClient *asynq.Client
AsynqInspector *asynq.Inspector
RedisClient *redis.Client
RedisPrefix string
}
Dependencies contains the dependencies needed for the processor.
type ProcessPayload ¶
type ProcessPayload struct {
BlockNumber big.Int `json:"block_number"`
TransactionHash string `json:"transaction_hash"`
TransactionIndex uint32 `json:"transaction_index"`
NetworkName string `json:"network_name"`
Network string `json:"network"` // Alias for NetworkName
ProcessingMode string `json:"processing_mode"` // "forwards" or "backwards"
}
ProcessPayload represents the payload for processing a transaction.
func (*ProcessPayload) MarshalBinary ¶
func (p *ProcessPayload) MarshalBinary() ([]byte, error)
MarshalBinary implements encoding.BinaryMarshaler.
func (*ProcessPayload) UnmarshalBinary ¶
func (p *ProcessPayload) UnmarshalBinary(data []byte) error
UnmarshalBinary implements encoding.BinaryUnmarshaler.
type Processor ¶
type Processor struct {
// Embedded limiter for shared blocking/completion logic
*tracker.Limiter
// contains filtered or unexported fields
}
Processor handles transaction structlog processing.
func New ¶
func New(deps *Dependencies, config *Config) (*Processor, error)
New creates a new transaction structlog processor.
func (*Processor) EnqueueTask ¶
EnqueueTask enqueues a task to the specified queue with infinite retries.
func (*Processor) EnqueueTransactionTasks ¶ added in v0.0.10
func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block execution.Block) (int, error)
EnqueueTransactionTasks enqueues transaction processing tasks for a given block. Uses TaskID for deduplication - tasks with conflicting IDs are skipped but still count.
func (*Processor) ExtractStructlogs ¶ added in v0.0.7
func (p *Processor) ExtractStructlogs(ctx context.Context, block execution.Block, index int, tx execution.Transaction) ([]Structlog, error)
ExtractStructlogs extracts structlog data from a transaction without inserting to database.
func (*Processor) GetCompletionTracker ¶ added in v0.1.5
func (p *Processor) GetCompletionTracker() *tracker.BlockCompletionTracker
GetCompletionTracker returns the block completion tracker.
func (*Processor) GetHandlers ¶
func (p *Processor) GetHandlers() map[string]asynq.HandlerFunc
GetHandlers returns the task handlers for this processor.
func (*Processor) ProcessBlock ¶ added in v0.1.5
ProcessBlock processes a single block - fetches, marks enqueued, and enqueues tasks. This is used for both normal processing and gap filling of missing blocks.
func (*Processor) ProcessNextBlock ¶
ProcessNextBlock processes the next available block(s). In zero-interval mode, this attempts to fetch and process multiple blocks up to the available capacity for improved throughput.
func (*Processor) ProcessSingleTransaction ¶
func (p *Processor) ProcessSingleTransaction(ctx context.Context, block execution.Block, index int, tx execution.Transaction) (int, error)
ProcessSingleTransaction processes a single transaction and inserts its structlogs directly to ClickHouse.
func (*Processor) ProcessTransaction ¶ added in v0.1.1
func (p *Processor) ProcessTransaction(ctx context.Context, block execution.Block, index int, tx execution.Transaction) (int, error)
ProcessTransaction processes a transaction and inserts structlogs to ClickHouse.
func (*Processor) ReprocessBlock ¶ added in v0.1.5
ReprocessBlock re-enqueues tasks for an orphaned block. Used when a block is in ClickHouse (complete=0) but has no Redis tracking. TaskID deduplication ensures no duplicate tasks are created.
func (*Processor) SetProcessingMode ¶
SetProcessingMode sets the processing mode for the processor.
type Structlog ¶
type Structlog struct {
UpdatedDateTime ClickHouseTime `json:"updated_date_time"`
BlockNumber uint64 `json:"block_number"`
TransactionHash string `json:"transaction_hash"`
TransactionIndex uint32 `json:"transaction_index"`
TransactionGas uint64 `json:"transaction_gas"`
TransactionFailed bool `json:"transaction_failed"`
TransactionReturnValue *string `json:"transaction_return_value"`
Index uint32 `json:"index"`
Operation string `json:"operation"`
// Gas is the remaining gas before this opcode executes.
Gas uint64 `json:"gas"`
// GasCost is from the execution node trace. For CALL/CREATE opcodes, this is the
// gas stipend passed to the child frame, not the call overhead.
GasCost uint64 `json:"gas_cost"`
// GasUsed is computed as gas[i] - gas[i+1] at the same depth level.
// For CALL/CREATE opcodes, this includes the call overhead plus all child frame gas.
// Summing across all opcodes will double count child frame gas.
GasUsed uint64 `json:"gas_used"`
// GasSelf excludes child frame gas. For CALL/CREATE opcodes, this is just the call
// overhead (warm/cold access, memory expansion). For other opcodes, equals GasUsed.
// Summing across all opcodes gives total execution gas without double counting.
GasSelf uint64 `json:"gas_self"`
Depth uint64 `json:"depth"`
ReturnData *string `json:"return_data"`
Refund *uint64 `json:"refund"`
Error *string `json:"error"`
CallToAddress *string `json:"call_to_address"`
CallFrameID uint32 `json:"call_frame_id"`
CallFramePath []uint32 `json:"call_frame_path"`
MetaNetworkName string `json:"meta_network_name"`
}
Structlog represents a single EVM opcode execution within a transaction trace. See gas_cost.go for detailed documentation on the gas fields.