README
ΒΆ
StreamBus Go SDK
π High-performance Go SDK for StreamBus - A modern, lightweight client library for building distributed streaming applications with StreamBus.
StreamBus SDK provides a robust and efficient way to integrate Go applications with StreamBus, offering high-throughput message production, flexible consumption patterns, and enterprise-grade features like transactions and security.
π Table of Contents
- Features
- Requirements
- Installation
- Quick Start
- Configuration
- Advanced Features
- API Reference
- Examples
- Performance
- Contributing
- Support
- License
β¨ Features
- Simple Client API - Easy-to-use client for producing and consuming messages
- Producer Support - Synchronous and asynchronous message production
- Consumer Support - Simple partition consumers and consumer groups
- Transactional Support - Exactly-once semantics with transactional producers and consumers
- Connection Pooling - Efficient connection management with configurable pooling
- Security - TLS/mTLS and SASL authentication support
- Protocol Optimized - High-performance binary protocol with minimal overhead
- Comprehensive Testing - Extensive test coverage and benchmarks
- Zero Dependencies - Minimal external dependencies for maximum reliability
π¦ Requirements
- Go 1.19 or higher
- StreamBus broker v1.0+ running and accessible
- Network connectivity to StreamBus brokers
π Installation
Install the SDK using Go modules:
go get github.com/gstreamio/streambus-sdk
Import in your Go code:
import "github.com/gstreamio/streambus-sdk/client"
π― Quick Start
Basic Producer
package main
import (
"log"
"github.com/gstreamio/streambus-sdk/client"
)
func main() {
// Create client configuration
config := client.DefaultConfig()
config.Brokers = []string{"localhost:9092"}
// Create client
c, err := client.New(config)
if err != nil {
log.Fatal(err)
}
defer c.Close()
// Create topic
if err := c.CreateTopic("events", 3, 1); err != nil {
log.Printf("Topic creation: %v", err)
}
// Create producer
producer := client.NewProducer(c)
defer producer.Close()
// Send message
key := []byte("key1")
value := []byte("Hello, StreamBus!")
if err := producer.Send("events", key, value); err != nil {
log.Fatal(err)
}
log.Println("Message sent successfully!")
}
Basic Consumer
package main
import (
"log"
"github.com/gstreamio/streambus-sdk/client"
)
func main() {
// Create client
config := client.DefaultConfig()
config.Brokers = []string{"localhost:9092"}
c, err := client.New(config)
if err != nil {
log.Fatal(err)
}
defer c.Close()
// Create consumer for topic "events", partition 0
consumer := client.NewConsumer(c, "events", 0)
defer consumer.Close()
// Set starting offset
if err := consumer.Seek(0); err != nil {
log.Fatal(err)
}
// Fetch messages
for i := 0; i < 10; i++ {
record, err := consumer.Fetch()
if err != nil {
log.Printf("Fetch error: %v", err)
continue
}
log.Printf("Received: key=%s, value=%s, offset=%d",
string(record.Key), string(record.Value), record.Offset)
}
}
Consumer Group
package main
import (
"context"
"log"
"time"
"github.com/gstreamio/streambus-sdk/client"
)
func main() {
config := client.DefaultConfig()
config.Brokers = []string{"localhost:9092"}
c, err := client.New(config)
if err != nil {
log.Fatal(err)
}
defer c.Close()
// Create group consumer
groupConfig := &client.GroupConsumerConfig{
GroupID: "my-consumer-group",
Topics: []string{"events"},
}
consumer, err := client.NewGroupConsumer(c, groupConfig)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
// Start consuming
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := consumer.Start(ctx, func(record *client.Record) error {
log.Printf("Group consumed: key=%s, value=%s",
string(record.Key), string(record.Value))
return nil
}); err != nil {
log.Fatal(err)
}
}
βοΈ Configuration
Client Configuration
config := &client.Config{
Brokers: []string{"localhost:9092"},
ConnectTimeout: 10 * time.Second,
RequestTimeout: 30 * time.Second,
// Connection pooling
MaxIdleConns: 10,
MaxConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
// Retry configuration
MaxRetries: 3,
RetryBackoff: 100 * time.Millisecond,
}
TLS Configuration
config := client.DefaultConfig()
config.Security = &client.SecurityConfig{
TLS: &client.TLSConfig{
Enabled: true,
CAFile: "/path/to/ca.crt",
CertFile: "/path/to/client.crt", // For mTLS
KeyFile: "/path/to/client.key", // For mTLS
ServerName: "streambus.example.com",
},
}
SASL Authentication
config := client.DefaultConfig()
config.Security = &client.SecurityConfig{
SASL: &client.SASLConfig{
Enabled: true,
Mechanism: "SCRAM-SHA-256",
Username: "producer1",
Password: "secure-password",
},
}
π§ Advanced Features
Transactional Producer
config := &client.TransactionalProducerConfig{
TransactionID: "my-transaction",
}
producer, err := client.NewTransactionalProducer(c, config)
if err != nil {
log.Fatal(err)
}
defer producer.Close()
// Begin transaction
if err := producer.BeginTransaction(); err != nil {
log.Fatal(err)
}
// Send messages
if err := producer.Send("events", []byte("key"), []byte("value")); err != nil {
producer.AbortTransaction()
log.Fatal(err)
}
// Commit transaction
if err := producer.CommitTransaction(); err != nil {
log.Fatal(err)
}
Transactional Consumer
config := &client.TransactionalConsumerConfig{
Topic: "events",
Partition: 0,
IsolationLevel: client.ReadCommitted,
}
consumer, err := client.NewTransactionalConsumer(c, config)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
// Only reads committed messages
record, err := consumer.Fetch()
if err != nil {
log.Fatal(err)
}
π API Reference
Client
New(config *Config) (*Client, error)- Create a new clientCreateTopic(name string, partitions, replicas int) error- Create a topicDeleteTopic(name string) error- Delete a topicListTopics() ([]string, error)- List all topicsClose() error- Close the client and all connections
Producer
NewProducer(client *Client) *Producer- Create a new producerSend(topic string, key, value []byte) error- Send a messageClose() error- Close the producer
Consumer
NewConsumer(client *Client, topic string, partition int) *Consumer- Create partition consumerSeek(offset int64) error- Set the starting offsetFetch() (*Record, error)- Fetch the next messageClose() error- Close the consumer
Group Consumer
NewGroupConsumer(client *Client, config *GroupConsumerConfig) (*GroupConsumer, error)- Create group consumerStart(ctx context.Context, handler MessageHandler) error- Start consuming with handlerClose() error- Close the consumer and leave the group
π‘ Examples
See the examples directory for complete working examples:
- Basic Producer/Consumer - Simple message production and consumption
- Consumer Groups - Distributed consumer groups with auto-balancing
- Transactional Messaging - Exactly-once processing patterns
- Secure Connections - TLS/mTLS and SASL authentication
β‘ Performance
- Connection Pooling: Configure appropriate pool sizes for your workload
- Batching: Use transactional producers for batching multiple messages
- Partition Strategy: Distribute load across multiple partitions
- Consumer Groups: Scale consumers horizontally with consumer groups
- Keep-Alive: Enable TCP keep-alive for long-lived connections
Benchmarks
The SDK achieves excellent performance in benchmarks:
- Producer: 1M+ messages/sec on a single connection
- Consumer: 800K+ messages/sec with minimal latency
- Memory: < 50MB for typical workloads
- CPU: < 5% CPU usage under normal load
π οΈ Error Handling
The SDK uses standard Go error handling patterns with typed errors for common scenarios:
if err := producer.Send("topic", key, value); err != nil {
switch {
case errors.Is(err, client.ErrConnectionFailed):
// Handle connection errors
case errors.Is(err, client.ErrTimeout):
// Handle timeouts
case errors.Is(err, client.ErrInvalidTopic):
// Handle invalid topic
default:
// Handle other errors
}
}
π€ Contributing
We welcome contributions! Please read our Contributing Guidelines to get started.
Development Setup
# Clone the repository
git clone https://github.com/gstreamio/streambus-sdk.git
cd streambus-sdk
# Install dependencies
go mod download
# Run tests
go test ./...
# Run benchmarks
go test -bench=. ./...
π Documentation
- API Documentation - Complete API reference
- Getting Started Guide - Step-by-step tutorial
- Architecture Overview - SDK design and internals
- Best Practices - Production deployment guidelines
π Support
- π Documentation: StreamBus Docs
- π Issues: GitHub Issues
- π¬ Discussions: GitHub Discussions
- π StreamBus Broker: github.com/shawntherrien/streambus
π License
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.