eventstream

package
v0.1.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 12, 2025 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package eventstream implements the AWS EventStream RPC protocol used for Greengrass IPC communication over Unix domain sockets.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func EncodeMessage

func EncodeMessage(msg *Message) ([]byte, error)

EncodeMessage encodes a Message into wire format

func IsConnectionError added in v0.1.6

func IsConnectionError(err error) bool

IsConnectionError checks if an error is a connection failure that should trigger reconnection

func WrapIfConnectionError added in v0.1.6

func WrapIfConnectionError(err error) error

WrapIfConnectionError wraps an error as ConnectionError if it's a network error

Types

type ConnectRequest

type ConnectRequest struct {
	AuthToken string `json:"authToken"`
}

ConnectRequest is the payload for CONNECT messages

type ConnectResponse

type ConnectResponse struct {
}

ConnectResponse is the payload for CONNECTACK messages

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection represents an EventStream RPC connection

func Connect

func Connect(ctx context.Context, config ConnectionConfig) (*Connection, error)

Connect establishes a new EventStream RPC connection

func (*Connection) Close

func (c *Connection) Close() error

Close closes the connection

func (*Connection) NewStream

func (c *Connection) NewStream(operation string) *Stream

NewStream creates a new operation stream Note: Stream ID is not allocated until Activate() is called, to ensure atomic allocation + first message send and prevent stream-id ordering violations

func (*Connection) RequestResponse

func (c *Connection) RequestResponse(ctx context.Context, operation string, request interface{}) ([]byte, error)

RequestResponse performs a simple request-response operation with automatic retry on connection failure

func (*Connection) WaitUntilReady added in v0.1.7

func (c *Connection) WaitUntilReady(ctx context.Context) error

WaitUntilReady blocks until the connection is ready for use Returns nil when connection is ready, or context error if cancelled

type ConnectionConfig

type ConnectionConfig struct {
	// SocketPath is the path to the Unix domain socket
	SocketPath string

	// AuthToken is the authentication token for the connection
	AuthToken string

	// EnableReconnection enables automatic reconnection on connection failure
	EnableReconnection bool

	// DisablePingPong disables proactive keepalive pings
	DisablePingPong bool

	// PingInterval is how often to send keepalive pings
	PingInterval time.Duration

	// PingTimeout is how long to wait for ping response
	PingTimeout time.Duration

	// MaxRetries is max retry attempts for request-response operations
	MaxRetries int

	// OnDisconnected callback when connection is lost
	OnDisconnected func(error)

	// OnReconnected callback when connection is restored
	OnReconnected func()
}

ConnectionConfig holds configuration for establishing a connection

type ConnectionError added in v0.1.6

type ConnectionError struct {
	Err error
}

ConnectionError represents a socket-level connection failure These errors are retriable and should trigger reconnection

func (*ConnectionError) Error added in v0.1.6

func (e *ConnectionError) Error() string

func (*ConnectionError) Unwrap added in v0.1.6

func (e *ConnectionError) Unwrap() error
type Header struct {
	Name  string
	Type  HeaderType
	Value interface{}
}

Header represents a single header in an EventStream message

type HeaderType

type HeaderType byte

HeaderType represents the type of a header value

const (
	// HeaderTypeBoolTrue represents a boolean true value
	HeaderTypeBoolTrue HeaderType = 0

	// HeaderTypeBoolFalse represents a boolean false value
	HeaderTypeBoolFalse HeaderType = 1

	// HeaderTypeByte represents a single byte value
	HeaderTypeByte HeaderType = 2

	// HeaderTypeInt16 represents a 16-bit integer
	HeaderTypeInt16 HeaderType = 3

	// HeaderTypeInt32 represents a 32-bit integer
	HeaderTypeInt32 HeaderType = 4

	// HeaderTypeInt64 represents a 64-bit integer
	HeaderTypeInt64 HeaderType = 5

	// HeaderTypeByteArray represents a byte array
	HeaderTypeByteArray HeaderType = 6

	// HeaderTypeString represents a UTF-8 string
	HeaderTypeString HeaderType = 7

	// HeaderTypeTimestamp represents a timestamp (milliseconds since epoch)
	HeaderTypeTimestamp HeaderType = 8

	// HeaderTypeUUID represents a UUID
	HeaderTypeUUID HeaderType = 9
)

type Message

type Message struct {
	Type    MessageType
	Flags   MessageFlags
	Headers []Header
	Payload []byte
}

Message represents a complete EventStream message

func CreateMessage

func CreateMessage(msgType MessageType, flags MessageFlags, payload []byte) *Message

CreateMessage is a helper to create a message with type and flags streamID is the stream identifier (0 for connection-level messages)

func CreateMessageWithStreamID

func CreateMessageWithStreamID(msgType MessageType, flags MessageFlags, streamID uint32, payload []byte) *Message

CreateMessageWithStreamID creates a message with a specific stream ID

func DecodeMessage

func DecodeMessage(reader io.Reader) (*Message, error)

DecodeMessage decodes a Message from wire format

func (*Message) GetHeader

func (m *Message) GetHeader(name string) (interface{}, bool)

GetHeader retrieves a header value by name

func (*Message) GetStringHeader

func (m *Message) GetStringHeader(name string) (string, bool)

GetStringHeader retrieves a string header value

func (*Message) SetHeader

func (m *Message) SetHeader(name string, headerType HeaderType, value interface{})

SetHeader sets or updates a header value

type MessageFlags

type MessageFlags uint32

MessageFlags represents bitwise flags for EventStream messages

const (
	// MessageFlagNone indicates no flags set
	MessageFlagNone MessageFlags = 0

	// MessageFlagConnectionAccepted indicates connection was accepted (used in ConnectAck)
	MessageFlagConnectionAccepted MessageFlags = 1 << 0

	// MessageFlagTerminateStream indicates the stream should be closed
	MessageFlagTerminateStream MessageFlags = 1 << 1
)

func (MessageFlags) HasFlag

func (f MessageFlags) HasFlag(flag MessageFlags) bool

HasFlag checks if a specific flag is set

func (MessageFlags) String

func (f MessageFlags) String() string

String returns string representation of MessageFlags

type MessageType

type MessageType uint32

MessageType represents the type of EventStream message Values MUST match the AWS EventStream RPC protocol specification

const (
	// MessageTypeApplicationMessage is used for operation requests/responses
	MessageTypeApplicationMessage MessageType = 0

	// MessageTypeApplicationError is used for operation errors
	MessageTypeApplicationError MessageType = 1

	// MessageTypePing is sent to keep connection alive
	MessageTypePing MessageType = 2

	// MessageTypePingResponse is sent in response to Ping
	MessageTypePingResponse MessageType = 3

	// MessageTypeConnect is sent by client to initiate connection
	MessageTypeConnect MessageType = 4

	// MessageTypeConnectAck is sent by server in response to Connect
	MessageTypeConnectAck MessageType = 5

	// MessageTypeProtocolError indicates a protocol-level error
	MessageTypeProtocolError MessageType = 6

	// MessageTypeInternalError indicates an internal server error
	MessageTypeInternalError MessageType = 7
)

func (MessageType) String

func (m MessageType) String() string

String returns string representation of MessageType

type OperationError

type OperationError struct {
	ErrorType       string          // Service model type of the error
	ErrorMessage    string          // Human-readable error message
	ModeledError    json.RawMessage // The full modeled error structure
	IsServiceError  bool            // True if this is a modeled service error
	IsInternalError bool            // True if this is an internal error
	UnderlyingError error           // Underlying error if any
}

OperationError represents an error returned from an operation

func (*OperationError) Error

func (e *OperationError) Error() string

Error implements the error interface

func (*OperationError) Unwrap

func (e *OperationError) Unwrap() error

Unwrap returns the underlying error

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream represents an EventStream RPC operation stream

func (*Stream) Activate

func (s *Stream) Activate(ctx context.Context, request interface{}) error

Activate activates a stream by sending the initial request

func (*Stream) Close

func (s *Stream) Close() error

Close closes the stream

func (*Stream) Done

func (s *Stream) Done() <-chan struct{}

Done returns a channel that is closed when the stream is done

func (*Stream) Errors

func (s *Stream) Errors() <-chan error

Errors returns the channel for receiving errors

func (*Stream) Messages

func (s *Stream) Messages() <-chan *Message

Messages returns the channel for receiving messages

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL