Documentation
¶
Overview ¶
Package middleware provides composable middleware for ProcessFunc.
Middleware wraps a ProcessFunc with additional behavior such as context management, panic recovery, retry logic, and metrics collection.
Index ¶
- Variables
- func SetDefaultLogConfig(config LogConfig)
- func SetDefaultLogger(l Logger)
- type BackoffFunc
- type LogConfig
- type LogLevel
- type Logger
- type Metadata
- type Metrics
- type MetricsCollector
- type Middleware
- func Log[In, Out any](config LogConfig) Middleware[In, Out]
- func MetadataProvider[In, Out any](provider func(in In) Metadata) Middleware[In, Out]
- func MetricsMiddleware[In, Out any](collect MetricsCollector) Middleware[In, Out]
- func Recover[In, Out any]() Middleware[In, Out]
- func Retry[In, Out any](cfg RetryConfig) Middleware[In, Out]
- type ProcessFunc
- type RecoveryError
- type RetryConfig
- type RetryState
- type ShouldRetryFunc
Constants ¶
This section is empty.
Variables ¶
var ( ErrFailure = errors.New("processing failed") ErrCancel = errors.New("processing cancelled") )
Sentinel errors for metrics categorization.
var ( // ErrRetry is the base error for retry operations. ErrRetry = errors.New("gopipe retry") // ErrRetryMaxAttempts is returned when all retry attempts fail. ErrRetryMaxAttempts = fmt.Errorf("%w: max attempts reached", ErrRetry) // ErrRetryTimeout is returned when the overall retry operation times out. ErrRetryTimeout = fmt.Errorf("%w: timeout reached", ErrRetry) // ErrRetryNotRetryable is returned when an error is not retryable. ErrRetryNotRetryable = fmt.Errorf("%w: not retryable", ErrRetry) )
Functions ¶
func SetDefaultLogConfig ¶
func SetDefaultLogConfig(config LogConfig)
SetDefaultLogConfig sets the default logger configuration for all pipes.
func SetDefaultLogger ¶
func SetDefaultLogger(l Logger)
SetDefaultLogger sets the default logger for all pipes. slog.Default() is used by default.
Types ¶
type BackoffFunc ¶
BackoffFunc returns the wait duration for a retry attempt. The attempt parameter is one-based (1 for first retry, 2 for second, etc.).
func ConstantBackoff ¶
func ConstantBackoff(delay time.Duration, jitter float64) BackoffFunc
ConstantBackoff creates a backoff function that returns a constant duration with optional jitter. The delay parameter specifies the base wait time for all retry attempts. The jitter parameter controls randomization: 0.0 = no jitter, 0.2 = ±20% variation.
func ExponentialBackoff ¶
func ExponentialBackoff(initialDelay time.Duration, factor float64, maxDelay time.Duration, jitter float64) BackoffFunc
ExponentialBackoff creates a backoff function with exponential backoff and jitter. Each retry attempt uses baseDelay * factor^(attempt-1) with random jitter applied. The maxDelay parameter caps the maximum backoff duration (0 = no limit).
type LogConfig ¶
type LogConfig struct {
// Args are additional arguments to include in all log messages.
Args []any
// LevelSuccess is the log level used for successful processing.
LevelSuccess LogLevel
// LevelCancel is the log level used when processing is canceled.
LevelCancel LogLevel
// LevelFailure is the log level used when processing fails.
LevelFailure LogLevel
// LevelRetry is the log level used when a retry is attempted.
LevelRetry LogLevel
// MessageSuccess is the message logged on successful processing.
MessageSuccess string
// MessageCancel is the message logged when processing is canceled.
MessageCancel string
// MessageFailure is the message logged when processing fails.
MessageFailure string
// MessageRetry is the message logged when a retry is attempted.
MessageRetry string
// Disabled disables all logging when set to true.
Disabled bool
}
LogConfig holds configuration for the logger middleware.
type LogLevel ¶
type LogLevel string
LogLevel represents the severity level for logging messages.
const ( // LogLevelDebug is used for detailed information. LogLevelDebug LogLevel = "debug" // LogLevelInfo is used for general information messages. LogLevelInfo LogLevel = "info" // LogLevelWarn is used for warning conditions. LogLevelWarn LogLevel = "warn" // LogLevelError is used for error conditions. LogLevelError LogLevel = "error" )
type Logger ¶
type Logger interface {
// Debug logs a message at debug level.
Debug(msg string, args ...any)
// Info logs a message at info level.
Info(msg string, args ...any)
// Warn logs a message at warning level.
Warn(msg string, args ...any)
// Error logs a message at error level.
Error(msg string, args ...any)
}
Logger defines an interface for logging at different severity levels.
type Metadata ¶
Metadata is a key-value store for additional information about pipeline items.
func MetadataFromContext ¶
MetadataFromContext extracts metadata from a context. Returns nil if no metadata is present.
type Metrics ¶
type Metrics struct {
Start time.Time
Duration time.Duration
Input int
Output int
InFlight int
Metadata Metadata
RetryState *RetryState
Error error
}
Metrics holds processing metrics for a single input.
func (*Metrics) Cancel ¶
Cancel returns a numeric indicator of cancellation (1 for cancel, 0 otherwise).
func (*Metrics) Failure ¶
Failure returns a numeric indicator of failure (1 for failure, 0 otherwise).
type MetricsCollector ¶
type MetricsCollector func(metrics *Metrics)
MetricsCollector defines a function that collects single input metrics.
func DistributeMetrics ¶
func DistributeMetrics(collectors ...MetricsCollector) MetricsCollector
DistributeMetrics creates a collector that distributes metrics to multiple collectors.
func NewLogCollector ¶
func NewLogCollector(config LogConfig) MetricsCollector
NewLogCollector creates a MetricsCollector that logs processing results.
type Middleware ¶
type Middleware[In, Out any] func(ProcessFunc[In, Out]) ProcessFunc[In, Out]
Middleware wraps a ProcessFunc with additional behavior.
func Log ¶
func Log[In, Out any](config LogConfig) Middleware[In, Out]
Log creates a logging middleware that logs processing metrics.
func MetadataProvider ¶
func MetadataProvider[In, Out any](provider func(in In) Metadata) Middleware[In, Out]
MetadataProvider wraps a ProcessFunc with metadata enrichment. It attaches metadata from the provider to the context for downstream use.
func MetricsMiddleware ¶
func MetricsMiddleware[In, Out any](collect MetricsCollector) Middleware[In, Out]
Metrics wraps a ProcessFunc with metrics collection. It tracks processing duration, input/output counts, and error information.
func Recover ¶
func Recover[In, Out any]() Middleware[In, Out]
Recover wraps a ProcessFunc with panic recovery. Any panic that occurs during processing is caught and converted into a RecoveryError with the stack trace captured.
func Retry ¶
func Retry[In, Out any](cfg RetryConfig) Middleware[In, Out]
Retry wraps a ProcessFunc with retry logic. Failed operations are retried based on ShouldRetry logic, with Backoff between attempts, until MaxAttempts is reached or Timeout expires.
type ProcessFunc ¶
ProcessFunc is the core processing signature. It takes a context and input, and returns zero or more outputs or an error.
type RecoveryError ¶
type RecoveryError struct {
// PanicValue is the original value that was passed to panic().
PanicValue any
// StackTrace contains the full stack trace at the point of panic.
StackTrace string
}
RecoveryError wraps a panic value with the stack trace. This allows panics to be converted to regular errors and handled gracefully.
func (*RecoveryError) Error ¶
func (e *RecoveryError) Error() string
type RetryConfig ¶
type RetryConfig struct {
// ShouldRetry determines which errors trigger retry attempts.
// If nil, defaults to retrying all errors.
ShouldRetry ShouldRetryFunc
// Backoff produces the wait duration between retry attempts.
// If nil, defaults to 1 second constant backoff with jitter ±20%.
Backoff BackoffFunc
// MaxAttempts limits the total number of processing attempts, including the initial attempt.
// Default is 3 attempts. Negative values allow unlimited retries.
MaxAttempts int
// Timeout sets the overall time limit for all processing attempts combined.
// Zero or negative value means no timeout. Default is 1 minute.
Timeout time.Duration
}
RetryConfig configures retry behavior for failed operations.
type RetryState ¶
type RetryState struct {
// Timeout is the configured overall timeout for all attempts.
Timeout time.Duration
// MaxAttempts is the configured maximum number of attempts.
MaxAttempts int
// Start is the time when the first attempt started.
Start time.Time
// Attempts is the total number of processing attempts made (1-based).
Attempts int
// Duration is the total elapsed time since Start.
Duration time.Duration
// Causes is a list of all errors encountered during attempts.
Causes []error
// Err is the error that caused the retry process to abort (final error).
Err error
}
RetryState tracks the progress and history of retry attempts.
func RetryStateFromContext ¶
func RetryStateFromContext(ctx context.Context) *RetryState
RetryStateFromContext extracts the RetryState from a context. Returns nil if no RetryState is present.
func RetryStateFromError ¶
func RetryStateFromError(err error) *RetryState
RetryStateFromError extracts the RetryState from an error. Returns nil if no RetryState is present.
type ShouldRetryFunc ¶
ShouldRetryFunc determines whether an error should trigger a retry attempt.
func ShouldNotRetry ¶
func ShouldNotRetry(errs ...error) ShouldRetryFunc
ShouldNotRetry creates a function that skips retries on specific errors. If no errors are specified, no errors trigger retries.
func ShouldRetry ¶
func ShouldRetry(errs ...error) ShouldRetryFunc
ShouldRetry creates a function that retries on specific errors. If no errors are specified, all errors trigger retries.