Documentation
¶
Overview ¶
Package goplugins provides a production-ready, type-safe plugin architecture for Go applications. It supports multiple transport protocols (HTTP, gRPC, Unix sockets) with built-in circuit breaking, health monitoring, authentication, and graceful degradation.
Key Features:
- Type-safe plugin interfaces using Go generics
- Multiple transport protocols (HTTP, gRPC, Unix sockets)
- Circuit breaker pattern for resilience
- Health monitoring and automatic recovery
- Authentication and authorization
- Hot-reloading of plugin configurations
- Comprehensive metrics and structured logging
- Graceful shutdown with proper cleanup
Basic Usage:
// Define your plugin request/response types
type KeyRequest struct {
KeyID string `json:"key_id"`
}
type KeyResponse struct {
Key []byte `json:"key"`
Error string `json:"error,omitempty"`
}
// Create a plugin manager
manager := goplugins.NewManager[KeyRequest, KeyResponse]()
// Load plugins from configuration
err := manager.LoadFromConfig("plugins.yaml")
if err != nil {
log.Fatal(err)
}
// Execute plugin operations
resp, err := manager.Execute(ctx, "vault-provider", KeyRequest{KeyID: "master"})
Security: The library implements multiple security layers including mTLS for transport security, API key authentication, request validation, rate limiting, and comprehensive audit logging.
Performance: Built-in connection pooling, intelligent caching, circuit breakers, and optimized serialization ensure high performance even under heavy load.
Copyright (c) 2025 AGILira - A. Giordano SPDX-License-Identifier: MPL-2.0
Index ¶
- Constants
- func NewAuthConfigValidationError(cause error) *errors.Error
- func NewCircuitBreakerOpenError(pluginName string) *errors.Error
- func NewCircuitBreakerTimeoutError(pluginName string, timeout interface{}) *errors.Error
- func NewDuplicatePluginNameError(name string) *errors.Error
- func NewExecTransportError(cause error) *errors.Error
- func NewGRPCTransportError(cause error) *errors.Error
- func NewHTTPTransportError(cause error) *errors.Error
- func NewHealthCheckFailedError(pluginName string, cause error) *errors.Error
- func NewHealthCheckTimeoutError(pluginName string, timeout interface{}) *errors.Error
- func NewInvalidEndpointFormatError() *errors.Error
- func NewInvalidEndpointURLError(endpoint string, cause error) *errors.Error
- func NewInvalidJSONConfigError(cause error) *errors.Error
- func NewInvalidPluginNameError(name string) *errors.Error
- func NewInvalidTransportError() *errors.Error
- func NewLoadBalancerFailedError(cause error) *errors.Error
- func NewMissingAPIKeyError() *errors.Error
- func NewMissingBasicCredentialsError() *errors.Error
- func NewMissingBearerTokenError() *errors.Error
- func NewMissingEndpointError(transport TransportType) *errors.Error
- func NewMissingExecutableError() *errors.Error
- func NewMissingMTLSCertsError() *errors.Error
- func NewMissingSocketPathError() *errors.Error
- func NewNoAvailablePluginsError(pluginType string) *errors.Error
- func NewNoPluginsConfiguredError() *errors.Error
- func NewPluginConnectionFailedError(name string, cause error) *errors.Error
- func NewPluginExecutionFailedError(name string, cause error) *errors.Error
- func NewPluginNotEnabledError(name string) *errors.Error
- func NewPluginNotFoundError(name string) *errors.Error
- func NewPluginTimeoutError(name string, timeout interface{}) *errors.Error
- func NewPluginValidationError(pluginIndex int, cause error) *errors.Error
- func NewRateLimitExceededError(pluginName string, limit interface{}) *errors.Error
- func NewUnixTransportError(cause error) *errors.Error
- func NewUnsupportedAuthMethodError(method AuthMethod) *errors.Error
- func NewUnsupportedTransportError(transport TransportType) *errors.Error
- type AuthConfig
- type AuthMethod
- type CircuitBreaker
- type CircuitBreakerConfig
- type CircuitBreakerState
- type CircuitBreakerStats
- type CommonPluginMetrics
- func (cpm *CommonPluginMetrics) DecrementActiveRequests(pluginName string)
- func (cpm *CommonPluginMetrics) IncrementActiveRequests(pluginName string)
- func (cpm *CommonPluginMetrics) RecordRequest(pluginName string, duration time.Duration, err error)
- func (cpm *CommonPluginMetrics) SetCircuitBreakerState(pluginName string, state int)
- type ConnectionConfig
- type CounterMetric
- type DefaultEnhancedMetricsCollector
- func (demc *DefaultEnhancedMetricsCollector) CounterWithLabels(name, description string, labelNames ...string) CounterMetric
- func (demc *DefaultEnhancedMetricsCollector) GaugeWithLabels(name, description string, labelNames ...string) GaugeMetric
- func (demc *DefaultEnhancedMetricsCollector) GetPrometheusMetrics() []PrometheusMetric
- func (demc *DefaultEnhancedMetricsCollector) HistogramWithLabels(name, description string, buckets []float64, labelNames ...string) HistogramMetric
- type DefaultMetricsCollector
- func (dmc *DefaultMetricsCollector) GetMetrics() map[string]interface{}
- func (dmc *DefaultMetricsCollector) IncrementCounter(name string, labels map[string]string, value int64)
- func (dmc *DefaultMetricsCollector) RecordCustomMetric(name string, labels map[string]string, value interface{})
- func (dmc *DefaultMetricsCollector) RecordHistogram(name string, labels map[string]string, value float64)
- func (dmc *DefaultMetricsCollector) SetGauge(name string, labels map[string]string, value float64)
- type DiscoveryConfig
- type EnhancedMetricsCollector
- type ExecutionContext
- type GRPCPlugin
- type GRPCPluginFactory
- type GRPCPluginService
- type GRPCPluginServiceClient
- type GaugeMetric
- type GlobalMetrics
- type HTTPPlugin
- type HTTPPluginFactory
- type HTTPPluginRequest
- type HTTPPluginResponse
- type HealthCheckConfig
- type HealthChecker
- func (hc *HealthChecker) Check() HealthStatus
- func (hc *HealthChecker) Done() <-chan struct{}
- func (hc *HealthChecker) GetConsecutiveFailures() int64
- func (hc *HealthChecker) GetLastCheck() time.Time
- func (hc *HealthChecker) IsRunning() bool
- func (hc *HealthChecker) Start()
- func (hc *HealthChecker) Stop()
- type HealthMonitor
- func (hm *HealthMonitor) AddChecker(name string, checker *HealthChecker)
- func (hm *HealthMonitor) GetAllStatus() map[string]HealthStatus
- func (hm *HealthMonitor) GetOverallHealth() HealthStatus
- func (hm *HealthMonitor) GetStatus(name string) (HealthStatus, bool)
- func (hm *HealthMonitor) RemoveChecker(name string)
- func (hm *HealthMonitor) Shutdown()
- func (hm *HealthMonitor) UpdateStatus(name string, status HealthStatus)
- type HealthStatus
- type HistogramMetric
- type LoadBalanceRequest
- type LoadBalancer
- func (lb *LoadBalancer[Req, Resp]) AddPlugin(name string, plugin Plugin[Req, Resp], weight, priority int) error
- func (lb *LoadBalancer[Req, Resp]) DisablePlugin(name string) error
- func (lb *LoadBalancer[Req, Resp]) EnablePlugin(name string) error
- func (lb *LoadBalancer[Req, Resp]) Execute(ctx context.Context, execCtx ExecutionContext, lbReq LoadBalanceRequest, ...) (Resp, error)
- func (lb *LoadBalancer[Req, Resp]) GetStats() map[string]LoadBalancerStats
- func (lb *LoadBalancer[Req, Resp]) RemovePlugin(name string) error
- func (lb *LoadBalancer[Req, Resp]) SelectPlugin(lbReq LoadBalanceRequest) (string, Plugin[Req, Resp], error)
- type LoadBalancerStats
- type LoadBalancingStrategy
- type Manager
- func (m *Manager[Req, Resp]) Execute(ctx context.Context, pluginName string, request Req) (Resp, error)
- func (m *Manager[Req, Resp]) ExecuteWithOptions(ctx context.Context, pluginName string, execCtx ExecutionContext, request Req) (Resp, error)
- func (m *Manager[Req, Resp]) GetMetrics() ManagerMetrics
- func (m *Manager[Req, Resp]) GetPlugin(name string) (Plugin[Req, Resp], error)
- func (m *Manager[Req, Resp]) Health() map[string]HealthStatus
- func (m *Manager[Req, Resp]) ListPlugins() map[string]HealthStatus
- func (m *Manager[Req, Resp]) LoadFromConfig(config ManagerConfig) error
- func (m *Manager[Req, Resp]) Register(plugin Plugin[Req, Resp]) error
- func (m *Manager[Req, Resp]) RegisterFactory(pluginType string, factory PluginFactory[Req, Resp]) error
- func (m *Manager[Req, Resp]) ReloadConfig(config ManagerConfig) error
- func (m *Manager[Req, Resp]) ReloadConfigWithStrategy(config ManagerConfig, strategy ReloadStrategy) error
- func (m *Manager[Req, Resp]) Shutdown(ctx context.Context) error
- func (m *Manager[Req, Resp]) Unregister(name string) error
- type ManagerConfig
- type ManagerMetrics
- type MetricsCollector
- type ObservabilityConfig
- type ObservabilityReport
- type ObservableManager
- type Plugin
- type PluginConfig
- type PluginDiff
- type PluginFactory
- type PluginInfo
- type PluginLoadMetrics
- type PluginManager
- type PluginMetricsReport
- type PluginObservabilityMetrics
- type PluginReloader
- type PluginStatus
- type PluginUpdate
- type PluginWrapper
- type PrometheusBucket
- type PrometheusMetric
- type RateLimitConfig
- type RateLimiter
- type ReloadOptions
- type ReloadStrategy
- type RetryConfig
- type Span
- type SpanStatusCode
- type TracingProvider
- type TransportType
- type UnixConnectionPool
- type UnixSocketMessage
- type UnixSocketPlugin
- func (u *UnixSocketPlugin[Req, Resp]) Close() error
- func (u *UnixSocketPlugin[Req, Resp]) Execute(ctx context.Context, execCtx ExecutionContext, request Req) (Resp, error)
- func (u *UnixSocketPlugin[Req, Resp]) Health(ctx context.Context) HealthStatus
- func (u *UnixSocketPlugin[Req, Resp]) Info() PluginInfo
- type UnixSocketPluginFactory
- type UnixSocketResponse
Constants ¶
const ( // Configuration errors (1000-1099) ErrCodeInvalidPluginName = "PLUGIN_1001" ErrCodeInvalidTransport = "PLUGIN_1002" ErrCodeMissingEndpoint = "PLUGIN_1003" ErrCodeInvalidEndpointURL = "PLUGIN_1004" ErrCodeInvalidEndpointFormat = "PLUGIN_1005" ErrCodeMissingSocketPath = "PLUGIN_1006" ErrCodeMissingExecutable = "PLUGIN_1007" ErrCodeUnsupportedTransport = "PLUGIN_1008" ErrCodeNoPluginsConfigured = "PLUGIN_1009" ErrCodeDuplicatePluginName = "PLUGIN_1010" ErrCodeInvalidJSONConfig = "PLUGIN_1011" // Authentication errors (1100-1199) ErrCodeMissingAPIKey = "AUTH_1101" ErrCodeMissingBearerToken = "AUTH_1102" ErrCodeMissingBasicCredentials = "AUTH_1103" ErrCodeMissingMTLSCerts = "AUTH_1104" ErrCodeUnsupportedAuthMethod = "AUTH_1105" // Plugin execution errors (1200-1299) ErrCodePluginNotFound = "PLUGIN_1201" ErrCodePluginNotEnabled = "PLUGIN_1202" ErrCodePluginExecutionFailed = "PLUGIN_1203" ErrCodePluginTimeout = "PLUGIN_1204" ErrCodePluginConnectionFailed = "PLUGIN_1205" // Transport errors (1300-1399) ErrCodeHTTPTransportError = "TRANSPORT_1301" ErrCodeGRPCTransportError = "TRANSPORT_1302" ErrCodeUnixTransportError = "TRANSPORT_1303" ErrCodeExecTransportError = "TRANSPORT_1304" // Circuit breaker errors (1400-1499) ErrCodeCircuitBreakerOpen = "CIRCUIT_1401" ErrCodeCircuitBreakerTimeout = "CIRCUIT_1402" // Rate limiting errors (1500-1599) ErrCodeRateLimitExceeded = "RATELIMIT_1501" // Health check errors (1600-1699) ErrCodeHealthCheckFailed = "HEALTH_1601" ErrCodeHealthCheckTimeout = "HEALTH_1602" // Load balancer errors (1700-1799) ErrCodeNoAvailablePlugins = "LOADBALANCER_1701" ErrCodeLoadBalancerFailed = "LOADBALANCER_1702" )
Error codes for the go-plugins system
Variables ¶
This section is empty.
Functions ¶
func NewExecTransportError ¶
func NewGRPCTransportError ¶
func NewHTTPTransportError ¶
func NewMissingAPIKeyError ¶
func NewMissingEndpointError ¶
func NewMissingEndpointError(transport TransportType) *errors.Error
func NewPluginNotFoundError ¶
func NewPluginTimeoutError ¶
func NewUnixTransportError ¶
func NewUnsupportedAuthMethodError ¶
func NewUnsupportedAuthMethodError(method AuthMethod) *errors.Error
func NewUnsupportedTransportError ¶
func NewUnsupportedTransportError(transport TransportType) *errors.Error
Types ¶
type AuthConfig ¶
type AuthConfig struct {
Method AuthMethod `json:"method" yaml:"method"`
APIKey string `json:"api_key,omitempty" yaml:"api_key,omitempty"`
Token string `json:"token,omitempty" yaml:"token,omitempty"`
Username string `json:"username,omitempty" yaml:"username,omitempty"`
Password string `json:"password,omitempty" yaml:"password,omitempty"`
CertFile string `json:"cert_file,omitempty" yaml:"cert_file,omitempty"`
KeyFile string `json:"key_file,omitempty" yaml:"key_file,omitempty"`
CAFile string `json:"ca_file,omitempty" yaml:"ca_file,omitempty"`
Headers map[string]string `json:"headers,omitempty" yaml:"headers,omitempty"`
}
AuthConfig contains authentication configuration for plugin connections.
This structure supports multiple authentication methods and provides flexible configuration options for securing plugin communications. The specific fields used depend on the chosen authentication method.
Field usage by auth method:
- AuthAPIKey: Uses APIKey field
- AuthBearer: Uses Token field
- AuthBasic: Uses Username and Password fields
- AuthMTLS: Uses CertFile, KeyFile, and optionally CAFile
- AuthCustom: Uses Headers field for custom authentication headers
Example configurations:
// API Key authentication
auth := AuthConfig{
Method: AuthAPIKey,
APIKey: "your-api-key-here",
}
// mTLS authentication
auth := AuthConfig{
Method: AuthMTLS,
CertFile: "/path/to/client.crt",
KeyFile: "/path/to/client.key",
CAFile: "/path/to/ca.crt",
}
func (*AuthConfig) Validate ¶
func (ac *AuthConfig) Validate() error
Validate validates the authentication configuration
type AuthMethod ¶
type AuthMethod string
AuthMethod represents different authentication methods supported by the plugin system.
Available authentication methods:
- AuthNone: No authentication required
- AuthAPIKey: API key-based authentication via X-API-Key header
- AuthBearer: Bearer token authentication via Authorization header
- AuthBasic: HTTP Basic authentication with username/password
- AuthMTLS: Mutual TLS authentication using client certificates
- AuthCustom: Custom authentication method with user-defined headers
Example usage:
auth := AuthConfig{
Method: AuthBearer,
Token: "your-jwt-token-here",
}
const ( AuthNone AuthMethod = "none" AuthAPIKey AuthMethod = "api-key" AuthBearer AuthMethod = "bearer" AuthBasic AuthMethod = "basic" AuthMTLS AuthMethod = "mtls" AuthCustom AuthMethod = "custom" )
type CircuitBreaker ¶
type CircuitBreaker struct {
// contains filtered or unexported fields
}
CircuitBreaker implements the circuit breaker pattern for enhanced plugin resilience.
This implementation provides automatic failure detection and recovery mechanisms to prevent cascading failures in distributed systems. It uses atomic operations for thread safety and maintains detailed statistics for monitoring.
Key features:
- Thread-safe operation using atomic counters
- Configurable failure thresholds and recovery timeouts
- Automatic state transitions based on success/failure patterns
- Detailed statistics tracking for observability
- Graceful recovery testing through half-open state
Usage example:
config := CircuitBreakerConfig{
Enabled: true,
FailureThreshold: 5,
RecoveryTimeout: 30 * time.Second,
MinRequestThreshold: 3,
SuccessThreshold: 2,
}
cb := NewCircuitBreaker(config)
// Before making a request
if !cb.AllowRequest() {
return nil, errors.New("circuit breaker open")
}
// After request completion
if err != nil {
cb.RecordFailure()
} else {
cb.RecordSuccess()
}
func NewCircuitBreaker ¶
func NewCircuitBreaker(config CircuitBreakerConfig) *CircuitBreaker
NewCircuitBreaker creates a new circuit breaker instance with the specified configuration.
The circuit breaker starts in the StateClosed state, allowing all requests to pass through. State transitions will occur automatically based on the configured thresholds and the success/failure patterns of monitored operations.
Parameters:
- config: Configuration defining thresholds, timeouts, and behavior parameters
Returns a thread-safe CircuitBreaker ready for use across multiple goroutines.
func (*CircuitBreaker) AllowRequest ¶
func (cb *CircuitBreaker) AllowRequest() bool
AllowRequest determines whether a request should be allowed through the circuit breaker.
This method implements the core logic of the circuit breaker pattern by checking the current state and applying the appropriate rules:
- StateClosed: Always allows requests (normal operation)
- StateOpen: Blocks all requests until recovery timeout expires
- StateHalfOpen: Allows limited requests to test service recovery
The method is thread-safe and may trigger state transitions when called. It should be called before attempting any operation that you want to protect with the circuit breaker.
Returns:
- true: Request should proceed
- false: Request should be rejected (fail fast)
func (*CircuitBreaker) GetState ¶
func (cb *CircuitBreaker) GetState() CircuitBreakerState
GetState returns the current state of the circuit breaker.
This method provides a thread-safe way to inspect the circuit breaker's current operational state without affecting its behavior. It's useful for monitoring, debugging, and making operational decisions.
Returns one of:
- StateClosed: Normal operation
- StateOpen: Circuit is tripped, blocking requests
- StateHalfOpen: Testing recovery
func (*CircuitBreaker) GetStats ¶
func (cb *CircuitBreaker) GetStats() CircuitBreakerStats
GetStats returns comprehensive statistics about the circuit breaker's operation.
This method provides detailed metrics for monitoring and observability purposes. The statistics include current state, operation counts, and timing information that can be used for alerting, dashboards, and performance analysis.
The returned statistics are consistent with the circuit breaker's internal state at the time of the call, providing a reliable snapshot for monitoring.
Returns CircuitBreakerStats containing:
- Current state and failure/success counts
- Request count and last failure timestamp
- All data needed for operational visibility
func (*CircuitBreaker) RecordFailure ¶
func (cb *CircuitBreaker) RecordFailure()
RecordFailure records a failed operation and may trigger the circuit breaker to open.
This method should be called after every failed operation that was allowed through the circuit breaker. It updates failure counters and may cause the circuit breaker to transition to StateOpen if the failure threshold is exceeded.
The method is thread-safe and performs atomic updates to maintain consistency across concurrent operations. It also updates the last failure timestamp for recovery timeout calculations.
State transition logic:
- May open circuit if FailureThreshold is exceeded
- In StateHalfOpen: Any failure immediately reopens the circuit
- Updates failure timestamp for recovery timeout tracking
func (*CircuitBreaker) RecordSuccess ¶
func (cb *CircuitBreaker) RecordSuccess()
RecordSuccess records a successful operation and may trigger state transitions.
This method should be called after every successful operation that was allowed through the circuit breaker. It updates internal counters and may cause the circuit breaker to transition from StateHalfOpen to StateClosed if enough consecutive successes have been recorded.
The method is thread-safe and performs atomic updates to maintain consistency across concurrent operations.
State transition logic:
- In StateHalfOpen: May close circuit if SuccessThreshold is met
- In other states: Updates statistics for monitoring
func (*CircuitBreaker) Reset ¶
func (cb *CircuitBreaker) Reset()
Reset forcibly resets the circuit breaker to the closed state and clears all counters.
This method provides an administrative way to manually reset the circuit breaker, typically used for operational recovery or testing purposes. It immediately transitions the circuit breaker to StateClosed regardless of current state and clears all failure/success counters.
Use cases:
- Manual recovery after fixing underlying issues
- Administrative reset during maintenance
- Testing and development scenarios
- Integration with external monitoring systems
Note: Use with caution in production as it bypasses the normal recovery logic.
type CircuitBreakerConfig ¶
type CircuitBreakerConfig struct {
Enabled bool `json:"enabled" yaml:"enabled"`
FailureThreshold int `json:"failure_threshold" yaml:"failure_threshold"`
RecoveryTimeout time.Duration `json:"recovery_timeout" yaml:"recovery_timeout"`
MinRequestThreshold int `json:"min_request_threshold" yaml:"min_request_threshold"`
SuccessThreshold int `json:"success_threshold" yaml:"success_threshold"`
}
CircuitBreakerConfig contains circuit breaker settings for plugin resilience.
The circuit breaker implements the Circuit Breaker pattern to prevent cascading failures by temporarily stopping requests to failing services. It has three states:
- Closed: Normal operation, requests flow through
- Open: Circuit is tripped, requests fail fast
- Half-Open: Testing if service has recovered
State transitions:
- Closed → Open: When FailureThreshold consecutive failures occur
- Open → Half-Open: After RecoveryTimeout has elapsed
- Half-Open → Closed: When SuccessThreshold successes occur
- Half-Open → Open: When any failure occurs
Example configuration:
cb := CircuitBreakerConfig{
Enabled: true,
FailureThreshold: 5, // Trip after 5 failures
RecoveryTimeout: 30 * time.Second,
MinRequestThreshold: 3, // Need 3 requests before considering trip
SuccessThreshold: 2, // Need 2 successes to close circuit
}
type CircuitBreakerState ¶
type CircuitBreakerState int32
CircuitBreakerState represents the current operational state of a circuit breaker.
The circuit breaker pattern prevents cascading failures by monitoring the failure rate of operations and temporarily blocking requests when failures exceed a threshold. This helps systems fail fast and recover gracefully.
State behaviors:
- StateClosed: Normal operation, all requests are allowed through
- StateOpen: Circuit is tripped, requests fail immediately without execution
- StateHalfOpen: Testing phase, limited requests allowed to test recovery
State transitions occur automatically based on failure/success counts and timeouts.
const ( StateClosed CircuitBreakerState = iota StateOpen StateHalfOpen )
func (CircuitBreakerState) String ¶
func (s CircuitBreakerState) String() string
type CircuitBreakerStats ¶
type CircuitBreakerStats struct {
State CircuitBreakerState `json:"state"`
FailureCount int64 `json:"failure_count"`
SuccessCount int64 `json:"success_count"`
RequestCount int64 `json:"request_count"`
LastFailure time.Time `json:"last_failure"`
}
CircuitBreakerStats contains comprehensive statistics about circuit breaker operation.
This structure provides all the metrics needed for monitoring, alerting, and debugging circuit breaker behavior. It includes both current state information and historical operation counts.
Fields:
- State: Current operational state (Closed/Open/HalfOpen)
- FailureCount: Total failures recorded since last reset
- SuccessCount: Total successes recorded since last reset
- RequestCount: Total requests processed since last reset
- LastFailure: Timestamp of the most recent failure
These statistics can be exposed via metrics systems, logged for analysis, or used by monitoring systems to track system health and performance.
type CommonPluginMetrics ¶
type CommonPluginMetrics struct {
RequestCount CounterMetric
RequestDuration HistogramMetric
ActiveRequests GaugeMetric
ErrorCount CounterMetric
CircuitBreakerState GaugeMetric
}
CommonPluginMetrics provides a set of commonly used metrics for plugin systems
func CreateCommonPluginMetrics ¶
func CreateCommonPluginMetrics(collector EnhancedMetricsCollector) *CommonPluginMetrics
CreateCommonPluginMetrics creates commonly used plugin metrics with the enhanced collector
func (*CommonPluginMetrics) DecrementActiveRequests ¶
func (cpm *CommonPluginMetrics) DecrementActiveRequests(pluginName string)
DecrementActiveRequests decrements the active request count
func (*CommonPluginMetrics) IncrementActiveRequests ¶
func (cpm *CommonPluginMetrics) IncrementActiveRequests(pluginName string)
IncrementActiveRequests increments the active request count
func (*CommonPluginMetrics) RecordRequest ¶
func (cpm *CommonPluginMetrics) RecordRequest(pluginName string, duration time.Duration, err error)
RecordRequest records a plugin request with its outcome
func (*CommonPluginMetrics) SetCircuitBreakerState ¶
func (cpm *CommonPluginMetrics) SetCircuitBreakerState(pluginName string, state int)
SetCircuitBreakerState sets the circuit breaker state for a plugin
type ConnectionConfig ¶
type ConnectionConfig struct {
MaxConnections int `json:"max_connections" yaml:"max_connections"`
MaxIdleConnections int `json:"max_idle_connections" yaml:"max_idle_connections"`
IdleTimeout time.Duration `json:"idle_timeout" yaml:"idle_timeout"`
ConnectionTimeout time.Duration `json:"connection_timeout" yaml:"connection_timeout"`
RequestTimeout time.Duration `json:"request_timeout" yaml:"request_timeout"`
KeepAlive bool `json:"keep_alive" yaml:"keep_alive"`
DisableCompression bool `json:"disable_compression" yaml:"disable_compression"`
}
ConnectionConfig contains connection pooling and timeout settings for plugin transports.
This configuration optimizes network resource usage and performance by managing connection lifecycles and timeouts. Proper configuration prevents resource leaks and improves response times.
Configuration guidelines:
- MaxConnections: Total connection pool size (consider server limits)
- MaxIdleConnections: Connections to keep alive when idle (balance memory vs latency)
- IdleTimeout: How long to keep idle connections (balance resource usage vs reconnect cost)
- ConnectionTimeout: Maximum time to establish new connections
- RequestTimeout: Maximum time for individual requests
- KeepAlive: Enable TCP keep-alive for long-lived connections
Example configuration:
conn := ConnectionConfig{
MaxConnections: 10, // Max 10 concurrent connections
MaxIdleConnections: 5, // Keep 5 connections idle
IdleTimeout: 30 * time.Second, // Close idle connections after 30s
ConnectionTimeout: 10 * time.Second, // 10s to establish connection
RequestTimeout: 30 * time.Second, // 30s per request
KeepAlive: true, // Enable TCP keep-alive
DisableCompression: false, // Enable compression
}
type CounterMetric ¶
type CounterMetric interface {
Inc(labelValues ...string)
Add(value float64, labelValues ...string)
}
CounterMetric represents a counter with native label support
type DefaultEnhancedMetricsCollector ¶
type DefaultEnhancedMetricsCollector struct {
*DefaultMetricsCollector // Embed for backward compatibility
// contains filtered or unexported fields
}
DefaultEnhancedMetricsCollector provides an enhanced metrics collector with native label support
func NewDefaultEnhancedMetricsCollector ¶
func NewDefaultEnhancedMetricsCollector() *DefaultEnhancedMetricsCollector
NewDefaultEnhancedMetricsCollector creates a new enhanced metrics collector
func (*DefaultEnhancedMetricsCollector) CounterWithLabels ¶
func (demc *DefaultEnhancedMetricsCollector) CounterWithLabels(name, description string, labelNames ...string) CounterMetric
CounterWithLabels implements EnhancedMetricsCollector
func (*DefaultEnhancedMetricsCollector) GaugeWithLabels ¶
func (demc *DefaultEnhancedMetricsCollector) GaugeWithLabels(name, description string, labelNames ...string) GaugeMetric
GaugeWithLabels implements EnhancedMetricsCollector
func (*DefaultEnhancedMetricsCollector) GetPrometheusMetrics ¶
func (demc *DefaultEnhancedMetricsCollector) GetPrometheusMetrics() []PrometheusMetric
GetPrometheusMetrics implements EnhancedMetricsCollector
func (*DefaultEnhancedMetricsCollector) HistogramWithLabels ¶
func (demc *DefaultEnhancedMetricsCollector) HistogramWithLabels(name, description string, buckets []float64, labelNames ...string) HistogramMetric
HistogramWithLabels implements EnhancedMetricsCollector
type DefaultMetricsCollector ¶
type DefaultMetricsCollector struct {
// contains filtered or unexported fields
}
DefaultMetricsCollector provides a basic in-memory metrics collector
func NewDefaultMetricsCollector ¶
func NewDefaultMetricsCollector() *DefaultMetricsCollector
NewDefaultMetricsCollector creates a new default metrics collector
func (*DefaultMetricsCollector) GetMetrics ¶
func (dmc *DefaultMetricsCollector) GetMetrics() map[string]interface{}
GetMetrics implements MetricsCollector
func (*DefaultMetricsCollector) IncrementCounter ¶
func (dmc *DefaultMetricsCollector) IncrementCounter(name string, labels map[string]string, value int64)
IncrementCounter implements MetricsCollector
func (*DefaultMetricsCollector) RecordCustomMetric ¶
func (dmc *DefaultMetricsCollector) RecordCustomMetric(name string, labels map[string]string, value interface{})
RecordCustomMetric implements MetricsCollector
func (*DefaultMetricsCollector) RecordHistogram ¶
func (dmc *DefaultMetricsCollector) RecordHistogram(name string, labels map[string]string, value float64)
RecordHistogram implements MetricsCollector
type DiscoveryConfig ¶
type DiscoveryConfig struct {
Enabled bool `json:"enabled" yaml:"enabled"`
Directories []string `json:"directories,omitempty" yaml:"directories,omitempty"`
Patterns []string `json:"patterns,omitempty" yaml:"patterns,omitempty"`
WatchMode bool `json:"watch_mode" yaml:"watch_mode"`
}
DiscoveryConfig contains plugin auto-discovery settings for dynamic plugin loading.
The discovery system automatically finds and loads plugins from specified directories, enabling dynamic plugin registration without manual configuration. This is particularly useful for plugin ecosystems where plugins are deployed independently.
Discovery behavior:
- Directories: List of paths to scan for plugins
- Patterns: File name patterns to match (e.g., "*.so", "plugin-*")
- WatchMode: Continuously monitor directories for changes
Security considerations:
- Only scan trusted directories to prevent malicious plugin loading
- Use specific patterns to avoid loading unintended files
- Validate discovered plugins before loading
Example configuration:
discovery := DiscoveryConfig{
Enabled: true,
Directories: []string{
"/opt/plugins",
"/usr/local/lib/plugins",
},
Patterns: []string{
"*.so", // Shared libraries
"plugin-*", // Plugin executables
},
WatchMode: true, // Hot-reload new plugins
}
type EnhancedMetricsCollector ¶
type EnhancedMetricsCollector interface {
MetricsCollector // Embed the original interface for backward compatibility
// Enhanced metrics with native label support
CounterWithLabels(name, description string, labelNames ...string) CounterMetric
GaugeWithLabels(name, description string, labelNames ...string) GaugeMetric
HistogramWithLabels(name, description string, buckets []float64, labelNames ...string) HistogramMetric
// Get metrics in Prometheus-compatible format
GetPrometheusMetrics() []PrometheusMetric
}
EnhancedMetricsCollector extends MetricsCollector with native label support This interface is designed to be compatible with Prometheus and other modern metrics systems
func MigrateToEnhancedMetrics ¶
func MigrateToEnhancedMetrics(legacy MetricsCollector) EnhancedMetricsCollector
MigrateToEnhancedMetrics helps migrate from legacy metrics to enhanced metrics
func NewEnhancedMetricsCollector ¶
func NewEnhancedMetricsCollector() EnhancedMetricsCollector
NewEnhancedMetricsCollector creates a new enhanced metrics collector with native label support
type ExecutionContext ¶
type ExecutionContext struct {
RequestID string `json:"request_id"`
Timeout time.Duration `json:"timeout"`
MaxRetries int `json:"max_retries"`
Headers map[string]string `json:"headers,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
}
ExecutionContext provides execution context and configuration to plugins
type GRPCPlugin ¶
type GRPCPlugin[Req, Resp any] struct { // contains filtered or unexported fields }
GRPCPlugin implements Plugin interface using gRPC transport with comprehensive TLS and security support.
This implementation provides high-performance RPC communication using gRPC with full support for TLS, mutual authentication, metadata handling, and proper connection management. It's designed for low-latency, high-throughput scenarios where performance is critical.
Features:
- gRPC and gRPC-TLS transports with configurable security
- Mutual TLS (mTLS) authentication with client certificates
- Connection reuse and proper lifecycle management
- Metadata propagation for distributed tracing
- Comprehensive error handling with gRPC status codes
- Health checking with standard gRPC health protocol
- Automatic connection recovery and reconnection
Example usage:
config := PluginConfig{
Name: "payment-processor",
Transport: TransportGRPCTLS,
Endpoint: "payment.company.com:443",
Auth: AuthConfig{
Method: AuthMTLS,
CertFile: "/etc/ssl/client.crt",
KeyFile: "/etc/ssl/client.key",
CAFile: "/etc/ssl/ca.crt",
},
}
plugin, err := NewGRPCPlugin[PaymentRequest, PaymentResponse](config, logger)
if err != nil {
log.Fatal(err)
}
// Use the plugin
response, err := plugin.Execute(ctx, execCtx, request)
if err != nil {
log.Printf("Payment processing failed: %v", err)
}
// Don't forget to close
defer plugin.Close()
func NewGRPCPlugin ¶
func NewGRPCPlugin[Req, Resp any](config PluginConfig, logger *slog.Logger) (*GRPCPlugin[Req, Resp], error)
NewGRPCPlugin creates a new gRPC plugin instance
func (*GRPCPlugin[Req, Resp]) Close ¶
func (g *GRPCPlugin[Req, Resp]) Close() error
Close closes the gRPC connection
func (*GRPCPlugin[Req, Resp]) Execute ¶
func (g *GRPCPlugin[Req, Resp]) Execute(ctx context.Context, execCtx ExecutionContext, request Req) (Resp, error)
Execute processes a request using gRPC
func (*GRPCPlugin[Req, Resp]) Health ¶
func (g *GRPCPlugin[Req, Resp]) Health(ctx context.Context) HealthStatus
Health performs a health check via gRPC
func (*GRPCPlugin[Req, Resp]) Info ¶
func (g *GRPCPlugin[Req, Resp]) Info() PluginInfo
Info returns plugin information
type GRPCPluginFactory ¶
type GRPCPluginFactory[Req, Resp any] struct { // contains filtered or unexported fields }
GRPCPluginFactory creates gRPC plugin instances with full TLS and security configuration.
This factory manages the creation of GRPCPlugin instances, handling complex gRPC connection setup, TLS configuration, and security validation. It ensures proper connection establishment and validates all security configurations.
Security features:
- TLS 1.2+ enforcement with secure cipher suites
- Mutual TLS authentication with certificate validation
- CA certificate validation for server authentication
- Proper credential management and secure defaults
Example usage:
factory := NewGRPCPluginFactory[OrderRequest, OrderResponse](logger)
config := PluginConfig{
Name: "order-service",
Transport: TransportGRPCTLS,
Endpoint: "orders.company.com:9443",
Auth: AuthConfig{
Method: AuthMTLS,
CertFile: "/etc/ssl/certs/client.crt",
KeyFile: "/etc/ssl/private/client.key",
CAFile: "/etc/ssl/certs/ca.crt",
},
}
plugin, err := factory.CreatePlugin(config)
if err != nil {
log.Fatalf("Failed to create gRPC plugin: %v", err)
}
func NewGRPCPluginFactory ¶
func NewGRPCPluginFactory[Req, Resp any](logger *slog.Logger) *GRPCPluginFactory[Req, Resp]
NewGRPCPluginFactory creates a new gRPC plugin factory
func (*GRPCPluginFactory[Req, Resp]) CreatePlugin ¶
func (f *GRPCPluginFactory[Req, Resp]) CreatePlugin(config PluginConfig) (Plugin[Req, Resp], error)
CreatePlugin creates a new gRPC plugin instance
func (*GRPCPluginFactory[Req, Resp]) SupportedTransports ¶
func (f *GRPCPluginFactory[Req, Resp]) SupportedTransports() []string
SupportedTransports returns supported transport types
func (*GRPCPluginFactory[Req, Resp]) ValidateConfig ¶
func (f *GRPCPluginFactory[Req, Resp]) ValidateConfig(config PluginConfig) error
ValidateConfig validates gRPC plugin configuration
type GRPCPluginService ¶
type GRPCPluginService interface {
// Execute processes a request and returns a response
Execute(ctx context.Context, request []byte) ([]byte, error)
// Health performs a health check
Health(ctx context.Context) error
// Info returns plugin information
Info(ctx context.Context) ([]byte, error)
}
GRPCPluginService defines the standard gRPC service interface that plugins must implement.
This interface establishes a contract for gRPC-based plugins, providing standardized methods for execution, health checking, and metadata retrieval. All gRPC plugins should implement this interface to ensure compatibility with the plugin system.
Method specifications:
- Execute: Process requests with serialized JSON data
- Health: Perform health checks (should follow gRPC health checking protocol)
- Info: Return plugin metadata and capabilities
Example protobuf service definition:
service PluginService {
rpc Execute(ExecuteRequest) returns (ExecuteResponse);
rpc Health(HealthRequest) returns (HealthResponse);
rpc Info(InfoRequest) returns (InfoResponse);
}
The actual gRPC implementation should serialize/deserialize the byte arrays as JSON for consistency with other transport methods.
type GRPCPluginServiceClient ¶
type GRPCPluginServiceClient struct {
// contains filtered or unexported fields
}
GRPCPluginServiceClient wraps the gRPC client for plugin services
func NewGRPCPluginServiceClient ¶
func NewGRPCPluginServiceClient(conn *grpc.ClientConn) *GRPCPluginServiceClient
NewGRPCPluginServiceClient creates a new gRPC plugin service client
type GaugeMetric ¶
type GaugeMetric interface {
Set(value float64, labelValues ...string)
Inc(labelValues ...string)
Dec(labelValues ...string)
Add(value float64, labelValues ...string)
}
GaugeMetric represents a gauge with native label support
type GlobalMetrics ¶
type GlobalMetrics struct {
TotalRequests int64 `json:"total_requests"`
TotalErrors int64 `json:"total_errors"`
ActiveRequests int64 `json:"active_requests"`
}
GlobalMetrics contains system-wide metrics
type HTTPPlugin ¶
type HTTPPlugin[Req, Resp any] struct { // contains filtered or unexported fields }
HTTPPlugin represents a plugin that communicates over HTTP/HTTPS protocols.
This implementation provides a full-featured HTTP client for plugin communication, supporting authentication, rate limiting, connection pooling, and comprehensive error handling. It's designed for production environments with proper security and operational concerns.
Features:
- HTTP/HTTPS with configurable TLS settings including mTLS
- Multiple authentication methods (API key, Bearer token, Basic auth)
- Connection pooling and keep-alive for performance
- Rate limiting with token bucket algorithm
- Structured request/response handling with JSON serialization
- Health checking with configurable endpoints
- Proper timeout handling and graceful connection cleanup
Example usage:
config := PluginConfig{
Name: "api-service",
Transport: TransportHTTPS,
Endpoint: "https://api.example.com/v1/plugin",
Auth: AuthConfig{
Method: AuthBearer,
Token: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
},
Connection: ConnectionConfig{
MaxConnections: 10,
RequestTimeout: 30 * time.Second,
ConnectionTimeout: 10 * time.Second,
},
RateLimit: RateLimitConfig{
Enabled: true,
RequestsPerSecond: 100.0,
BurstSize: 200,
},
}
factory := NewHTTPPluginFactory[MyRequest, MyResponse]()
plugin, err := factory.CreatePlugin(config)
if err != nil {
log.Fatal(err)
}
// Use the plugin
response, err := plugin.Execute(ctx, execCtx, request)
func (*HTTPPlugin[Req, Resp]) Close ¶
func (p *HTTPPlugin[Req, Resp]) Close() error
Close implements Plugin.Close
func (*HTTPPlugin[Req, Resp]) Execute ¶
func (p *HTTPPlugin[Req, Resp]) Execute(ctx context.Context, execCtx ExecutionContext, request Req) (Resp, error)
Execute implements Plugin.Execute
func (*HTTPPlugin[Req, Resp]) Health ¶
func (p *HTTPPlugin[Req, Resp]) Health(ctx context.Context) HealthStatus
Health implements Plugin.Health
func (*HTTPPlugin[Req, Resp]) Info ¶
func (p *HTTPPlugin[Req, Resp]) Info() PluginInfo
Info implements Plugin.Info
type HTTPPluginFactory ¶
type HTTPPluginFactory[Req, Resp any] struct{}
HTTPPluginFactory creates HTTP-based plugins with comprehensive configuration support.
This factory handles the creation of HTTPPlugin instances, managing HTTP client configuration, TLS setup, authentication, and connection pooling. It validates configurations to ensure security and proper operation.
Supported features:
- HTTP and HTTPS transports with proper TLS configuration
- Multiple authentication methods with security validation
- Connection pooling and timeout configuration
- Rate limiting setup and validation
- Comprehensive configuration validation
Example usage:
factory := NewHTTPPluginFactory[AuthRequest, AuthResponse]()
config := PluginConfig{
Transport: TransportHTTPS,
Endpoint: "https://auth.company.com/api/v1",
Auth: AuthConfig{
Method: AuthMTLS,
CertFile: "/etc/ssl/client.crt",
KeyFile: "/etc/ssl/client.key",
CAFile: "/etc/ssl/ca.crt",
},
}
plugin, err := factory.CreatePlugin(config)
if err != nil {
log.Fatalf("Failed to create plugin: %v", err)
}
func NewHTTPPluginFactory ¶
func NewHTTPPluginFactory[Req, Resp any]() *HTTPPluginFactory[Req, Resp]
NewHTTPPluginFactory creates a new HTTP plugin factory
func (*HTTPPluginFactory[Req, Resp]) CreatePlugin ¶
func (f *HTTPPluginFactory[Req, Resp]) CreatePlugin(config PluginConfig) (Plugin[Req, Resp], error)
CreatePlugin implements PluginFactory.CreatePlugin for HTTP transport
func (*HTTPPluginFactory[Req, Resp]) SupportedTransports ¶
func (f *HTTPPluginFactory[Req, Resp]) SupportedTransports() []string
SupportedTransports implements PluginFactory.SupportedTransports
func (*HTTPPluginFactory[Req, Resp]) ValidateConfig ¶
func (f *HTTPPluginFactory[Req, Resp]) ValidateConfig(config PluginConfig) error
ValidateConfig implements PluginFactory.ValidateConfig
type HTTPPluginRequest ¶
type HTTPPluginRequest[T any] struct { Data T `json:"data"` RequestID string `json:"request_id"` Timeout string `json:"timeout"` Headers map[string]string `json:"headers,omitempty"` Metadata map[string]string `json:"metadata,omitempty"` }
HTTPPluginRequest represents the standardized request format for HTTP plugins.
This structure defines the wire format for requests sent to HTTP-based plugins. It includes the actual request data along with metadata needed for proper request handling, tracing, and timeout management.
The format is designed to be plugin-agnostic while providing all necessary context for request processing. Plugins can expect this structure when receiving requests via HTTP transport.
Example JSON payload:
{
"data": {
"user_id": "12345",
"action": "authenticate"
},
"request_id": "req-abc-123",
"timeout": "30s",
"headers": {
"X-Source-Service": "auth-gateway"
},
"metadata": {
"trace_id": "trace-xyz-789"
}
}
type HTTPPluginResponse ¶
type HTTPPluginResponse[T any] struct { Data T `json:"data,omitempty"` Error string `json:"error,omitempty"` RequestID string `json:"request_id"` Metadata map[string]string `json:"metadata,omitempty"` }
HTTPPluginResponse represents the standardized response format for HTTP plugins.
This structure defines the wire format for responses received from HTTP-based plugins. It provides a consistent interface for handling both successful responses and errors, along with metadata for tracing and debugging.
The response format allows plugins to return either successful data or error information, making error handling consistent across all HTTP-based plugins. The request ID enables correlation with the original request for tracing.
Example successful response:
{
"data": {
"token": "jwt-token-here",
"expires_in": 3600
},
"request_id": "req-abc-123",
"metadata": {
"processing_time_ms": "150"
}
}
Example error response:
{
"error": "Invalid credentials provided",
"request_id": "req-abc-123",
"metadata": {
"error_code": "AUTH_FAILED"
}
}
type HealthCheckConfig ¶
type HealthCheckConfig struct {
Enabled bool `json:"enabled" yaml:"enabled"`
Interval time.Duration `json:"interval" yaml:"interval"`
Timeout time.Duration `json:"timeout" yaml:"timeout"`
FailureLimit int `json:"failure_limit" yaml:"failure_limit"`
Endpoint string `json:"endpoint,omitempty" yaml:"endpoint,omitempty"`
}
HealthCheckConfig contains health check settings for monitoring plugin availability.
Health checks are performed periodically to detect failed or degraded plugins and automatically route traffic away from unhealthy instances. The health checker maintains plugin status and provides early warning of issues.
Configuration behavior:
- Interval: How often to perform health checks
- Timeout: Maximum time to wait for health check response
- FailureLimit: Number of consecutive failures before marking plugin unhealthy
- Endpoint: Custom endpoint for health checks (optional, defaults to plugin endpoint)
Example configuration:
health := HealthCheckConfig{
Enabled: true,
Interval: 30 * time.Second, // Check every 30 seconds
Timeout: 5 * time.Second, // Fail if no response in 5 seconds
FailureLimit: 3, // Mark unhealthy after 3 failures
Endpoint: "/health", // Custom health endpoint
}
type HealthChecker ¶
type HealthChecker struct {
// contains filtered or unexported fields
}
HealthChecker monitors the health of a plugin through periodic health checks.
This component provides continuous monitoring of plugin availability and performance by executing health checks at regular intervals. It tracks consecutive failures and provides detailed health status information for operational visibility and automated recovery decisions.
Key features:
- Periodic health checking with configurable intervals
- Consecutive failure tracking with automatic degradation detection
- Thread-safe operation with atomic counters
- Graceful start/stop lifecycle management
- Detailed health status reporting with response times
Usage example:
config := HealthCheckConfig{
Enabled: true,
Interval: 30 * time.Second,
Timeout: 5 * time.Second,
FailureLimit: 3,
}
checker := NewHealthChecker(plugin, config)
// Check current health status
status := checker.Check()
if status.Status != StatusHealthy {
log.Printf("Plugin unhealthy: %s", status.Message)
}
// Stop monitoring when done
checker.Stop()
func NewHealthChecker ¶
func NewHealthChecker(plugin interface {
Health(ctx context.Context) HealthStatus
Close() error
}, config HealthCheckConfig) *HealthChecker
NewHealthChecker creates a new health checker for monitoring a plugin's availability.
The health checker will automatically start monitoring if health checking is enabled in the configuration. It runs in a separate goroutine and performs periodic health checks according to the specified interval.
Parameters:
- plugin: Must implement Health() and Close() methods
- config: Health check configuration including interval, timeout, and failure limits
The health checker maintains its own lifecycle and can be started/stopped independently of the plugin it monitors.
func (*HealthChecker) Check ¶
func (hc *HealthChecker) Check() HealthStatus
Check performs a single synchronous health check and returns the current status.
This method executes an immediate health check against the monitored plugin, respecting the configured timeout. It updates internal failure counters and determines the overall health status based on consecutive failure patterns.
Health status determination:
- StatusHealthy: Plugin responds successfully within timeout
- StatusOffline: Consecutive failures exceed the configured limit
- Other statuses: Based on plugin's own health assessment
The method is thread-safe and can be called independently of the periodic health checking goroutine. Response time and failure counts are tracked for operational visibility.
Returns HealthStatus with detailed information including status, message, response time, and timestamp.
func (*HealthChecker) Done ¶
func (hc *HealthChecker) Done() <-chan struct{}
Done returns a channel that will be closed when the health checker stops
func (*HealthChecker) GetConsecutiveFailures ¶
func (hc *HealthChecker) GetConsecutiveFailures() int64
GetConsecutiveFailures returns the number of consecutive failures
func (*HealthChecker) GetLastCheck ¶
func (hc *HealthChecker) GetLastCheck() time.Time
GetLastCheck returns the timestamp of the last health check
func (*HealthChecker) IsRunning ¶
func (hc *HealthChecker) IsRunning() bool
IsRunning returns true if the health checker is currently running
func (*HealthChecker) Start ¶
func (hc *HealthChecker) Start()
Start initiates the health checker's periodic monitoring goroutine.
This method starts continuous health monitoring according to the configured interval. If health checking is disabled in the configuration, this method has no effect. The method is idempotent - calling it multiple times is safe.
The health checker runs in its own goroutine and performs checks at regular intervals until stopped. Each check updates the plugin's health status and maintains failure counters for degradation detection.
Use Stop() to halt the monitoring when no longer needed.
func (*HealthChecker) Stop ¶
func (hc *HealthChecker) Stop()
Stop halts the health checker's periodic monitoring and waits for cleanup.
This method gracefully shuts down the health checking goroutine and waits for it to complete any in-flight health check before returning. The method is idempotent - calling it multiple times is safe.
After stopping, the health checker can be restarted with Start() if needed. Any ongoing health check will complete before the goroutine exits.
type HealthMonitor ¶
type HealthMonitor struct {
// contains filtered or unexported fields
}
HealthMonitor provides centralized health monitoring for multiple components.
This component manages multiple HealthChecker instances and provides a unified view of system health across all monitored plugins. It's designed for scenarios where you need to monitor multiple plugins and make system-wide decisions based on overall health status.
Features:
- Centralized management of multiple health checkers
- Aggregated health status calculation
- Thread-safe operations with proper synchronization
- Individual and overall health status reporting
- Graceful shutdown of all managed checkers
Usage example:
monitor := NewHealthMonitor()
// Add checkers for different plugins
monitor.AddChecker("auth-service", authHealthChecker)
monitor.AddChecker("payment-service", paymentHealthChecker)
// Get overall system health
overallHealth := monitor.GetOverallHealth()
if overallHealth.Status != StatusHealthy {
log.Printf("System degraded: %s", overallHealth.Message)
}
// Cleanup when done
monitor.Shutdown()
func NewHealthMonitor ¶
func NewHealthMonitor() *HealthMonitor
NewHealthMonitor creates a new centralized health monitor instance.
The health monitor starts empty and health checkers can be added dynamically using AddChecker(). It provides thread-safe operations for managing multiple health checkers and computing overall system health.
Returns a ready-to-use HealthMonitor that can manage multiple plugin health checkers and provide unified health status reporting.
func (*HealthMonitor) AddChecker ¶
func (hm *HealthMonitor) AddChecker(name string, checker *HealthChecker)
AddChecker adds a health checker for a named component
func (*HealthMonitor) GetAllStatus ¶
func (hm *HealthMonitor) GetAllStatus() map[string]HealthStatus
GetAllStatus returns the health status of all monitored components
func (*HealthMonitor) GetOverallHealth ¶
func (hm *HealthMonitor) GetOverallHealth() HealthStatus
GetOverallHealth computes and returns the aggregated health status of all monitored components.
This method analyzes the health status of all registered components and determines the overall system health using the following logic:
- StatusHealthy: All components are healthy
- StatusDegraded: Some components are degraded but none are offline/unhealthy
- StatusUnhealthy: One or more components are offline or unhealthy
The returned status includes a summary message describing any issues found across the monitored components. This is useful for system-level health checks and determining if the entire service should be considered available.
Returns HealthStatus representing the worst-case scenario across all monitored components, with an appropriate message describing the overall state.
func (*HealthMonitor) GetStatus ¶
func (hm *HealthMonitor) GetStatus(name string) (HealthStatus, bool)
GetStatus returns the health status of a specific component
func (*HealthMonitor) RemoveChecker ¶
func (hm *HealthMonitor) RemoveChecker(name string)
RemoveChecker removes a health checker
func (*HealthMonitor) Shutdown ¶
func (hm *HealthMonitor) Shutdown()
Shutdown stops all health checkers
func (*HealthMonitor) UpdateStatus ¶
func (hm *HealthMonitor) UpdateStatus(name string, status HealthStatus)
UpdateStatus updates the health status for a component
type HealthStatus ¶
type HealthStatus struct {
Status PluginStatus `json:"status"`
Message string `json:"message,omitempty"`
LastCheck time.Time `json:"last_check"`
ResponseTime time.Duration `json:"response_time"`
Metadata map[string]string `json:"metadata,omitempty"`
}
HealthStatus contains comprehensive health information about a plugin instance.
This structure provides detailed health assessment data used by monitoring systems, load balancers, and circuit breakers to make intelligent routing and recovery decisions. It includes both current status and historical timing information for trend analysis.
Fields:
- Status: Current operational status (healthy, degraded, unhealthy, offline)
- Message: Human-readable description of the current status
- LastCheck: Timestamp of when this status was determined
- ResponseTime: How long the health check took to complete
- Metadata: Additional context-specific information (error codes, version info, etc.)
Example usage:
health := plugin.Health(ctx)
if health.Status != StatusHealthy {
log.Printf("Plugin %s unhealthy: %s (response time: %v)",
pluginName, health.Message, health.ResponseTime)
}
type HistogramMetric ¶
HistogramMetric represents a histogram with native label support
type LoadBalanceRequest ¶
type LoadBalanceRequest struct {
RequestID string
Key string // For consistent hashing
Priority int // Request priority
Preferences []string // Preferred plugin names
}
LoadBalanceRequest contains request information for load balancing
type LoadBalancer ¶
type LoadBalancer[Req, Resp any] struct { // contains filtered or unexported fields }
LoadBalancer manages multiple plugins of the same type and distributes load using configurable strategies.
This component provides intelligent request distribution across multiple plugin instances, implementing various load balancing algorithms and maintaining detailed metrics for operational visibility and decision-making.
Key features:
- Multiple load balancing strategies (round-robin, least connections, etc.)
- Per-plugin metrics tracking (latency, success rate, health score)
- Dynamic plugin enable/disable without service interruption
- Weighted and priority-based routing
- Thread-safe operation with minimal contention
- Comprehensive statistics for monitoring and debugging
Usage example:
lb := NewLoadBalancer[MyRequest, MyResponse](StrategyLeastLatency, logger)
// Add plugin instances with different weights and priorities
lb.AddPlugin("primary", plugin1, 10, 100) // High weight, high priority
lb.AddPlugin("secondary", plugin2, 5, 50) // Lower weight, lower priority
// Execute request with automatic plugin selection
request := LoadBalanceRequest{
RequestID: "req-123",
Key: "user-456", // For consistent hashing
}
response, err := lb.Execute(ctx, execCtx, request, myRequest)
// Monitor performance
stats := lb.GetStats()
for name, stat := range stats {
fmt.Printf("Plugin %s: Success rate %.2f%%, Avg latency %v\n",
name, stat.GetSuccessRate(), stat.AverageLatency)
}
func NewLoadBalancer ¶
func NewLoadBalancer[Req, Resp any](strategy LoadBalancingStrategy, logger *slog.Logger) *LoadBalancer[Req, Resp]
NewLoadBalancer creates a new load balancer instance with the specified strategy.
The load balancer starts empty and plugins must be added using AddPlugin(). It immediately begins tracking metrics and applying the selected load balancing strategy to distribute requests among registered plugins.
Parameters:
- strategy: Load balancing algorithm to use for plugin selection
- logger: Logger instance for operational logging (uses default if nil)
Returns a fully initialized LoadBalancer ready to accept plugin registrations and handle request routing.
func (*LoadBalancer[Req, Resp]) AddPlugin ¶
func (lb *LoadBalancer[Req, Resp]) AddPlugin(name string, plugin Plugin[Req, Resp], weight, priority int) error
AddPlugin adds a plugin to the load balancer
func (*LoadBalancer[Req, Resp]) DisablePlugin ¶
func (lb *LoadBalancer[Req, Resp]) DisablePlugin(name string) error
DisablePlugin disables a plugin from load balancing
func (*LoadBalancer[Req, Resp]) EnablePlugin ¶
func (lb *LoadBalancer[Req, Resp]) EnablePlugin(name string) error
EnablePlugin enables a plugin for load balancing
func (*LoadBalancer[Req, Resp]) Execute ¶
func (lb *LoadBalancer[Req, Resp]) Execute(ctx context.Context, execCtx ExecutionContext, lbReq LoadBalanceRequest, request Req) (Resp, error)
Execute processes a request using intelligent load balancing and comprehensive metrics tracking.
This method implements the complete request lifecycle including plugin selection, request execution, metrics collection, and health score updates. It provides automatic failover and detailed performance tracking for operational visibility.
Request lifecycle:
- Select optimal plugin using configured load balancing strategy
- Track active connections and update metrics
- Execute request through selected plugin
- Record latency, success/failure, and update health scores
- Return response or error with full traceability
Parameters:
- ctx: Context for cancellation and timeout control
- execCtx: Execution context with request metadata and configuration
- lbReq: Load balancing request with selection hints (key, preferences)
- request: The actual request payload to be processed
Returns the response from the selected plugin or an error if all plugins are unavailable or the request fails.
Metrics collected:
- Request count, success/failure rates
- Response latency and connection counts
- Plugin health scores and availability
func (*LoadBalancer[Req, Resp]) GetStats ¶
func (lb *LoadBalancer[Req, Resp]) GetStats() map[string]LoadBalancerStats
GetStats returns current load balancing statistics
func (*LoadBalancer[Req, Resp]) RemovePlugin ¶
func (lb *LoadBalancer[Req, Resp]) RemovePlugin(name string) error
RemovePlugin removes a plugin from the load balancer
func (*LoadBalancer[Req, Resp]) SelectPlugin ¶
func (lb *LoadBalancer[Req, Resp]) SelectPlugin(lbReq LoadBalanceRequest) (string, Plugin[Req, Resp], error)
SelectPlugin selects the best plugin based on the load balancing strategy
type LoadBalancerStats ¶
type LoadBalancerStats struct {
PluginName string `json:"plugin_name"`
Weight int `json:"weight"`
Priority int `json:"priority"`
Enabled bool `json:"enabled"`
ActiveConnections int32 `json:"active_connections"`
TotalRequests int64 `json:"total_requests"`
SuccessfulRequests int64 `json:"successful_requests"`
FailedRequests int64 `json:"failed_requests"`
AverageLatency time.Duration `json:"average_latency"`
HealthScore int32 `json:"health_score"`
LastUsed time.Time `json:"last_used"`
}
LoadBalancerStats contains comprehensive statistics for a plugin in the load balancer.
This structure provides detailed metrics about plugin performance and usage patterns within the load balancer. The statistics are essential for monitoring, alerting, capacity planning, and debugging load balancing decisions.
Statistical categories:
- Configuration: Weight, priority, and enabled status
- Performance: Request counts, success rates, and latency metrics
- Health: Health score and availability indicators
- Operational: Active connections and last usage timestamp
Example usage:
stats := loadBalancer.GetStats()
for pluginName, stat := range stats {
successRate := stat.GetSuccessRate()
if successRate < 95.0 {
log.Printf("Plugin %s has low success rate: %.2f%%",
pluginName, successRate)
}
if stat.AverageLatency > 5*time.Second {
log.Printf("Plugin %s has high latency: %v",
pluginName, stat.AverageLatency)
}
}
func (LoadBalancerStats) GetSuccessRate ¶
func (stats LoadBalancerStats) GetSuccessRate() float64
GetSuccessRate calculates the success rate as a percentage (0-100).
This method computes the success rate based on successful requests divided by total requests. It returns 0.0 if no requests have been processed. The success rate is a key indicator of plugin health and reliability.
Returns:
- 0.0: No requests processed yet
- 0.0-100.0: Success percentage based on successful vs total requests
Usage for monitoring:
if stats.GetSuccessRate() < 99.0 {
alert("Plugin success rate below threshold")
}
type LoadBalancingStrategy ¶
type LoadBalancingStrategy string
LoadBalancingStrategy defines different load balancing algorithms for distributing requests.
Each strategy implements a different approach to selecting which plugin instance should handle a request, optimizing for different goals like fairness, performance, locality, or consistency.
Strategy descriptions:
- StrategyRoundRobin: Distributes requests evenly in circular order
- StrategyRandom: Selects plugins randomly for uniform distribution
- StrategyLeastConnections: Routes to plugin with fewest active connections
- StrategyLeastLatency: Routes to plugin with lowest average response time
- StrategyWeightedRandom: Random selection based on configured weights
- StrategyConsistentHash: Routes based on request key for session affinity
- StrategyPriority: Routes to highest priority healthy plugin
Example usage:
lb := NewLoadBalancer(StrategyLeastLatency, logger)
lb.AddPlugin("fast-service", plugin1, 1, 10) // weight=1, priority=10
lb.AddPlugin("backup-service", plugin2, 1, 5) // weight=1, priority=5
request := LoadBalanceRequest{Key: "user-123"}
name, plugin, err := lb.SelectPlugin(request)
const ( // StrategyRoundRobin distributes requests in round-robin fashion StrategyRoundRobin LoadBalancingStrategy = "round-robin" // StrategyRandom selects plugins randomly StrategyRandom LoadBalancingStrategy = "random" // StrategyLeastConnections selects plugin with least active connections StrategyLeastConnections LoadBalancingStrategy = "least-connections" // StrategyLeastLatency selects plugin with lowest average latency StrategyLeastLatency LoadBalancingStrategy = "least-latency" // StrategyWeightedRandom selects plugins based on weights with random distribution StrategyWeightedRandom LoadBalancingStrategy = "weighted-random" // StrategyConsistentHash uses consistent hashing for request routing StrategyConsistentHash LoadBalancingStrategy = "consistent-hash" // StrategyPriority selects highest priority healthy plugin StrategyPriority LoadBalancingStrategy = "priority" )
type Manager ¶
type Manager[Req, Resp any] struct { // contains filtered or unexported fields }
Manager implements PluginManager with comprehensive production-ready features.
This is the central component that orchestrates the entire plugin system, providing plugin lifecycle management, load balancing, circuit breaking, health monitoring, and observability. It's designed to handle enterprise-scale workloads with high availability and reliability requirements.
Core capabilities:
- Plugin registration and lifecycle management
- Automatic failover with circuit breaker patterns
- Health monitoring with automatic recovery
- Load balancing across multiple plugin instances
- Hot-reload configuration updates without service interruption
- Comprehensive metrics and structured logging
- Graceful shutdown with proper resource cleanup
Example usage:
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
manager := NewManager[AuthRequest, AuthResponse](logger)
// Register plugin factories
httpFactory := NewHTTPPluginFactory[AuthRequest, AuthResponse]()
manager.RegisterFactory("http", httpFactory)
// Load configuration
config := ManagerConfig{
Plugins: []PluginConfig{
{
Name: "auth-service",
Type: "http",
Transport: TransportHTTPS,
Endpoint: "https://auth.company.com/api/v1",
},
},
}
if err := manager.LoadFromConfig(config); err != nil {
log.Fatal(err)
}
// Execute requests
response, err := manager.Execute(ctx, "auth-service", request)
if err != nil {
log.Printf("Request failed: %v", err)
}
// Graceful shutdown
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
manager.Shutdown(ctx)
func NewManager ¶
NewManager creates a new production-ready plugin manager with comprehensive monitoring and resilience features.
The manager is initialized with sensible defaults and ready to accept plugin registrations. It includes built-in circuit breakers, health checkers, and metrics collection for operational visibility and automated recovery.
Parameters:
- logger: Structured logger for operational logging (uses default if nil)
The manager starts with empty plugin registries and must be configured with plugin factories and loaded with plugin configurations before use.
Returns a fully initialized Manager ready for plugin registration and configuration.
func (*Manager[Req, Resp]) Execute ¶
func (m *Manager[Req, Resp]) Execute(ctx context.Context, pluginName string, request Req) (Resp, error)
Execute implements PluginManager.Execute
func (*Manager[Req, Resp]) ExecuteWithOptions ¶
func (m *Manager[Req, Resp]) ExecuteWithOptions(ctx context.Context, pluginName string, execCtx ExecutionContext, request Req) (Resp, error)
ExecuteWithOptions implements PluginManager.ExecuteWithOptions
func (*Manager[Req, Resp]) GetMetrics ¶
func (m *Manager[Req, Resp]) GetMetrics() ManagerMetrics
GetMetrics returns current operational metrics
func (*Manager[Req, Resp]) Health ¶
func (m *Manager[Req, Resp]) Health() map[string]HealthStatus
Health implements PluginManager.Health
func (*Manager[Req, Resp]) ListPlugins ¶
func (m *Manager[Req, Resp]) ListPlugins() map[string]HealthStatus
ListPlugins implements PluginManager.ListPlugins
func (*Manager[Req, Resp]) LoadFromConfig ¶
func (m *Manager[Req, Resp]) LoadFromConfig(config ManagerConfig) error
LoadFromConfig implements PluginManager.LoadFromConfig
func (*Manager[Req, Resp]) RegisterFactory ¶
func (m *Manager[Req, Resp]) RegisterFactory(pluginType string, factory PluginFactory[Req, Resp]) error
RegisterFactory registers a plugin factory for a specific plugin type
func (*Manager[Req, Resp]) ReloadConfig ¶
func (m *Manager[Req, Resp]) ReloadConfig(config ManagerConfig) error
ReloadConfig implements PluginManager.ReloadConfig
func (*Manager[Req, Resp]) ReloadConfigWithStrategy ¶
func (m *Manager[Req, Resp]) ReloadConfigWithStrategy(config ManagerConfig, strategy ReloadStrategy) error
ReloadConfigWithStrategy reloads configuration with specified strategy
func (*Manager[Req, Resp]) Unregister ¶
Unregister implements PluginManager.Unregister
type ManagerConfig ¶
type ManagerConfig struct {
// Global settings
LogLevel string `json:"log_level" yaml:"log_level"`
MetricsPort int `json:"metrics_port" yaml:"metrics_port"`
// Default configurations that apply to all plugins unless overridden
DefaultRetry RetryConfig `json:"default_retry" yaml:"default_retry"`
DefaultCircuitBreaker CircuitBreakerConfig `json:"default_circuit_breaker" yaml:"default_circuit_breaker"`
DefaultHealthCheck HealthCheckConfig `json:"default_health_check" yaml:"default_health_check"`
DefaultConnection ConnectionConfig `json:"default_connection" yaml:"default_connection"`
DefaultRateLimit RateLimitConfig `json:"default_rate_limit" yaml:"default_rate_limit"`
// Plugin configurations
Plugins []PluginConfig `json:"plugins" yaml:"plugins"`
// Plugin discovery settings
Discovery DiscoveryConfig `json:"discovery,omitempty" yaml:"discovery,omitempty"`
}
ManagerConfig represents the comprehensive configuration for the plugin manager.
This is the top-level configuration that controls the entire plugin system, including global settings, default policies, and the collection of plugin configurations. It provides a centralized way to manage plugin behavior and operational parameters.
Configuration structure:
- Global: System-wide settings like logging and metrics
- Defaults: Default configurations applied to all plugins
- Plugins: Individual plugin configurations
- Discovery: Automatic plugin discovery settings
The manager applies default configurations to plugins that don't specify their own values, allowing for consistent behavior across the system while still permitting per-plugin customization.
Example configuration:
config := ManagerConfig{
LogLevel: "info",
MetricsPort: 9090,
DefaultRetry: RetryConfig{
MaxRetries: 3,
InitialInterval: 100 * time.Millisecond,
MaxInterval: 5 * time.Second,
Multiplier: 2.0,
RandomJitter: true,
},
DefaultHealthCheck: HealthCheckConfig{
Enabled: true,
Interval: 30 * time.Second,
Timeout: 5 * time.Second,
FailureLimit: 3,
},
Plugins: []PluginConfig{
{
Name: "auth-service",
Transport: TransportHTTPS,
Endpoint: "https://auth.example.com/api/v1",
// Inherits default retry and health check settings
},
},
}
func GetDefaultManagerConfig ¶
func GetDefaultManagerConfig() ManagerConfig
GetDefaultManagerConfig returns a ManagerConfig with sensible production-ready defaults.
This function provides a baseline configuration suitable for most production environments. The defaults are chosen to balance reliability, performance, and resource usage. Users can customize specific settings as needed.
Default configuration includes:
- Info-level logging with metrics on port 9090
- Conservative retry policy with exponential backoff
- Circuit breaker enabled with reasonable thresholds
- Health checking enabled with 30-second intervals
- Connection pooling with moderate limits
- Rate limiting disabled by default
Usage:
config := GetDefaultManagerConfig()
config.LogLevel = "debug" // Customize as needed
config.Plugins = []PluginConfig{
// Add your plugins here
}
func (*ManagerConfig) ApplyDefaults ¶
func (mc *ManagerConfig) ApplyDefaults()
ApplyDefaults applies default configurations to plugins that don't specify them
func (*ManagerConfig) FromJSON ¶
func (mc *ManagerConfig) FromJSON(data []byte) error
FromJSON loads configuration from JSON
func (*ManagerConfig) ToJSON ¶
func (mc *ManagerConfig) ToJSON() ([]byte, error)
func (*ManagerConfig) Validate ¶
func (mc *ManagerConfig) Validate() error
Validate validates the manager configuration
type ManagerMetrics ¶
type ManagerMetrics struct {
RequestsTotal atomic.Int64
RequestsSuccess atomic.Int64
RequestsFailure atomic.Int64
RequestDuration atomic.Int64 // nanoseconds
CircuitBreakerTrips atomic.Int64
HealthCheckFailures atomic.Int64
}
ManagerMetrics tracks operational metrics
type MetricsCollector ¶
type MetricsCollector interface {
// Counter metrics
IncrementCounter(name string, labels map[string]string, value int64)
// Gauge metrics
SetGauge(name string, labels map[string]string, value float64)
// Histogram metrics
RecordHistogram(name string, labels map[string]string, value float64)
// Custom metrics
RecordCustomMetric(name string, labels map[string]string, value interface{})
// Get current metrics snapshot
GetMetrics() map[string]interface{}
}
MetricsCollector defines interface for collecting plugin metrics
type ObservabilityConfig ¶
type ObservabilityConfig struct {
// Metrics
MetricsEnabled bool `json:"metrics_enabled"`
MetricsCollector MetricsCollector `json:"-"`
EnhancedMetricsCollector EnhancedMetricsCollector `json:"-"` // Optional: for native label support
MetricsPrefix string `json:"metrics_prefix"`
// Tracing
TracingEnabled bool `json:"tracing_enabled"`
TracingProvider TracingProvider `json:"-"`
TracingSampleRate float64 `json:"tracing_sample_rate"`
// Logging
LoggingEnabled bool `json:"logging_enabled"`
LogLevel string `json:"log_level"`
StructuredLogging bool `json:"structured_logging"`
// Health monitoring
HealthMetrics bool `json:"health_metrics"`
PerformanceMetrics bool `json:"performance_metrics"`
ErrorMetrics bool `json:"error_metrics"`
}
ObservabilityConfig configures the observability system
func DefaultObservabilityConfig ¶
func DefaultObservabilityConfig() ObservabilityConfig
DefaultObservabilityConfig returns default observability configuration
func EnhancedObservabilityConfig ¶
func EnhancedObservabilityConfig() ObservabilityConfig
EnhancedObservabilityConfig returns observability configuration with enhanced metrics collector
type ObservabilityReport ¶
type ObservabilityReport struct {
GeneratedAt time.Time `json:"generated_at"`
UpTime time.Duration `json:"uptime"`
Global GlobalMetrics `json:"global"`
Plugins map[string]PluginMetricsReport `json:"plugins"`
}
ObservabilityReport contains comprehensive metrics report
type ObservableManager ¶
type ObservableManager[Req, Resp any] struct { *Manager[Req, Resp] // contains filtered or unexported fields }
ObservableManager extends Manager with comprehensive observability
func NewObservableManager ¶
func NewObservableManager[Req, Resp any](baseManager *Manager[Req, Resp], config ObservabilityConfig) *ObservableManager[Req, Resp]
NewObservableManager creates a new observable manager
func (*ObservableManager[Req, Resp]) ExecuteWithObservability ¶
func (om *ObservableManager[Req, Resp]) ExecuteWithObservability(ctx context.Context, pluginName string, execCtx ExecutionContext, request Req) (Resp, error)
ExecuteWithObservability executes a request with full observability
func (*ObservableManager[Req, Resp]) GetObservabilityMetrics ¶
func (om *ObservableManager[Req, Resp]) GetObservabilityMetrics() ObservabilityReport
GetObservabilityMetrics returns comprehensive metrics
type Plugin ¶
type Plugin[Req, Resp any] interface { // Info returns metadata about the plugin Info() PluginInfo // Execute processes a request and returns a response // Context should be honored for timeouts and cancellation Execute(ctx context.Context, execCtx ExecutionContext, request Req) (Resp, error) // Health performs a health check and returns detailed status Health(ctx context.Context) HealthStatus // Close gracefully shuts down the plugin and cleans up resources // Should be idempotent (safe to call multiple times) Close() error }
Plugin represents a generic plugin that can process requests of type Req and return responses of type Resp This interface is designed to be transport-agnostic and production-ready
type PluginConfig ¶
type PluginConfig struct {
// Basic plugin information
Name string `json:"name" yaml:"name"`
Type string `json:"type" yaml:"type"`
Transport TransportType `json:"transport" yaml:"transport"`
Endpoint string `json:"endpoint" yaml:"endpoint"`
Priority int `json:"priority" yaml:"priority"`
Enabled bool `json:"enabled" yaml:"enabled"`
// Executable-specific configuration
Executable string `json:"executable,omitempty" yaml:"executable,omitempty"`
Args []string `json:"args,omitempty" yaml:"args,omitempty"`
Env []string `json:"env,omitempty" yaml:"env,omitempty"`
WorkDir string `json:"work_dir,omitempty" yaml:"work_dir,omitempty"`
// Security and authentication
Auth AuthConfig `json:"auth" yaml:"auth"`
// Resilience and reliability
Retry RetryConfig `json:"retry" yaml:"retry"`
CircuitBreaker CircuitBreakerConfig `json:"circuit_breaker" yaml:"circuit_breaker"`
HealthCheck HealthCheckConfig `json:"health_check" yaml:"health_check"`
Connection ConnectionConfig `json:"connection" yaml:"connection"`
RateLimit RateLimitConfig `json:"rate_limit" yaml:"rate_limit"`
// Plugin-specific configuration
Options map[string]interface{} `json:"options,omitempty" yaml:"options,omitempty"`
// Metadata
Labels map[string]string `json:"labels,omitempty" yaml:"labels,omitempty"`
Annotations map[string]string `json:"annotations,omitempty" yaml:"annotations,omitempty"`
}
PluginConfig represents the comprehensive configuration for a single plugin instance.
This is the main configuration structure that defines how a plugin connects, authenticates, and behaves within the plugin system. It combines transport configuration, security settings, resilience patterns, and operational parameters.
Configuration sections:
- Basic: Name, type, transport, endpoint identification
- Executable: Process execution settings (for exec transport)
- Security: Authentication and authorization configuration
- Resilience: Retry, circuit breaker, health check settings
- Performance: Connection pooling, rate limiting configuration
- Metadata: Labels and annotations for organization and discovery
Example configurations:
// HTTP plugin with API key authentication
httpPlugin := PluginConfig{
Name: "payment-service",
Type: "payment",
Transport: TransportHTTPS,
Endpoint: "https://payments.example.com/api/v1",
Enabled: true,
Priority: 1,
Auth: AuthConfig{
Method: AuthAPIKey,
APIKey: "your-api-key",
},
Retry: RetryConfig{
MaxRetries: 3,
InitialInterval: 100 * time.Millisecond,
MaxInterval: 5 * time.Second,
Multiplier: 2.0,
RandomJitter: true,
},
Labels: map[string]string{
"environment": "production",
"version": "v1.2.3",
},
}
// Executable plugin with custom environment
execPlugin := PluginConfig{
Name: "data-processor",
Type: "processor",
Transport: TransportExecutable,
Executable: "/opt/processors/data-processor",
Args: []string{"--config", "/etc/processor.conf"},
Env: []string{"LOG_LEVEL=info", "MAX_MEMORY=1GB"},
WorkDir: "/tmp/processor",
}
func (*PluginConfig) Validate ¶
func (pc *PluginConfig) Validate() error
Validate validates the plugin configuration
type PluginDiff ¶
type PluginDiff struct {
Added []PluginConfig `json:"added"`
Updated []PluginUpdate `json:"updated"`
Removed []string `json:"removed"`
Unchanged []string `json:"unchanged"`
}
PluginDiff represents the differences between old and new plugin configurations
type PluginFactory ¶
type PluginFactory[Req, Resp any] interface { // CreatePlugin creates a new plugin instance from the given configuration CreatePlugin(config PluginConfig) (Plugin[Req, Resp], error) // SupportedTransports returns the list of supported transport protocols SupportedTransports() []string // ValidateConfig validates a plugin configuration without creating the plugin ValidateConfig(config PluginConfig) error }
PluginFactory creates new plugin instances from configuration
type PluginInfo ¶
type PluginInfo struct {
Name string `json:"name"`
Version string `json:"version"`
Description string `json:"description,omitempty"`
Author string `json:"author,omitempty"`
Capabilities []string `json:"capabilities,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
}
PluginInfo contains comprehensive metadata about a plugin instance.
This structure provides essential information about a plugin's identity, capabilities, and characteristics. It's used for plugin discovery, version management, capability matching, and operational visibility.
Fields:
- Name: Unique identifier for the plugin instance
- Version: Plugin version for compatibility and update management
- Description: Human-readable description of plugin functionality
- Author: Plugin developer/maintainer information
- Capabilities: List of features or operations the plugin supports
- Metadata: Additional key-value pairs for custom plugin information
Example:
info := plugin.Info()
fmt.Printf("Plugin: %s v%s by %s\n", info.Name, info.Version, info.Author)
fmt.Printf("Capabilities: %v\n", info.Capabilities)
type PluginLoadMetrics ¶
type PluginLoadMetrics struct {
TotalRequests atomic.Int64
SuccessfulRequests atomic.Int64
FailedRequests atomic.Int64
ActiveConnections atomic.Int32
AverageLatency atomic.Int64 // Nanoseconds
LastLatency atomic.Int64 // Nanoseconds
LastUpdate atomic.Int64 // Unix timestamp
HealthScore atomic.Int32 // 0-100 scale
}
PluginLoadMetrics tracks metrics for load balancing decisions
type PluginManager ¶
type PluginManager[Req, Resp any] interface { // Register adds a plugin to the manager Register(plugin Plugin[Req, Resp]) error // Unregister removes a plugin from the manager Unregister(name string) error // Execute routes a request to the appropriate plugin // Includes automatic retries, circuit breaking, and fallback handling Execute(ctx context.Context, pluginName string, request Req) (Resp, error) // ExecuteWithOptions executes with custom execution context ExecuteWithOptions(ctx context.Context, pluginName string, execCtx ExecutionContext, request Req) (Resp, error) // GetPlugin returns a specific plugin by name GetPlugin(name string) (Plugin[Req, Resp], error) // ListPlugins returns all registered plugin names and their health status ListPlugins() map[string]HealthStatus // LoadFromConfig loads plugins from a configuration file or object LoadFromConfig(config ManagerConfig) error // ReloadConfig hot-reloads configuration without stopping the manager ReloadConfig(config ManagerConfig) error // Health returns the overall health of all plugins Health() map[string]HealthStatus // Shutdown gracefully shuts down all plugins and cleans up resources Shutdown(ctx context.Context) error }
PluginManager manages a collection of plugins and provides load balancing, circuit breaking, and health monitoring
type PluginMetricsReport ¶
type PluginMetricsReport struct {
TotalRequests int64 `json:"total_requests"`
SuccessfulRequests int64 `json:"successful_requests"`
FailedRequests int64 `json:"failed_requests"`
ActiveRequests int64 `json:"active_requests"`
SuccessRate float64 `json:"success_rate_percent"`
MinLatency time.Duration `json:"min_latency"`
MaxLatency time.Duration `json:"max_latency"`
AvgLatency time.Duration `json:"avg_latency"`
TimeoutErrors int64 `json:"timeout_errors"`
ConnectionErrors int64 `json:"connection_errors"`
AuthErrors int64 `json:"auth_errors"`
OtherErrors int64 `json:"other_errors"`
CircuitBreakerTrips int64 `json:"circuit_breaker_trips"`
HealthCheckTotal int64 `json:"health_check_total"`
HealthCheckFailed int64 `json:"health_check_failed"`
}
PluginMetricsReport contains metrics for a single plugin
type PluginObservabilityMetrics ¶
type PluginObservabilityMetrics struct {
// Request metrics
TotalRequests *atomic.Int64
SuccessfulRequests *atomic.Int64
FailedRequests *atomic.Int64
ActiveRequests *atomic.Int64
// Timing metrics
TotalLatency *atomic.Int64 // Total latency in nanoseconds
MinLatency *atomic.Int64
MaxLatency *atomic.Int64
AvgLatency *atomic.Int64
// Error metrics
TimeoutErrors *atomic.Int64
ConnectionErrors *atomic.Int64
AuthErrors *atomic.Int64
OtherErrors *atomic.Int64
// Circuit breaker metrics
CircuitBreakerTrips *atomic.Int64
CircuitBreakerState string
// Health metrics
HealthCheckTotal *atomic.Int64
HealthCheckFailed *atomic.Int64
LastHealthCheck *atomic.Int64 // Unix timestamp
HealthStatus string
}
PluginObservabilityMetrics contains detailed metrics for a single plugin
type PluginReloader ¶
type PluginReloader[Req, Resp any] struct { // contains filtered or unexported fields }
PluginReloader manages the hot-reload process
func NewPluginReloader ¶
func NewPluginReloader[Req, Resp any](manager *Manager[Req, Resp], options ReloadOptions, logger *slog.Logger) *PluginReloader[Req, Resp]
NewPluginReloader creates a new plugin reloader
func (*PluginReloader[Req, Resp]) ReloadWithIntelligentDiff ¶
func (r *PluginReloader[Req, Resp]) ReloadWithIntelligentDiff(ctx context.Context, newConfig ManagerConfig) error
ReloadWithIntelligentDiff performs intelligent hot-reload based on configuration diff
type PluginStatus ¶
type PluginStatus int
PluginStatus represents the current operational status of a plugin instance.
Status levels indicate the plugin's ability to handle requests and its overall health:
- StatusUnknown: Initial state or status cannot be determined
- StatusHealthy: Plugin is fully operational and handling requests normally
- StatusDegraded: Plugin is operational but performance may be impacted
- StatusUnhealthy: Plugin has issues but may still handle some requests
- StatusOffline: Plugin is not responding and should not receive requests
These statuses are used by load balancers, circuit breakers, and health monitoring systems to make routing and recovery decisions.
const ( StatusUnknown PluginStatus = iota StatusHealthy StatusDegraded StatusUnhealthy StatusOffline )
func (PluginStatus) String ¶
func (s PluginStatus) String() string
type PluginUpdate ¶
type PluginUpdate struct {
Name string `json:"name"`
OldConfig PluginConfig `json:"old_config"`
NewConfig PluginConfig `json:"new_config"`
Changes []string `json:"changes"`
}
PluginUpdate represents an update to an existing plugin
type PluginWrapper ¶
type PluginWrapper[Req, Resp any] struct { Plugin Plugin[Req, Resp] Weight int Priority int Active *atomic.Int32 // Active connection count Enabled *atomic.Bool LastUsed *atomic.Int64 // Unix timestamp }
PluginWrapper wraps a plugin with load balancing metadata
type PrometheusBucket ¶
type PrometheusBucket struct {
UpperBound float64 `json:"upper_bound"`
Count uint64 `json:"count"`
}
PrometheusBucket represents a histogram bucket
type PrometheusMetric ¶
type PrometheusMetric struct {
Name string `json:"name"`
Type string `json:"type"` // counter, gauge, histogram
Description string `json:"description"`
Value float64 `json:"value,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
Buckets []PrometheusBucket `json:"buckets,omitempty"` // For histograms
}
PrometheusMetric represents a metric in Prometheus format
type RateLimitConfig ¶
type RateLimitConfig struct {
Enabled bool `json:"enabled" yaml:"enabled"`
RequestsPerSecond float64 `json:"requests_per_second" yaml:"requests_per_second"`
BurstSize int `json:"burst_size" yaml:"burst_size"`
TimeWindow time.Duration `json:"time_window" yaml:"time_window"`
}
RateLimitConfig contains rate limiting settings for controlling plugin request rates.
Rate limiting protects plugins from being overwhelmed and ensures fair resource distribution. It implements a token bucket algorithm that allows burst traffic while maintaining average rate limits.
Token bucket algorithm:
- BurstSize: Maximum tokens available (allows burst of requests)
- RequestsPerSecond: Rate at which tokens are replenished
- TimeWindow: Period over which the rate is measured
- Tokens are consumed for each request and replenished at the configured rate
Example configuration:
rateLimit := RateLimitConfig{
Enabled: true,
RequestsPerSecond: 10.0, // Allow 10 requests per second
BurstSize: 20, // Allow bursts up to 20 requests
TimeWindow: time.Second, // Rate window of 1 second
}
// This allows up to 20 requests immediately, then 10 per second thereafter
type RateLimiter ¶
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter implements token bucket rate limiting
func NewRateLimiter ¶
func NewRateLimiter(config RateLimitConfig) *RateLimiter
NewRateLimiter creates a new rate limiter
func (*RateLimiter) Allow ¶
func (rl *RateLimiter) Allow() bool
Allow checks if a request should be allowed
type ReloadOptions ¶
type ReloadOptions struct {
Strategy ReloadStrategy `json:"strategy"`
DrainTimeout time.Duration `json:"drain_timeout"`
GracefulTimeout time.Duration `json:"graceful_timeout"`
MaxConcurrentReloads int `json:"max_concurrent_reloads"`
HealthCheckTimeout time.Duration `json:"health_check_timeout"`
RollbackOnFailure bool `json:"rollback_on_failure"`
}
ReloadOptions configures how hot-reload should behave
func DefaultReloadOptions ¶
func DefaultReloadOptions() ReloadOptions
DefaultReloadOptions returns sensible defaults for reload options
type ReloadStrategy ¶
type ReloadStrategy string
ReloadStrategy defines how plugins should be reloaded
const ( // ReloadStrategyRecreate completely removes and recreates plugins (current behavior) ReloadStrategyRecreate ReloadStrategy = "recreate" // ReloadStrategyGraceful drains connections gracefully before updating ReloadStrategyGraceful ReloadStrategy = "graceful" // ReloadStrategyRolling performs rolling updates with zero downtime ReloadStrategyRolling ReloadStrategy = "rolling" )
type RetryConfig ¶
type RetryConfig struct {
MaxRetries int `json:"max_retries" yaml:"max_retries"`
InitialInterval time.Duration `json:"initial_interval" yaml:"initial_interval"`
MaxInterval time.Duration `json:"max_interval" yaml:"max_interval"`
Multiplier float64 `json:"multiplier" yaml:"multiplier"`
RandomJitter bool `json:"random_jitter" yaml:"random_jitter"`
}
RetryConfig contains retry and backoff configuration for failed plugin requests.
This configuration implements an exponential backoff strategy with optional random jitter to prevent thundering herd problems when multiple clients retry simultaneously.
The retry logic works as follows:
- First retry after InitialInterval
- Each subsequent retry multiplies the interval by Multiplier
- Interval never exceeds MaxInterval
- RandomJitter adds up to ±10% randomness to intervals
- Stop retrying after MaxRetries attempts
Example configuration:
retry := RetryConfig{
MaxRetries: 3,
InitialInterval: 100 * time.Millisecond,
MaxInterval: 5 * time.Second,
Multiplier: 2.0,
RandomJitter: true,
}
// This results in delays of ~100ms, ~200ms, ~400ms (with jitter)
type Span ¶
type Span interface {
// Set span attributes
SetAttribute(key string, value interface{})
// Set span status
SetStatus(code SpanStatusCode, message string)
// Finish the span
Finish()
// Get span context for propagation
Context() interface{}
}
Span represents a tracing span
type SpanStatusCode ¶
type SpanStatusCode int
SpanStatusCode represents span status
const ( SpanStatusOK SpanStatusCode = iota SpanStatusError SpanStatusTimeout )
type TracingProvider ¶
type TracingProvider interface {
// Start a new span
StartSpan(ctx context.Context, operationName string) (context.Context, Span)
// Extract tracing context from headers/metadata
ExtractContext(headers map[string]string) context.Context
// Inject tracing context into headers/metadata
InjectContext(ctx context.Context) map[string]string
}
TracingProvider defines interface for distributed tracing
type TransportType ¶
type TransportType string
TransportType represents the different transport protocols supported by the plugin system.
Each transport type defines how plugins communicate with their underlying services:
- HTTP/HTTPS: REST API communication over standard HTTP(S) protocols
- gRPC: High-performance RPC communication with optional TLS
- Unix sockets: Local inter-process communication via Unix domain sockets
- Executable: Direct execution of external processes
Example usage:
config := PluginConfig{
Transport: TransportHTTPS,
Endpoint: "https://api.example.com/v1/plugin",
}
const ( TransportHTTP TransportType = "http" TransportHTTPS TransportType = "https" TransportGRPC TransportType = "grpc" TransportGRPCTLS TransportType = "grpc-tls" TransportUnix TransportType = "unix" TransportExecutable TransportType = "exec" )
type UnixConnectionPool ¶
type UnixConnectionPool struct {
// contains filtered or unexported fields
}
UnixConnectionPool manages a pool of Unix socket connections
func NewUnixConnectionPool ¶
func NewUnixConnectionPool(socketPath string, maxConnections int) *UnixConnectionPool
NewUnixConnectionPool creates a new Unix socket connection pool
func (*UnixConnectionPool) Close ¶
func (p *UnixConnectionPool) Close() error
Close closes all connections in the pool
func (*UnixConnectionPool) GetConnection ¶
func (p *UnixConnectionPool) GetConnection() (net.Conn, error)
GetConnection gets a connection from the pool or creates a new one
func (*UnixConnectionPool) ReturnConnection ¶
func (p *UnixConnectionPool) ReturnConnection(conn net.Conn)
ReturnConnection returns a connection to the pool
type UnixSocketMessage ¶
type UnixSocketMessage struct {
Type string `json:"type"`
RequestID string `json:"request_id"`
Data json.RawMessage `json:"data"`
Headers map[string]string `json:"headers,omitempty"`
Timeout int64 `json:"timeout_ms,omitempty"`
}
UnixSocketMessage represents a standardized message format for Unix socket communication.
This structure defines the wire protocol for Unix domain socket communication, providing a consistent message format that supports different operation types and includes all necessary metadata for request handling and correlation.
Message types:
- "execute": Normal plugin execution request
- "health": Health check request
- "info": Plugin information request
- "ping": Connection test/keepalive
Example message for execution:
{
"type": "execute",
"request_id": "req-12345",
"data": {"user_id": "user123", "action": "validate"},
"headers": {"X-Trace-ID": "trace-abc"},
"timeout_ms": 5000
}
The JSON format ensures consistency with other transports while maintaining the performance benefits of Unix domain sockets.
type UnixSocketPlugin ¶
type UnixSocketPlugin[Req, Resp any] struct { // contains filtered or unexported fields }
UnixSocketPlugin implements Plugin interface using Unix Domain Sockets for high-performance local communication.
This implementation provides the fastest possible communication mechanism for plugins running on the same machine, using Unix domain sockets to eliminate network overhead. It's ideal for high-throughput scenarios where plugins and the main application are co-located.
Features:
- Ultra-low latency local communication via Unix domain sockets
- Connection pooling for high concurrency scenarios
- JSON-based message protocol for consistency with other transports
- Proper connection lifecycle and error handling
- Request correlation and timeout management
- Health checking with rapid local validation
Performance characteristics:
- Lowest possible latency (no network stack)
- Highest throughput for local communication
- Zero network security concerns (local socket)
- No serialization overhead beyond JSON
Example usage:
config := PluginConfig{
Name: "local-processor",
Transport: TransportUnix,
Endpoint: "/tmp/processor.sock", // Unix socket path
Connection: ConnectionConfig{
MaxConnections: 20, // Pool size for concurrent requests
},
}
plugin, err := NewUnixSocketPlugin[ProcessRequest, ProcessResponse](config, logger)
if err != nil {
log.Fatal(err)
}
// High-speed local communication
response, err := plugin.Execute(ctx, execCtx, request)
if err != nil {
log.Printf("Local processing failed: %v", err)
}
// Cleanup
defer plugin.Close()
func NewUnixSocketPlugin ¶
func NewUnixSocketPlugin[Req, Resp any](config PluginConfig, logger *slog.Logger) (*UnixSocketPlugin[Req, Resp], error)
NewUnixSocketPlugin creates a new Unix socket plugin instance
func (*UnixSocketPlugin[Req, Resp]) Close ¶
func (u *UnixSocketPlugin[Req, Resp]) Close() error
Close closes the Unix socket connection pool
func (*UnixSocketPlugin[Req, Resp]) Execute ¶
func (u *UnixSocketPlugin[Req, Resp]) Execute(ctx context.Context, execCtx ExecutionContext, request Req) (Resp, error)
Execute processes a request using Unix socket
func (*UnixSocketPlugin[Req, Resp]) Health ¶
func (u *UnixSocketPlugin[Req, Resp]) Health(ctx context.Context) HealthStatus
Health performs a health check via Unix socket
func (*UnixSocketPlugin[Req, Resp]) Info ¶
func (u *UnixSocketPlugin[Req, Resp]) Info() PluginInfo
Info returns plugin information
type UnixSocketPluginFactory ¶
type UnixSocketPluginFactory[Req, Resp any] struct { // contains filtered or unexported fields }
UnixSocketPluginFactory creates Unix socket plugin instances
func NewUnixSocketPluginFactory ¶
func NewUnixSocketPluginFactory[Req, Resp any](logger *slog.Logger) *UnixSocketPluginFactory[Req, Resp]
NewUnixSocketPluginFactory creates a new Unix socket plugin factory
func (*UnixSocketPluginFactory[Req, Resp]) CreatePlugin ¶
func (f *UnixSocketPluginFactory[Req, Resp]) CreatePlugin(config PluginConfig) (Plugin[Req, Resp], error)
CreatePlugin creates a new Unix socket plugin instance
func (*UnixSocketPluginFactory[Req, Resp]) SupportedTransports ¶
func (f *UnixSocketPluginFactory[Req, Resp]) SupportedTransports() []string
SupportedTransports returns supported transport types
func (*UnixSocketPluginFactory[Req, Resp]) ValidateConfig ¶
func (f *UnixSocketPluginFactory[Req, Resp]) ValidateConfig(config PluginConfig) error
ValidateConfig validates Unix socket plugin configuration
type UnixSocketResponse ¶
type UnixSocketResponse struct {
Type string `json:"type"`
RequestID string `json:"request_id"`
Success bool `json:"success"`
Data json.RawMessage `json:"data,omitempty"`
Error string `json:"error,omitempty"`
Headers map[string]string `json:"headers,omitempty"`
}
UnixSocketResponse represents a standardized response format for Unix socket communication.
This structure defines the response protocol for Unix domain socket communication, providing consistent success/error handling and metadata propagation. It enables proper request correlation and comprehensive error reporting.
Response handling:
- Success=true: Data field contains the successful response
- Success=false: Error field contains the error message
- RequestID matches the original request for correlation
- Headers can include metadata like processing time, trace info
Example successful response:
{
"type": "execute",
"request_id": "req-12345",
"success": true,
"data": {"token": "abc123", "expires": 3600},
"headers": {"X-Processing-Time-Ms": "23"}
}
Example error response:
{
"type": "execute",
"request_id": "req-12345",
"success": false,
"error": "Invalid user credentials",
"headers": {"X-Error-Code": "AUTH_FAILED"}
}