qpool

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2025 License: MIT Imports: 13 Imported by: 4

README

QPool - Quantum Worker Pool & Message Queue

Go CI/CD Go Reference Go Report Card Bugs Code Smells Lines of Code Reliability Rating Security Rating Technical Debt Maintainability Rating Vulnerabilities

QPool is a high-performance, feature-rich worker pool implementation in Go that combines the capabilities of a traditional worker pool with a sophisticated message queue. It's designed to handle complex job dependencies, provide robust error handling, and scale automatically based on workload.

🌟 Key Features

  • Dynamic Worker Pool

    • Auto-scaling based on workload
    • Configurable min/max worker counts
    • Efficient worker management
    • Smart job distribution
  • Advanced Job Dependencies

    • Support for future dependencies
    • Dependency chain resolution
    • Circular dependency detection
    • Parent-child relationship tracking
  • Robust Error Handling

    • Circuit breaker pattern
    • Configurable retry policies
    • Exponential backoff
    • Timeout management
  • Performance Features

    • Non-blocking job scheduling
    • Efficient memory usage
    • Resource utilization tracking
    • Load-based auto-scaling
  • Monitoring & Metrics

    • Comprehensive metrics collection
    • Latency percentiles (p95, p99)
    • Success/failure rates
    • Resource utilization stats
    • Dependency resolution tracking

⚛️ Quantum Entanglement

QPool introduces the concept of Entanglement, inspired by quantum mechanics. Just as quantum particles can be entangled so that the state of one instantly affects the other, jobs in an Entanglement share state that persists across time and space. When one job updates the shared state, all other jobs in the entanglement—even those not yet processed—will see these changes.

This powerful feature enables:

  • Shared state between related jobs that persists even across job completion
  • Automatic state synchronization for jobs processed at different times
  • Immutable history of all state changes
  • Perfect for distributed data processing and coordinated task execution

Learn more about Entanglement with code samples →

📦️ Regulators

QPool implements a biological-inspired regulation system that helps maintain system stability and optimal performance under varying loads. Like a living organism that regulates its internal state through multiple coordinated mechanisms, QPool uses a set of regulators to control different aspects of the worker pool.

The regulation system includes:

🚦 Rate Limiter

A token bucket-based rate limiter that provides smooth, burst-capable traffic control. Like a water tank with controlled inflow and outflow, it ensures operations proceed at a sustainable pace while allowing brief bursts of activity when needed.

  • Configurable steady-state rate and burst capacity
  • Smooth operation without sharp cutoffs
  • Perfect for API rate limiting and resource protection
  • Automatic token replenishment

Learn more about Rate Limiting →

🔌 Circuit Breaker

Inspired by electrical circuit breakers, this pattern prevents system failure by automatically stopping operations when error rates exceed acceptable thresholds. Like its electrical counterpart, it "trips" to protect the system and automatically tests for recovery.

  • Prevents cascade failures in distributed systems
  • Self-healing with automatic recovery testing
  • Configurable error thresholds and recovery timing
  • Perfect for protecting external service calls

Learn more about Circuit Breakers →

⚖️ Load Balancer

Like a traffic controller directing vehicles to different lanes based on congestion, the Load Balancer ensures optimal work distribution across workers by considering their capacity, current load, and performance characteristics.

  • Intelligent work distribution based on metrics
  • Automatic worker selection and load tracking
  • Adapts to worker pool scaling
  • Perfect for optimizing resource utilization

Learn more about Load Balancing →

🔙 BackPressureRegulator

Manages system pressure by controlling job intake based on queue size and processing times, preventing system overload.

Learn more about Backpressure →

👮🏽 ResourceGovernorRegulator

Monitors and manages system resource usage (CPU, memory) to prevent resource exhaustion and maintain system stability.

Learn more about Resource Governor →

🔃 AdaptiveScalerRegulator

Dynamically adjusts the worker pool size based on load metrics and performance data to optimize resource utilization.

Learn more about Adaptive Scaling →

All regulators implement the Regulator interface, allowing them to be composed and combined to create sophisticated control systems.

Learn more about the Regulation System →

📦 Installation

go get github.com/theapemachine/qpool

🚀 Quick Start

Here's a simple example to get you started:

package main

import (
    "context"
    "time"
    "github.com/theapemachine/qpool"
)

func main() {
    // Create a new pool with min 2, max 5 workers
    ctx := context.Background()
    pool := qpool.NewQ(ctx, 2, 5, &qpool.Config{
        SchedulingTimeout: time.Second,
    })
    defer pool.Close()

    // Schedule a simple job
    result := pool.Schedule("job-1", func() (any, error) {
        return "Hello, World!", nil
    })

    // Wait for the result
    value := <-result
    if value.Error != nil {
        panic(value.Error)
    }
    println(value.Value.(string))
}

🔨 Advanced Usage

Job Dependencies

QPool supports a robust job dependency system that allows you to create complex workflows. Jobs can depend on one or more other jobs, and the system ensures proper execution order.

// Create jobs with dependencies
job1Result := pool.Schedule("data-fetch", func() (any, error) {
    return fetchData()
}, qpool.WithTTL(time.Minute))

// This job will only execute after data-fetch completes successfully
job2Result := pool.Schedule("data-process", func() (any, error) {
    return processData()
}, qpool.WithDependencies([]string{"data-fetch"}))

// You can also add multiple dependencies
job3Result := pool.Schedule("data-aggregate", func() (any, error) {
    return aggregateData()
}, qpool.WithDependencies([]string{"data-fetch", "data-process"}))

// Configure dependency retry behavior
job4Result := pool.Schedule("data-transform", func() (any, error) {
    return transformData()
}, 
    qpool.WithDependencies([]string{"data-aggregate"}),
    qpool.WithDependencyRetry(3, &qpool.ExponentialBackoff{Initial: time.Second}))

// Process results
for result := range job4Result {
    if result.Error != nil {
        log.Printf("Error: %v", result.Error)
        continue
    }
    // Process the result
    log.Printf("Success: %v", result.Value)
}

Key features of the dependency system:

  • Jobs wait for all dependencies to complete successfully before starting
  • If any dependency fails, dependent jobs fail automatically
  • Configurable retry policies for dependency resolution
  • Automatic cleanup of completed job results based on TTL
  • Non-blocking dependency resolution with timeout handling
Circuit Breaker
// Add circuit breaker to protect sensitive operations
pool.Schedule("api-call", func() (any, error) {
    return callExternalAPI()
}, qpool.WithCircuitBreaker("api", 5, time.Minute))
Retry Policy
// Configure retry behavior
pool.Schedule("flaky-operation", func() (any, error) {
    return flakyOperation()
}, qpool.WithRetry(3, &qpool.ExponentialBackoff{
    Initial: time.Second,
}))
Broadcast Groups
// Create a broadcast group for pub/sub functionality
group := pool.CreateBroadcastGroup("sensors", time.Minute)
subscriber := pool.Subscribe("sensors")

// Send updates to all subscribers
group.Send(qpool.QValue{
    Value: "sensor-update",
    CreatedAt: time.Now(),
})

📊 Metrics & Monitoring

QPool provides comprehensive metrics for monitoring:

// Get current metrics
metrics := pool.metrics.ExportMetrics()

fmt.Printf("Active Workers: %d\n", metrics["worker_count"])
fmt.Printf("Queue Size: %d\n", metrics["queue_size"])
fmt.Printf("Success Rate: %.2f%%\n", metrics["success_rate"]*100)
fmt.Printf("P95 Latency: %dms\n", metrics["p95_latency"])

🔧 Configuration

QPool can be configured through the Config struct:

config := &qpool.Config{
    SchedulingTimeout: time.Second * 5,
}

pool := qpool.NewQ(ctx, minWorkers, maxWorkers, config)

🏗️ Architecture

QPool consists of several key components:

  • Q (Pool): Main orchestrator managing workers and job scheduling
  • Worker: Handles job execution and resource management
  • QuantumSpace: Manages job results and dependencies
  • CircuitBreaker: Provides fault tolerance
  • Scaler: Handles dynamic worker pool sizing
  • Metrics: Collects and exposes performance data

📈 Performance

QPool is designed for high performance:

  • Non-blocking job scheduling
  • Efficient memory usage
  • Smart resource allocation
  • Automatic scaling based on load
  • Optimized dependency resolution

🧪 Testing

Run the test suite:

go test -v ./...

Run with race detection:

go test -race -v ./...

🤝 Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/AmazingFeature)
  3. Commit your changes (git commit -m 'Add some AmazingFeature')
  4. Push to the branch (git push origin feature/AmazingFeature)
  5. Open a Pull Request

📄 License

This project is licensed under the MIT License - see the LICENSE file for details.

🙏 Acknowledgments

  • Inspired by a conversation with Claude AI, with ChatGPT, O1, and Gemini action as reviewers, and providing critique on progress.
  • Built with modern concurrency patterns and best practices.
  • Designed for real-world production use cases.

📚 Documentation

For detailed documentation, please visit our Go Docs.

📞 Support

  • Create an issue for bug reports.
  • Start a discussion for feature requests.
  • Check existing issues before creating new ones.

Made with ❤️ by Daniel Owen van Dommelen

Documentation

Overview

broadcastgroup.go

qspace.go

qvalue.go

wavefunction.go

Index

Constants

This section is empty.

Variables

View Source
var ErrNoAvailableWorkers = errors.New("no workers available to process job")

ErrNoAvailableWorkers is returned when no workers are available to process a job

Functions

func Max

func Max(a, b int) int

Max returns the maximum of two integers

func MaxFloat added in v0.0.2

func MaxFloat(a, b float64) float64

MaxFloat returns the larger of two float64 values

func Min

func Min(a, b int) int

Min returns the minimum of two integers

func MinFloat added in v0.0.2

func MinFloat(a, b float64) float64

MinFloat returns the smaller of two float64 values

Types

type AdaptiveScalerRegulator added in v0.0.2

type AdaptiveScalerRegulator struct {
	// contains filtered or unexported fields
}

AdaptiveScalerRegulator implements the Regulator interface to dynamically adjust worker pool size. It combines the functionality of the existing Scaler with additional adaptive behaviors, similar to how an adaptive cruise control system adjusts speed based on traffic conditions.

Key features:

  • Dynamic worker pool sizing
  • Load-based scaling
  • Resource-aware adjustments
  • Performance optimization

func NewAdaptiveScalerRegulator added in v0.0.2

func NewAdaptiveScalerRegulator(pool *Q, minWorkers, maxWorkers int, config *ScalerConfig) *AdaptiveScalerRegulator

NewAdaptiveScalerRegulator creates a new adaptive scaler regulator.

Parameters:

  • pool: The worker pool to manage
  • minWorkers: Minimum number of workers
  • maxWorkers: Maximum number of workers
  • config: Scaling configuration parameters

Returns:

  • *AdaptiveScalerRegulator: A new adaptive scaler instance

Example:

scaler := NewAdaptiveScalerRegulator(pool, 2, 10, &ScalerConfig{...})

func (*AdaptiveScalerRegulator) Limit added in v0.0.2

func (as *AdaptiveScalerRegulator) Limit() bool

Limit implements the Regulator interface by determining if scaling operations should be limited. Returns true during cooldown periods or at worker limits.

Returns:

  • bool: true if scaling should be limited, false if it can proceed

func (*AdaptiveScalerRegulator) Observe added in v0.0.2

func (as *AdaptiveScalerRegulator) Observe(metrics *Metrics)

Observe implements the Regulator interface by monitoring system metrics. This method updates the scaler's view of system load and performance.

Parameters:

  • metrics: Current system metrics including worker and queue statistics

func (*AdaptiveScalerRegulator) Renormalize added in v0.0.2

func (as *AdaptiveScalerRegulator) Renormalize()

Renormalize implements the Regulator interface by attempting to restore normal operation. This method triggers a scaling evaluation if enough time has passed since the last scale.

type BackPressureRegulator added in v0.0.2

type BackPressureRegulator struct {
	// contains filtered or unexported fields
}

BackPressureRegulator implements the Regulator interface to prevent system overload. It monitors queue depth and processing times to regulate job intake, similar to how pressure regulators in plumbing systems prevent pipe damage by limiting flow when pressure builds up.

Key features:

  • Queue depth monitoring
  • Processing time tracking
  • Adaptive pressure thresholds
  • Gradual flow control

func NewBackPressureRegulator added in v0.0.2

func NewBackPressureRegulator(maxQueueSize int, targetProcessTime, pressureWindow time.Duration) *BackPressureRegulator

NewBackPressureRegulator creates a new back pressure regulator.

Parameters:

  • maxQueueSize: Maximum allowed queue size before applying back pressure
  • targetProcessTime: Target job processing time
  • pressureWindow: Time window for pressure calculations

Returns:

  • *BackPressureRegulator: A new back pressure regulator instance

Example:

regulator := NewBackPressureRegulator(1000, time.Second, time.Minute)

func (*BackPressureRegulator) GetPressure added in v0.0.2

func (bp *BackPressureRegulator) GetPressure() float64

GetPressure returns the current system pressure level

func (*BackPressureRegulator) Limit added in v0.0.2

func (bp *BackPressureRegulator) Limit() bool

Limit implements the Regulator interface by determining if job intake should be limited. Returns true when system pressure exceeds acceptable levels.

Returns:

  • bool: true if job intake should be limited, false if it can proceed

func (*BackPressureRegulator) Observe added in v0.0.2

func (bp *BackPressureRegulator) Observe(metrics *Metrics)

Observe implements the Regulator interface by monitoring system metrics. This method updates the regulator's view of system pressure based on queue size and processing times.

Parameters:

  • metrics: Current system metrics including queue and timing data

func (*BackPressureRegulator) Renormalize added in v0.0.2

func (bp *BackPressureRegulator) Renormalize()

Renormalize implements the Regulator interface by attempting to restore normal operation. This method gradually reduces system pressure if conditions allow.

type BroadcastGroup

type BroadcastGroup struct {
	ID string

	// Management
	TTL      time.Duration
	LastUsed time.Time
	// contains filtered or unexported fields
}
BroadcastGroup implements a quantum-aware pub/sub system.

It provides a quantum-inspired broadcast mechanism that maintains quantum properties such as entanglement and uncertainty while distributing messages to subscribers.

Key features:

  • Quantum state preservation
  • Filtered message routing
  • Subscriber management
  • Metrics collection
  • Entanglement support

func NewBroadcastGroup added in v0.0.2

func NewBroadcastGroup(id string, ttl time.Duration, maxQueue int) *BroadcastGroup
NewBroadcastGroup creates a new broadcast group with quantum properties.

Initializes a new broadcast group with specified parameters and default quantum properties such as minimum uncertainty.

Parameters:

  • id: Unique identifier for the broadcast group
  • ttl: Time-to-live duration for the group
  • maxQueue: Maximum queue size for message buffering

Returns:

  • *BroadcastGroup: A new broadcast group instance

func (*BroadcastGroup) AddFilter added in v0.0.2

func (bg *BroadcastGroup) AddFilter(filter FilterFunc)
AddFilter adds a global filter to the broadcast group.

Registers a new filter function that will be applied to all messages before broadcasting.

Parameters:

  • filter: The filter function to add

Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.

func (*BroadcastGroup) AddRoutingRule added in v0.0.2

func (bg *BroadcastGroup) AddRoutingRule(subscriberID string, rule RoutingRule)
AddRoutingRule adds a routing rule for a specific subscriber.

Adds a new routing rule that determines how messages should be filtered for a specific subscriber.

Parameters:

  • subscriberID: ID of the subscriber to add the rule for
  • rule: The routing rule to add

Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.

func (*BroadcastGroup) Close added in v0.0.2

func (bg *BroadcastGroup) Close()
Close shuts down the broadcast group and cleans up resources.

Performs graceful shutdown of the broadcast group, closing all subscriber channels and cleaning up internal resources.

Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.

func (*BroadcastGroup) GetMetrics added in v0.0.2

func (bg *BroadcastGroup) GetMetrics() BroadcastMetrics
GetMetrics returns current broadcast metrics.

Provides access to the current operational metrics of the broadcast group.

Returns:

  • BroadcastMetrics: Copy of the current metrics

Thread-safe: This method uses read-lock to ensure safe concurrent access.

func (*BroadcastGroup) Send

func (bg *BroadcastGroup) Send(qv *QValue)
Send broadcasts a quantum value to all applicable subscribers.

Distributes a quantum value to subscribers according to routing rules while maintaining quantum properties and updating metrics.

Parameters:

  • qv: The quantum value to broadcast

Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.

func (*BroadcastGroup) SetEntanglement added in v0.0.2

func (bg *BroadcastGroup) SetEntanglement(e *Entanglement)
SetEntanglement connects the broadcast group to an entanglement.

Associates the broadcast group with a quantum entanglement, allowing it to participate in quantum-like state synchronization.

Parameters:

  • e: The entanglement to connect to

Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.

func (*BroadcastGroup) Subscribe added in v0.0.2

func (bg *BroadcastGroup) Subscribe(subscriberID string, bufferSize int, rules ...RoutingRule) chan *QValue
Subscribe adds a new subscriber with optional filtering and routing rules.

Creates and registers a new subscriber channel with specified buffer size and optional routing rules for message filtering.

Parameters:

  • subscriberID: Unique identifier for the subscriber
  • bufferSize: Size of the subscriber's message buffer
  • rules: Optional routing rules for message filtering

Returns:

  • chan *QValue: Channel for receiving broadcast messages

Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.

func (*BroadcastGroup) Unsubscribe added in v0.0.2

func (bg *BroadcastGroup) Unsubscribe(subscriberID string)
Unsubscribe removes a subscriber and cleans up associated resources.

Safely removes a subscriber from the broadcast group, closing their channel and cleaning up any associated routing rules.

Parameters:

  • subscriberID: ID of the subscriber to remove

Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.

type BroadcastMetrics added in v0.0.2

type BroadcastMetrics struct {
	MessagesSent      int64
	MessagesDropped   int64
	AverageLatency    time.Duration
	ActiveSubscribers int
	UncertaintyLevel  UncertaintyLevel
	LastBroadcastTime time.Time
}
BroadcastMetrics tracks performance and behavior of the broadcast group.

Collects and maintains statistical information about the broadcast group's operation, including message counts, latency, and quantum uncertainty levels.

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker implements both the circuit breaker pattern and Regulator interface. It provides a way to automatically degrade service when the system is under stress by temporarily stopping operations when a failure threshold is reached.

The circuit breaker operates in three states:

  • Closed: Normal operation, all requests are allowed
  • Open: Failure threshold exceeded, all requests are rejected
  • Half-Open: Probationary state allowing limited requests to test system health

This pattern helps prevent cascading failures and allows the system to recover from failure states without overwhelming potentially unstable dependencies.

func NewCircuitBreaker added in v0.0.2

func NewCircuitBreaker(maxFailures int, resetTimeout time.Duration, halfOpenMax int) *CircuitBreaker

NewCircuitBreaker creates a new circuit breaker instance with specified parameters.

Parameters:

  • maxFailures: Number of failures allowed before opening the circuit
  • resetTimeout: Duration to wait before attempting to close an open circuit
  • halfOpenMax: Maximum number of requests allowed in half-open state

Returns:

  • *CircuitBreaker: A new circuit breaker instance initialized in closed state

func (*CircuitBreaker) Allow

func (cb *CircuitBreaker) Allow() bool

Allow determines if a request is allowed based on the circuit state. This method implements the core circuit breaker logic, determining whether to allow requests based on the current state and timing conditions.

Returns:

  • bool: true if the request should be allowed, false if it should be rejected

func (*CircuitBreaker) Limit added in v0.0.2

func (cb *CircuitBreaker) Limit() bool

Limit implements the Regulator interface by determining if requests should be limited. This method provides a standardized way to check if the circuit breaker is currently preventing operations.

Returns:

  • bool: true if requests should be limited, false if they should proceed

func (*CircuitBreaker) Observe added in v0.0.2

func (cb *CircuitBreaker) Observe(metrics *Metrics)

Observe implements the Regulator interface by accepting system metrics. This method allows the circuit breaker to monitor system health and adjust its behavior based on current conditions.

Parameters:

  • metrics: Current system metrics including performance and health indicators

func (*CircuitBreaker) RecordFailure

func (cb *CircuitBreaker) RecordFailure()

RecordFailure records a failure and updates the circuit state. This method tracks the number of failures and opens the circuit if the failure threshold is exceeded. It handles state transitions differently based on the current circuit state.

func (*CircuitBreaker) RecordSuccess

func (cb *CircuitBreaker) RecordSuccess()

RecordSuccess records a successful attempt and updates the circuit state. This method handles the transition from half-open to closed state after successful operations, and resets failure counts in closed state.

func (*CircuitBreaker) Renormalize added in v0.0.2

func (cb *CircuitBreaker) Renormalize()

Renormalize implements the Regulator interface by attempting to restore normal operation. This method checks if enough time has passed since the circuit was opened and transitions to half-open state if appropriate, allowing for system recovery.

type CircuitBreakerConfig

type CircuitBreakerConfig struct {
	MaxFailures  int
	ResetTimeout time.Duration
	HalfOpenMax  int
}

CircuitBreakerConfig defines configuration for a circuit breaker

type CircuitState

type CircuitState int

CircuitState represents the state of the circuit breaker. This is used to track the current operational mode of the circuit breaker as it transitions between different states based on system health.

const (
	CircuitClosed   CircuitState = iota // Normal operation state
	CircuitOpen                         // Failure state, rejecting requests
	CircuitHalfOpen                     // Probationary state, allowing limited requests
)

type Config

type Config struct {
	SchedulingTimeout time.Duration
}

func NewConfig added in v0.0.2

func NewConfig() *Config

type Entanglement added in v0.0.2

type Entanglement struct {
	ID           string
	Jobs         []Job
	SharedState  map[string]any
	CreatedAt    time.Time
	LastModified time.Time

	Dependencies  []string
	TTL           time.Duration
	OnStateChange func(oldState, newState map[string]any)
	// contains filtered or unexported fields
}

Entanglement wraps a selection of jobs into a shared space. Meant for jobs that each describe part of a larger task. When one job in the entanglement changes state, it affects all others.

Inspired by quantum entanglement, this type provides a way to create groups of jobs that share state and react to changes in that state simultaneously. Just as quantum particles can be entangled such that the state of one instantly affects the other, jobs in an Entanglement share a common state that, when changed, affects all jobs in the group.

The shared state in an Entanglement is immutable and persistent. State changes are recorded in a ledger and replayed for any job that joins or starts processing later, ensuring that the quantum-like property of entanglement is maintained across time. This means that even if jobs process at different times, they all see the complete history of state changes, preserving the causal relationship between entangled jobs.

Use cases include:

  • Distributed data processing where multiple jobs need to share intermediate results
  • Coordinated tasks where jobs need to react to each other's progress
  • State synchronization across a set of related operations
  • Fan-out/fan-in patterns where multiple jobs contribute to a shared outcome

func NewEntanglement added in v0.0.2

func NewEntanglement(id string, jobs []Job, ttl time.Duration) *Entanglement

NewEntanglement creates a new entanglement of jobs with the specified ID and TTL.

The entanglement acts as a quantum-inspired container that maintains shared state across multiple jobs. Like quantum entangled particles that remain connected regardless of distance, jobs in the entanglement remain connected through their shared state.

The state history is preserved and replayed for any job that starts processing later, ensuring that the quantum-like property of entanglement is maintained across time.

Parameters:

  • id: A unique identifier for the entanglement
  • jobs: Initial set of jobs to be entangled
  • ttl: Time-to-live duration after which the entanglement expires

Example:

jobs := []Job{job1, job2, job3}
entanglement := NewEntanglement("data-processing", jobs, 1*time.Hour)

func (*Entanglement) AddJob added in v0.0.2

func (e *Entanglement) AddJob(job Job)

AddJob adds a job to the entanglement.

This method expands the entanglement to include a new job, similar to how quantum systems can be expanded to include more entangled particles. The newly added job becomes part of the shared state system and will be affected by state changes.

Parameters:

  • job: The job to add to the entanglement

Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.

func (*Entanglement) GetState added in v0.0.2

func (e *Entanglement) GetState(key string) (any, bool)

GetState retrieves a value from the shared state.

This method provides a way to observe the current state of the entanglement. Like quantum measurement, it provides a snapshot of the current state at the time of observation.

Parameters:

  • key: The state key to retrieve

Returns:

  • value: The value associated with the key
  • exists: Boolean indicating whether the key exists in the state

Thread-safe: This method uses a read lock to ensure safe concurrent access.

func (*Entanglement) GetStateHistory added in v0.0.2

func (e *Entanglement) GetStateHistory(sinceSequence uint64) []StateChange

GetStateHistory returns all state changes that have occurred since a given sequence number. This allows jobs that start processing later to catch up on all state changes they missed.

Parameters:

  • sinceSequence: The sequence number to start from (0 for all history)

Returns:

  • []StateChange: Ordered list of state changes since the specified sequence

func (*Entanglement) IsExpired added in v0.0.2

func (e *Entanglement) IsExpired() bool

IsExpired checks if the entanglement has exceeded its TTL.

This method determines if the entanglement should be considered expired based on its Time-To-Live (TTL) duration and the time since its last modification. An expired entanglement might be cleaned up by the system, similar to how quantum entanglement can be lost due to decoherence.

Returns:

  • bool: True if the entanglement has expired, false otherwise

Note: A TTL of 0 or less means the entanglement never expires.

func (*Entanglement) RemoveJob added in v0.0.2

func (e *Entanglement) RemoveJob(jobID string) bool

RemoveJob removes a job from the entanglement.

This method removes a job from the entanglement, effectively "disentangling" it from the shared state system. The removed job will no longer be affected by or contribute to state changes in the entanglement.

Parameters:

  • jobID: The ID of the job to remove

Returns:

  • bool: True if the job was found and removed, false otherwise

Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.

func (*Entanglement) ReplayStateChanges added in v0.0.2

func (e *Entanglement) ReplayStateChanges(job Job)

ReplayStateChanges applies all historical state changes to a newly starting job. This ensures that jobs starting later still see the complete history of state changes in the correct order.

Parameters:

  • job: The job to replay state changes for

func (*Entanglement) UpdateQuantumState added in v0.0.2

func (e *Entanglement) UpdateQuantumState(key string, state *State)

func (*Entanglement) UpdateState added in v0.0.2

func (e *Entanglement) UpdateState(key string, value any)

UpdateState updates the shared state and notifies all entangled jobs of the change.

Similar to how measuring one quantum particle instantly affects its entangled partner, updating state through this method instantly affects all jobs in the entanglement. The state change is recorded in an immutable ledger, ensuring that even jobs that haven't started processing yet will see this change when they begin.

The OnStateChange callback (if set) is triggered with both the old and new state, allowing jobs to react to the change. For jobs that start later, these changes are replayed in order during their initialization.

Parameters:

  • key: The state key to update
  • value: The new value for the state key

Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.

type Evidence added in v0.0.2

type Evidence struct {
	Method     string      // Verification method used
	Confidence float64     // Confidence in this evidence
	Data       interface{} // The actual evidence data
}

Evidence represents verification data supporting a particular state.

type ExponentialBackoff

type ExponentialBackoff struct {
	Initial time.Duration
}

ExponentialBackoff implements RetryStrategy

func (*ExponentialBackoff) NextDelay

func (eb *ExponentialBackoff) NextDelay(attempt int) time.Duration

type FilterFunc added in v0.0.2

type FilterFunc func(*QValue) bool
FilterFunc defines a function type for filtering quantum values.

This function type is used to determine whether a quantum value should be processed or ignored in the broadcast system.

Parameters:

  • *QValue: The quantum value to be filtered

Returns:

  • bool: True if the value should be processed, false if it should be filtered out

type Job

type Job struct {
	ID                    string
	Fn                    func() (any, error)
	RetryPolicy           *RetryPolicy
	CircuitID             string
	CircuitConfig         *CircuitBreakerConfig
	Dependencies          []string
	TTL                   time.Duration
	Attempt               int
	LastError             error
	DependencyRetryPolicy *RetryPolicy
	StartTime             time.Time
}

Job represents work to be done

type JobOption

type JobOption func(*Job)

JobOption is a function type for configuring jobs

func WithCircuitBreaker

func WithCircuitBreaker(id string, maxFailures int, resetTimeout time.Duration) JobOption

WithCircuitBreaker configures circuit breaker for a job

func WithDependencies added in v0.0.2

func WithDependencies(dependencies []string) JobOption

WithDependencies configures job dependencies

func WithDependencyRetry

func WithDependencyRetry(attempts int, strategy RetryStrategy) JobOption

WithDependencyRetry configures retry behavior for dependencies

func WithRetry

func WithRetry(attempts int, strategy RetryStrategy) JobOption

WithRetry configures retry behavior for a job

func WithTTL

func WithTTL(ttl time.Duration) JobOption
WithTTL configures TTL for a job.

Sets the time-to-live (TTL) for a job, which determines how long the job will remain in the system before being discarded.

type LoadBalancer added in v0.0.2

type LoadBalancer struct {
	// contains filtered or unexported fields
}

LoadBalancer implements the Regulator interface to provide intelligent work distribution. It ensures even distribution of work across workers while considering system metrics like worker load, processing speed, and resource utilization.

Like a traffic controller directing vehicles to different lanes based on congestion, the load balancer directs work to workers based on their current capacity and performance characteristics.

Key features:

  • Even work distribution
  • Metric-based routing decisions
  • Automatic worker selection
  • Adaptive load management

func NewLoadBalancer added in v0.0.2

func NewLoadBalancer(workerCount, workerCapacity int) *LoadBalancer

NewLoadBalancer creates a new load balancer regulator with specified parameters.

Parameters:

  • workerCount: Initial number of workers to balance between
  • workerCapacity: Maximum concurrent jobs per worker

Returns:

  • *LoadBalancer: A new load balancer instance

Example:

balancer := NewLoadBalancer(5, 10) // 5 workers, 10 jobs each

func (*LoadBalancer) Limit added in v0.0.2

func (lb *LoadBalancer) Limit() bool

Limit implements the Regulator interface by determining if work should be limited. Returns true if all workers are at capacity and no more work should be accepted.

Returns:

  • bool: true if work should be limited, false if it can proceed

Thread-safety: This method is thread-safe through mutex protection.

func (*LoadBalancer) Observe added in v0.0.2

func (lb *LoadBalancer) Observe(metrics *Metrics)

Observe implements the Regulator interface by monitoring system metrics. This method updates the load balancer's view of worker performance and system state, allowing it to make informed routing decisions.

Parameters:

  • metrics: Current system metrics including worker performance data

func (*LoadBalancer) RecordJobComplete added in v0.0.2

func (lb *LoadBalancer) RecordJobComplete(workerID int, duration time.Duration)

RecordJobComplete updates worker statistics when a job completes processing. This helps maintain accurate load and performance information.

Parameters:

  • workerID: The ID of the worker that completed the job
  • duration: How long the job took to process

func (*LoadBalancer) RecordJobStart added in v0.0.2

func (lb *LoadBalancer) RecordJobStart(workerID int)

RecordJobStart updates worker statistics when a job starts processing. This helps maintain accurate load information for future routing decisions.

Parameters:

  • workerID: The ID of the worker that started the job

func (*LoadBalancer) Renormalize added in v0.0.2

func (lb *LoadBalancer) Renormalize()

Renormalize implements the Regulator interface by attempting to restore normal operation. This method resets worker statistics and redistributes load if necessary.

func (*LoadBalancer) SelectWorker added in v0.0.2

func (lb *LoadBalancer) SelectWorker() (int, error)

SelectWorker chooses the most appropriate worker for the next job based on current load distribution and worker performance metrics.

Returns:

  • int: The selected worker ID
  • error: Error if no suitable worker is available

type Metrics

type Metrics struct {
	WorkerCount          int
	JobQueueSize         int
	ActiveWorkers        int
	LastScale            time.Time
	ErrorRates           map[string]float64
	TotalJobTime         time.Duration
	JobCount             int64
	CircuitBreakerStates map[string]CircuitState

	// Additional suggested metrics
	AverageJobLatency   time.Duration
	P95JobLatency       time.Duration
	P99JobLatency       time.Duration
	JobSuccessRate      float64
	QueueWaitTime       time.Duration
	ResourceUtilization float64

	// Rate limiting metrics
	RateLimitHits int64
	ThrottledJobs int64

	// SchedulingFailures field to track scheduling timeouts
	SchedulingFailures int64

	// Additional metrics
	FailureCount int64
	// contains filtered or unexported fields
}

Metrics tracks and stores various performance metrics for the worker pool.

func NewMetrics added in v0.0.2

func NewMetrics() *Metrics

NewMetrics creates and initializes a new Metrics instance.

func (*Metrics) ExportMetrics

func (m *Metrics) ExportMetrics() map[string]interface{}

Add metrics export functionality

func (*Metrics) RecordJobExecution added in v0.0.2

func (m *Metrics) RecordJobExecution(startTime time.Time, success bool)

RecordJobExecution records the execution time and success status of a job.

func (*Metrics) RecordJobFailure

func (m *Metrics) RecordJobFailure()

RecordJobFailure records the failure of a job and updates metrics

func (*Metrics) RecordJobSuccess

func (m *Metrics) RecordJobSuccess(latency time.Duration)

type ObservationEffect added in v0.0.2

type ObservationEffect struct {
	ObserverID    string
	ObservedAt    time.Time
	StateCollapse bool
	Uncertainty   UncertaintyLevel
}

ObservationEffect represents how observation affects the quantum state

type Probability added in v0.0.2

type Probability float64

Probability represents a quantum state probability

type Q

type Q struct {
	// contains filtered or unexported fields
}
Q is our hybrid worker pool/message queue implementation.

It combines traditional worker pool functionality with quantum-inspired state management. The pool maintains a balance between worker availability and job scheduling while providing quantum-like properties such as state superposition and entanglement through its integration with QSpace.

Key features:

  • Dynamic worker scaling
  • Circuit breaker pattern support
  • Quantum-inspired state management
  • Metrics collection and monitoring

func NewQ

func NewQ(ctx context.Context, minWorkers, maxWorkers int, config *Config) *Q
NewQ creates a new quantum pool with the specified worker constraints and configuration.

The pool initializes with the minimum number of workers and scales dynamically based on load.

Parameters:

  • ctx: Parent context for lifecycle management
  • minWorkers: Minimum number of workers to maintain
  • maxWorkers: Maximum number of workers allowed
  • config: Pool configuration parameters

Returns:

  • *Q: A new quantum pool instance

func (*Q) Close

func (q *Q) Close()
Close gracefully shuts down the quantum pool.

It ensures all workers complete their current jobs and cleans up resources. The shutdown process:

  1. Cancels the pool's context
  2. Waits for all goroutines to complete
  3. Closes all channels safely
  4. Cleans up worker resources

func (*Q) CreateBroadcastGroup

func (q *Q) CreateBroadcastGroup(id string, ttl time.Duration) *BroadcastGroup
CreateBroadcastGroup creates a new broadcast group.

Initializes a new broadcast group with specified parameters and default quantum properties such as minimum uncertainty.

func (*Q) Schedule

func (q *Q) Schedule(id string, fn func() (any, error), opts ...JobOption) chan *QValue
Schedule submits a job to the quantum pool for execution.

The job is processed according to quantum-inspired principles, maintaining state history and uncertainty levels through QSpace integration.

Parameters:

  • id: Unique identifier for the job
  • fn: The function to execute
  • opts: Optional job configuration parameters

Returns:

  • chan *QValue: Channel that will receive the job's result

func (*Q) Subscribe

func (q *Q) Subscribe(groupID string) chan QValue
Subscribe returns a channel for receiving values from a broadcast group.

Provides a channel for receiving quantum values from a specific broadcast group.

type QSpace added in v0.0.2

type QSpace struct {
	// contains filtered or unexported fields
}

QSpace represents a quantum-like state space.

It provides a managed environment for quantum-inspired values, maintaining their states, relationships, and uncertainties. The space implements concepts from quantum mechanics such as entanglement, state superposition, and the uncertainty principle.

Key features:

  • Quantum value storage and retrieval
  • State transition history
  • Entanglement management
  • Relationship tracking
  • Automatic resource cleanup

func NewQSpace added in v0.0.2

func NewQSpace() *QSpace

NewQSpace creates a new quantum space.

Initializes a new space with default uncertainty principles and starts maintenance goroutines for cleanup and uncertainty monitoring.

Returns:

  • *QSpace: A new quantum space instance ready for use

func (*QSpace) AddRelationship added in v0.0.2

func (qs *QSpace) AddRelationship(parentID, childID string) error

AddRelationship establishes a parent-child relationship between values.

Creates directed relationships between values while preventing circular dependencies that could cause deadlocks or infinite loops.

Parameters:

  • parentID: ID of the parent value
  • childID: ID of the child value

Returns:

  • error: Error if the relationship would create a circular dependency

Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.

func (*QSpace) Await added in v0.0.2

func (qs *QSpace) Await(id string) chan *QValue

Await returns a channel that will receive the quantum value.

Implements quantum-inspired delayed observation, where values may be uncertain or not yet collapsed to a definite state.

Parameters:

  • id: The identifier of the value to await

Returns:

  • chan *QValue: Channel that will receive the value when available

Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.

func (*QSpace) Close added in v0.0.2

func (qs *QSpace) Close()

Close shuts down the quantum space.

Performs graceful shutdown of the quantum space, cleaning up resources and closing all channels safely.

Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.

func (*QSpace) CreateBroadcastGroup added in v0.0.2

func (qs *QSpace) CreateBroadcastGroup(id string, ttl time.Duration) *BroadcastGroup

CreateBroadcastGroup creates a new broadcast group.

Initializes a new broadcast group with specified parameters and default quantum properties such as minimum uncertainty.

func (*QSpace) CreateEntanglement added in v0.0.2

func (qs *QSpace) CreateEntanglement(ids []string) *Entanglement

CreateEntanglement establishes quantum entanglement between values.

Creates and manages relationships between quantum values that should maintain synchronized states. Like quantum entanglement in physics, changes to one value affect all entangled values.

Parameters:

  • ids: Slice of value IDs to entangle together

Returns:

  • *Entanglement: A new entanglement instance managing the relationship

Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.

func (*QSpace) CreateQuantumEntanglement added in v0.0.2

func (qs *QSpace) CreateQuantumEntanglement(id string) *Entanglement

Enhance existing QSpace

func (*QSpace) Exists added in v0.0.2

func (qs *QSpace) Exists(id string) bool

Exists checks if a value exists in the space

func (*QSpace) GetStateHistory added in v0.0.2

func (qs *QSpace) GetStateHistory(valueID string) []StateTransition

GetStateHistory returns the state transition history for a value.

Provides access to the complete history of state transitions for analysis and debugging purposes.

Parameters:

  • valueID: ID of the value to get history for

Returns:

  • []StateTransition: Ordered list of state transitions for the value

Thread-safe: This method uses read-lock to ensure safe concurrent access.

func (*QSpace) Store added in v0.0.2

func (qs *QSpace) Store(id string, value interface{}, states []State, ttl time.Duration)

Store stores a quantum value with proper uncertainty handling.

Values are stored with their associated states and TTL, maintaining quantum-inspired properties such as superposition and uncertainty.

Parameters:

  • id: Unique identifier for the value
  • value: The actual value to store
  • states: Possible states with their probabilities
  • ttl: Time-to-live duration for the value

Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.

func (*QSpace) StoreError added in v0.0.2

func (qs *QSpace) StoreError(id string, err error, ttl time.Duration)

StoreError stores an error result in the quantum space

func (*QSpace) Subscribe added in v0.0.2

func (qs *QSpace) Subscribe(groupID string) chan *QValue

Subscribe returns a channel for receiving values from a broadcast group.

Provides a channel for receiving quantum values from a specific broadcast group.

type QValue added in v0.0.2

type QValue struct {

	// Core value and metadata
	Value     interface{}
	Error     error
	CreatedAt time.Time
	TTL       time.Duration

	// Quantum properties
	States       []State          // Possible superposition states
	Uncertainty  UncertaintyLevel // Heisenberg-inspired uncertainty
	Observations []ObservationEffect
	Entangled    []string // IDs of entangled values
	// contains filtered or unexported fields
}

QValue represents a value with quantum-like properties

func NewQValue added in v0.0.2

func NewQValue(initialValue interface{}, states []State) *QValue

NewQValue creates a new quantum value with initial states

func (*QValue) Entangle added in v0.0.2

func (qv *QValue) Entangle(other *QValue)

Entangle connects this value with another quantum value

func (*QValue) ID added in v0.0.2

func (qv *QValue) ID() string

ID generates a unique identifier for this quantum value

func (*QValue) Observe added in v0.0.2

func (qv *QValue) Observe(observerID string) interface{}

Observe triggers wave function collapse based on quantum rules

type QuantumState added in v0.0.2

type QuantumState struct {
	Vector      []complex128
	Uncertainty float64
}

func (*QuantumState) Collapse added in v0.0.2

func (qs *QuantumState) Collapse() any

func (*QuantumState) Measure added in v0.0.2

func (qs *QuantumState) Measure() any

type Qubit added in v0.0.2

type Qubit struct {
	// contains filtered or unexported fields
}

func NewQubit added in v0.0.2

func NewQubit(alpha, beta complex128) *Qubit

func (*Qubit) ApplyHadamard added in v0.0.2

func (q *Qubit) ApplyHadamard()

type RateLimiter added in v0.0.2

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter implements the Regulator interface using a token bucket algorithm. It controls the rate of operations by maintaining a bucket of tokens that are consumed by each operation and replenished at a fixed rate.

Like a water tank with a steady inflow and controlled outflow, this regulator ensures that operations occur at a sustainable rate, preventing system overload while allowing for brief bursts of activity when the token bucket is full.

Key features:

  • Smooth rate limiting with burst capacity
  • Configurable token replenishment rate
  • Thread-safe operation
  • Metric-aware for adaptive rate limiting

func NewRateLimiter added in v0.0.2

func NewRateLimiter(maxTokens int, refillRate time.Duration) *RateLimiter

NewRateLimiter creates a new rate limit regulator with specified parameters.

Parameters:

  • maxTokens: Maximum number of tokens (burst capacity)
  • refillRate: Duration between token replenishments

Returns:

  • *RateLimiter: A new rate limit regulator instance

Example:

limiter := NewRateLimiter(100, time.Second) // 100 ops/second with burst capacity

func (*RateLimiter) Limit added in v0.0.2

func (rl *RateLimiter) Limit() bool

Limit implements the Regulator interface by determining if an operation should be limited. It consumes a token if available, allowing the operation to proceed. If no tokens are available, the operation is limited.

Returns:

  • bool: true if the operation should be limited, false if it can proceed

Thread-safety: This method is thread-safe through mutex protection.

func (*RateLimiter) Observe added in v0.0.2

func (rl *RateLimiter) Observe(metrics *Metrics)

Observe implements the Regulator interface by monitoring system metrics. The rate limiter can use these metrics to dynamically adjust its rate limits based on system conditions.

For example, it might:

  • Reduce rates during high system load
  • Increase limits when resources are abundant
  • Adjust burst capacity based on queue length

Parameters:

  • metrics: Current system metrics including performance and health indicators

func (*RateLimiter) Renormalize added in v0.0.2

func (rl *RateLimiter) Renormalize()

Renormalize implements the Regulator interface by attempting to restore normal operation. This method triggers a token refill, potentially allowing more operations to proceed if enough time has passed since the last refill.

The rate limiter uses this method to maintain a steady flow of operations while adhering to the configured rate limits.

type Regulator added in v0.0.2

type Regulator interface {
	// Observe allows the regulator to monitor system metrics and state.
	// This is analogous to a sensor in a mechanical regulator, providing
	// the feedback necessary for making control decisions.
	//
	// Parameters:
	//   - metrics: Current system metrics including performance and health indicators
	Observe(metrics *Metrics)

	// Limit determines if the regulated action should be restricted.
	// Returns true if the action should be limited, false if it should proceed.
	// This is the main control point where the regulator decides whether to
	// allow or restrict operations based on observed conditions.
	//
	// Returns:
	//   - bool: true if the action should be limited, false if it should proceed
	Limit() bool

	// Renormalize attempts to return the system to a normal operating state.
	// This is similar to a feedback loop in control systems, where the regulator
	// takes active steps to restore normal operations after a period of restriction.
	// The exact meaning of "normal" depends on the specific regulator implementation.
	Renormalize()
}

Regulator defines an interface for types that regulate the flow and behavior of the pool. Inspired by biological and mechanical regulators that maintain system homeostasis, this interface provides a common pattern for implementing various regulation mechanisms.

Each regulator acts as a control system component, monitoring and adjusting the pool's behavior to maintain optimal performance and stability. Like a thermostat or pressure regulator in physical systems, these regulators help maintain the system within desired operational parameters.

Examples of regulators include:

  • CircuitBreaker: Prevents cascading failures by stopping operations when error rates are high
  • RateLimit: Controls the flow rate of jobs to prevent system overload
  • LoadBalancer: Distributes work evenly across available resources
  • BackPressure: Prevents system overload by controlling input rates
  • ResourceGovernor: Manages resource consumption within defined limits

func NewRegulator added in v0.0.2

func NewRegulator(regulatorType Regulator) Regulator

NewRegulator creates a new regulator of the specified type. This factory function allows for flexible creation of different regulator types while maintaining a consistent interface for the pool to interact with.

Parameters:

  • regulatorType: A concrete implementation of the Regulator interface

Returns:

  • Regulator: The initialized regulator instance

Example:

circuitBreaker := NewCircuitBreakerRegulator(5, time.Minute, 3)
regulator := NewRegulator(circuitBreaker)

type ResourceGovernorRegulator added in v0.0.2

type ResourceGovernorRegulator struct {
	// contains filtered or unexported fields
}

ResourceGovernorRegulator implements the Regulator interface to manage system resources. It monitors and controls resource usage (CPU, memory, etc.) to prevent system exhaustion, similar to how a power governor prevents engine damage by limiting power consumption under heavy load.

Key features:

  • CPU usage monitoring
  • Memory usage tracking
  • Resource thresholds
  • Adaptive limiting

func NewResourceGovernorRegulator added in v0.0.2

func NewResourceGovernorRegulator(maxCPUPercent, maxMemoryPercent float64, checkInterval time.Duration) *ResourceGovernorRegulator

NewResourceGovernorRegulator creates a new resource governor regulator.

Parameters:

  • maxCPUPercent: Maximum allowed CPU usage (0.0-1.0)
  • maxMemoryPercent: Maximum allowed memory usage (0.0-1.0)
  • checkInterval: How often to check resource usage

Returns:

  • *ResourceGovernorRegulator: A new resource governor instance

Example:

governor := NewResourceGovernorRegulator(0.8, 0.9, time.Second)

func (*ResourceGovernorRegulator) GetResourceUsage added in v0.0.2

func (rg *ResourceGovernorRegulator) GetResourceUsage() (cpu, memory float64)

GetResourceUsage returns current resource utilization levels

func (*ResourceGovernorRegulator) GetThresholds added in v0.0.2

func (rg *ResourceGovernorRegulator) GetThresholds() (cpu, memory float64)

GetThresholds returns the current resource usage thresholds

func (*ResourceGovernorRegulator) Limit added in v0.0.2

func (rg *ResourceGovernorRegulator) Limit() bool

Limit implements the Regulator interface by determining if resource usage should be limited. Returns true when resource usage exceeds thresholds.

Returns:

  • bool: true if resource usage should be limited, false if it can proceed

func (*ResourceGovernorRegulator) Observe added in v0.0.2

func (rg *ResourceGovernorRegulator) Observe(metrics *Metrics)

Observe implements the Regulator interface by monitoring system metrics. This method updates the governor's view of resource utilization based on current system metrics.

Parameters:

  • metrics: Current system metrics including resource utilization data

func (*ResourceGovernorRegulator) Renormalize added in v0.0.2

func (rg *ResourceGovernorRegulator) Renormalize()

Renormalize implements the Regulator interface by attempting to restore normal operation. This method updates resource usage measurements and adjusts thresholds if necessary.

type RetryPolicy

type RetryPolicy struct {
	MaxAttempts int
	Strategy    RetryStrategy
	BackoffFunc func(attempt int) time.Duration
	Filter      func(error) bool
}

RetryPolicy defines retry behavior

type RetryStrategy

type RetryStrategy interface {
	NextDelay(attempt int) time.Duration
}

RetryStrategy defines the interface for retry behavior

type RoutingRule added in v0.0.2

type RoutingRule struct {
	SubscriberID string
	Filter       FilterFunc
	Priority     int
}
RoutingRule defines how messages should be routed to specific subscribers.

It combines subscriber identification with filtering logic and priority levels to enable sophisticated message routing in the broadcast system.

type Scaler

type Scaler struct {
	// contains filtered or unexported fields
}

Scaler manages pool size based on current load

func NewScaler

func NewScaler(q *Q, minWorkers, maxWorkers int, config *ScalerConfig) *Scaler

NewScaler initializes and starts a new Scaler

type ScalerConfig

type ScalerConfig struct {
	TargetLoad         float64
	ScaleUpThreshold   float64
	ScaleDownThreshold float64
	Cooldown           time.Duration
}

ScalerConfig defines configuration for the Scaler

type State added in v0.0.2

type State struct {
	Value       interface{}
	Probability float64
	Amplitude   complex128 // For future quantum computing features
	Evidence    []Evidence // Supporting evidence for this state
}

State represents a possible quantum state with its probability amplitude and associated verification information.

type StateChange added in v0.0.2

type StateChange struct {
	Timestamp time.Time
	Key       string
	Value     any
	Sequence  uint64 // Monotonically increasing sequence number
}

StateChange represents an immutable record of a change to the shared state. Each change is timestamped and contains both the key and value that was changed, allowing for precise replay of state evolution.

type StateTransition added in v0.0.2

type StateTransition struct {
	ValueID   string
	FromState State
	ToState   State
	Timestamp time.Time
	Cause     string
}

StateTransition represents a change in quantum state.

It records the complete history of state changes, including the previous and new states, timing information, and the cause of the transition. This maintains a quantum-like state history that can be used to understand the evolution of values over time.

type UncertaintyLevel added in v0.0.2

type UncertaintyLevel float64

UncertaintyLevel defines how uncertain we are about a value

const (
	MinUncertainty UncertaintyLevel = 0.0
	MaxUncertainty UncertaintyLevel = 1.0
)

type UncertaintyPrinciple added in v0.0.2

type UncertaintyPrinciple struct {
	MinDeltaTime    time.Duration // Minimum time between observations
	MaxDeltaTime    time.Duration // Maximum time before max uncertainty
	BaseUncertainty UncertaintyLevel
}

UncertaintyPrinciple enforces quantum-like uncertainty rules.

It implements Heisenberg-inspired uncertainty principles where observation and time affect the certainty of quantum values. The principle ensures that values become more uncertain over time and that frequent observations impact the system's state.

Key concepts:

  • Minimum time between observations to limit measurement impact
  • Maximum time before reaching maximum uncertainty
  • Base uncertainty level for all measurements

type WaveFunction added in v0.0.2

type WaveFunction struct {
	States      []State
	Uncertainty UncertaintyLevel
	// contains filtered or unexported fields
}

WaveFunction represents a quantum state that can exist in multiple possible states simultaneously until observation or verification forces a collapse into a definite state.

func NewWaveFunction added in v0.0.2

func NewWaveFunction(
	states []State,
	uncertainty UncertaintyLevel,
	methodDiversity float64,
) *WaveFunction

func (*WaveFunction) AddEvidence added in v0.0.2

func (wf *WaveFunction) AddEvidence(stateValue interface{}, evidence Evidence)

AddEvidence allows adding new evidence to a state after creation.

func (*WaveFunction) Collapse added in v0.0.2

func (wf *WaveFunction) Collapse() interface{}

Collapse forces the wave function to choose a definite state based on both probabilities and verification evidence. The collapse mechanism considers: 1. State probabilities 2. Method diversity 3. Evidence quality 4. Uncertainty level

func (*WaveFunction) UpdateMethodDiversity added in v0.0.2

func (wf *WaveFunction) UpdateMethodDiversity(diversity float64)

UpdateMethodDiversity allows updating the method diversity score as new verification methods are added.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker processes jobs

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL