protocol

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 15, 2025 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MaxMessageSize = 1024 * 1024 * 10 // 10MB
	HeaderSize     = 20               // Length(4) + RequestID(8) + Type(1) + Version(1) + Flags(2) + CRC32(4)
)

Message size limits

View Source
const (
	ProtocolVersion = 1
)

Protocol version

Variables

This section is empty.

Functions

func GetBuffer

func GetBuffer(size int) []byte

GetBuffer gets a buffer from the default pool

func PutBuffer

func PutBuffer(buf []byte)

PutBuffer returns a buffer to the default pool

func WithBuffer

func WithBuffer(size int, fn func([]byte) error) error

WithBuffer executes a function with a pooled buffer and automatically returns it

func ZeroCopySlice

func ZeroCopySlice(buf []byte, offset, length int) []byte

ZeroCopySlice creates a slice view into a buffer without copying The returned slice shares memory with the original buffer IMPORTANT: The original buffer must not be modified while this slice is in use

Types

type AcksLevel

type AcksLevel int16

AcksLevel represents the acknowledgment level for produce requests

const (
	AcksNone AcksLevel = 0  // No acknowledgment (fire and forget)
	AcksOne  AcksLevel = 1  // Acknowledgment from leader only
	AcksAll  AcksLevel = -1 // Acknowledgment from all ISR members
)

type BufferPool

type BufferPool struct {
	// contains filtered or unexported fields
}

BufferPool is a pool of reusable byte buffers to reduce allocations This implements zero-copy optimization by reusing buffers across requests

func NewBufferPool

func NewBufferPool() *BufferPool

NewBufferPool creates a new buffer pool

func (*BufferPool) Get

func (p *BufferPool) Get(size int) []byte

Get returns a buffer of at least the requested size The returned buffer may be larger than requested

func (*BufferPool) GetExact

func (p *BufferPool) GetExact(size int) []byte

GetExact returns a buffer of exactly the requested size Use this when you need precise sizing

func (*BufferPool) Put

func (p *BufferPool) Put(buf []byte)

Put returns a buffer to the pool for reuse Only buffers obtained from Get() should be Put() back

type Codec

type Codec struct {
	// contains filtered or unexported fields
}

Codec handles encoding and decoding of protocol messages

func NewCodec

func NewCodec() *Codec

NewCodec creates a new codec

func (*Codec) DecodeRequest

func (c *Codec) DecodeRequest(r io.Reader) (*Request, error)

DecodeRequest decodes a request from the reader

func (*Codec) DecodeResponse

func (c *Codec) DecodeResponse(r io.Reader) (*Response, error)

DecodeResponse decodes a response from the reader

func (*Codec) DecodeResponsePayload

func (c *Codec) DecodeResponsePayload(resp *Response, reqType RequestType) error

DecodeResponsePayload decodes a response payload based on request type

func (*Codec) EncodeRequest

func (c *Codec) EncodeRequest(w io.Writer, req *Request) error

EncodeRequest encodes a request to the writer

func (*Codec) EncodeResponse

func (c *Codec) EncodeResponse(w io.Writer, resp *Response) error

EncodeResponse encodes a response to the writer

type CreateTopicRequest

type CreateTopicRequest struct {
	Topic             string
	NumPartitions     uint32
	ReplicationFactor uint16
}

CreateTopicRequest represents a create topic request

type CreateTopicResponse

type CreateTopicResponse struct {
	Topic     string
	Created   bool
	ErrorCode ErrorCode
}

CreateTopicResponse represents a create topic response

type DeleteTopicRequest

type DeleteTopicRequest struct {
	Topic string
}

DeleteTopicRequest represents a delete topic request

type DeleteTopicResponse

type DeleteTopicResponse struct {
	Topic     string
	Deleted   bool
	ErrorCode ErrorCode
}

DeleteTopicResponse represents a delete topic response

type ErrorCode

type ErrorCode uint16

ErrorCode represents specific error codes

const (
	ErrNone              ErrorCode = 0
	ErrUnknownRequest    ErrorCode = 1
	ErrInvalidRequest    ErrorCode = 2
	ErrOffsetOutOfRange  ErrorCode = 3
	ErrCorruptMessage    ErrorCode = 4
	ErrPartitionNotFound ErrorCode = 5
	ErrRequestTimeout    ErrorCode = 6
	ErrStorageError      ErrorCode = 7
	ErrTopicNotFound     ErrorCode = 8
	ErrTopicExists       ErrorCode = 9
	ErrChecksumMismatch  ErrorCode = 10
	ErrInvalidProtocol   ErrorCode = 11
	ErrMessageTooLarge   ErrorCode = 12
	// Consumer group error codes
	ErrUnknownMemberID           ErrorCode = 20
	ErrInvalidSessionTimeout     ErrorCode = 21
	ErrRebalanceInProgress       ErrorCode = 22
	ErrInvalidGenerationID       ErrorCode = 23
	ErrUnknownConsumerGroupID    ErrorCode = 24
	ErrNotCoordinator            ErrorCode = 25
	ErrInvalidCommitOffsetSize   ErrorCode = 26
	ErrGroupAuthorizationFailed  ErrorCode = 27
	ErrIllegalGeneration         ErrorCode = 28
	ErrInconsistentGroupProtocol ErrorCode = 29
	// Transaction error codes
	ErrInvalidProducerEpoch               ErrorCode = 30
	ErrInvalidTransactionState            ErrorCode = 31
	ErrInvalidProducerIDMapping           ErrorCode = 32
	ErrTransactionCoordinatorNotAvailable ErrorCode = 33
	ErrTransactionCoordinatorFenced       ErrorCode = 34
	ErrProducerFenced                     ErrorCode = 35
	ErrInvalidTransactionTimeout          ErrorCode = 36
	ErrConcurrentTransactions             ErrorCode = 37
	ErrTransactionAborted                 ErrorCode = 38
	ErrInvalidPartitionList               ErrorCode = 39
	// Security error codes
	ErrAuthenticationFailed ErrorCode = 40
	ErrAuthorizationFailed  ErrorCode = 41
	ErrInvalidCredentials   ErrorCode = 42
	ErrAccountDisabled      ErrorCode = 43
)

func (ErrorCode) Error

func (e ErrorCode) Error() string

Error returns the error message

func (ErrorCode) String

func (e ErrorCode) String() string

String returns the string representation of ErrorCode

type ErrorResponse

type ErrorResponse struct {
	ErrorCode ErrorCode
	Message   string
}

ErrorResponse represents an error response

type FetchRequest

type FetchRequest struct {
	Topic       string
	PartitionID uint32
	Offset      int64
	MaxBytes    uint32
}

FetchRequest represents a fetch request

type FetchResponse

type FetchResponse struct {
	Topic         string
	PartitionID   uint32
	HighWaterMark int64
	Messages      []Message
}

FetchResponse represents a fetch response

type GetOffsetRequest

type GetOffsetRequest struct {
	Topic       string
	PartitionID uint32
}

GetOffsetRequest represents a get offset request

type GetOffsetResponse

type GetOffsetResponse struct {
	Topic         string
	PartitionID   uint32
	StartOffset   int64
	EndOffset     int64
	HighWaterMark int64
}

GetOffsetResponse represents a get offset response

type HealthCheckRequest

type HealthCheckRequest struct {
}

HealthCheckRequest represents a health check request

type HealthCheckResponse

type HealthCheckResponse struct {
	Status string // "healthy" or "unhealthy"
	Uptime int64  // Uptime in seconds
}

HealthCheckResponse represents a health check response

type ListTopicsRequest

type ListTopicsRequest struct {
}

ListTopicsRequest represents a list topics request

type ListTopicsResponse

type ListTopicsResponse struct {
	Topics []TopicInfo
}

ListTopicsResponse represents a list topics response

type Message

type Message struct {
	Offset    int64             // Message offset (set by server)
	Key       []byte            // Message key (optional)
	Value     []byte            // Message value
	Headers   map[string][]byte // Message headers
	Timestamp int64             // Unix timestamp (nanoseconds)
}

Message represents a single message

func (*Message) Size

func (m *Message) Size() int

Size returns the serialized size of the message

type MessageBuffer

type MessageBuffer struct {
	// contains filtered or unexported fields
}

MessageBuffer is a reusable buffer for encoding/decoding messages with zero-copy slice views

func NewMessageBuffer

func NewMessageBuffer(pool *BufferPool, initialSize int) *MessageBuffer

NewMessageBuffer creates a new message buffer

func (*MessageBuffer) Bytes

func (mb *MessageBuffer) Bytes() []byte

Bytes returns the current buffer contents

func (*MessageBuffer) Release

func (mb *MessageBuffer) Release()

Release returns the buffer to the pool

func (*MessageBuffer) Reset

func (mb *MessageBuffer) Reset()

Reset resets the buffer for reuse

func (*MessageBuffer) Slice

func (mb *MessageBuffer) Slice(offset, length int) []byte

Slice returns a zero-copy slice view

func (*MessageBuffer) Write

func (mb *MessageBuffer) Write(data []byte)

Write appends data to the buffer (may require copying)

func (*MessageBuffer) WriteZeroCopy

func (mb *MessageBuffer) WriteZeroCopy(data []byte) bool

WriteZeroCopy appends data reference without copying (if possible) Returns true if zero-copy was used, false if data was copied

type ProduceRequest

type ProduceRequest struct {
	Topic       string
	PartitionID uint32
	Messages    []Message
	Acks        AcksLevel // Acknowledgment level (0, 1, or -1)
	TimeoutMs   int32     // Timeout for acks=all (default: 30000)
}

ProduceRequest represents a produce request

type ProduceResponse

type ProduceResponse struct {
	Topic         string
	PartitionID   uint32
	BaseOffset    int64  // First offset assigned
	NumMessages   uint32 // Number of messages written
	HighWaterMark int64  // Current high water mark
}

ProduceResponse represents a produce response

type Request

type Request struct {
	Header  RequestHeader
	Payload interface{}
}

Request represents a protocol request

type RequestFlags

type RequestFlags uint16

RequestFlags represents request flags

const (
	FlagNone       RequestFlags = 0
	FlagRequireAck RequestFlags = 1 << 0 // Require acknowledgment
	FlagCompressed RequestFlags = 1 << 1 // Payload is compressed
	FlagBatch      RequestFlags = 1 << 2 // Batch request
	FlagAsync      RequestFlags = 1 << 3 // Async request (fire and forget)
	FlagIdempotent RequestFlags = 1 << 4 // Idempotent request
)

type RequestHeader

type RequestHeader struct {
	Length    uint32       // Total message length (excluding length field)
	RequestID uint64       // Unique request identifier
	Type      RequestType  // Request type
	Version   byte         // Protocol version
	Flags     RequestFlags // Request flags
}

RequestHeader represents the request header

type RequestType

type RequestType byte

RequestType represents the type of request

const (
	RequestTypeProduce            RequestType = 0x01
	RequestTypeFetch              RequestType = 0x02
	RequestTypeGetOffset          RequestType = 0x03
	RequestTypeCreateTopic        RequestType = 0x04
	RequestTypeDeleteTopic        RequestType = 0x05
	RequestTypeListTopics         RequestType = 0x06
	RequestTypeHealthCheck        RequestType = 0x07
	RequestTypeJoinGroup          RequestType = 0x08
	RequestTypeSyncGroup          RequestType = 0x09
	RequestTypeHeartbeat          RequestType = 0x0A
	RequestTypeLeaveGroup         RequestType = 0x0B
	RequestTypeOffsetCommit       RequestType = 0x0C
	RequestTypeOffsetFetch        RequestType = 0x0D
	RequestTypeInitProducerID     RequestType = 0x0E
	RequestTypeAddPartitionsToTxn RequestType = 0x0F
	RequestTypeAddOffsetsToTxn    RequestType = 0x10
	RequestTypeEndTxn             RequestType = 0x11
	RequestTypeTxnOffsetCommit    RequestType = 0x12
)

func (RequestType) String

func (t RequestType) String() string

String returns the string representation of RequestType

type Response

type Response struct {
	Header  ResponseHeader
	Payload interface{}
}

Response represents a protocol response

type ResponseHeader

type ResponseHeader struct {
	Length    uint32     // Total message length (excluding length field)
	RequestID uint64     // Matches request ID
	Status    StatusCode // Response status
	ErrorCode ErrorCode  // Error code if status != OK
}

ResponseHeader represents the response header

type SharedBufferWrapper

type SharedBufferWrapper struct {
	// contains filtered or unexported fields
}

SharedBufferWrapper wraps a buffer that should not be modified This is used for zero-copy reads where the buffer is shared

func NewSharedBuffer

func NewSharedBuffer(buf []byte) *SharedBufferWrapper

NewSharedBuffer creates a new shared buffer wrapper

func (*SharedBufferWrapper) Bytes

func (s *SharedBufferWrapper) Bytes() []byte

Bytes returns the full buffer (read-only view)

func (*SharedBufferWrapper) Len

func (s *SharedBufferWrapper) Len() int

Len returns the buffer length

func (*SharedBufferWrapper) Slice

func (s *SharedBufferWrapper) Slice(offset, length int) []byte

Slice returns a zero-copy slice of the buffer

type StatusCode

type StatusCode byte

StatusCode represents the response status

const (
	StatusOK             StatusCode = 0
	StatusError          StatusCode = 1
	StatusPartialSuccess StatusCode = 2
)

func (StatusCode) String

func (s StatusCode) String() string

String returns the string representation of StatusCode

type TopicInfo

type TopicInfo struct {
	Name          string
	NumPartitions uint32
}

TopicInfo represents information about a topic

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL