middleware

package
v0.17.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 10, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package middleware provides composable middleware for ProcessFunc.

Middleware wraps a ProcessFunc with additional behavior such as context management, panic recovery, retry logic, and metrics collection.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrFailure = errors.New("processing failed")
	ErrCancel  = errors.New("processing cancelled")
)

Sentinel errors for metrics categorization.

View Source
var (
	// ErrRetry is the base error for retry operations.
	ErrRetry = errors.New("gopipe retry")

	// ErrRetryMaxAttempts is returned when all retry attempts fail.
	ErrRetryMaxAttempts = fmt.Errorf("%w: max attempts reached", ErrRetry)

	// ErrRetryTimeout is returned when the overall retry operation times out.
	ErrRetryTimeout = fmt.Errorf("%w: timeout reached", ErrRetry)

	// ErrRetryNotRetryable is returned when an error is not retryable.
	ErrRetryNotRetryable = fmt.Errorf("%w: not retryable", ErrRetry)
)

Functions

func SetDefaultLogConfig

func SetDefaultLogConfig(config LogConfig)

SetDefaultLogConfig sets the default logger configuration for all pipes.

func SetDefaultLogger

func SetDefaultLogger(l Logger)

SetDefaultLogger sets the default logger for all pipes. slog.Default() is used by default.

Types

type BackoffFunc

type BackoffFunc func(attempt int) time.Duration

BackoffFunc returns the wait duration for a retry attempt. The attempt parameter is one-based (1 for first retry, 2 for second, etc.).

func ConstantBackoff

func ConstantBackoff(delay time.Duration, jitter float64) BackoffFunc

ConstantBackoff creates a backoff function that returns a constant duration with optional jitter. The delay parameter specifies the base wait time for all retry attempts. The jitter parameter controls randomization: 0.0 = no jitter, 0.2 = ±20% variation.

func ExponentialBackoff

func ExponentialBackoff(initialDelay time.Duration, factor float64, maxDelay time.Duration, jitter float64) BackoffFunc

ExponentialBackoff creates a backoff function with exponential backoff and jitter. Each retry attempt uses baseDelay * factor^(attempt-1) with random jitter applied. The maxDelay parameter caps the maximum backoff duration (0 = no limit).

type LogConfig

type LogConfig struct {
	// Args are additional arguments to include in all log messages.
	Args []any

	// LevelSuccess is the log level used for successful processing.
	LevelSuccess LogLevel
	// LevelCancel is the log level used when processing is canceled.
	LevelCancel LogLevel
	// LevelFailure is the log level used when processing fails.
	LevelFailure LogLevel
	// LevelRetry is the log level used when a retry is attempted.
	LevelRetry LogLevel

	// MessageSuccess is the message logged on successful processing.
	MessageSuccess string
	// MessageCancel is the message logged when processing is canceled.
	MessageCancel string
	// MessageFailure is the message logged when processing fails.
	MessageFailure string
	// MessageRetry is the message logged when a retry is attempted.
	MessageRetry string

	// Disabled disables all logging when set to true.
	Disabled bool
}

LogConfig holds configuration for the logger middleware.

type LogLevel

type LogLevel string

LogLevel represents the severity level for logging messages.

const (
	// LogLevelDebug is used for detailed information.
	LogLevelDebug LogLevel = "debug"
	// LogLevelInfo is used for general information messages.
	LogLevelInfo LogLevel = "info"
	// LogLevelWarn is used for warning conditions.
	LogLevelWarn LogLevel = "warn"
	// LogLevelError is used for error conditions.
	LogLevelError LogLevel = "error"
)

type Logger

type Logger interface {
	// Debug logs a message at debug level.
	Debug(msg string, args ...any)
	// Info logs a message at info level.
	Info(msg string, args ...any)
	// Warn logs a message at warning level.
	Warn(msg string, args ...any)
	// Error logs a message at error level.
	Error(msg string, args ...any)
}

Logger defines an interface for logging at different severity levels.

type Metadata

type Metadata map[string]any

Metadata is a key-value store for additional information about pipeline items.

func MetadataFromContext

func MetadataFromContext(ctx context.Context) Metadata

MetadataFromContext extracts metadata from a context. Returns nil if no metadata is present.

func (Metadata) Args

func (m Metadata) Args() []any

Args converts metadata to a flat key-value slice for logging.

type Metrics

type Metrics struct {
	Start    time.Time
	Duration time.Duration
	Input    int
	Output   int
	InFlight int

	Metadata   Metadata
	RetryState *RetryState

	Error error
}

Metrics holds processing metrics for a single input.

func (*Metrics) Cancel

func (m *Metrics) Cancel() int

Cancel returns a numeric indicator of cancellation (1 for cancel, 0 otherwise).

func (*Metrics) Failure

func (m *Metrics) Failure() int

Failure returns a numeric indicator of failure (1 for failure, 0 otherwise).

func (*Metrics) Retry

func (m *Metrics) Retry() int

Retry returns a numeric indicator of retry (1 for retry, 0 otherwise).

func (*Metrics) Success

func (m *Metrics) Success() int

Success returns a numeric indicator of success (1 for success, 0 otherwise).

type MetricsCollector

type MetricsCollector func(metrics *Metrics)

MetricsCollector defines a function that collects single input metrics.

func DistributeMetrics

func DistributeMetrics(collectors ...MetricsCollector) MetricsCollector

DistributeMetrics creates a collector that distributes metrics to multiple collectors.

func NewLogCollector

func NewLogCollector(config LogConfig) MetricsCollector

NewLogCollector creates a MetricsCollector that logs processing results.

type Middleware

type Middleware[In, Out any] func(ProcessFunc[In, Out]) ProcessFunc[In, Out]

Middleware wraps a ProcessFunc with additional behavior.

func Log

func Log[In, Out any](config LogConfig) Middleware[In, Out]

Log creates a logging middleware that logs processing metrics.

func MetadataProvider

func MetadataProvider[In, Out any](provider func(in In) Metadata) Middleware[In, Out]

MetadataProvider wraps a ProcessFunc with metadata enrichment. It attaches metadata from the provider to the context for downstream use.

func MetricsMiddleware

func MetricsMiddleware[In, Out any](collect MetricsCollector) Middleware[In, Out]

Metrics wraps a ProcessFunc with metrics collection. It tracks processing duration, input/output counts, and error information.

func Recover

func Recover[In, Out any]() Middleware[In, Out]

Recover wraps a ProcessFunc with panic recovery. Any panic that occurs during processing is caught and converted into a RecoveryError with the stack trace captured.

func Retry

func Retry[In, Out any](cfg RetryConfig) Middleware[In, Out]

Retry wraps a ProcessFunc with retry logic. Failed operations are retried based on ShouldRetry logic, with Backoff between attempts, until MaxAttempts is reached or Timeout expires.

type ProcessFunc

type ProcessFunc[In, Out any] func(context.Context, In) ([]Out, error)

ProcessFunc is the core processing signature. It takes a context and input, and returns zero or more outputs or an error.

type RecoveryError

type RecoveryError struct {
	// PanicValue is the original value that was passed to panic().
	PanicValue any
	// StackTrace contains the full stack trace at the point of panic.
	StackTrace string
}

RecoveryError wraps a panic value with the stack trace. This allows panics to be converted to regular errors and handled gracefully.

func (*RecoveryError) Error

func (e *RecoveryError) Error() string

type RetryConfig

type RetryConfig struct {
	// ShouldRetry determines which errors trigger retry attempts.
	// If nil, defaults to retrying all errors.
	ShouldRetry ShouldRetryFunc

	// Backoff produces the wait duration between retry attempts.
	// If nil, defaults to 1 second constant backoff with jitter ±20%.
	Backoff BackoffFunc

	// MaxAttempts limits the total number of processing attempts, including the initial attempt.
	// Default is 3 attempts. Negative values allow unlimited retries.
	MaxAttempts int

	// Timeout sets the overall time limit for all processing attempts combined.
	// Zero or negative value means no timeout. Default is 1 minute.
	Timeout time.Duration
}

RetryConfig configures retry behavior for failed operations.

type RetryState

type RetryState struct {
	// Timeout is the configured overall timeout for all attempts.
	Timeout time.Duration
	// MaxAttempts is the configured maximum number of attempts.
	MaxAttempts int
	// Start is the time when the first attempt started.
	Start time.Time
	// Attempts is the total number of processing attempts made (1-based).
	Attempts int
	// Duration is the total elapsed time since Start.
	Duration time.Duration
	// Causes is a list of all errors encountered during attempts.
	Causes []error
	// Err is the error that caused the retry process to abort (final error).
	Err error
}

RetryState tracks the progress and history of retry attempts.

func RetryStateFromContext

func RetryStateFromContext(ctx context.Context) *RetryState

RetryStateFromContext extracts the RetryState from a context. Returns nil if no RetryState is present.

func RetryStateFromError

func RetryStateFromError(err error) *RetryState

RetryStateFromError extracts the RetryState from an error. Returns nil if no RetryState is present.

type ShouldRetryFunc

type ShouldRetryFunc func(error) bool

ShouldRetryFunc determines whether an error should trigger a retry attempt.

func ShouldNotRetry

func ShouldNotRetry(errs ...error) ShouldRetryFunc

ShouldNotRetry creates a function that skips retries on specific errors. If no errors are specified, no errors trigger retries.

func ShouldRetry

func ShouldRetry(errs ...error) ShouldRetryFunc

ShouldRetry creates a function that retries on specific errors. If no errors are specified, all errors trigger retries.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL