goplugins

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 13, 2025 License: MPL-2.0 Imports: 30 Imported by: 1

README

go-plugins: GO Plugin System over HTTP, gRPC & Unix Sockets

an AGILira library

CI/CD Pipeline Security Go Report Card Coverage

go-plugins provides a production-ready, type-safe plugin architecture for Go applications. It supports multiple transport protocols (HTTP, gRPC, Unix sockets) with built-in circuit breaking, health monitoring, authentication, and graceful degradation.

FeaturesQuick StartUsageObservabilityExamplesAPI Reference

Features

  • Multiple Transport Protocols: HTTP/HTTPS, gRPC (with optional TLS), Unix domain sockets, and executable plugins
  • Type Safety: Generics-based architecture ensuring compile-time type safety for requests and responses
  • Circuit Breaker Pattern: Automatic failure detection and recovery with configurable thresholds
  • Health Monitoring: Continuous health checking with automatic plugin status management
  • Load Balancing: Multiple algorithms including round-robin, least connections, and weighted random
  • Authentication: Support for API keys, Bearer tokens, Basic auth, mTLS, and custom methods
  • Hot Reload: Intelligent configuration reloading with graceful connection draining
  • Rate Limiting: Token bucket rate limiting to prevent overwhelming plugins
  • Observability: Comprehensive metrics collection and structured logging
  • Connection Pooling: Efficient resource management with configurable connection limits

Compatibility and Support

go-plugins is designed for Go 1.23+ environments and follows Long-Term Support guidelines to ensure consistent performance across production deployments.

Quick Start

Installation
go get github.com/agilira/go-plugins
Basic Usage
package main

import (
    "context"
    "log"
    "log/slog"

    "github.com/agilira/go-plugins"
)

// Define request/response types
type AuthRequest struct {
    UserID string `json:"user_id"`
    Token  string `json:"token"`
}

type AuthResponse struct {
    Valid   bool   `json:"valid"`
    Message string `json:"message,omitempty"`
}

func main() {
    // Create manager
    logger := slog.Default()
    manager := goplugins.NewManager[AuthRequest, AuthResponse](logger)

    // Register plugin factory
    httpFactory := goplugins.NewHTTPPluginFactory[AuthRequest, AuthResponse]()
    manager.RegisterFactory("http", httpFactory)

    // Configure plugins
    config := goplugins.ManagerConfig{
        Plugins: []goplugins.PluginConfig{
            {
                Name:      "auth-service",
                Type:      "http",
                Transport: goplugins.TransportHTTPS,
                Endpoint:  "https://auth.example.com/api/v1/validate",
                Enabled:   true,
                Auth: goplugins.AuthConfig{
                    Method: goplugins.AuthBearer,
                    Token:  "your-jwt-token",
                },
            },
        },
    }

    // Load configuration
    if err := manager.LoadFromConfig(config); err != nil {
        log.Fatal(err)
    }

    // Execute plugin request
    ctx := context.Background()
    request := AuthRequest{
        UserID: "user123",
        Token:  "access-token",
    }

    response, err := manager.Execute(ctx, "auth-service", request)
    if err != nil {
        log.Printf("Request failed: %v", err)
        return
    }

    log.Printf("Authentication result: %+v", response)
}
Usage
HTTP/HTTPS Plugin Configuration
config := goplugins.PluginConfig{
    Name:      "api-service",
    Transport: goplugins.TransportHTTPS,
    Endpoint:  "https://api.example.com/v1/process",
    Auth: goplugins.AuthConfig{
        Method: goplugins.AuthAPIKey,
        APIKey: "your-api-key",
    },
    Connection: goplugins.ConnectionConfig{
        MaxConnections:    10,
        RequestTimeout:    30 * time.Second,
        ConnectionTimeout: 10 * time.Second,
    },
}
gRPC Plugin Configuration
config := goplugins.PluginConfig{
    Name:      "grpc-service",
    Transport: goplugins.TransportGRPCTLS,
    Endpoint:  "grpc.example.com:443",
    Auth: goplugins.AuthConfig{
        Method:   goplugins.AuthMTLS,
        CertFile: "/etc/ssl/client.crt",
        KeyFile:  "/etc/ssl/client.key",
        CAFile:   "/etc/ssl/ca.crt",
    },
}
Unix Domain Socket Plugin Configuration
config := goplugins.PluginConfig{
    Name:      "local-processor",
    Transport: goplugins.TransportUnix,
    Endpoint:  "/tmp/processor.sock",
    Connection: goplugins.ConnectionConfig{
        MaxConnections: 20,
    },
}
Observability
Metrics Collection
// Enable enhanced observability
observableManager := goplugins.NewObservableManager(manager, 
    goplugins.EnhancedObservabilityConfig(), logger)

// Get comprehensive metrics
metrics := observableManager.GetObservabilityMetrics()
log.Printf("Total requests: %d", metrics.TotalRequests)
log.Printf("Success rate: %.2f%%", metrics.OverallSuccessRate)

// Per-plugin metrics
for pluginName, pluginMetrics := range metrics.PluginMetrics {
    log.Printf("Plugin %s: %d requests, %.2f%% success rate",
        pluginName, pluginMetrics.TotalRequests, pluginMetrics.SuccessRate)
}
Circuit Breaker and Health Monitoring
// Get plugin health status
healthStatus := manager.Health()
for pluginName, status := range healthStatus {
    log.Printf("Plugin %s: %s (%v)", 
        pluginName, status.Status, status.ResponseTime)
}

// Centralized health monitoring
monitor := goplugins.NewHealthMonitor()
monitor.AddChecker("auth-service", authHealthChecker)
monitor.AddChecker("payment-service", paymentHealthChecker)

overallHealth := monitor.GetOverallHealth()
if overallHealth.Status != goplugins.StatusHealthy {
    log.Printf("System degraded: %s", overallHealth.Message)
}
Core Components
  • Manager: Central plugin management with lifecycle control and load balancing
  • Circuit Breaker: Automatic failure detection and recovery with configurable thresholds
  • Health Checker: Continuous plugin health monitoring with status tracking
  • Load Balancer: Multiple strategies for distributing requests across plugin instances
  • Hot Reloader: Intelligent configuration updates with graceful connection draining
  • Observability System: Comprehensive metrics collection and structured logging

Testing

# Run all tests
go test ./...

# Run with coverage
go test -v -race -coverprofile=coverage.out ./...
go tool cover -html=coverage.out

# Run benchmarks
go test -bench=. -benchmem ./...

Examples

Load Balancing
// Create load balancer with round-robin strategy
balancer := goplugins.NewLoadBalancer[MyRequest, MyResponse](
    goplugins.LoadBalanceStrategyRoundRobin,
    logger,
)

// Add plugins with different weights
balancer.AddPlugin("service-1", plugin1, 100, 1) // weight=100, priority=1
balancer.AddPlugin("service-2", plugin2, 50, 1)  // weight=50, priority=1

// Execute with load balancing
request := goplugins.LoadBalanceRequest{
    RequestID: "req-123",
    Data:      myRequest,
}
response, err := balancer.Execute(ctx, request)
Hot Reload with Graceful Draining
// Create reloader with advanced options
reloader := goplugins.NewPluginReloader(manager, goplugins.ReloadOptions{
    Strategy:             goplugins.ReloadStrategyGraceful,
    DrainTimeout:         30 * time.Second,
    GracefulTimeout:      60 * time.Second,
    MaxConcurrentReloads: 3,
    HealthCheckTimeout:   10 * time.Second,
    RollbackOnFailure:    true,
}, logger)

// Perform intelligent reload
ctx := context.Background()
err := reloader.ReloadWithIntelligentDiff(ctx, newConfig)
if err != nil {
    log.Printf("Reload failed: %v", err)
}
Complete Production Configuration
config := goplugins.ManagerConfig{
    LogLevel:    "info",
    MetricsPort: 9090,
    
    // Default settings applied to all plugins
    DefaultRetry: goplugins.RetryConfig{
        MaxRetries:      3,
        InitialInterval: 100 * time.Millisecond,
        MaxInterval:     5 * time.Second,
        Multiplier:      2.0,
        RandomJitter:    true,
    },
    
    DefaultCircuitBreaker: goplugins.CircuitBreakerConfig{
        Enabled:             true,
        FailureThreshold:    5,
        RecoveryTimeout:     30 * time.Second,
        MinRequestThreshold: 3,
        SuccessThreshold:    2,
    },
    
    DefaultHealthCheck: goplugins.HealthCheckConfig{
        Enabled:      true,
        Interval:     30 * time.Second,
        Timeout:      5 * time.Second,
        FailureLimit: 3,
    },
    
    Plugins: []goplugins.PluginConfig{
        {
            Name:      "payment-service",
            Type:      "http",
            Transport: goplugins.TransportHTTPS,
            Endpoint:  "https://payments.example.com/api/v1",
            Priority:  1,
            Enabled:   true,
            
            Auth: goplugins.AuthConfig{
                Method: goplugins.AuthBearer,
                Token:  os.Getenv("PAYMENT_SERVICE_TOKEN"),
            },
            
            RateLimit: goplugins.RateLimitConfig{
                Enabled:           true,
                RequestsPerSecond: 50.0,
                BurstSize:         100,
            },
            
            Labels: map[string]string{
                "environment": "production",
                "team":        "payments",
            },
        },
    },
}

API Reference

Core Interfaces
  • Plugin[Req, Resp]: Main plugin interface with Execute, Health, Info, and Close methods
  • PluginManager[Req, Resp]: Manager interface for plugin lifecycle and execution
  • PluginFactory[Req, Resp]: Factory interface for creating plugin instances from configuration
Transport Implementations
  • HTTPPlugin[Req, Resp]: HTTP/HTTPS transport with authentication and connection pooling
  • GRPCPlugin[Req, Resp]: gRPC transport with TLS support and service discovery
  • UnixSocketPlugin[Req, Resp]: Unix domain socket transport for high-performance local communication
Configuration Types
  • PluginConfig: Complete plugin configuration with transport, auth, and operational settings
  • ManagerConfig: Manager-level configuration with defaults and plugin definitions
  • AuthConfig: Authentication configuration supporting multiple methods
  • RetryConfig: Retry and backoff configuration for failed requests
  • CircuitBreakerConfig: Circuit breaker settings for failure detection and recovery
  • HealthCheckConfig: Health monitoring configuration with intervals and thresholds

License

go-plugins is licensed under the Mozilla Public License 2.0.


go-plugins • an AGILira library

Documentation

Overview

Package goplugins provides a production-ready, type-safe plugin architecture for Go applications. It supports multiple transport protocols (HTTP, gRPC, Unix sockets) with built-in circuit breaking, health monitoring, authentication, and graceful degradation.

Key Features:

  • Type-safe plugin interfaces using Go generics
  • Multiple transport protocols (HTTP, gRPC, Unix sockets)
  • Circuit breaker pattern for resilience
  • Health monitoring and automatic recovery
  • Authentication and authorization
  • Hot-reloading of plugin configurations
  • Comprehensive metrics and structured logging
  • Graceful shutdown with proper cleanup

Basic Usage:

// Define your plugin request/response types
type KeyRequest struct {
	KeyID string `json:"key_id"`
}

type KeyResponse struct {
	Key   []byte `json:"key"`
	Error string `json:"error,omitempty"`
}

// Create a plugin manager
manager := goplugins.NewManager[KeyRequest, KeyResponse]()

// Load plugins from configuration
err := manager.LoadFromConfig("plugins.yaml")
if err != nil {
	log.Fatal(err)
}

// Execute plugin operations
resp, err := manager.Execute(ctx, "vault-provider", KeyRequest{KeyID: "master"})

Security: The library implements multiple security layers including mTLS for transport security, API key authentication, request validation, rate limiting, and comprehensive audit logging.

Performance: Built-in connection pooling, intelligent caching, circuit breakers, and optimized serialization ensure high performance even under heavy load.

Copyright (c) 2025 AGILira - A. Giordano SPDX-License-Identifier: MPL-2.0

Index

Constants

View Source
const (
	// Configuration errors (1000-1099)
	ErrCodeInvalidPluginName     = "PLUGIN_1001"
	ErrCodeInvalidTransport      = "PLUGIN_1002"
	ErrCodeMissingEndpoint       = "PLUGIN_1003"
	ErrCodeInvalidEndpointURL    = "PLUGIN_1004"
	ErrCodeInvalidEndpointFormat = "PLUGIN_1005"
	ErrCodeMissingSocketPath     = "PLUGIN_1006"
	ErrCodeMissingExecutable     = "PLUGIN_1007"
	ErrCodeUnsupportedTransport  = "PLUGIN_1008"
	ErrCodeNoPluginsConfigured   = "PLUGIN_1009"
	ErrCodeDuplicatePluginName   = "PLUGIN_1010"
	ErrCodeInvalidJSONConfig     = "PLUGIN_1011"

	// Authentication errors (1100-1199)
	ErrCodeMissingAPIKey           = "AUTH_1101"
	ErrCodeMissingBearerToken      = "AUTH_1102"
	ErrCodeMissingBasicCredentials = "AUTH_1103"
	ErrCodeMissingMTLSCerts        = "AUTH_1104"
	ErrCodeUnsupportedAuthMethod   = "AUTH_1105"

	// Plugin execution errors (1200-1299)
	ErrCodePluginNotFound         = "PLUGIN_1201"
	ErrCodePluginNotEnabled       = "PLUGIN_1202"
	ErrCodePluginExecutionFailed  = "PLUGIN_1203"
	ErrCodePluginTimeout          = "PLUGIN_1204"
	ErrCodePluginConnectionFailed = "PLUGIN_1205"

	// Transport errors (1300-1399)
	ErrCodeHTTPTransportError = "TRANSPORT_1301"
	ErrCodeGRPCTransportError = "TRANSPORT_1302"
	ErrCodeUnixTransportError = "TRANSPORT_1303"
	ErrCodeExecTransportError = "TRANSPORT_1304"

	// Circuit breaker errors (1400-1499)
	ErrCodeCircuitBreakerOpen    = "CIRCUIT_1401"
	ErrCodeCircuitBreakerTimeout = "CIRCUIT_1402"

	// Rate limiting errors (1500-1599)
	ErrCodeRateLimitExceeded = "RATELIMIT_1501"

	// Health check errors (1600-1699)
	ErrCodeHealthCheckFailed  = "HEALTH_1601"
	ErrCodeHealthCheckTimeout = "HEALTH_1602"

	// Load balancer errors (1700-1799)
	ErrCodeNoAvailablePlugins = "LOADBALANCER_1701"
	ErrCodeLoadBalancerFailed = "LOADBALANCER_1702"
)

Error codes for the go-plugins system

Variables

This section is empty.

Functions

func NewAuthConfigValidationError

func NewAuthConfigValidationError(cause error) *errors.Error

func NewCircuitBreakerOpenError

func NewCircuitBreakerOpenError(pluginName string) *errors.Error

func NewCircuitBreakerTimeoutError

func NewCircuitBreakerTimeoutError(pluginName string, timeout interface{}) *errors.Error

func NewDuplicatePluginNameError

func NewDuplicatePluginNameError(name string) *errors.Error

func NewExecTransportError

func NewExecTransportError(cause error) *errors.Error

func NewGRPCTransportError

func NewGRPCTransportError(cause error) *errors.Error

func NewHTTPTransportError

func NewHTTPTransportError(cause error) *errors.Error

func NewHealthCheckFailedError

func NewHealthCheckFailedError(pluginName string, cause error) *errors.Error

func NewHealthCheckTimeoutError

func NewHealthCheckTimeoutError(pluginName string, timeout interface{}) *errors.Error

func NewInvalidEndpointFormatError

func NewInvalidEndpointFormatError() *errors.Error

func NewInvalidEndpointURLError

func NewInvalidEndpointURLError(endpoint string, cause error) *errors.Error

func NewInvalidJSONConfigError

func NewInvalidJSONConfigError(cause error) *errors.Error

func NewInvalidPluginNameError

func NewInvalidPluginNameError(name string) *errors.Error

func NewInvalidTransportError

func NewInvalidTransportError() *errors.Error

func NewLoadBalancerFailedError

func NewLoadBalancerFailedError(cause error) *errors.Error

func NewMissingAPIKeyError

func NewMissingAPIKeyError() *errors.Error

func NewMissingBasicCredentialsError

func NewMissingBasicCredentialsError() *errors.Error

func NewMissingBearerTokenError

func NewMissingBearerTokenError() *errors.Error

func NewMissingEndpointError

func NewMissingEndpointError(transport TransportType) *errors.Error

func NewMissingExecutableError

func NewMissingExecutableError() *errors.Error

func NewMissingMTLSCertsError

func NewMissingMTLSCertsError() *errors.Error

func NewMissingSocketPathError

func NewMissingSocketPathError() *errors.Error

func NewNoAvailablePluginsError

func NewNoAvailablePluginsError(pluginType string) *errors.Error

func NewNoPluginsConfiguredError

func NewNoPluginsConfiguredError() *errors.Error

func NewPluginConnectionFailedError

func NewPluginConnectionFailedError(name string, cause error) *errors.Error

func NewPluginExecutionFailedError

func NewPluginExecutionFailedError(name string, cause error) *errors.Error

func NewPluginNotEnabledError

func NewPluginNotEnabledError(name string) *errors.Error

func NewPluginNotFoundError

func NewPluginNotFoundError(name string) *errors.Error

func NewPluginTimeoutError

func NewPluginTimeoutError(name string, timeout interface{}) *errors.Error

func NewPluginValidationError

func NewPluginValidationError(pluginIndex int, cause error) *errors.Error

func NewRateLimitExceededError

func NewRateLimitExceededError(pluginName string, limit interface{}) *errors.Error

func NewUnixTransportError

func NewUnixTransportError(cause error) *errors.Error

func NewUnsupportedAuthMethodError

func NewUnsupportedAuthMethodError(method AuthMethod) *errors.Error

func NewUnsupportedTransportError

func NewUnsupportedTransportError(transport TransportType) *errors.Error

Types

type AuthConfig

type AuthConfig struct {
	Method   AuthMethod        `json:"method" yaml:"method"`
	APIKey   string            `json:"api_key,omitempty" yaml:"api_key,omitempty"`
	Token    string            `json:"token,omitempty" yaml:"token,omitempty"`
	Username string            `json:"username,omitempty" yaml:"username,omitempty"`
	Password string            `json:"password,omitempty" yaml:"password,omitempty"`
	CertFile string            `json:"cert_file,omitempty" yaml:"cert_file,omitempty"`
	KeyFile  string            `json:"key_file,omitempty" yaml:"key_file,omitempty"`
	CAFile   string            `json:"ca_file,omitempty" yaml:"ca_file,omitempty"`
	Headers  map[string]string `json:"headers,omitempty" yaml:"headers,omitempty"`
}

AuthConfig contains authentication configuration for plugin connections.

This structure supports multiple authentication methods and provides flexible configuration options for securing plugin communications. The specific fields used depend on the chosen authentication method.

Field usage by auth method:

  • AuthAPIKey: Uses APIKey field
  • AuthBearer: Uses Token field
  • AuthBasic: Uses Username and Password fields
  • AuthMTLS: Uses CertFile, KeyFile, and optionally CAFile
  • AuthCustom: Uses Headers field for custom authentication headers

Example configurations:

// API Key authentication
auth := AuthConfig{
    Method: AuthAPIKey,
    APIKey: "your-api-key-here",
}

// mTLS authentication
auth := AuthConfig{
    Method:   AuthMTLS,
    CertFile: "/path/to/client.crt",
    KeyFile:  "/path/to/client.key",
    CAFile:   "/path/to/ca.crt",
}

func (*AuthConfig) Validate

func (ac *AuthConfig) Validate() error

Validate validates the authentication configuration

type AuthMethod

type AuthMethod string

AuthMethod represents different authentication methods supported by the plugin system.

Available authentication methods:

  • AuthNone: No authentication required
  • AuthAPIKey: API key-based authentication via X-API-Key header
  • AuthBearer: Bearer token authentication via Authorization header
  • AuthBasic: HTTP Basic authentication with username/password
  • AuthMTLS: Mutual TLS authentication using client certificates
  • AuthCustom: Custom authentication method with user-defined headers

Example usage:

auth := AuthConfig{
    Method: AuthBearer,
    Token:  "your-jwt-token-here",
}
const (
	AuthNone   AuthMethod = "none"
	AuthAPIKey AuthMethod = "api-key"
	AuthBearer AuthMethod = "bearer"
	AuthBasic  AuthMethod = "basic"
	AuthMTLS   AuthMethod = "mtls"
	AuthCustom AuthMethod = "custom"
)

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker implements the circuit breaker pattern for enhanced plugin resilience.

This implementation provides automatic failure detection and recovery mechanisms to prevent cascading failures in distributed systems. It uses atomic operations for thread safety and maintains detailed statistics for monitoring.

Key features:

  • Thread-safe operation using atomic counters
  • Configurable failure thresholds and recovery timeouts
  • Automatic state transitions based on success/failure patterns
  • Detailed statistics tracking for observability
  • Graceful recovery testing through half-open state

Usage example:

config := CircuitBreakerConfig{
    Enabled:             true,
    FailureThreshold:    5,
    RecoveryTimeout:     30 * time.Second,
    MinRequestThreshold: 3,
    SuccessThreshold:    2,
}
cb := NewCircuitBreaker(config)

// Before making a request
if !cb.AllowRequest() {
    return nil, errors.New("circuit breaker open")
}

// After request completion
if err != nil {
    cb.RecordFailure()
} else {
    cb.RecordSuccess()
}

func NewCircuitBreaker

func NewCircuitBreaker(config CircuitBreakerConfig) *CircuitBreaker

NewCircuitBreaker creates a new circuit breaker instance with the specified configuration.

The circuit breaker starts in the StateClosed state, allowing all requests to pass through. State transitions will occur automatically based on the configured thresholds and the success/failure patterns of monitored operations.

Parameters:

  • config: Configuration defining thresholds, timeouts, and behavior parameters

Returns a thread-safe CircuitBreaker ready for use across multiple goroutines.

func (*CircuitBreaker) AllowRequest

func (cb *CircuitBreaker) AllowRequest() bool

AllowRequest determines whether a request should be allowed through the circuit breaker.

This method implements the core logic of the circuit breaker pattern by checking the current state and applying the appropriate rules:

  • StateClosed: Always allows requests (normal operation)
  • StateOpen: Blocks all requests until recovery timeout expires
  • StateHalfOpen: Allows limited requests to test service recovery

The method is thread-safe and may trigger state transitions when called. It should be called before attempting any operation that you want to protect with the circuit breaker.

Returns:

  • true: Request should proceed
  • false: Request should be rejected (fail fast)

func (*CircuitBreaker) GetState

func (cb *CircuitBreaker) GetState() CircuitBreakerState

GetState returns the current state of the circuit breaker.

This method provides a thread-safe way to inspect the circuit breaker's current operational state without affecting its behavior. It's useful for monitoring, debugging, and making operational decisions.

Returns one of:

  • StateClosed: Normal operation
  • StateOpen: Circuit is tripped, blocking requests
  • StateHalfOpen: Testing recovery

func (*CircuitBreaker) GetStats

func (cb *CircuitBreaker) GetStats() CircuitBreakerStats

GetStats returns comprehensive statistics about the circuit breaker's operation.

This method provides detailed metrics for monitoring and observability purposes. The statistics include current state, operation counts, and timing information that can be used for alerting, dashboards, and performance analysis.

The returned statistics are consistent with the circuit breaker's internal state at the time of the call, providing a reliable snapshot for monitoring.

Returns CircuitBreakerStats containing:

  • Current state and failure/success counts
  • Request count and last failure timestamp
  • All data needed for operational visibility

func (*CircuitBreaker) RecordFailure

func (cb *CircuitBreaker) RecordFailure()

RecordFailure records a failed operation and may trigger the circuit breaker to open.

This method should be called after every failed operation that was allowed through the circuit breaker. It updates failure counters and may cause the circuit breaker to transition to StateOpen if the failure threshold is exceeded.

The method is thread-safe and performs atomic updates to maintain consistency across concurrent operations. It also updates the last failure timestamp for recovery timeout calculations.

State transition logic:

  • May open circuit if FailureThreshold is exceeded
  • In StateHalfOpen: Any failure immediately reopens the circuit
  • Updates failure timestamp for recovery timeout tracking

func (*CircuitBreaker) RecordSuccess

func (cb *CircuitBreaker) RecordSuccess()

RecordSuccess records a successful operation and may trigger state transitions.

This method should be called after every successful operation that was allowed through the circuit breaker. It updates internal counters and may cause the circuit breaker to transition from StateHalfOpen to StateClosed if enough consecutive successes have been recorded.

The method is thread-safe and performs atomic updates to maintain consistency across concurrent operations.

State transition logic:

  • In StateHalfOpen: May close circuit if SuccessThreshold is met
  • In other states: Updates statistics for monitoring

func (*CircuitBreaker) Reset

func (cb *CircuitBreaker) Reset()

Reset forcibly resets the circuit breaker to the closed state and clears all counters.

This method provides an administrative way to manually reset the circuit breaker, typically used for operational recovery or testing purposes. It immediately transitions the circuit breaker to StateClosed regardless of current state and clears all failure/success counters.

Use cases:

  • Manual recovery after fixing underlying issues
  • Administrative reset during maintenance
  • Testing and development scenarios
  • Integration with external monitoring systems

Note: Use with caution in production as it bypasses the normal recovery logic.

type CircuitBreakerConfig

type CircuitBreakerConfig struct {
	Enabled             bool          `json:"enabled" yaml:"enabled"`
	FailureThreshold    int           `json:"failure_threshold" yaml:"failure_threshold"`
	RecoveryTimeout     time.Duration `json:"recovery_timeout" yaml:"recovery_timeout"`
	MinRequestThreshold int           `json:"min_request_threshold" yaml:"min_request_threshold"`
	SuccessThreshold    int           `json:"success_threshold" yaml:"success_threshold"`
}

CircuitBreakerConfig contains circuit breaker settings for plugin resilience.

The circuit breaker implements the Circuit Breaker pattern to prevent cascading failures by temporarily stopping requests to failing services. It has three states:

  • Closed: Normal operation, requests flow through
  • Open: Circuit is tripped, requests fail fast
  • Half-Open: Testing if service has recovered

State transitions:

  • Closed → Open: When FailureThreshold consecutive failures occur
  • Open → Half-Open: After RecoveryTimeout has elapsed
  • Half-Open → Closed: When SuccessThreshold successes occur
  • Half-Open → Open: When any failure occurs

Example configuration:

cb := CircuitBreakerConfig{
    Enabled:             true,
    FailureThreshold:    5,    // Trip after 5 failures
    RecoveryTimeout:     30 * time.Second,
    MinRequestThreshold: 3,    // Need 3 requests before considering trip
    SuccessThreshold:    2,    // Need 2 successes to close circuit
}

type CircuitBreakerState

type CircuitBreakerState int32

CircuitBreakerState represents the current operational state of a circuit breaker.

The circuit breaker pattern prevents cascading failures by monitoring the failure rate of operations and temporarily blocking requests when failures exceed a threshold. This helps systems fail fast and recover gracefully.

State behaviors:

  • StateClosed: Normal operation, all requests are allowed through
  • StateOpen: Circuit is tripped, requests fail immediately without execution
  • StateHalfOpen: Testing phase, limited requests allowed to test recovery

State transitions occur automatically based on failure/success counts and timeouts.

const (
	StateClosed CircuitBreakerState = iota
	StateOpen
	StateHalfOpen
)

func (CircuitBreakerState) String

func (s CircuitBreakerState) String() string

type CircuitBreakerStats

type CircuitBreakerStats struct {
	State        CircuitBreakerState `json:"state"`
	FailureCount int64               `json:"failure_count"`
	SuccessCount int64               `json:"success_count"`
	RequestCount int64               `json:"request_count"`
	LastFailure  time.Time           `json:"last_failure"`
}

CircuitBreakerStats contains comprehensive statistics about circuit breaker operation.

This structure provides all the metrics needed for monitoring, alerting, and debugging circuit breaker behavior. It includes both current state information and historical operation counts.

Fields:

  • State: Current operational state (Closed/Open/HalfOpen)
  • FailureCount: Total failures recorded since last reset
  • SuccessCount: Total successes recorded since last reset
  • RequestCount: Total requests processed since last reset
  • LastFailure: Timestamp of the most recent failure

These statistics can be exposed via metrics systems, logged for analysis, or used by monitoring systems to track system health and performance.

type CommonPluginMetrics

type CommonPluginMetrics struct {
	RequestCount        CounterMetric
	RequestDuration     HistogramMetric
	ActiveRequests      GaugeMetric
	ErrorCount          CounterMetric
	CircuitBreakerState GaugeMetric
}

CommonPluginMetrics provides a set of commonly used metrics for plugin systems

func CreateCommonPluginMetrics

func CreateCommonPluginMetrics(collector EnhancedMetricsCollector) *CommonPluginMetrics

CreateCommonPluginMetrics creates commonly used plugin metrics with the enhanced collector

func (*CommonPluginMetrics) DecrementActiveRequests

func (cpm *CommonPluginMetrics) DecrementActiveRequests(pluginName string)

DecrementActiveRequests decrements the active request count

func (*CommonPluginMetrics) IncrementActiveRequests

func (cpm *CommonPluginMetrics) IncrementActiveRequests(pluginName string)

IncrementActiveRequests increments the active request count

func (*CommonPluginMetrics) RecordRequest

func (cpm *CommonPluginMetrics) RecordRequest(pluginName string, duration time.Duration, err error)

RecordRequest records a plugin request with its outcome

func (*CommonPluginMetrics) SetCircuitBreakerState

func (cpm *CommonPluginMetrics) SetCircuitBreakerState(pluginName string, state int)

SetCircuitBreakerState sets the circuit breaker state for a plugin

type ConnectionConfig

type ConnectionConfig struct {
	MaxConnections     int           `json:"max_connections" yaml:"max_connections"`
	MaxIdleConnections int           `json:"max_idle_connections" yaml:"max_idle_connections"`
	IdleTimeout        time.Duration `json:"idle_timeout" yaml:"idle_timeout"`
	ConnectionTimeout  time.Duration `json:"connection_timeout" yaml:"connection_timeout"`
	RequestTimeout     time.Duration `json:"request_timeout" yaml:"request_timeout"`
	KeepAlive          bool          `json:"keep_alive" yaml:"keep_alive"`
	DisableCompression bool          `json:"disable_compression" yaml:"disable_compression"`
}

ConnectionConfig contains connection pooling and timeout settings for plugin transports.

This configuration optimizes network resource usage and performance by managing connection lifecycles and timeouts. Proper configuration prevents resource leaks and improves response times.

Configuration guidelines:

  • MaxConnections: Total connection pool size (consider server limits)
  • MaxIdleConnections: Connections to keep alive when idle (balance memory vs latency)
  • IdleTimeout: How long to keep idle connections (balance resource usage vs reconnect cost)
  • ConnectionTimeout: Maximum time to establish new connections
  • RequestTimeout: Maximum time for individual requests
  • KeepAlive: Enable TCP keep-alive for long-lived connections

Example configuration:

conn := ConnectionConfig{
    MaxConnections:     10,                   // Max 10 concurrent connections
    MaxIdleConnections: 5,                    // Keep 5 connections idle
    IdleTimeout:        30 * time.Second,     // Close idle connections after 30s
    ConnectionTimeout:  10 * time.Second,     // 10s to establish connection
    RequestTimeout:     30 * time.Second,     // 30s per request
    KeepAlive:          true,                 // Enable TCP keep-alive
    DisableCompression: false,               // Enable compression
}

type CounterMetric

type CounterMetric interface {
	Inc(labelValues ...string)
	Add(value float64, labelValues ...string)
}

CounterMetric represents a counter with native label support

type DefaultEnhancedMetricsCollector

type DefaultEnhancedMetricsCollector struct {
	*DefaultMetricsCollector // Embed for backward compatibility
	// contains filtered or unexported fields
}

DefaultEnhancedMetricsCollector provides an enhanced metrics collector with native label support

func NewDefaultEnhancedMetricsCollector

func NewDefaultEnhancedMetricsCollector() *DefaultEnhancedMetricsCollector

NewDefaultEnhancedMetricsCollector creates a new enhanced metrics collector

func (*DefaultEnhancedMetricsCollector) CounterWithLabels

func (demc *DefaultEnhancedMetricsCollector) CounterWithLabels(name, description string, labelNames ...string) CounterMetric

CounterWithLabels implements EnhancedMetricsCollector

func (*DefaultEnhancedMetricsCollector) GaugeWithLabels

func (demc *DefaultEnhancedMetricsCollector) GaugeWithLabels(name, description string, labelNames ...string) GaugeMetric

GaugeWithLabels implements EnhancedMetricsCollector

func (*DefaultEnhancedMetricsCollector) GetPrometheusMetrics

func (demc *DefaultEnhancedMetricsCollector) GetPrometheusMetrics() []PrometheusMetric

GetPrometheusMetrics implements EnhancedMetricsCollector

func (*DefaultEnhancedMetricsCollector) HistogramWithLabels

func (demc *DefaultEnhancedMetricsCollector) HistogramWithLabels(name, description string, buckets []float64, labelNames ...string) HistogramMetric

HistogramWithLabels implements EnhancedMetricsCollector

type DefaultMetricsCollector

type DefaultMetricsCollector struct {
	// contains filtered or unexported fields
}

DefaultMetricsCollector provides a basic in-memory metrics collector

func NewDefaultMetricsCollector

func NewDefaultMetricsCollector() *DefaultMetricsCollector

NewDefaultMetricsCollector creates a new default metrics collector

func (*DefaultMetricsCollector) GetMetrics

func (dmc *DefaultMetricsCollector) GetMetrics() map[string]interface{}

GetMetrics implements MetricsCollector

func (*DefaultMetricsCollector) IncrementCounter

func (dmc *DefaultMetricsCollector) IncrementCounter(name string, labels map[string]string, value int64)

IncrementCounter implements MetricsCollector

func (*DefaultMetricsCollector) RecordCustomMetric

func (dmc *DefaultMetricsCollector) RecordCustomMetric(name string, labels map[string]string, value interface{})

RecordCustomMetric implements MetricsCollector

func (*DefaultMetricsCollector) RecordHistogram

func (dmc *DefaultMetricsCollector) RecordHistogram(name string, labels map[string]string, value float64)

RecordHistogram implements MetricsCollector

func (*DefaultMetricsCollector) SetGauge

func (dmc *DefaultMetricsCollector) SetGauge(name string, labels map[string]string, value float64)

SetGauge implements MetricsCollector

type DiscoveryConfig

type DiscoveryConfig struct {
	Enabled     bool     `json:"enabled" yaml:"enabled"`
	Directories []string `json:"directories,omitempty" yaml:"directories,omitempty"`
	Patterns    []string `json:"patterns,omitempty" yaml:"patterns,omitempty"`
	WatchMode   bool     `json:"watch_mode" yaml:"watch_mode"`
}

DiscoveryConfig contains plugin auto-discovery settings for dynamic plugin loading.

The discovery system automatically finds and loads plugins from specified directories, enabling dynamic plugin registration without manual configuration. This is particularly useful for plugin ecosystems where plugins are deployed independently.

Discovery behavior:

  • Directories: List of paths to scan for plugins
  • Patterns: File name patterns to match (e.g., "*.so", "plugin-*")
  • WatchMode: Continuously monitor directories for changes

Security considerations:

  • Only scan trusted directories to prevent malicious plugin loading
  • Use specific patterns to avoid loading unintended files
  • Validate discovered plugins before loading

Example configuration:

discovery := DiscoveryConfig{
    Enabled:     true,
    Directories: []string{
        "/opt/plugins",
        "/usr/local/lib/plugins",
    },
    Patterns: []string{
        "*.so",      // Shared libraries
        "plugin-*",  // Plugin executables
    },
    WatchMode: true,  // Hot-reload new plugins
}

type EnhancedMetricsCollector

type EnhancedMetricsCollector interface {
	MetricsCollector // Embed the original interface for backward compatibility

	// Enhanced metrics with native label support
	CounterWithLabels(name, description string, labelNames ...string) CounterMetric
	GaugeWithLabels(name, description string, labelNames ...string) GaugeMetric
	HistogramWithLabels(name, description string, buckets []float64, labelNames ...string) HistogramMetric

	// Get metrics in Prometheus-compatible format
	GetPrometheusMetrics() []PrometheusMetric
}

EnhancedMetricsCollector extends MetricsCollector with native label support This interface is designed to be compatible with Prometheus and other modern metrics systems

func MigrateToEnhancedMetrics

func MigrateToEnhancedMetrics(legacy MetricsCollector) EnhancedMetricsCollector

MigrateToEnhancedMetrics helps migrate from legacy metrics to enhanced metrics

func NewEnhancedMetricsCollector

func NewEnhancedMetricsCollector() EnhancedMetricsCollector

NewEnhancedMetricsCollector creates a new enhanced metrics collector with native label support

type ExecutionContext

type ExecutionContext struct {
	RequestID  string            `json:"request_id"`
	Timeout    time.Duration     `json:"timeout"`
	MaxRetries int               `json:"max_retries"`
	Headers    map[string]string `json:"headers,omitempty"`
	Metadata   map[string]string `json:"metadata,omitempty"`
}

ExecutionContext provides execution context and configuration to plugins

type GRPCPlugin

type GRPCPlugin[Req, Resp any] struct {
	// contains filtered or unexported fields
}

GRPCPlugin implements Plugin interface using gRPC transport with comprehensive TLS and security support.

This implementation provides high-performance RPC communication using gRPC with full support for TLS, mutual authentication, metadata handling, and proper connection management. It's designed for low-latency, high-throughput scenarios where performance is critical.

Features:

  • gRPC and gRPC-TLS transports with configurable security
  • Mutual TLS (mTLS) authentication with client certificates
  • Connection reuse and proper lifecycle management
  • Metadata propagation for distributed tracing
  • Comprehensive error handling with gRPC status codes
  • Health checking with standard gRPC health protocol
  • Automatic connection recovery and reconnection

Example usage:

config := PluginConfig{
    Name:      "payment-processor",
    Transport: TransportGRPCTLS,
    Endpoint:  "payment.company.com:443",
    Auth: AuthConfig{
        Method:   AuthMTLS,
        CertFile: "/etc/ssl/client.crt",
        KeyFile:  "/etc/ssl/client.key",
        CAFile:   "/etc/ssl/ca.crt",
    },
}

plugin, err := NewGRPCPlugin[PaymentRequest, PaymentResponse](config, logger)
if err != nil {
    log.Fatal(err)
}

// Use the plugin
response, err := plugin.Execute(ctx, execCtx, request)
if err != nil {
    log.Printf("Payment processing failed: %v", err)
}

// Don't forget to close
defer plugin.Close()

func NewGRPCPlugin

func NewGRPCPlugin[Req, Resp any](config PluginConfig, logger *slog.Logger) (*GRPCPlugin[Req, Resp], error)

NewGRPCPlugin creates a new gRPC plugin instance

func (*GRPCPlugin[Req, Resp]) Close

func (g *GRPCPlugin[Req, Resp]) Close() error

Close closes the gRPC connection

func (*GRPCPlugin[Req, Resp]) Execute

func (g *GRPCPlugin[Req, Resp]) Execute(ctx context.Context, execCtx ExecutionContext, request Req) (Resp, error)

Execute processes a request using gRPC

func (*GRPCPlugin[Req, Resp]) Health

func (g *GRPCPlugin[Req, Resp]) Health(ctx context.Context) HealthStatus

Health performs a health check via gRPC

func (*GRPCPlugin[Req, Resp]) Info

func (g *GRPCPlugin[Req, Resp]) Info() PluginInfo

Info returns plugin information

type GRPCPluginFactory

type GRPCPluginFactory[Req, Resp any] struct {
	// contains filtered or unexported fields
}

GRPCPluginFactory creates gRPC plugin instances with full TLS and security configuration.

This factory manages the creation of GRPCPlugin instances, handling complex gRPC connection setup, TLS configuration, and security validation. It ensures proper connection establishment and validates all security configurations.

Security features:

  • TLS 1.2+ enforcement with secure cipher suites
  • Mutual TLS authentication with certificate validation
  • CA certificate validation for server authentication
  • Proper credential management and secure defaults

Example usage:

factory := NewGRPCPluginFactory[OrderRequest, OrderResponse](logger)

config := PluginConfig{
    Name:      "order-service",
    Transport: TransportGRPCTLS,
    Endpoint:  "orders.company.com:9443",
    Auth: AuthConfig{
        Method:   AuthMTLS,
        CertFile: "/etc/ssl/certs/client.crt",
        KeyFile:  "/etc/ssl/private/client.key",
        CAFile:   "/etc/ssl/certs/ca.crt",
    },
}

plugin, err := factory.CreatePlugin(config)
if err != nil {
    log.Fatalf("Failed to create gRPC plugin: %v", err)
}

func NewGRPCPluginFactory

func NewGRPCPluginFactory[Req, Resp any](logger *slog.Logger) *GRPCPluginFactory[Req, Resp]

NewGRPCPluginFactory creates a new gRPC plugin factory

func (*GRPCPluginFactory[Req, Resp]) CreatePlugin

func (f *GRPCPluginFactory[Req, Resp]) CreatePlugin(config PluginConfig) (Plugin[Req, Resp], error)

CreatePlugin creates a new gRPC plugin instance

func (*GRPCPluginFactory[Req, Resp]) SupportedTransports

func (f *GRPCPluginFactory[Req, Resp]) SupportedTransports() []string

SupportedTransports returns supported transport types

func (*GRPCPluginFactory[Req, Resp]) ValidateConfig

func (f *GRPCPluginFactory[Req, Resp]) ValidateConfig(config PluginConfig) error

ValidateConfig validates gRPC plugin configuration

type GRPCPluginService

type GRPCPluginService interface {
	// Execute processes a request and returns a response
	Execute(ctx context.Context, request []byte) ([]byte, error)

	// Health performs a health check
	Health(ctx context.Context) error

	// Info returns plugin information
	Info(ctx context.Context) ([]byte, error)
}

GRPCPluginService defines the standard gRPC service interface that plugins must implement.

This interface establishes a contract for gRPC-based plugins, providing standardized methods for execution, health checking, and metadata retrieval. All gRPC plugins should implement this interface to ensure compatibility with the plugin system.

Method specifications:

  • Execute: Process requests with serialized JSON data
  • Health: Perform health checks (should follow gRPC health checking protocol)
  • Info: Return plugin metadata and capabilities

Example protobuf service definition:

service PluginService {
  rpc Execute(ExecuteRequest) returns (ExecuteResponse);
  rpc Health(HealthRequest) returns (HealthResponse);
  rpc Info(InfoRequest) returns (InfoResponse);
}

The actual gRPC implementation should serialize/deserialize the byte arrays as JSON for consistency with other transport methods.

type GRPCPluginServiceClient

type GRPCPluginServiceClient struct {
	// contains filtered or unexported fields
}

GRPCPluginServiceClient wraps the gRPC client for plugin services

func NewGRPCPluginServiceClient

func NewGRPCPluginServiceClient(conn *grpc.ClientConn) *GRPCPluginServiceClient

NewGRPCPluginServiceClient creates a new gRPC plugin service client

func (*GRPCPluginServiceClient) Execute

func (c *GRPCPluginServiceClient) Execute(ctx context.Context, request []byte) ([]byte, error)

Execute calls the remote Execute method

func (*GRPCPluginServiceClient) Health

Health calls the remote Health method

func (*GRPCPluginServiceClient) Info

func (c *GRPCPluginServiceClient) Info(ctx context.Context) ([]byte, error)

Info calls the remote Info method

type GaugeMetric

type GaugeMetric interface {
	Set(value float64, labelValues ...string)
	Inc(labelValues ...string)
	Dec(labelValues ...string)
	Add(value float64, labelValues ...string)
}

GaugeMetric represents a gauge with native label support

type GlobalMetrics

type GlobalMetrics struct {
	TotalRequests  int64 `json:"total_requests"`
	TotalErrors    int64 `json:"total_errors"`
	ActiveRequests int64 `json:"active_requests"`
}

GlobalMetrics contains system-wide metrics

type HTTPPlugin

type HTTPPlugin[Req, Resp any] struct {
	// contains filtered or unexported fields
}

HTTPPlugin represents a plugin that communicates over HTTP/HTTPS protocols.

This implementation provides a full-featured HTTP client for plugin communication, supporting authentication, rate limiting, connection pooling, and comprehensive error handling. It's designed for production environments with proper security and operational concerns.

Features:

  • HTTP/HTTPS with configurable TLS settings including mTLS
  • Multiple authentication methods (API key, Bearer token, Basic auth)
  • Connection pooling and keep-alive for performance
  • Rate limiting with token bucket algorithm
  • Structured request/response handling with JSON serialization
  • Health checking with configurable endpoints
  • Proper timeout handling and graceful connection cleanup

Example usage:

config := PluginConfig{
    Name:      "api-service",
    Transport: TransportHTTPS,
    Endpoint:  "https://api.example.com/v1/plugin",
    Auth: AuthConfig{
        Method: AuthBearer,
        Token:  "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
    },
    Connection: ConnectionConfig{
        MaxConnections:    10,
        RequestTimeout:    30 * time.Second,
        ConnectionTimeout: 10 * time.Second,
    },
    RateLimit: RateLimitConfig{
        Enabled:           true,
        RequestsPerSecond: 100.0,
        BurstSize:         200,
    },
}

factory := NewHTTPPluginFactory[MyRequest, MyResponse]()
plugin, err := factory.CreatePlugin(config)
if err != nil {
    log.Fatal(err)
}

// Use the plugin
response, err := plugin.Execute(ctx, execCtx, request)

func (*HTTPPlugin[Req, Resp]) Close

func (p *HTTPPlugin[Req, Resp]) Close() error

Close implements Plugin.Close

func (*HTTPPlugin[Req, Resp]) Execute

func (p *HTTPPlugin[Req, Resp]) Execute(ctx context.Context, execCtx ExecutionContext, request Req) (Resp, error)

Execute implements Plugin.Execute

func (*HTTPPlugin[Req, Resp]) Health

func (p *HTTPPlugin[Req, Resp]) Health(ctx context.Context) HealthStatus

Health implements Plugin.Health

func (*HTTPPlugin[Req, Resp]) Info

func (p *HTTPPlugin[Req, Resp]) Info() PluginInfo

Info implements Plugin.Info

type HTTPPluginFactory

type HTTPPluginFactory[Req, Resp any] struct{}

HTTPPluginFactory creates HTTP-based plugins with comprehensive configuration support.

This factory handles the creation of HTTPPlugin instances, managing HTTP client configuration, TLS setup, authentication, and connection pooling. It validates configurations to ensure security and proper operation.

Supported features:

  • HTTP and HTTPS transports with proper TLS configuration
  • Multiple authentication methods with security validation
  • Connection pooling and timeout configuration
  • Rate limiting setup and validation
  • Comprehensive configuration validation

Example usage:

factory := NewHTTPPluginFactory[AuthRequest, AuthResponse]()

config := PluginConfig{
    Transport: TransportHTTPS,
    Endpoint:  "https://auth.company.com/api/v1",
    Auth: AuthConfig{
        Method:   AuthMTLS,
        CertFile: "/etc/ssl/client.crt",
        KeyFile:  "/etc/ssl/client.key",
        CAFile:   "/etc/ssl/ca.crt",
    },
}

plugin, err := factory.CreatePlugin(config)
if err != nil {
    log.Fatalf("Failed to create plugin: %v", err)
}

func NewHTTPPluginFactory

func NewHTTPPluginFactory[Req, Resp any]() *HTTPPluginFactory[Req, Resp]

NewHTTPPluginFactory creates a new HTTP plugin factory

func (*HTTPPluginFactory[Req, Resp]) CreatePlugin

func (f *HTTPPluginFactory[Req, Resp]) CreatePlugin(config PluginConfig) (Plugin[Req, Resp], error)

CreatePlugin implements PluginFactory.CreatePlugin for HTTP transport

func (*HTTPPluginFactory[Req, Resp]) SupportedTransports

func (f *HTTPPluginFactory[Req, Resp]) SupportedTransports() []string

SupportedTransports implements PluginFactory.SupportedTransports

func (*HTTPPluginFactory[Req, Resp]) ValidateConfig

func (f *HTTPPluginFactory[Req, Resp]) ValidateConfig(config PluginConfig) error

ValidateConfig implements PluginFactory.ValidateConfig

type HTTPPluginRequest

type HTTPPluginRequest[T any] struct {
	Data      T                 `json:"data"`
	RequestID string            `json:"request_id"`
	Timeout   string            `json:"timeout"`
	Headers   map[string]string `json:"headers,omitempty"`
	Metadata  map[string]string `json:"metadata,omitempty"`
}

HTTPPluginRequest represents the standardized request format for HTTP plugins.

This structure defines the wire format for requests sent to HTTP-based plugins. It includes the actual request data along with metadata needed for proper request handling, tracing, and timeout management.

The format is designed to be plugin-agnostic while providing all necessary context for request processing. Plugins can expect this structure when receiving requests via HTTP transport.

Example JSON payload:

{
  "data": {
    "user_id": "12345",
    "action": "authenticate"
  },
  "request_id": "req-abc-123",
  "timeout": "30s",
  "headers": {
    "X-Source-Service": "auth-gateway"
  },
  "metadata": {
    "trace_id": "trace-xyz-789"
  }
}

type HTTPPluginResponse

type HTTPPluginResponse[T any] struct {
	Data      T                 `json:"data,omitempty"`
	Error     string            `json:"error,omitempty"`
	RequestID string            `json:"request_id"`
	Metadata  map[string]string `json:"metadata,omitempty"`
}

HTTPPluginResponse represents the standardized response format for HTTP plugins.

This structure defines the wire format for responses received from HTTP-based plugins. It provides a consistent interface for handling both successful responses and errors, along with metadata for tracing and debugging.

The response format allows plugins to return either successful data or error information, making error handling consistent across all HTTP-based plugins. The request ID enables correlation with the original request for tracing.

Example successful response:

{
  "data": {
    "token": "jwt-token-here",
    "expires_in": 3600
  },
  "request_id": "req-abc-123",
  "metadata": {
    "processing_time_ms": "150"
  }
}

Example error response:

{
  "error": "Invalid credentials provided",
  "request_id": "req-abc-123",
  "metadata": {
    "error_code": "AUTH_FAILED"
  }
}

type HealthCheckConfig

type HealthCheckConfig struct {
	Enabled      bool          `json:"enabled" yaml:"enabled"`
	Interval     time.Duration `json:"interval" yaml:"interval"`
	Timeout      time.Duration `json:"timeout" yaml:"timeout"`
	FailureLimit int           `json:"failure_limit" yaml:"failure_limit"`
	Endpoint     string        `json:"endpoint,omitempty" yaml:"endpoint,omitempty"`
}

HealthCheckConfig contains health check settings for monitoring plugin availability.

Health checks are performed periodically to detect failed or degraded plugins and automatically route traffic away from unhealthy instances. The health checker maintains plugin status and provides early warning of issues.

Configuration behavior:

  • Interval: How often to perform health checks
  • Timeout: Maximum time to wait for health check response
  • FailureLimit: Number of consecutive failures before marking plugin unhealthy
  • Endpoint: Custom endpoint for health checks (optional, defaults to plugin endpoint)

Example configuration:

health := HealthCheckConfig{
    Enabled:      true,
    Interval:     30 * time.Second,  // Check every 30 seconds
    Timeout:      5 * time.Second,   // Fail if no response in 5 seconds
    FailureLimit: 3,                 // Mark unhealthy after 3 failures
    Endpoint:     "/health",         // Custom health endpoint
}

type HealthChecker

type HealthChecker struct {
	// contains filtered or unexported fields
}

HealthChecker monitors the health of a plugin through periodic health checks.

This component provides continuous monitoring of plugin availability and performance by executing health checks at regular intervals. It tracks consecutive failures and provides detailed health status information for operational visibility and automated recovery decisions.

Key features:

  • Periodic health checking with configurable intervals
  • Consecutive failure tracking with automatic degradation detection
  • Thread-safe operation with atomic counters
  • Graceful start/stop lifecycle management
  • Detailed health status reporting with response times

Usage example:

config := HealthCheckConfig{
    Enabled:      true,
    Interval:     30 * time.Second,
    Timeout:      5 * time.Second,
    FailureLimit: 3,
}
checker := NewHealthChecker(plugin, config)

// Check current health status
status := checker.Check()
if status.Status != StatusHealthy {
    log.Printf("Plugin unhealthy: %s", status.Message)
}

// Stop monitoring when done
checker.Stop()

func NewHealthChecker

func NewHealthChecker(plugin interface {
	Health(ctx context.Context) HealthStatus
	Close() error
}, config HealthCheckConfig) *HealthChecker

NewHealthChecker creates a new health checker for monitoring a plugin's availability.

The health checker will automatically start monitoring if health checking is enabled in the configuration. It runs in a separate goroutine and performs periodic health checks according to the specified interval.

Parameters:

  • plugin: Must implement Health() and Close() methods
  • config: Health check configuration including interval, timeout, and failure limits

The health checker maintains its own lifecycle and can be started/stopped independently of the plugin it monitors.

func (*HealthChecker) Check

func (hc *HealthChecker) Check() HealthStatus

Check performs a single synchronous health check and returns the current status.

This method executes an immediate health check against the monitored plugin, respecting the configured timeout. It updates internal failure counters and determines the overall health status based on consecutive failure patterns.

Health status determination:

  • StatusHealthy: Plugin responds successfully within timeout
  • StatusOffline: Consecutive failures exceed the configured limit
  • Other statuses: Based on plugin's own health assessment

The method is thread-safe and can be called independently of the periodic health checking goroutine. Response time and failure counts are tracked for operational visibility.

Returns HealthStatus with detailed information including status, message, response time, and timestamp.

func (*HealthChecker) Done

func (hc *HealthChecker) Done() <-chan struct{}

Done returns a channel that will be closed when the health checker stops

func (*HealthChecker) GetConsecutiveFailures

func (hc *HealthChecker) GetConsecutiveFailures() int64

GetConsecutiveFailures returns the number of consecutive failures

func (*HealthChecker) GetLastCheck

func (hc *HealthChecker) GetLastCheck() time.Time

GetLastCheck returns the timestamp of the last health check

func (*HealthChecker) IsRunning

func (hc *HealthChecker) IsRunning() bool

IsRunning returns true if the health checker is currently running

func (*HealthChecker) Start

func (hc *HealthChecker) Start()

Start initiates the health checker's periodic monitoring goroutine.

This method starts continuous health monitoring according to the configured interval. If health checking is disabled in the configuration, this method has no effect. The method is idempotent - calling it multiple times is safe.

The health checker runs in its own goroutine and performs checks at regular intervals until stopped. Each check updates the plugin's health status and maintains failure counters for degradation detection.

Use Stop() to halt the monitoring when no longer needed.

func (*HealthChecker) Stop

func (hc *HealthChecker) Stop()

Stop halts the health checker's periodic monitoring and waits for cleanup.

This method gracefully shuts down the health checking goroutine and waits for it to complete any in-flight health check before returning. The method is idempotent - calling it multiple times is safe.

After stopping, the health checker can be restarted with Start() if needed. Any ongoing health check will complete before the goroutine exits.

type HealthMonitor

type HealthMonitor struct {
	// contains filtered or unexported fields
}

HealthMonitor provides centralized health monitoring for multiple components.

This component manages multiple HealthChecker instances and provides a unified view of system health across all monitored plugins. It's designed for scenarios where you need to monitor multiple plugins and make system-wide decisions based on overall health status.

Features:

  • Centralized management of multiple health checkers
  • Aggregated health status calculation
  • Thread-safe operations with proper synchronization
  • Individual and overall health status reporting
  • Graceful shutdown of all managed checkers

Usage example:

monitor := NewHealthMonitor()

// Add checkers for different plugins
monitor.AddChecker("auth-service", authHealthChecker)
monitor.AddChecker("payment-service", paymentHealthChecker)

// Get overall system health
overallHealth := monitor.GetOverallHealth()
if overallHealth.Status != StatusHealthy {
    log.Printf("System degraded: %s", overallHealth.Message)
}

// Cleanup when done
monitor.Shutdown()

func NewHealthMonitor

func NewHealthMonitor() *HealthMonitor

NewHealthMonitor creates a new centralized health monitor instance.

The health monitor starts empty and health checkers can be added dynamically using AddChecker(). It provides thread-safe operations for managing multiple health checkers and computing overall system health.

Returns a ready-to-use HealthMonitor that can manage multiple plugin health checkers and provide unified health status reporting.

func (*HealthMonitor) AddChecker

func (hm *HealthMonitor) AddChecker(name string, checker *HealthChecker)

AddChecker adds a health checker for a named component

func (*HealthMonitor) GetAllStatus

func (hm *HealthMonitor) GetAllStatus() map[string]HealthStatus

GetAllStatus returns the health status of all monitored components

func (*HealthMonitor) GetOverallHealth

func (hm *HealthMonitor) GetOverallHealth() HealthStatus

GetOverallHealth computes and returns the aggregated health status of all monitored components.

This method analyzes the health status of all registered components and determines the overall system health using the following logic:

  • StatusHealthy: All components are healthy
  • StatusDegraded: Some components are degraded but none are offline/unhealthy
  • StatusUnhealthy: One or more components are offline or unhealthy

The returned status includes a summary message describing any issues found across the monitored components. This is useful for system-level health checks and determining if the entire service should be considered available.

Returns HealthStatus representing the worst-case scenario across all monitored components, with an appropriate message describing the overall state.

func (*HealthMonitor) GetStatus

func (hm *HealthMonitor) GetStatus(name string) (HealthStatus, bool)

GetStatus returns the health status of a specific component

func (*HealthMonitor) RemoveChecker

func (hm *HealthMonitor) RemoveChecker(name string)

RemoveChecker removes a health checker

func (*HealthMonitor) Shutdown

func (hm *HealthMonitor) Shutdown()

Shutdown stops all health checkers

func (*HealthMonitor) UpdateStatus

func (hm *HealthMonitor) UpdateStatus(name string, status HealthStatus)

UpdateStatus updates the health status for a component

type HealthStatus

type HealthStatus struct {
	Status       PluginStatus      `json:"status"`
	Message      string            `json:"message,omitempty"`
	LastCheck    time.Time         `json:"last_check"`
	ResponseTime time.Duration     `json:"response_time"`
	Metadata     map[string]string `json:"metadata,omitempty"`
}

HealthStatus contains comprehensive health information about a plugin instance.

This structure provides detailed health assessment data used by monitoring systems, load balancers, and circuit breakers to make intelligent routing and recovery decisions. It includes both current status and historical timing information for trend analysis.

Fields:

  • Status: Current operational status (healthy, degraded, unhealthy, offline)
  • Message: Human-readable description of the current status
  • LastCheck: Timestamp of when this status was determined
  • ResponseTime: How long the health check took to complete
  • Metadata: Additional context-specific information (error codes, version info, etc.)

Example usage:

health := plugin.Health(ctx)
if health.Status != StatusHealthy {
    log.Printf("Plugin %s unhealthy: %s (response time: %v)",
        pluginName, health.Message, health.ResponseTime)
}

type HistogramMetric

type HistogramMetric interface {
	Observe(value float64, labelValues ...string)
}

HistogramMetric represents a histogram with native label support

type LoadBalanceRequest

type LoadBalanceRequest struct {
	RequestID   string
	Key         string   // For consistent hashing
	Priority    int      // Request priority
	Preferences []string // Preferred plugin names
}

LoadBalanceRequest contains request information for load balancing

type LoadBalancer

type LoadBalancer[Req, Resp any] struct {
	// contains filtered or unexported fields
}

LoadBalancer manages multiple plugins of the same type and distributes load using configurable strategies.

This component provides intelligent request distribution across multiple plugin instances, implementing various load balancing algorithms and maintaining detailed metrics for operational visibility and decision-making.

Key features:

  • Multiple load balancing strategies (round-robin, least connections, etc.)
  • Per-plugin metrics tracking (latency, success rate, health score)
  • Dynamic plugin enable/disable without service interruption
  • Weighted and priority-based routing
  • Thread-safe operation with minimal contention
  • Comprehensive statistics for monitoring and debugging

Usage example:

lb := NewLoadBalancer[MyRequest, MyResponse](StrategyLeastLatency, logger)

// Add plugin instances with different weights and priorities
lb.AddPlugin("primary", plugin1, 10, 100)     // High weight, high priority
lb.AddPlugin("secondary", plugin2, 5, 50)     // Lower weight, lower priority

// Execute request with automatic plugin selection
request := LoadBalanceRequest{
    RequestID: "req-123",
    Key:       "user-456",  // For consistent hashing
}
response, err := lb.Execute(ctx, execCtx, request, myRequest)

// Monitor performance
stats := lb.GetStats()
for name, stat := range stats {
    fmt.Printf("Plugin %s: Success rate %.2f%%, Avg latency %v\n",
        name, stat.GetSuccessRate(), stat.AverageLatency)
}

func NewLoadBalancer

func NewLoadBalancer[Req, Resp any](strategy LoadBalancingStrategy, logger *slog.Logger) *LoadBalancer[Req, Resp]

NewLoadBalancer creates a new load balancer instance with the specified strategy.

The load balancer starts empty and plugins must be added using AddPlugin(). It immediately begins tracking metrics and applying the selected load balancing strategy to distribute requests among registered plugins.

Parameters:

  • strategy: Load balancing algorithm to use for plugin selection
  • logger: Logger instance for operational logging (uses default if nil)

Returns a fully initialized LoadBalancer ready to accept plugin registrations and handle request routing.

func (*LoadBalancer[Req, Resp]) AddPlugin

func (lb *LoadBalancer[Req, Resp]) AddPlugin(name string, plugin Plugin[Req, Resp], weight, priority int) error

AddPlugin adds a plugin to the load balancer

func (*LoadBalancer[Req, Resp]) DisablePlugin

func (lb *LoadBalancer[Req, Resp]) DisablePlugin(name string) error

DisablePlugin disables a plugin from load balancing

func (*LoadBalancer[Req, Resp]) EnablePlugin

func (lb *LoadBalancer[Req, Resp]) EnablePlugin(name string) error

EnablePlugin enables a plugin for load balancing

func (*LoadBalancer[Req, Resp]) Execute

func (lb *LoadBalancer[Req, Resp]) Execute(ctx context.Context, execCtx ExecutionContext, lbReq LoadBalanceRequest, request Req) (Resp, error)

Execute processes a request using intelligent load balancing and comprehensive metrics tracking.

This method implements the complete request lifecycle including plugin selection, request execution, metrics collection, and health score updates. It provides automatic failover and detailed performance tracking for operational visibility.

Request lifecycle:

  1. Select optimal plugin using configured load balancing strategy
  2. Track active connections and update metrics
  3. Execute request through selected plugin
  4. Record latency, success/failure, and update health scores
  5. Return response or error with full traceability

Parameters:

  • ctx: Context for cancellation and timeout control
  • execCtx: Execution context with request metadata and configuration
  • lbReq: Load balancing request with selection hints (key, preferences)
  • request: The actual request payload to be processed

Returns the response from the selected plugin or an error if all plugins are unavailable or the request fails.

Metrics collected:

  • Request count, success/failure rates
  • Response latency and connection counts
  • Plugin health scores and availability

func (*LoadBalancer[Req, Resp]) GetStats

func (lb *LoadBalancer[Req, Resp]) GetStats() map[string]LoadBalancerStats

GetStats returns current load balancing statistics

func (*LoadBalancer[Req, Resp]) RemovePlugin

func (lb *LoadBalancer[Req, Resp]) RemovePlugin(name string) error

RemovePlugin removes a plugin from the load balancer

func (*LoadBalancer[Req, Resp]) SelectPlugin

func (lb *LoadBalancer[Req, Resp]) SelectPlugin(lbReq LoadBalanceRequest) (string, Plugin[Req, Resp], error)

SelectPlugin selects the best plugin based on the load balancing strategy

type LoadBalancerStats

type LoadBalancerStats struct {
	PluginName         string        `json:"plugin_name"`
	Weight             int           `json:"weight"`
	Priority           int           `json:"priority"`
	Enabled            bool          `json:"enabled"`
	ActiveConnections  int32         `json:"active_connections"`
	TotalRequests      int64         `json:"total_requests"`
	SuccessfulRequests int64         `json:"successful_requests"`
	FailedRequests     int64         `json:"failed_requests"`
	AverageLatency     time.Duration `json:"average_latency"`
	HealthScore        int32         `json:"health_score"`
	LastUsed           time.Time     `json:"last_used"`
}

LoadBalancerStats contains comprehensive statistics for a plugin in the load balancer.

This structure provides detailed metrics about plugin performance and usage patterns within the load balancer. The statistics are essential for monitoring, alerting, capacity planning, and debugging load balancing decisions.

Statistical categories:

  • Configuration: Weight, priority, and enabled status
  • Performance: Request counts, success rates, and latency metrics
  • Health: Health score and availability indicators
  • Operational: Active connections and last usage timestamp

Example usage:

stats := loadBalancer.GetStats()
for pluginName, stat := range stats {
    successRate := stat.GetSuccessRate()
    if successRate < 95.0 {
        log.Printf("Plugin %s has low success rate: %.2f%%",
            pluginName, successRate)
    }

    if stat.AverageLatency > 5*time.Second {
        log.Printf("Plugin %s has high latency: %v",
            pluginName, stat.AverageLatency)
    }
}

func (LoadBalancerStats) GetSuccessRate

func (stats LoadBalancerStats) GetSuccessRate() float64

GetSuccessRate calculates the success rate as a percentage (0-100).

This method computes the success rate based on successful requests divided by total requests. It returns 0.0 if no requests have been processed. The success rate is a key indicator of plugin health and reliability.

Returns:

  • 0.0: No requests processed yet
  • 0.0-100.0: Success percentage based on successful vs total requests

Usage for monitoring:

if stats.GetSuccessRate() < 99.0 {
    alert("Plugin success rate below threshold")
}

type LoadBalancingStrategy

type LoadBalancingStrategy string

LoadBalancingStrategy defines different load balancing algorithms for distributing requests.

Each strategy implements a different approach to selecting which plugin instance should handle a request, optimizing for different goals like fairness, performance, locality, or consistency.

Strategy descriptions:

  • StrategyRoundRobin: Distributes requests evenly in circular order
  • StrategyRandom: Selects plugins randomly for uniform distribution
  • StrategyLeastConnections: Routes to plugin with fewest active connections
  • StrategyLeastLatency: Routes to plugin with lowest average response time
  • StrategyWeightedRandom: Random selection based on configured weights
  • StrategyConsistentHash: Routes based on request key for session affinity
  • StrategyPriority: Routes to highest priority healthy plugin

Example usage:

lb := NewLoadBalancer(StrategyLeastLatency, logger)
lb.AddPlugin("fast-service", plugin1, 1, 10)      // weight=1, priority=10
lb.AddPlugin("backup-service", plugin2, 1, 5)     // weight=1, priority=5

request := LoadBalanceRequest{Key: "user-123"}
name, plugin, err := lb.SelectPlugin(request)
const (
	// StrategyRoundRobin distributes requests in round-robin fashion
	StrategyRoundRobin LoadBalancingStrategy = "round-robin"

	// StrategyRandom selects plugins randomly
	StrategyRandom LoadBalancingStrategy = "random"

	// StrategyLeastConnections selects plugin with least active connections
	StrategyLeastConnections LoadBalancingStrategy = "least-connections"

	// StrategyLeastLatency selects plugin with lowest average latency
	StrategyLeastLatency LoadBalancingStrategy = "least-latency"

	// StrategyWeightedRandom selects plugins based on weights with random distribution
	StrategyWeightedRandom LoadBalancingStrategy = "weighted-random"

	// StrategyConsistentHash uses consistent hashing for request routing
	StrategyConsistentHash LoadBalancingStrategy = "consistent-hash"

	// StrategyPriority selects highest priority healthy plugin
	StrategyPriority LoadBalancingStrategy = "priority"
)

type Manager

type Manager[Req, Resp any] struct {
	// contains filtered or unexported fields
}

Manager implements PluginManager with comprehensive production-ready features.

This is the central component that orchestrates the entire plugin system, providing plugin lifecycle management, load balancing, circuit breaking, health monitoring, and observability. It's designed to handle enterprise-scale workloads with high availability and reliability requirements.

Core capabilities:

  • Plugin registration and lifecycle management
  • Automatic failover with circuit breaker patterns
  • Health monitoring with automatic recovery
  • Load balancing across multiple plugin instances
  • Hot-reload configuration updates without service interruption
  • Comprehensive metrics and structured logging
  • Graceful shutdown with proper resource cleanup

Example usage:

logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
manager := NewManager[AuthRequest, AuthResponse](logger)

// Register plugin factories
httpFactory := NewHTTPPluginFactory[AuthRequest, AuthResponse]()
manager.RegisterFactory("http", httpFactory)

// Load configuration
config := ManagerConfig{
    Plugins: []PluginConfig{
        {
            Name:      "auth-service",
            Type:      "http",
            Transport: TransportHTTPS,
            Endpoint:  "https://auth.company.com/api/v1",
        },
    },
}

if err := manager.LoadFromConfig(config); err != nil {
    log.Fatal(err)
}

// Execute requests
response, err := manager.Execute(ctx, "auth-service", request)
if err != nil {
    log.Printf("Request failed: %v", err)
}

// Graceful shutdown
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
manager.Shutdown(ctx)

func NewManager

func NewManager[Req, Resp any](logger *slog.Logger) *Manager[Req, Resp]

NewManager creates a new production-ready plugin manager with comprehensive monitoring and resilience features.

The manager is initialized with sensible defaults and ready to accept plugin registrations. It includes built-in circuit breakers, health checkers, and metrics collection for operational visibility and automated recovery.

Parameters:

  • logger: Structured logger for operational logging (uses default if nil)

The manager starts with empty plugin registries and must be configured with plugin factories and loaded with plugin configurations before use.

Returns a fully initialized Manager ready for plugin registration and configuration.

func (*Manager[Req, Resp]) Execute

func (m *Manager[Req, Resp]) Execute(ctx context.Context, pluginName string, request Req) (Resp, error)

Execute implements PluginManager.Execute

func (*Manager[Req, Resp]) ExecuteWithOptions

func (m *Manager[Req, Resp]) ExecuteWithOptions(ctx context.Context, pluginName string, execCtx ExecutionContext, request Req) (Resp, error)

ExecuteWithOptions implements PluginManager.ExecuteWithOptions

func (*Manager[Req, Resp]) GetMetrics

func (m *Manager[Req, Resp]) GetMetrics() ManagerMetrics

GetMetrics returns current operational metrics

func (*Manager[Req, Resp]) GetPlugin

func (m *Manager[Req, Resp]) GetPlugin(name string) (Plugin[Req, Resp], error)

GetPlugin implements PluginManager.GetPlugin

func (*Manager[Req, Resp]) Health

func (m *Manager[Req, Resp]) Health() map[string]HealthStatus

Health implements PluginManager.Health

func (*Manager[Req, Resp]) ListPlugins

func (m *Manager[Req, Resp]) ListPlugins() map[string]HealthStatus

ListPlugins implements PluginManager.ListPlugins

func (*Manager[Req, Resp]) LoadFromConfig

func (m *Manager[Req, Resp]) LoadFromConfig(config ManagerConfig) error

LoadFromConfig implements PluginManager.LoadFromConfig

func (*Manager[Req, Resp]) Register

func (m *Manager[Req, Resp]) Register(plugin Plugin[Req, Resp]) error

Register implements PluginManager.Register

func (*Manager[Req, Resp]) RegisterFactory

func (m *Manager[Req, Resp]) RegisterFactory(pluginType string, factory PluginFactory[Req, Resp]) error

RegisterFactory registers a plugin factory for a specific plugin type

func (*Manager[Req, Resp]) ReloadConfig

func (m *Manager[Req, Resp]) ReloadConfig(config ManagerConfig) error

ReloadConfig implements PluginManager.ReloadConfig

func (*Manager[Req, Resp]) ReloadConfigWithStrategy

func (m *Manager[Req, Resp]) ReloadConfigWithStrategy(config ManagerConfig, strategy ReloadStrategy) error

ReloadConfigWithStrategy reloads configuration with specified strategy

func (*Manager[Req, Resp]) Shutdown

func (m *Manager[Req, Resp]) Shutdown(ctx context.Context) error

Shutdown implements PluginManager.Shutdown

func (*Manager[Req, Resp]) Unregister

func (m *Manager[Req, Resp]) Unregister(name string) error

Unregister implements PluginManager.Unregister

type ManagerConfig

type ManagerConfig struct {
	// Global settings
	LogLevel    string `json:"log_level" yaml:"log_level"`
	MetricsPort int    `json:"metrics_port" yaml:"metrics_port"`

	// Default configurations that apply to all plugins unless overridden
	DefaultRetry          RetryConfig          `json:"default_retry" yaml:"default_retry"`
	DefaultCircuitBreaker CircuitBreakerConfig `json:"default_circuit_breaker" yaml:"default_circuit_breaker"`
	DefaultHealthCheck    HealthCheckConfig    `json:"default_health_check" yaml:"default_health_check"`
	DefaultConnection     ConnectionConfig     `json:"default_connection" yaml:"default_connection"`
	DefaultRateLimit      RateLimitConfig      `json:"default_rate_limit" yaml:"default_rate_limit"`

	// Plugin configurations
	Plugins []PluginConfig `json:"plugins" yaml:"plugins"`

	// Plugin discovery settings
	Discovery DiscoveryConfig `json:"discovery,omitempty" yaml:"discovery,omitempty"`
}

ManagerConfig represents the comprehensive configuration for the plugin manager.

This is the top-level configuration that controls the entire plugin system, including global settings, default policies, and the collection of plugin configurations. It provides a centralized way to manage plugin behavior and operational parameters.

Configuration structure:

  • Global: System-wide settings like logging and metrics
  • Defaults: Default configurations applied to all plugins
  • Plugins: Individual plugin configurations
  • Discovery: Automatic plugin discovery settings

The manager applies default configurations to plugins that don't specify their own values, allowing for consistent behavior across the system while still permitting per-plugin customization.

Example configuration:

config := ManagerConfig{
    LogLevel:    "info",
    MetricsPort: 9090,
    DefaultRetry: RetryConfig{
        MaxRetries:      3,
        InitialInterval: 100 * time.Millisecond,
        MaxInterval:     5 * time.Second,
        Multiplier:      2.0,
        RandomJitter:    true,
    },
    DefaultHealthCheck: HealthCheckConfig{
        Enabled:      true,
        Interval:     30 * time.Second,
        Timeout:      5 * time.Second,
        FailureLimit: 3,
    },
    Plugins: []PluginConfig{
        {
            Name:      "auth-service",
            Transport: TransportHTTPS,
            Endpoint:  "https://auth.example.com/api/v1",
            // Inherits default retry and health check settings
        },
    },
}

func GetDefaultManagerConfig

func GetDefaultManagerConfig() ManagerConfig

GetDefaultManagerConfig returns a ManagerConfig with sensible production-ready defaults.

This function provides a baseline configuration suitable for most production environments. The defaults are chosen to balance reliability, performance, and resource usage. Users can customize specific settings as needed.

Default configuration includes:

  • Info-level logging with metrics on port 9090
  • Conservative retry policy with exponential backoff
  • Circuit breaker enabled with reasonable thresholds
  • Health checking enabled with 30-second intervals
  • Connection pooling with moderate limits
  • Rate limiting disabled by default

Usage:

config := GetDefaultManagerConfig()
config.LogLevel = "debug"  // Customize as needed
config.Plugins = []PluginConfig{
    // Add your plugins here
}

func (*ManagerConfig) ApplyDefaults

func (mc *ManagerConfig) ApplyDefaults()

ApplyDefaults applies default configurations to plugins that don't specify them

func (*ManagerConfig) FromJSON

func (mc *ManagerConfig) FromJSON(data []byte) error

FromJSON loads configuration from JSON

func (*ManagerConfig) ToJSON

func (mc *ManagerConfig) ToJSON() ([]byte, error)

func (*ManagerConfig) Validate

func (mc *ManagerConfig) Validate() error

Validate validates the manager configuration

type ManagerMetrics

type ManagerMetrics struct {
	RequestsTotal       atomic.Int64
	RequestsSuccess     atomic.Int64
	RequestsFailure     atomic.Int64
	RequestDuration     atomic.Int64 // nanoseconds
	CircuitBreakerTrips atomic.Int64
	HealthCheckFailures atomic.Int64
}

ManagerMetrics tracks operational metrics

type MetricsCollector

type MetricsCollector interface {
	// Counter metrics
	IncrementCounter(name string, labels map[string]string, value int64)

	// Gauge metrics
	SetGauge(name string, labels map[string]string, value float64)

	// Histogram metrics
	RecordHistogram(name string, labels map[string]string, value float64)

	// Custom metrics
	RecordCustomMetric(name string, labels map[string]string, value interface{})

	// Get current metrics snapshot
	GetMetrics() map[string]interface{}
}

MetricsCollector defines interface for collecting plugin metrics

type ObservabilityConfig

type ObservabilityConfig struct {
	// Metrics
	MetricsEnabled           bool                     `json:"metrics_enabled"`
	MetricsCollector         MetricsCollector         `json:"-"`
	EnhancedMetricsCollector EnhancedMetricsCollector `json:"-"` // Optional: for native label support
	MetricsPrefix            string                   `json:"metrics_prefix"`

	// Tracing
	TracingEnabled    bool            `json:"tracing_enabled"`
	TracingProvider   TracingProvider `json:"-"`
	TracingSampleRate float64         `json:"tracing_sample_rate"`

	// Logging
	LoggingEnabled    bool   `json:"logging_enabled"`
	LogLevel          string `json:"log_level"`
	StructuredLogging bool   `json:"structured_logging"`

	// Health monitoring
	HealthMetrics      bool `json:"health_metrics"`
	PerformanceMetrics bool `json:"performance_metrics"`
	ErrorMetrics       bool `json:"error_metrics"`
}

ObservabilityConfig configures the observability system

func DefaultObservabilityConfig

func DefaultObservabilityConfig() ObservabilityConfig

DefaultObservabilityConfig returns default observability configuration

func EnhancedObservabilityConfig

func EnhancedObservabilityConfig() ObservabilityConfig

EnhancedObservabilityConfig returns observability configuration with enhanced metrics collector

type ObservabilityReport

type ObservabilityReport struct {
	GeneratedAt time.Time                      `json:"generated_at"`
	UpTime      time.Duration                  `json:"uptime"`
	Global      GlobalMetrics                  `json:"global"`
	Plugins     map[string]PluginMetricsReport `json:"plugins"`
}

ObservabilityReport contains comprehensive metrics report

type ObservableManager

type ObservableManager[Req, Resp any] struct {
	*Manager[Req, Resp]
	// contains filtered or unexported fields
}

ObservableManager extends Manager with comprehensive observability

func NewObservableManager

func NewObservableManager[Req, Resp any](baseManager *Manager[Req, Resp], config ObservabilityConfig) *ObservableManager[Req, Resp]

NewObservableManager creates a new observable manager

func (*ObservableManager[Req, Resp]) ExecuteWithObservability

func (om *ObservableManager[Req, Resp]) ExecuteWithObservability(ctx context.Context, pluginName string, execCtx ExecutionContext, request Req) (Resp, error)

ExecuteWithObservability executes a request with full observability

func (*ObservableManager[Req, Resp]) GetObservabilityMetrics

func (om *ObservableManager[Req, Resp]) GetObservabilityMetrics() ObservabilityReport

GetObservabilityMetrics returns comprehensive metrics

type Plugin

type Plugin[Req, Resp any] interface {
	// Info returns metadata about the plugin
	Info() PluginInfo

	// Execute processes a request and returns a response
	// Context should be honored for timeouts and cancellation
	Execute(ctx context.Context, execCtx ExecutionContext, request Req) (Resp, error)

	// Health performs a health check and returns detailed status
	Health(ctx context.Context) HealthStatus

	// Close gracefully shuts down the plugin and cleans up resources
	// Should be idempotent (safe to call multiple times)
	Close() error
}

Plugin represents a generic plugin that can process requests of type Req and return responses of type Resp This interface is designed to be transport-agnostic and production-ready

type PluginConfig

type PluginConfig struct {
	// Basic plugin information
	Name      string        `json:"name" yaml:"name"`
	Type      string        `json:"type" yaml:"type"`
	Transport TransportType `json:"transport" yaml:"transport"`
	Endpoint  string        `json:"endpoint" yaml:"endpoint"`
	Priority  int           `json:"priority" yaml:"priority"`
	Enabled   bool          `json:"enabled" yaml:"enabled"`

	// Executable-specific configuration
	Executable string   `json:"executable,omitempty" yaml:"executable,omitempty"`
	Args       []string `json:"args,omitempty" yaml:"args,omitempty"`
	Env        []string `json:"env,omitempty" yaml:"env,omitempty"`
	WorkDir    string   `json:"work_dir,omitempty" yaml:"work_dir,omitempty"`

	// Security and authentication
	Auth AuthConfig `json:"auth" yaml:"auth"`

	// Resilience and reliability
	Retry          RetryConfig          `json:"retry" yaml:"retry"`
	CircuitBreaker CircuitBreakerConfig `json:"circuit_breaker" yaml:"circuit_breaker"`
	HealthCheck    HealthCheckConfig    `json:"health_check" yaml:"health_check"`
	Connection     ConnectionConfig     `json:"connection" yaml:"connection"`
	RateLimit      RateLimitConfig      `json:"rate_limit" yaml:"rate_limit"`

	// Plugin-specific configuration
	Options map[string]interface{} `json:"options,omitempty" yaml:"options,omitempty"`

	// Metadata
	Labels      map[string]string `json:"labels,omitempty" yaml:"labels,omitempty"`
	Annotations map[string]string `json:"annotations,omitempty" yaml:"annotations,omitempty"`
}

PluginConfig represents the comprehensive configuration for a single plugin instance.

This is the main configuration structure that defines how a plugin connects, authenticates, and behaves within the plugin system. It combines transport configuration, security settings, resilience patterns, and operational parameters.

Configuration sections:

  • Basic: Name, type, transport, endpoint identification
  • Executable: Process execution settings (for exec transport)
  • Security: Authentication and authorization configuration
  • Resilience: Retry, circuit breaker, health check settings
  • Performance: Connection pooling, rate limiting configuration
  • Metadata: Labels and annotations for organization and discovery

Example configurations:

// HTTP plugin with API key authentication
httpPlugin := PluginConfig{
    Name:      "payment-service",
    Type:      "payment",
    Transport: TransportHTTPS,
    Endpoint:  "https://payments.example.com/api/v1",
    Enabled:   true,
    Priority:  1,
    Auth: AuthConfig{
        Method: AuthAPIKey,
        APIKey: "your-api-key",
    },
    Retry: RetryConfig{
        MaxRetries:      3,
        InitialInterval: 100 * time.Millisecond,
        MaxInterval:     5 * time.Second,
        Multiplier:      2.0,
        RandomJitter:    true,
    },
    Labels: map[string]string{
        "environment": "production",
        "version":     "v1.2.3",
    },
}

// Executable plugin with custom environment
execPlugin := PluginConfig{
    Name:       "data-processor",
    Type:       "processor",
    Transport:  TransportExecutable,
    Executable: "/opt/processors/data-processor",
    Args:       []string{"--config", "/etc/processor.conf"},
    Env:        []string{"LOG_LEVEL=info", "MAX_MEMORY=1GB"},
    WorkDir:    "/tmp/processor",
}

func (*PluginConfig) Validate

func (pc *PluginConfig) Validate() error

Validate validates the plugin configuration

type PluginDiff

type PluginDiff struct {
	Added     []PluginConfig `json:"added"`
	Updated   []PluginUpdate `json:"updated"`
	Removed   []string       `json:"removed"`
	Unchanged []string       `json:"unchanged"`
}

PluginDiff represents the differences between old and new plugin configurations

type PluginFactory

type PluginFactory[Req, Resp any] interface {
	// CreatePlugin creates a new plugin instance from the given configuration
	CreatePlugin(config PluginConfig) (Plugin[Req, Resp], error)

	// SupportedTransports returns the list of supported transport protocols
	SupportedTransports() []string

	// ValidateConfig validates a plugin configuration without creating the plugin
	ValidateConfig(config PluginConfig) error
}

PluginFactory creates new plugin instances from configuration

type PluginInfo

type PluginInfo struct {
	Name         string            `json:"name"`
	Version      string            `json:"version"`
	Description  string            `json:"description,omitempty"`
	Author       string            `json:"author,omitempty"`
	Capabilities []string          `json:"capabilities,omitempty"`
	Metadata     map[string]string `json:"metadata,omitempty"`
}

PluginInfo contains comprehensive metadata about a plugin instance.

This structure provides essential information about a plugin's identity, capabilities, and characteristics. It's used for plugin discovery, version management, capability matching, and operational visibility.

Fields:

  • Name: Unique identifier for the plugin instance
  • Version: Plugin version for compatibility and update management
  • Description: Human-readable description of plugin functionality
  • Author: Plugin developer/maintainer information
  • Capabilities: List of features or operations the plugin supports
  • Metadata: Additional key-value pairs for custom plugin information

Example:

info := plugin.Info()
fmt.Printf("Plugin: %s v%s by %s\n", info.Name, info.Version, info.Author)
fmt.Printf("Capabilities: %v\n", info.Capabilities)

type PluginLoadMetrics

type PluginLoadMetrics struct {
	TotalRequests      atomic.Int64
	SuccessfulRequests atomic.Int64
	FailedRequests     atomic.Int64
	ActiveConnections  atomic.Int32
	AverageLatency     atomic.Int64 // Nanoseconds
	LastLatency        atomic.Int64 // Nanoseconds
	LastUpdate         atomic.Int64 // Unix timestamp
	HealthScore        atomic.Int32 // 0-100 scale
}

PluginLoadMetrics tracks metrics for load balancing decisions

type PluginManager

type PluginManager[Req, Resp any] interface {
	// Register adds a plugin to the manager
	Register(plugin Plugin[Req, Resp]) error

	// Unregister removes a plugin from the manager
	Unregister(name string) error

	// Execute routes a request to the appropriate plugin
	// Includes automatic retries, circuit breaking, and fallback handling
	Execute(ctx context.Context, pluginName string, request Req) (Resp, error)

	// ExecuteWithOptions executes with custom execution context
	ExecuteWithOptions(ctx context.Context, pluginName string, execCtx ExecutionContext, request Req) (Resp, error)

	// GetPlugin returns a specific plugin by name
	GetPlugin(name string) (Plugin[Req, Resp], error)

	// ListPlugins returns all registered plugin names and their health status
	ListPlugins() map[string]HealthStatus

	// LoadFromConfig loads plugins from a configuration file or object
	LoadFromConfig(config ManagerConfig) error

	// ReloadConfig hot-reloads configuration without stopping the manager
	ReloadConfig(config ManagerConfig) error

	// Health returns the overall health of all plugins
	Health() map[string]HealthStatus

	// Shutdown gracefully shuts down all plugins and cleans up resources
	Shutdown(ctx context.Context) error
}

PluginManager manages a collection of plugins and provides load balancing, circuit breaking, and health monitoring

type PluginMetricsReport

type PluginMetricsReport struct {
	TotalRequests       int64         `json:"total_requests"`
	SuccessfulRequests  int64         `json:"successful_requests"`
	FailedRequests      int64         `json:"failed_requests"`
	ActiveRequests      int64         `json:"active_requests"`
	SuccessRate         float64       `json:"success_rate_percent"`
	MinLatency          time.Duration `json:"min_latency"`
	MaxLatency          time.Duration `json:"max_latency"`
	AvgLatency          time.Duration `json:"avg_latency"`
	TimeoutErrors       int64         `json:"timeout_errors"`
	ConnectionErrors    int64         `json:"connection_errors"`
	AuthErrors          int64         `json:"auth_errors"`
	OtherErrors         int64         `json:"other_errors"`
	CircuitBreakerTrips int64         `json:"circuit_breaker_trips"`
	HealthCheckTotal    int64         `json:"health_check_total"`
	HealthCheckFailed   int64         `json:"health_check_failed"`
}

PluginMetricsReport contains metrics for a single plugin

type PluginObservabilityMetrics

type PluginObservabilityMetrics struct {
	// Request metrics
	TotalRequests      *atomic.Int64
	SuccessfulRequests *atomic.Int64
	FailedRequests     *atomic.Int64
	ActiveRequests     *atomic.Int64

	// Timing metrics
	TotalLatency *atomic.Int64 // Total latency in nanoseconds
	MinLatency   *atomic.Int64
	MaxLatency   *atomic.Int64
	AvgLatency   *atomic.Int64

	// Error metrics
	TimeoutErrors    *atomic.Int64
	ConnectionErrors *atomic.Int64
	AuthErrors       *atomic.Int64
	OtherErrors      *atomic.Int64

	// Circuit breaker metrics
	CircuitBreakerTrips *atomic.Int64
	CircuitBreakerState string

	// Health metrics
	HealthCheckTotal  *atomic.Int64
	HealthCheckFailed *atomic.Int64
	LastHealthCheck   *atomic.Int64 // Unix timestamp
	HealthStatus      string
}

PluginObservabilityMetrics contains detailed metrics for a single plugin

type PluginReloader

type PluginReloader[Req, Resp any] struct {
	// contains filtered or unexported fields
}

PluginReloader manages the hot-reload process

func NewPluginReloader

func NewPluginReloader[Req, Resp any](manager *Manager[Req, Resp], options ReloadOptions, logger *slog.Logger) *PluginReloader[Req, Resp]

NewPluginReloader creates a new plugin reloader

func (*PluginReloader[Req, Resp]) ReloadWithIntelligentDiff

func (r *PluginReloader[Req, Resp]) ReloadWithIntelligentDiff(ctx context.Context, newConfig ManagerConfig) error

ReloadWithIntelligentDiff performs intelligent hot-reload based on configuration diff

type PluginStatus

type PluginStatus int

PluginStatus represents the current operational status of a plugin instance.

Status levels indicate the plugin's ability to handle requests and its overall health:

  • StatusUnknown: Initial state or status cannot be determined
  • StatusHealthy: Plugin is fully operational and handling requests normally
  • StatusDegraded: Plugin is operational but performance may be impacted
  • StatusUnhealthy: Plugin has issues but may still handle some requests
  • StatusOffline: Plugin is not responding and should not receive requests

These statuses are used by load balancers, circuit breakers, and health monitoring systems to make routing and recovery decisions.

const (
	StatusUnknown PluginStatus = iota
	StatusHealthy
	StatusDegraded
	StatusUnhealthy
	StatusOffline
)

func (PluginStatus) String

func (s PluginStatus) String() string

type PluginUpdate

type PluginUpdate struct {
	Name      string       `json:"name"`
	OldConfig PluginConfig `json:"old_config"`
	NewConfig PluginConfig `json:"new_config"`
	Changes   []string     `json:"changes"`
}

PluginUpdate represents an update to an existing plugin

type PluginWrapper

type PluginWrapper[Req, Resp any] struct {
	Plugin   Plugin[Req, Resp]
	Weight   int
	Priority int
	Active   *atomic.Int32 // Active connection count
	Enabled  *atomic.Bool
	LastUsed *atomic.Int64 // Unix timestamp
}

PluginWrapper wraps a plugin with load balancing metadata

type PrometheusBucket

type PrometheusBucket struct {
	UpperBound float64 `json:"upper_bound"`
	Count      uint64  `json:"count"`
}

PrometheusBucket represents a histogram bucket

type PrometheusMetric

type PrometheusMetric struct {
	Name        string             `json:"name"`
	Type        string             `json:"type"` // counter, gauge, histogram
	Description string             `json:"description"`
	Value       float64            `json:"value,omitempty"`
	Labels      map[string]string  `json:"labels,omitempty"`
	Buckets     []PrometheusBucket `json:"buckets,omitempty"` // For histograms
}

PrometheusMetric represents a metric in Prometheus format

type RateLimitConfig

type RateLimitConfig struct {
	Enabled           bool          `json:"enabled" yaml:"enabled"`
	RequestsPerSecond float64       `json:"requests_per_second" yaml:"requests_per_second"`
	BurstSize         int           `json:"burst_size" yaml:"burst_size"`
	TimeWindow        time.Duration `json:"time_window" yaml:"time_window"`
}

RateLimitConfig contains rate limiting settings for controlling plugin request rates.

Rate limiting protects plugins from being overwhelmed and ensures fair resource distribution. It implements a token bucket algorithm that allows burst traffic while maintaining average rate limits.

Token bucket algorithm:

  • BurstSize: Maximum tokens available (allows burst of requests)
  • RequestsPerSecond: Rate at which tokens are replenished
  • TimeWindow: Period over which the rate is measured
  • Tokens are consumed for each request and replenished at the configured rate

Example configuration:

rateLimit := RateLimitConfig{
    Enabled:           true,
    RequestsPerSecond: 10.0,           // Allow 10 requests per second
    BurstSize:         20,             // Allow bursts up to 20 requests
    TimeWindow:        time.Second,    // Rate window of 1 second
}
// This allows up to 20 requests immediately, then 10 per second thereafter

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter implements token bucket rate limiting

func NewRateLimiter

func NewRateLimiter(config RateLimitConfig) *RateLimiter

NewRateLimiter creates a new rate limiter

func (*RateLimiter) Allow

func (rl *RateLimiter) Allow() bool

Allow checks if a request should be allowed

type ReloadOptions

type ReloadOptions struct {
	Strategy             ReloadStrategy `json:"strategy"`
	DrainTimeout         time.Duration  `json:"drain_timeout"`
	GracefulTimeout      time.Duration  `json:"graceful_timeout"`
	MaxConcurrentReloads int            `json:"max_concurrent_reloads"`
	HealthCheckTimeout   time.Duration  `json:"health_check_timeout"`
	RollbackOnFailure    bool           `json:"rollback_on_failure"`
}

ReloadOptions configures how hot-reload should behave

func DefaultReloadOptions

func DefaultReloadOptions() ReloadOptions

DefaultReloadOptions returns sensible defaults for reload options

type ReloadStrategy

type ReloadStrategy string

ReloadStrategy defines how plugins should be reloaded

const (
	// ReloadStrategyRecreate completely removes and recreates plugins (current behavior)
	ReloadStrategyRecreate ReloadStrategy = "recreate"

	// ReloadStrategyGraceful drains connections gracefully before updating
	ReloadStrategyGraceful ReloadStrategy = "graceful"

	// ReloadStrategyRolling performs rolling updates with zero downtime
	ReloadStrategyRolling ReloadStrategy = "rolling"
)

type RetryConfig

type RetryConfig struct {
	MaxRetries      int           `json:"max_retries" yaml:"max_retries"`
	InitialInterval time.Duration `json:"initial_interval" yaml:"initial_interval"`
	MaxInterval     time.Duration `json:"max_interval" yaml:"max_interval"`
	Multiplier      float64       `json:"multiplier" yaml:"multiplier"`
	RandomJitter    bool          `json:"random_jitter" yaml:"random_jitter"`
}

RetryConfig contains retry and backoff configuration for failed plugin requests.

This configuration implements an exponential backoff strategy with optional random jitter to prevent thundering herd problems when multiple clients retry simultaneously.

The retry logic works as follows:

  1. First retry after InitialInterval
  2. Each subsequent retry multiplies the interval by Multiplier
  3. Interval never exceeds MaxInterval
  4. RandomJitter adds up to ±10% randomness to intervals
  5. Stop retrying after MaxRetries attempts

Example configuration:

retry := RetryConfig{
    MaxRetries:      3,
    InitialInterval: 100 * time.Millisecond,
    MaxInterval:     5 * time.Second,
    Multiplier:      2.0,
    RandomJitter:    true,
}
// This results in delays of ~100ms, ~200ms, ~400ms (with jitter)

type Span

type Span interface {
	// Set span attributes
	SetAttribute(key string, value interface{})

	// Set span status
	SetStatus(code SpanStatusCode, message string)

	// Finish the span
	Finish()

	// Get span context for propagation
	Context() interface{}
}

Span represents a tracing span

type SpanStatusCode

type SpanStatusCode int

SpanStatusCode represents span status

const (
	SpanStatusOK SpanStatusCode = iota
	SpanStatusError
	SpanStatusTimeout
)

type TracingProvider

type TracingProvider interface {
	// Start a new span
	StartSpan(ctx context.Context, operationName string) (context.Context, Span)

	// Extract tracing context from headers/metadata
	ExtractContext(headers map[string]string) context.Context

	// Inject tracing context into headers/metadata
	InjectContext(ctx context.Context) map[string]string
}

TracingProvider defines interface for distributed tracing

type TransportType

type TransportType string

TransportType represents the different transport protocols supported by the plugin system.

Each transport type defines how plugins communicate with their underlying services:

  • HTTP/HTTPS: REST API communication over standard HTTP(S) protocols
  • gRPC: High-performance RPC communication with optional TLS
  • Unix sockets: Local inter-process communication via Unix domain sockets
  • Executable: Direct execution of external processes

Example usage:

config := PluginConfig{
    Transport: TransportHTTPS,
    Endpoint:  "https://api.example.com/v1/plugin",
}
const (
	TransportHTTP       TransportType = "http"
	TransportHTTPS      TransportType = "https"
	TransportGRPC       TransportType = "grpc"
	TransportGRPCTLS    TransportType = "grpc-tls"
	TransportUnix       TransportType = "unix"
	TransportExecutable TransportType = "exec"
)

type UnixConnectionPool

type UnixConnectionPool struct {
	// contains filtered or unexported fields
}

UnixConnectionPool manages a pool of Unix socket connections

func NewUnixConnectionPool

func NewUnixConnectionPool(socketPath string, maxConnections int) *UnixConnectionPool

NewUnixConnectionPool creates a new Unix socket connection pool

func (*UnixConnectionPool) Close

func (p *UnixConnectionPool) Close() error

Close closes all connections in the pool

func (*UnixConnectionPool) GetConnection

func (p *UnixConnectionPool) GetConnection() (net.Conn, error)

GetConnection gets a connection from the pool or creates a new one

func (*UnixConnectionPool) ReturnConnection

func (p *UnixConnectionPool) ReturnConnection(conn net.Conn)

ReturnConnection returns a connection to the pool

type UnixSocketMessage

type UnixSocketMessage struct {
	Type      string            `json:"type"`
	RequestID string            `json:"request_id"`
	Data      json.RawMessage   `json:"data"`
	Headers   map[string]string `json:"headers,omitempty"`
	Timeout   int64             `json:"timeout_ms,omitempty"`
}

UnixSocketMessage represents a standardized message format for Unix socket communication.

This structure defines the wire protocol for Unix domain socket communication, providing a consistent message format that supports different operation types and includes all necessary metadata for request handling and correlation.

Message types:

  • "execute": Normal plugin execution request
  • "health": Health check request
  • "info": Plugin information request
  • "ping": Connection test/keepalive

Example message for execution:

{
  "type": "execute",
  "request_id": "req-12345",
  "data": {"user_id": "user123", "action": "validate"},
  "headers": {"X-Trace-ID": "trace-abc"},
  "timeout_ms": 5000
}

The JSON format ensures consistency with other transports while maintaining the performance benefits of Unix domain sockets.

type UnixSocketPlugin

type UnixSocketPlugin[Req, Resp any] struct {
	// contains filtered or unexported fields
}

UnixSocketPlugin implements Plugin interface using Unix Domain Sockets for high-performance local communication.

This implementation provides the fastest possible communication mechanism for plugins running on the same machine, using Unix domain sockets to eliminate network overhead. It's ideal for high-throughput scenarios where plugins and the main application are co-located.

Features:

  • Ultra-low latency local communication via Unix domain sockets
  • Connection pooling for high concurrency scenarios
  • JSON-based message protocol for consistency with other transports
  • Proper connection lifecycle and error handling
  • Request correlation and timeout management
  • Health checking with rapid local validation

Performance characteristics:

  • Lowest possible latency (no network stack)
  • Highest throughput for local communication
  • Zero network security concerns (local socket)
  • No serialization overhead beyond JSON

Example usage:

config := PluginConfig{
    Name:      "local-processor",
    Transport: TransportUnix,
    Endpoint:  "/tmp/processor.sock",  // Unix socket path
    Connection: ConnectionConfig{
        MaxConnections: 20,  // Pool size for concurrent requests
    },
}

plugin, err := NewUnixSocketPlugin[ProcessRequest, ProcessResponse](config, logger)
if err != nil {
    log.Fatal(err)
}

// High-speed local communication
response, err := plugin.Execute(ctx, execCtx, request)
if err != nil {
    log.Printf("Local processing failed: %v", err)
}

// Cleanup
defer plugin.Close()

func NewUnixSocketPlugin

func NewUnixSocketPlugin[Req, Resp any](config PluginConfig, logger *slog.Logger) (*UnixSocketPlugin[Req, Resp], error)

NewUnixSocketPlugin creates a new Unix socket plugin instance

func (*UnixSocketPlugin[Req, Resp]) Close

func (u *UnixSocketPlugin[Req, Resp]) Close() error

Close closes the Unix socket connection pool

func (*UnixSocketPlugin[Req, Resp]) Execute

func (u *UnixSocketPlugin[Req, Resp]) Execute(ctx context.Context, execCtx ExecutionContext, request Req) (Resp, error)

Execute processes a request using Unix socket

func (*UnixSocketPlugin[Req, Resp]) Health

func (u *UnixSocketPlugin[Req, Resp]) Health(ctx context.Context) HealthStatus

Health performs a health check via Unix socket

func (*UnixSocketPlugin[Req, Resp]) Info

func (u *UnixSocketPlugin[Req, Resp]) Info() PluginInfo

Info returns plugin information

type UnixSocketPluginFactory

type UnixSocketPluginFactory[Req, Resp any] struct {
	// contains filtered or unexported fields
}

UnixSocketPluginFactory creates Unix socket plugin instances

func NewUnixSocketPluginFactory

func NewUnixSocketPluginFactory[Req, Resp any](logger *slog.Logger) *UnixSocketPluginFactory[Req, Resp]

NewUnixSocketPluginFactory creates a new Unix socket plugin factory

func (*UnixSocketPluginFactory[Req, Resp]) CreatePlugin

func (f *UnixSocketPluginFactory[Req, Resp]) CreatePlugin(config PluginConfig) (Plugin[Req, Resp], error)

CreatePlugin creates a new Unix socket plugin instance

func (*UnixSocketPluginFactory[Req, Resp]) SupportedTransports

func (f *UnixSocketPluginFactory[Req, Resp]) SupportedTransports() []string

SupportedTransports returns supported transport types

func (*UnixSocketPluginFactory[Req, Resp]) ValidateConfig

func (f *UnixSocketPluginFactory[Req, Resp]) ValidateConfig(config PluginConfig) error

ValidateConfig validates Unix socket plugin configuration

type UnixSocketResponse

type UnixSocketResponse struct {
	Type      string            `json:"type"`
	RequestID string            `json:"request_id"`
	Success   bool              `json:"success"`
	Data      json.RawMessage   `json:"data,omitempty"`
	Error     string            `json:"error,omitempty"`
	Headers   map[string]string `json:"headers,omitempty"`
}

UnixSocketResponse represents a standardized response format for Unix socket communication.

This structure defines the response protocol for Unix domain socket communication, providing consistent success/error handling and metadata propagation. It enables proper request correlation and comprehensive error reporting.

Response handling:

  • Success=true: Data field contains the successful response
  • Success=false: Error field contains the error message
  • RequestID matches the original request for correlation
  • Headers can include metadata like processing time, trace info

Example successful response:

{
  "type": "execute",
  "request_id": "req-12345",
  "success": true,
  "data": {"token": "abc123", "expires": 3600},
  "headers": {"X-Processing-Time-Ms": "23"}
}

Example error response:

{
  "type": "execute",
  "request_id": "req-12345",
  "success": false,
  "error": "Invalid user credentials",
  "headers": {"X-Error-Code": "AUTH_FAILED"}
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL