Documentation
¶
Index ¶
- Variables
- func IsCloseError(err error) bool
- type Client
- func (c *Client[T]) Close() error
- func (c *Client[T]) Conn() *Conn[T]
- func (c *Client[T]) Connect(ctx context.Context) error
- func (c *Client[T]) ConnectWithReadLoop(ctx context.Context) error
- func (c *Client[T]) IsConnected() bool
- func (c *Client[T]) OnConnect(fn func(*Client[T]))
- func (c *Client[T]) OnDisconnect(fn func(*Client[T], error))
- func (c *Client[T]) OnMessage(fn func(T))
- func (c *Client[T]) OnStateChange(handler StateHandler)
- func (c *Client[T]) QueueStats() MessageQueueStats
- func (c *Client[T]) Read(ctx context.Context) (T, error)
- func (c *Client[T]) SessionID() string
- func (c *Client[T]) SetSessionID(id string)
- func (c *Client[T]) State() ConnectionState
- func (c *Client[T]) Write(ctx context.Context, msg T) error
- type ClientOptions
- type CloseCode
- type CloseError
- type CompressionManager
- type Conn
- type ConnectionState
- type DialOptions
- type Dialer
- type Frame
- type MessageQueue
- func (mq *MessageQueue[T]) Clear()
- func (mq *MessageQueue[T]) Close()
- func (mq *MessageQueue[T]) Enqueue(ctx context.Context, msg T) (chan error, error)
- func (mq *MessageQueue[T]) Flush(sendFn func(context.Context, T) error)
- func (mq *MessageQueue[T]) Size() int
- func (mq *MessageQueue[T]) Stats() MessageQueueStats
- type MessageQueueStats
- type Metrics
- func (m *Metrics) GetSnapshot() MetricsSnapshot
- func (m *Metrics) RecordCompression(originalSize, compressedSize int)
- func (m *Metrics) RecordConnection()
- func (m *Metrics) RecordDecompression()
- func (m *Metrics) RecordDisconnection()
- func (m *Metrics) RecordFrameError()
- func (m *Metrics) RecordHandshakeError()
- func (m *Metrics) RecordQueueDropped()
- func (m *Metrics) RecordQueueEnqueue()
- func (m *Metrics) RecordQueueSent()
- func (m *Metrics) RecordRead(bytes int, latency time.Duration)
- func (m *Metrics) RecordReadError()
- func (m *Metrics) RecordReconnectAttempt()
- func (m *Metrics) RecordReconnectFailure()
- func (m *Metrics) RecordReconnectSuccess()
- func (m *Metrics) RecordWrite(bytes int, latency time.Duration)
- func (m *Metrics) RecordWriteError()
- type MetricsSnapshot
- type ReconnectConfig
- type StateChange
- type StateHandler
- type UpgradeOptions
- type Upgrader
Constants ¶
This section is empty.
Variables ¶
var ( // ErrInvalidHandshake indicates the WebSocket handshake failed ErrInvalidHandshake = errors.New("axon: invalid websocket handshake") // ErrUpgradeRequired indicates the request is not a valid WebSocket upgrade request ErrUpgradeRequired = errors.New("axon: upgrade required") // ErrInvalidOrigin indicates the origin header is not allowed ErrInvalidOrigin = errors.New("axon: invalid origin") // ErrInvalidSubprotocol indicates the requested subprotocol is not supported ErrInvalidSubprotocol = errors.New("axon: invalid subprotocol") // ErrConnectionClosed indicates the connection has been closed ErrConnectionClosed = errors.New("axon: connection closed") // ErrFrameTooLarge indicates a frame exceeds the maximum allowed size ErrFrameTooLarge = errors.New("axon: frame too large") // ErrMessageTooLarge indicates a message exceeds the maximum allowed size ErrMessageTooLarge = errors.New("axon: message too large") // ErrInvalidFrame indicates a frame violates the WebSocket protocol ErrInvalidFrame = errors.New("axon: invalid frame") // ErrInvalidMask indicates frame masking is invalid ErrInvalidMask = errors.New("axon: invalid mask") // ErrUnsupportedFrameType indicates the frame type is not supported ErrUnsupportedFrameType = errors.New("axon: unsupported frame type") // ErrFragmentedControlFrame indicates a control frame is fragmented (not allowed) ErrFragmentedControlFrame = errors.New("axon: fragmented control frame") // ErrInvalidCloseCode indicates an invalid close code was used ErrInvalidCloseCode = errors.New("axon: invalid close code") // ErrReadDeadlineExceeded indicates a read operation exceeded its deadline ErrReadDeadlineExceeded = errors.New("axon: read deadline exceeded") // ErrWriteDeadlineExceeded indicates a write operation exceeded its deadline ErrWriteDeadlineExceeded = errors.New("axon: write deadline exceeded") // ErrContextCanceled indicates the context was canceled ErrContextCanceled = errors.New("axon: context canceled") // ErrSerializationFailed indicates message serialization failed ErrSerializationFailed = errors.New("axon: serialization failed") // ErrDeserializationFailed indicates message deserialization failed ErrDeserializationFailed = errors.New("axon: deserialization failed") // ErrReconnectFailed indicates reconnection attempts have been exhausted ErrReconnectFailed = errors.New("axon: reconnection failed") // ErrQueueFull indicates the message queue is full ErrQueueFull = errors.New("axon: message queue full") // ErrQueueClosed indicates the message queue has been closed ErrQueueClosed = errors.New("axon: message queue closed") // ErrQueueTimeout indicates a queued message has expired ErrQueueTimeout = errors.New("axon: queued message timeout") // ErrQueueCleared indicates the queue was cleared before message was sent ErrQueueCleared = errors.New("axon: message queue cleared") // ErrCompressionFailed indicates compression or decompression failed ErrCompressionFailed = errors.New("axon: compression failed") // ErrInvalidState indicates an invalid state transition was attempted ErrInvalidState = errors.New("axon: invalid state transition") // ErrClientClosed indicates the client has been closed ErrClientClosed = errors.New("axon: client closed") )
Sentinel errors for WebSocket protocol violations and state errors
var DefaultMetrics = &Metrics{}
DefaultMetrics is the default metrics instance
Functions ¶
func IsCloseError ¶ added in v0.2.0
IsCloseError returns true if the error is a CloseError.
Types ¶
type Client ¶
type Client[T any] struct { // contains filtered or unexported fields }
Client is a WebSocket client with automatic reconnection and message queuing
func NewClient ¶
func NewClient[T any](url string, opts *ClientOptions) *Client[T]
NewClient creates a new WebSocket client
func (*Client[T]) ConnectWithReadLoop ¶
ConnectWithReadLoop connects and starts a read loop Messages are delivered via OnMessage callback
func (*Client[T]) IsConnected ¶
IsConnected returns true if the client is connected
func (*Client[T]) OnDisconnect ¶
OnDisconnect sets the callback for when the connection is lost
func (*Client[T]) OnMessage ¶
func (c *Client[T]) OnMessage(fn func(T))
OnMessage sets the callback for received messages
func (*Client[T]) OnStateChange ¶
func (c *Client[T]) OnStateChange(handler StateHandler)
OnStateChange registers a callback for state change events
func (*Client[T]) QueueStats ¶
func (c *Client[T]) QueueStats() MessageQueueStats
QueueStats returns message queue statistics
func (*Client[T]) SetSessionID ¶
SetSessionID sets the session identifier for reconnection
func (*Client[T]) State ¶
func (c *Client[T]) State() ConnectionState
State returns the current connection state
type ClientOptions ¶
type ClientOptions struct {
// DialOptions for the underlying connection
DialOptions
// Reconnection configuration
Reconnect *ReconnectConfig
// QueueSize is the maximum number of messages to queue during disconnection
// 0 disables queuing
QueueSize int
// QueueTimeout is how long queued messages are valid
// Default: 30 seconds
QueueTimeout time.Duration
// OnError is called when an error occurs
OnError func(error)
// OnStateChange is called when the connection state changes
OnStateChange StateHandler
}
ClientOptions configures the WebSocket client
func DefaultClientOptions ¶
func DefaultClientOptions() *ClientOptions
DefaultClientOptions returns default client options
type CloseCode ¶ added in v0.2.0
type CloseCode int
CloseCode represents a WebSocket close status code (RFC 6455 Section 7.4).
const ( CloseNormalClosure CloseCode = 1000 // Normal closure; the connection successfully completed CloseGoingAway CloseCode = 1001 // Endpoint going away (server shutdown, browser navigated away) CloseProtocolError CloseCode = 1002 // Protocol error encountered CloseUnsupportedData CloseCode = 1003 // Received data type cannot be accepted CloseNoStatusReceived CloseCode = 1005 // No status code was present (reserved, must not be sent) CloseAbnormalClosure CloseCode = 1006 // Connection closed abnormally (reserved, must not be sent) CloseInvalidPayloadData CloseCode = 1007 // Received data was inconsistent with message type ClosePolicyViolation CloseCode = 1008 // Message violated policy (generic code when 1003/1009 don't apply) CloseMessageTooBig CloseCode = 1009 // Message too large to process CloseMandatoryExtension CloseCode = 1010 // Server didn't negotiate required extension CloseInternalError CloseCode = 1011 // Server encountered an unexpected condition CloseServiceRestart CloseCode = 1012 // Server is restarting CloseTryAgainLater CloseCode = 1013 // Server is overloaded, try again later CloseBadGateway CloseCode = 1014 // Server acting as gateway received invalid response CloseTLSHandshake CloseCode = 1015 // TLS handshake failed (reserved, must not be sent) )
Standard WebSocket close codes (RFC 6455 Section 7.4.1)
func (CloseCode) IsRecoverable ¶ added in v0.2.0
IsRecoverable returns true if reconnection should typically be attempted. This is a hint - applications may have their own logic for specific codes.
func (CloseCode) IsReserved ¶ added in v0.2.0
IsReserved returns true if this is a reserved close code that must not be sent in close frames.
type CloseError ¶ added in v0.2.0
CloseError represents a WebSocket close event with code and reason. It implements the error interface for use in error handling.
func AsCloseError ¶ added in v0.2.0
func AsCloseError(err error) *CloseError
AsCloseError attempts to extract a CloseError from an error. Returns nil if the error is not a CloseError.
func NewCloseError ¶ added in v0.2.0
func NewCloseError(code int, reason string) *CloseError
NewCloseError creates a new CloseError with the given code and reason.
func (*CloseError) Error ¶ added in v0.2.0
func (e *CloseError) Error() string
Error implements the error interface.
func (*CloseError) IsRecoverable ¶ added in v0.2.0
func (e *CloseError) IsRecoverable() bool
IsRecoverable returns true if reconnection should typically be attempted.
type CompressionManager ¶
type CompressionManager struct {
// contains filtered or unexported fields
}
CompressionManager handles per-message compression (RFC 7692)
func (*CompressionManager) Close ¶
func (cm *CompressionManager) Close() error
Close releases compression resources
func (*CompressionManager) Compress ¶
func (cm *CompressionManager) Compress(data []byte) ([]byte, error)
Compress compresses the payload using DEFLATE
func (*CompressionManager) Decompress ¶
func (cm *CompressionManager) Decompress(data []byte) ([]byte, error)
Decompress decompresses the payload using DEFLATE
func (*CompressionManager) ShouldCompress ¶
func (cm *CompressionManager) ShouldCompress(payloadSize int) bool
ShouldCompress returns true if the payload should be compressed
type Conn ¶
type Conn[T any] struct { // contains filtered or unexported fields }
Conn represents a WebSocket connection with type-safe message handling
func Dial ¶
Dial connects to a WebSocket server at the given URL. The URL must have a ws:// or wss:// scheme.
func DialWithDialer ¶
DialWithDialer connects to a WebSocket server using the provided dialer
func Upgrade ¶
func Upgrade[T any](w http.ResponseWriter, r *http.Request, opts *UpgradeOptions) (*Conn[T], error)
Upgrade upgrades an HTTP connection to a WebSocket connection
func (*Conn[T]) CloseReason ¶
CloseReason returns the close reason if the connection was closed
type ConnectionState ¶
type ConnectionState int32
ConnectionState represents the state of a WebSocket connection
const ( // StateDisconnected indicates the connection is not established StateDisconnected ConnectionState = iota // StateConnecting indicates a connection attempt is in progress StateConnecting // StateConnected indicates the connection is established and ready StateConnected // StateReconnecting indicates a reconnection attempt is in progress StateReconnecting // StateClosing indicates the connection is being closed StateClosing // StateClosed indicates the connection has been permanently closed StateClosed )
func (ConnectionState) CanReconnect ¶
func (s ConnectionState) CanReconnect() bool
CanReconnect returns true if reconnection is allowed from this state
func (ConnectionState) IsActive ¶
func (s ConnectionState) IsActive() bool
IsActive returns true if the state represents an active or transitioning state
func (ConnectionState) String ¶
func (s ConnectionState) String() string
String returns the string representation of the connection state
type DialOptions ¶
type DialOptions struct {
// HandshakeTimeout is the maximum time to wait for the handshake to complete.
// Default is 30 seconds.
HandshakeTimeout time.Duration
// ReadBufferSize sets the size of the read buffer in bytes.
// Default is 4096 bytes.
ReadBufferSize int
// WriteBufferSize sets the size of the write buffer in bytes.
// Default is 4096 bytes.
WriteBufferSize int
// MaxFrameSize sets the maximum frame size in bytes.
// Default is 4096 bytes.
MaxFrameSize int
// MaxMessageSize sets the maximum message size in bytes.
// Default is 1048576 bytes (1MB).
MaxMessageSize int
// ReadDeadline sets the read deadline for connections.
// Default is no deadline.
ReadDeadline time.Duration
// WriteDeadline sets the write deadline for connections.
// Default is no deadline.
WriteDeadline time.Duration
// PingInterval sets the interval for sending ping frames.
// If zero, pings are disabled.
PingInterval time.Duration
// PongTimeout sets the timeout for waiting for a pong response.
// If zero, pong timeout is disabled.
PongTimeout time.Duration
// Subprotocols sets the list of supported subprotocols.
// Default is nil (no subprotocols).
Subprotocols []string
// Compression enables per-message compression (RFC 7692).
// Default is false (disabled).
Compression bool
// CompressionThreshold sets the minimum message size to compress.
// Messages smaller than this will not be compressed.
// Default is 256 bytes.
CompressionThreshold int
// Headers sets additional HTTP headers for the handshake request.
Headers http.Header
// TLSConfig specifies the TLS configuration to use for wss:// connections.
// If nil, the default configuration is used.
TLSConfig *tls.Config
// NetDialer specifies the dialer to use for creating the network connection.
// If nil, a default dialer is used.
NetDialer *net.Dialer
}
DialOptions configures WebSocket client dial options
type Dialer ¶
type Dialer struct {
// contains filtered or unexported fields
}
Dialer is a WebSocket client dialer
func NewDialer ¶
func NewDialer(opts *DialOptions) *Dialer
NewDialer creates a new Dialer with the given options
type Frame ¶
type Frame struct {
Fin bool
Rsv1 bool
Rsv2 bool
Rsv3 bool
Opcode byte
Masked bool
MaskKey []byte
Payload []byte
}
Frame represents a WebSocket frame
type MessageQueue ¶
type MessageQueue[T any] struct { // contains filtered or unexported fields }
MessageQueue manages queuing of messages during disconnection
func (*MessageQueue[T]) Clear ¶
func (mq *MessageQueue[T]) Clear()
Clear discards all queued messages
func (*MessageQueue[T]) Close ¶
func (mq *MessageQueue[T]) Close()
Close closes the queue and discards all pending messages
func (*MessageQueue[T]) Enqueue ¶
func (mq *MessageQueue[T]) Enqueue(ctx context.Context, msg T) (chan error, error)
Enqueue adds a message to the queue Returns an error channel that will receive the send result
func (*MessageQueue[T]) Flush ¶
func (mq *MessageQueue[T]) Flush(sendFn func(context.Context, T) error)
Flush sends all queued messages using the provided send function
func (*MessageQueue[T]) Size ¶
func (mq *MessageQueue[T]) Size() int
Size returns the current number of queued messages
func (*MessageQueue[T]) Stats ¶
func (mq *MessageQueue[T]) Stats() MessageQueueStats
Stats returns queue statistics
type MessageQueueStats ¶
type MessageQueueStats struct {
CurrentSize int
MaxSize int
Dropped int64
Enqueued int64
Sent int64
}
MessageQueueStats contains queue statistics
type Metrics ¶
type Metrics struct {
// Connection metrics
ActiveConnections atomic.Int64
TotalConnections atomic.Int64
ClosedConnections atomic.Int64
// Message metrics
MessagesRead atomic.Int64
MessagesWritten atomic.Int64
BytesRead atomic.Int64
BytesWritten atomic.Int64
// Error metrics
ReadErrors atomic.Int64
WriteErrors atomic.Int64
FrameErrors atomic.Int64
HandshakeErrors atomic.Int64
// Performance metrics
ReadLatency atomic.Int64 // nanoseconds
WriteLatency atomic.Int64 // nanoseconds
// Reconnection metrics
ReconnectAttempts atomic.Int64
ReconnectSuccesses atomic.Int64
ReconnectFailures atomic.Int64
// Queue metrics
QueueEnqueued atomic.Int64
QueueSent atomic.Int64
QueueDropped atomic.Int64
// Compression metrics
CompressedMessages atomic.Int64
DecompressedMessages atomic.Int64
CompressionSaved atomic.Int64 // bytes saved by compression
}
Metrics tracks WebSocket connection metrics
func (*Metrics) GetSnapshot ¶
func (m *Metrics) GetSnapshot() MetricsSnapshot
GetSnapshot returns a snapshot of current metrics
func (*Metrics) RecordCompression ¶
RecordCompression records a compression operation
func (*Metrics) RecordConnection ¶
func (m *Metrics) RecordConnection()
RecordConnection records a new connection
func (*Metrics) RecordDecompression ¶
func (m *Metrics) RecordDecompression()
RecordDecompression records a decompression operation
func (*Metrics) RecordDisconnection ¶
func (m *Metrics) RecordDisconnection()
RecordDisconnection records a connection closure
func (*Metrics) RecordFrameError ¶
func (m *Metrics) RecordFrameError()
RecordFrameError records a frame parsing error
func (*Metrics) RecordHandshakeError ¶
func (m *Metrics) RecordHandshakeError()
RecordHandshakeError records a handshake error
func (*Metrics) RecordQueueDropped ¶
func (m *Metrics) RecordQueueDropped()
RecordQueueDropped records a queued message being dropped
func (*Metrics) RecordQueueEnqueue ¶
func (m *Metrics) RecordQueueEnqueue()
RecordQueueEnqueue records a message being queued
func (*Metrics) RecordQueueSent ¶
func (m *Metrics) RecordQueueSent()
RecordQueueSent records a queued message being sent
func (*Metrics) RecordRead ¶
RecordRead records a read operation
func (*Metrics) RecordReadError ¶
func (m *Metrics) RecordReadError()
RecordReadError records a read error
func (*Metrics) RecordReconnectAttempt ¶
func (m *Metrics) RecordReconnectAttempt()
RecordReconnectAttempt records a reconnection attempt
func (*Metrics) RecordReconnectFailure ¶
func (m *Metrics) RecordReconnectFailure()
RecordReconnectFailure records a failed reconnection
func (*Metrics) RecordReconnectSuccess ¶
func (m *Metrics) RecordReconnectSuccess()
RecordReconnectSuccess records a successful reconnection
func (*Metrics) RecordWrite ¶
RecordWrite records a write operation
func (*Metrics) RecordWriteError ¶
func (m *Metrics) RecordWriteError()
RecordWriteError records a write error
type MetricsSnapshot ¶
type MetricsSnapshot struct {
ActiveConnections int64
TotalConnections int64
ClosedConnections int64
MessagesRead int64
MessagesWritten int64
BytesRead int64
BytesWritten int64
ReadErrors int64
WriteErrors int64
FrameErrors int64
HandshakeErrors int64
AvgReadLatency time.Duration
AvgWriteLatency time.Duration
// Reconnection metrics
ReconnectAttempts int64
ReconnectSuccesses int64
ReconnectFailures int64
// Queue metrics
QueueEnqueued int64
QueueSent int64
QueueDropped int64
// Compression metrics
CompressedMessages int64
DecompressedMessages int64
CompressionSaved int64
}
MetricsSnapshot represents a snapshot of metrics at a point in time
type ReconnectConfig ¶
type ReconnectConfig struct {
// Enabled determines if automatic reconnection is active
Enabled bool
// MaxAttempts is the maximum number of reconnection attempts (0 = unlimited)
MaxAttempts int
// InitialDelay is the initial delay before the first reconnection attempt
// Default: 1 second
InitialDelay time.Duration
// MaxDelay is the maximum delay between reconnection attempts
// Default: 30 seconds
MaxDelay time.Duration
// BackoffMultiplier is the multiplier for exponential backoff
// Default: 2.0
BackoffMultiplier float64
// Jitter adds randomness to delay to prevent thundering herd
// Default: true
Jitter bool
// ResetAfter resets the attempt counter after being connected for this duration
// Default: 60 seconds
ResetAfter time.Duration
// OnReconnecting is called when a reconnection attempt starts
OnReconnecting func(attempt int, delay time.Duration)
// OnReconnected is called when reconnection succeeds
OnReconnected func(attempt int)
// OnReconnectFailed is called when reconnection fails
OnReconnectFailed func(attempt int, err error)
// ShouldReconnect is called to determine if reconnection should be attempted
// If nil, always attempts to reconnect (within MaxAttempts)
ShouldReconnect func(err error, attempt int) bool
}
ReconnectConfig configures automatic reconnection behavior
func DefaultReconnectConfig ¶
func DefaultReconnectConfig() *ReconnectConfig
DefaultReconnectConfig returns a default reconnection configuration
type StateChange ¶
type StateChange struct {
From ConnectionState
To ConnectionState
Time time.Time
Err error // Error that caused the transition (if any)
Attempt int // Reconnection attempt number (if applicable)
SessionID string // Current session identifier
}
StateChange represents a state transition event
type StateHandler ¶
type StateHandler func(StateChange)
StateHandler is a callback for state change events
type UpgradeOptions ¶
type UpgradeOptions struct {
// ReadBufferSize sets the size of the read buffer in bytes.
// Default is 4096 bytes.
ReadBufferSize int
// WriteBufferSize sets the size of the write buffer in bytes.
// Default is 4096 bytes.
WriteBufferSize int
// MaxFrameSize sets the maximum frame size in bytes.
// Frames exceeding this size will result in ErrFrameTooLarge.
// Default is 4096 bytes.
MaxFrameSize int
// MaxMessageSize sets the maximum message size in bytes.
// Messages exceeding this size will result in ErrMessageTooLarge.
// Default is 1048576 bytes (1MB).
MaxMessageSize int
// ReadDeadline sets the read deadline for connections.
// Default is no deadline.
ReadDeadline time.Duration
// WriteDeadline sets the write deadline for connections.
// Default is no deadline.
WriteDeadline time.Duration
// PingInterval sets the interval for sending ping frames.
// If zero, pings are disabled.
// Default is 0 (disabled).
PingInterval time.Duration
// PongTimeout sets the timeout for waiting for a pong response.
// If zero, pong timeout is disabled.
// Default is 0 (disabled).
PongTimeout time.Duration
// CheckOrigin sets a function to validate the origin header.
// If nil, all origins are allowed.
// Default is nil (all origins allowed).
CheckOrigin func(r *http.Request) bool
// Subprotocols sets the list of supported subprotocols.
// The client's requested subprotocol must match one of these.
// Default is nil (no subprotocols).
Subprotocols []string
// Compression enables per-message compression (RFC 7692).
// Default is false (disabled).
Compression bool
}
UpgradeOptions configures WebSocket connection upgrade options
type Upgrader ¶
type Upgrader struct {
// contains filtered or unexported fields
}
Upgrader handles WebSocket connection upgrades
func NewUpgrader ¶
func NewUpgrader(opts *UpgradeOptions) *Upgrader
NewUpgrader creates a new Upgrader with default settings