Documentation
¶
Index ¶
- Constants
- Variables
- func GenerateTaskID(network string, blockNumber uint64) string
- func NewProcessBackwardsTask(payload *ProcessPayload) (*asynq.Task, string, error)
- func NewProcessForwardsTask(payload *ProcessPayload) (*asynq.Task, string, error)
- type ClickHouseDateTime
- type Columns
- type Config
- type Dependencies
- type ProcessPayload
- type Processor
- func (p *Processor) EnqueueTask(ctx context.Context, task *asynq.Task, opts ...asynq.Option) error
- func (p *Processor) GetCompletionTracker() *tracker.BlockCompletionTracker
- func (p *Processor) GetHandlers() map[string]asynq.HandlerFunc
- func (p *Processor) GetQueues() []tracker.QueueInfo
- func (p *Processor) Name() string
- func (p *Processor) ProcessBlock(ctx context.Context, block execution.Block) error
- func (p *Processor) ProcessNextBlock(ctx context.Context) error
- func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error
- func (p *Processor) SetProcessingMode(mode string)
- func (p *Processor) Start(ctx context.Context) error
- func (p *Processor) Stop(ctx context.Context) error
- type Transaction
Constants ¶
const ( DefaultBufferMaxRows = 100000 DefaultBufferFlushInterval = time.Second )
Default buffer configuration values.
const ( // ProcessForwardsTaskType is the task type for forwards processing. ProcessForwardsTaskType = "transaction_simple_process_forwards" // ProcessBackwardsTaskType is the task type for backwards processing. ProcessBackwardsTaskType = "transaction_simple_process_backwards" )
const ProcessorName = "transaction_simple"
ProcessorName is the name of the simple transaction processor.
Variables ¶
var ErrTaskIDConflict = asynq.ErrTaskIDConflict
ErrTaskIDConflict is returned when a task with the same ID already exists.
Functions ¶
func GenerateTaskID ¶ added in v0.1.5
GenerateTaskID creates a deterministic task ID for deduplication. Format: {processor}:{network}:{blockNum}:block. For block-based processors, we use "block" as the identifier since there's one task per block.
func NewProcessBackwardsTask ¶
func NewProcessBackwardsTask(payload *ProcessPayload) (*asynq.Task, string, error)
NewProcessBackwardsTask creates a new backwards process task. Returns the task, taskID for deduplication, and any error.
func NewProcessForwardsTask ¶
func NewProcessForwardsTask(payload *ProcessPayload) (*asynq.Task, string, error)
NewProcessForwardsTask creates a new forwards process task. Returns the task, taskID for deduplication, and any error.
Types ¶
type ClickHouseDateTime ¶ added in v0.1.5
ClickHouseDateTime is a time.Time wrapper that formats correctly for ClickHouse JSON.
func (ClickHouseDateTime) MarshalJSON ¶ added in v0.1.5
func (t ClickHouseDateTime) MarshalJSON() ([]byte, error)
MarshalJSON formats time for ClickHouse DateTime column.
type Columns ¶ added in v0.1.5
type Columns struct {
UpdatedDateTime proto.ColDateTime
BlockNumber proto.ColUInt64
BlockHash proto.ColStr
ParentHash proto.ColStr
Position proto.ColUInt32
Hash proto.ColStr
From proto.ColStr
To *proto.ColNullable[string]
Nonce proto.ColUInt64
GasPrice proto.ColUInt128
Gas proto.ColUInt64
GasTipCap *proto.ColNullable[proto.UInt128]
GasFeeCap *proto.ColNullable[proto.UInt128]
Value proto.ColUInt128
Type proto.ColUInt8
Size proto.ColUInt32
CallDataSize proto.ColUInt32
BlobGas *proto.ColNullable[uint64]
BlobGasFeeCap *proto.ColNullable[proto.UInt128]
BlobHashes *proto.ColArr[string]
Success proto.ColBool
NInputBytes proto.ColUInt32
NInputZeroBytes proto.ColUInt32
NInputNonzeroBytes proto.ColUInt32
MetaNetworkName proto.ColStr
}
Columns holds all columns for transaction batch insert using ch-go columnar protocol.
func NewColumns ¶ added in v0.1.5
func NewColumns() *Columns
NewColumns creates a new Columns instance with all nullable and array columns initialized.
func (*Columns) Append ¶ added in v0.1.5
func (c *Columns) Append(tx Transaction)
Append adds a Transaction row to all columns.
type Config ¶
type Config struct {
clickhouse.Config `yaml:",inline"`
Enabled bool `yaml:"enabled"`
Table string `yaml:"table"`
// Row buffer settings for batched ClickHouse inserts
BufferMaxRows int `yaml:"bufferMaxRows"` // Max rows before flush. Default: 100000
BufferFlushInterval time.Duration `yaml:"bufferFlushInterval"` // Max time before flush. Default: 1s
// Block completion tracking
MaxPendingBlockRange int `yaml:"maxPendingBlockRange"` // Max distance between oldest incomplete and current block. Default: 2
}
Config holds configuration for the simple transaction processor.
type Dependencies ¶
type Dependencies struct {
Log logrus.FieldLogger
Pool *ethereum.Pool
Network *ethereum.Network
State *state.Manager
AsynqClient *asynq.Client
AsynqInspector *asynq.Inspector
RedisClient *redis.Client
RedisPrefix string
}
Dependencies contains the dependencies needed for the processor.
type ProcessPayload ¶
type ProcessPayload struct {
BlockNumber big.Int `json:"block_number"`
NetworkName string `json:"network_name"`
ProcessingMode string `json:"processing_mode"`
}
ProcessPayload represents the payload for processing a block.
func (*ProcessPayload) MarshalBinary ¶
func (p *ProcessPayload) MarshalBinary() ([]byte, error)
MarshalBinary implements encoding.BinaryMarshaler.
func (*ProcessPayload) UnmarshalBinary ¶
func (p *ProcessPayload) UnmarshalBinary(data []byte) error
UnmarshalBinary implements encoding.BinaryUnmarshaler.
type Processor ¶
type Processor struct {
// Embedded limiter for shared blocking/completion logic
*tracker.Limiter
// contains filtered or unexported fields
}
Processor handles simple transaction processing.
func New ¶
func New(deps *Dependencies, config *Config) (*Processor, error)
New creates a new simple transaction processor.
func (*Processor) EnqueueTask ¶
EnqueueTask enqueues a task to the specified queue with infinite retries.
func (*Processor) GetCompletionTracker ¶ added in v0.1.5
func (p *Processor) GetCompletionTracker() *tracker.BlockCompletionTracker
GetCompletionTracker returns the block completion tracker.
func (*Processor) GetHandlers ¶
func (p *Processor) GetHandlers() map[string]asynq.HandlerFunc
GetHandlers returns the task handlers for this processor.
func (*Processor) ProcessBlock ¶ added in v0.1.5
ProcessBlock processes a single block - fetches, marks enqueued, and enqueues tasks. This is used for both normal processing and gap filling of missing blocks.
func (*Processor) ProcessNextBlock ¶
ProcessNextBlock processes the next available block(s). In zero-interval mode, this attempts to fetch and process multiple blocks up to the available capacity for improved throughput.
func (*Processor) ReprocessBlock ¶ added in v0.1.5
ReprocessBlock re-enqueues tasks for an orphaned block. Used when a block is in ClickHouse (complete=0) but has no Redis tracking. TaskID deduplication ensures no duplicate tasks are created.
func (*Processor) SetProcessingMode ¶
SetProcessingMode sets the processing mode for the processor.
type Transaction ¶
type Transaction struct {
UpdatedDateTime ClickHouseDateTime `json:"updated_date_time"`
BlockNumber uint64 `json:"block_number"`
BlockHash string `json:"block_hash"`
ParentHash string `json:"parent_hash"`
Position uint32 `json:"position"`
Hash string `json:"hash"`
From string `json:"from"`
To *string `json:"to"`
Nonce uint64 `json:"nonce"`
GasPrice string `json:"gas_price"` // Effective gas price as UInt128 string
Gas uint64 `json:"gas"` // Gas limit
GasTipCap *string `json:"gas_tip_cap"` // Nullable UInt128 string
GasFeeCap *string `json:"gas_fee_cap"` // Nullable UInt128 string
Value string `json:"value"` // UInt128 string
Type uint8 `json:"type"` // Transaction type
Size uint32 `json:"size"` // Transaction size in bytes
CallDataSize uint32 `json:"call_data_size"` // Size of call data
BlobGas *uint64 `json:"blob_gas"` // Nullable - for type 3 txs
BlobGasFeeCap *string `json:"blob_gas_fee_cap"` // Nullable UInt128 string
BlobHashes []string `json:"blob_hashes"` // Array of versioned hashes
Success bool `json:"success"`
NInputBytes uint32 `json:"n_input_bytes"`
NInputZeroBytes uint32 `json:"n_input_zero_bytes"`
NInputNonzeroBytes uint32 `json:"n_input_nonzero_bytes"`
MetaNetworkName string `json:"meta_network_name"`
}
Transaction represents a row in the execution_transaction table.