Documentation
¶
Index ¶
- Constants
- func GetBuffer(size int) []byte
- func PutBuffer(buf []byte)
- func WithBuffer(size int, fn func([]byte) error) error
- func ZeroCopySlice(buf []byte, offset, length int) []byte
- type AcksLevel
- type BufferPool
- type Codec
- func (c *Codec) DecodeRequest(r io.Reader) (*Request, error)
- func (c *Codec) DecodeResponse(r io.Reader) (*Response, error)
- func (c *Codec) DecodeResponsePayload(resp *Response, reqType RequestType) error
- func (c *Codec) EncodeRequest(w io.Writer, req *Request) error
- func (c *Codec) EncodeResponse(w io.Writer, resp *Response) error
- type CreateTopicRequest
- type CreateTopicResponse
- type DeleteTopicRequest
- type DeleteTopicResponse
- type ErrorCode
- type ErrorResponse
- type FetchRequest
- type FetchResponse
- type GetOffsetRequest
- type GetOffsetResponse
- type HealthCheckRequest
- type HealthCheckResponse
- type ListTopicsRequest
- type ListTopicsResponse
- type Message
- type MessageBuffer
- type ProduceRequest
- type ProduceResponse
- type Request
- type RequestFlags
- type RequestHeader
- type RequestType
- type Response
- type ResponseHeader
- type SharedBufferWrapper
- type StatusCode
- type TopicInfo
Constants ¶
const ( MaxMessageSize = 1024 * 1024 * 10 // 10MB HeaderSize = 20 // Length(4) + RequestID(8) + Type(1) + Version(1) + Flags(2) + CRC32(4) )
Message size limits
const (
ProtocolVersion = 1
)
Protocol version
Variables ¶
This section is empty.
Functions ¶
func WithBuffer ¶
WithBuffer executes a function with a pooled buffer and automatically returns it
func ZeroCopySlice ¶
ZeroCopySlice creates a slice view into a buffer without copying The returned slice shares memory with the original buffer IMPORTANT: The original buffer must not be modified while this slice is in use
Types ¶
type AcksLevel ¶
type AcksLevel int16
AcksLevel represents the acknowledgment level for produce requests
type BufferPool ¶
type BufferPool struct {
// contains filtered or unexported fields
}
BufferPool is a pool of reusable byte buffers to reduce allocations This implements zero-copy optimization by reusing buffers across requests
func (*BufferPool) Get ¶
func (p *BufferPool) Get(size int) []byte
Get returns a buffer of at least the requested size The returned buffer may be larger than requested
func (*BufferPool) GetExact ¶
func (p *BufferPool) GetExact(size int) []byte
GetExact returns a buffer of exactly the requested size Use this when you need precise sizing
func (*BufferPool) Put ¶
func (p *BufferPool) Put(buf []byte)
Put returns a buffer to the pool for reuse Only buffers obtained from Get() should be Put() back
type Codec ¶
type Codec struct {
// contains filtered or unexported fields
}
Codec handles encoding and decoding of protocol messages
func (*Codec) DecodeRequest ¶
DecodeRequest decodes a request from the reader
func (*Codec) DecodeResponse ¶
DecodeResponse decodes a response from the reader
func (*Codec) DecodeResponsePayload ¶
func (c *Codec) DecodeResponsePayload(resp *Response, reqType RequestType) error
DecodeResponsePayload decodes a response payload based on request type
func (*Codec) EncodeRequest ¶
EncodeRequest encodes a request to the writer
type CreateTopicRequest ¶
CreateTopicRequest represents a create topic request
type CreateTopicResponse ¶
CreateTopicResponse represents a create topic response
type DeleteTopicRequest ¶
type DeleteTopicRequest struct {
Topic string
}
DeleteTopicRequest represents a delete topic request
type DeleteTopicResponse ¶
DeleteTopicResponse represents a delete topic response
type ErrorCode ¶
type ErrorCode uint16
ErrorCode represents specific error codes
const ( ErrNone ErrorCode = 0 ErrUnknownRequest ErrorCode = 1 ErrInvalidRequest ErrorCode = 2 ErrOffsetOutOfRange ErrorCode = 3 ErrCorruptMessage ErrorCode = 4 ErrPartitionNotFound ErrorCode = 5 ErrRequestTimeout ErrorCode = 6 ErrStorageError ErrorCode = 7 ErrTopicNotFound ErrorCode = 8 ErrTopicExists ErrorCode = 9 ErrChecksumMismatch ErrorCode = 10 ErrInvalidProtocol ErrorCode = 11 ErrMessageTooLarge ErrorCode = 12 // Consumer group error codes ErrUnknownMemberID ErrorCode = 20 ErrInvalidSessionTimeout ErrorCode = 21 ErrRebalanceInProgress ErrorCode = 22 ErrInvalidGenerationID ErrorCode = 23 ErrUnknownConsumerGroupID ErrorCode = 24 ErrNotCoordinator ErrorCode = 25 ErrInvalidCommitOffsetSize ErrorCode = 26 ErrGroupAuthorizationFailed ErrorCode = 27 ErrIllegalGeneration ErrorCode = 28 ErrInconsistentGroupProtocol ErrorCode = 29 // Transaction error codes ErrInvalidProducerEpoch ErrorCode = 30 ErrInvalidTransactionState ErrorCode = 31 ErrInvalidProducerIDMapping ErrorCode = 32 ErrTransactionCoordinatorNotAvailable ErrorCode = 33 ErrTransactionCoordinatorFenced ErrorCode = 34 ErrProducerFenced ErrorCode = 35 ErrInvalidTransactionTimeout ErrorCode = 36 ErrConcurrentTransactions ErrorCode = 37 ErrTransactionAborted ErrorCode = 38 ErrInvalidPartitionList ErrorCode = 39 // Security error codes ErrAuthenticationFailed ErrorCode = 40 ErrAuthorizationFailed ErrorCode = 41 ErrInvalidCredentials ErrorCode = 42 ErrAccountDisabled ErrorCode = 43 )
type ErrorResponse ¶
ErrorResponse represents an error response
type FetchRequest ¶
FetchRequest represents a fetch request
type FetchResponse ¶
type FetchResponse struct {
Topic string
PartitionID uint32
HighWaterMark int64
Messages []Message
}
FetchResponse represents a fetch response
type GetOffsetRequest ¶
GetOffsetRequest represents a get offset request
type GetOffsetResponse ¶
type GetOffsetResponse struct {
Topic string
PartitionID uint32
StartOffset int64
EndOffset int64
HighWaterMark int64
}
GetOffsetResponse represents a get offset response
type HealthCheckRequest ¶
type HealthCheckRequest struct {
}
HealthCheckRequest represents a health check request
type HealthCheckResponse ¶
type HealthCheckResponse struct {
Status string // "healthy" or "unhealthy"
Uptime int64 // Uptime in seconds
}
HealthCheckResponse represents a health check response
type ListTopicsRequest ¶
type ListTopicsRequest struct {
}
ListTopicsRequest represents a list topics request
type ListTopicsResponse ¶
type ListTopicsResponse struct {
Topics []TopicInfo
}
ListTopicsResponse represents a list topics response
type Message ¶
type Message struct {
Offset int64 // Message offset (set by server)
Key []byte // Message key (optional)
Value []byte // Message value
Headers map[string][]byte // Message headers
Timestamp int64 // Unix timestamp (nanoseconds)
}
Message represents a single message
type MessageBuffer ¶
type MessageBuffer struct {
// contains filtered or unexported fields
}
MessageBuffer is a reusable buffer for encoding/decoding messages with zero-copy slice views
func NewMessageBuffer ¶
func NewMessageBuffer(pool *BufferPool, initialSize int) *MessageBuffer
NewMessageBuffer creates a new message buffer
func (*MessageBuffer) Bytes ¶
func (mb *MessageBuffer) Bytes() []byte
Bytes returns the current buffer contents
func (*MessageBuffer) Release ¶
func (mb *MessageBuffer) Release()
Release returns the buffer to the pool
func (*MessageBuffer) Slice ¶
func (mb *MessageBuffer) Slice(offset, length int) []byte
Slice returns a zero-copy slice view
func (*MessageBuffer) Write ¶
func (mb *MessageBuffer) Write(data []byte)
Write appends data to the buffer (may require copying)
func (*MessageBuffer) WriteZeroCopy ¶
func (mb *MessageBuffer) WriteZeroCopy(data []byte) bool
WriteZeroCopy appends data reference without copying (if possible) Returns true if zero-copy was used, false if data was copied
type ProduceRequest ¶
type ProduceRequest struct {
Topic string
PartitionID uint32
Messages []Message
Acks AcksLevel // Acknowledgment level (0, 1, or -1)
TimeoutMs int32 // Timeout for acks=all (default: 30000)
}
ProduceRequest represents a produce request
type ProduceResponse ¶
type ProduceResponse struct {
Topic string
PartitionID uint32
BaseOffset int64 // First offset assigned
NumMessages uint32 // Number of messages written
HighWaterMark int64 // Current high water mark
}
ProduceResponse represents a produce response
type Request ¶
type Request struct {
Header RequestHeader
Payload interface{}
}
Request represents a protocol request
type RequestFlags ¶
type RequestFlags uint16
RequestFlags represents request flags
const ( FlagNone RequestFlags = 0 FlagRequireAck RequestFlags = 1 << 0 // Require acknowledgment FlagCompressed RequestFlags = 1 << 1 // Payload is compressed FlagBatch RequestFlags = 1 << 2 // Batch request FlagAsync RequestFlags = 1 << 3 // Async request (fire and forget) FlagIdempotent RequestFlags = 1 << 4 // Idempotent request )
type RequestHeader ¶
type RequestHeader struct {
Length uint32 // Total message length (excluding length field)
RequestID uint64 // Unique request identifier
Type RequestType // Request type
Version byte // Protocol version
Flags RequestFlags // Request flags
}
RequestHeader represents the request header
type RequestType ¶
type RequestType byte
RequestType represents the type of request
const ( RequestTypeProduce RequestType = 0x01 RequestTypeFetch RequestType = 0x02 RequestTypeGetOffset RequestType = 0x03 RequestTypeCreateTopic RequestType = 0x04 RequestTypeDeleteTopic RequestType = 0x05 RequestTypeListTopics RequestType = 0x06 RequestTypeHealthCheck RequestType = 0x07 RequestTypeJoinGroup RequestType = 0x08 RequestTypeSyncGroup RequestType = 0x09 RequestTypeHeartbeat RequestType = 0x0A RequestTypeLeaveGroup RequestType = 0x0B RequestTypeOffsetCommit RequestType = 0x0C RequestTypeOffsetFetch RequestType = 0x0D RequestTypeInitProducerID RequestType = 0x0E RequestTypeAddPartitionsToTxn RequestType = 0x0F RequestTypeAddOffsetsToTxn RequestType = 0x10 RequestTypeEndTxn RequestType = 0x11 RequestTypeTxnOffsetCommit RequestType = 0x12 )
func (RequestType) String ¶
func (t RequestType) String() string
String returns the string representation of RequestType
type Response ¶
type Response struct {
Header ResponseHeader
Payload interface{}
}
Response represents a protocol response
type ResponseHeader ¶
type ResponseHeader struct {
Length uint32 // Total message length (excluding length field)
RequestID uint64 // Matches request ID
Status StatusCode // Response status
ErrorCode ErrorCode // Error code if status != OK
}
ResponseHeader represents the response header
type SharedBufferWrapper ¶
type SharedBufferWrapper struct {
// contains filtered or unexported fields
}
SharedBufferWrapper wraps a buffer that should not be modified This is used for zero-copy reads where the buffer is shared
func NewSharedBuffer ¶
func NewSharedBuffer(buf []byte) *SharedBufferWrapper
NewSharedBuffer creates a new shared buffer wrapper
func (*SharedBufferWrapper) Bytes ¶
func (s *SharedBufferWrapper) Bytes() []byte
Bytes returns the full buffer (read-only view)
func (*SharedBufferWrapper) Len ¶
func (s *SharedBufferWrapper) Len() int
Len returns the buffer length
func (*SharedBufferWrapper) Slice ¶
func (s *SharedBufferWrapper) Slice(offset, length int) []byte
Slice returns a zero-copy slice of the buffer
type StatusCode ¶
type StatusCode byte
StatusCode represents the response status
const ( StatusOK StatusCode = 0 StatusError StatusCode = 1 StatusPartialSuccess StatusCode = 2 )
func (StatusCode) String ¶
func (s StatusCode) String() string
String returns the string representation of StatusCode