Documentation
¶
Overview ¶
broadcastgroup.go
qspace.go
qvalue.go
wavefunction.go
Index ¶
- Variables
- func Max(a, b int) int
- func MaxFloat(a, b float64) float64
- func Min(a, b int) int
- func MinFloat(a, b float64) float64
- type AdaptiveScalerRegulator
- type BackPressureRegulator
- type BroadcastGroup
- func (bg *BroadcastGroup) AddFilter(filter FilterFunc)
- func (bg *BroadcastGroup) AddRoutingRule(subscriberID string, rule RoutingRule)
- func (bg *BroadcastGroup) Close()
- func (bg *BroadcastGroup) GetMetrics() BroadcastMetrics
- func (bg *BroadcastGroup) Send(qv *QValue)
- func (bg *BroadcastGroup) SetEntanglement(e *Entanglement)
- func (bg *BroadcastGroup) Subscribe(subscriberID string, bufferSize int, rules ...RoutingRule) chan *QValue
- func (bg *BroadcastGroup) Unsubscribe(subscriberID string)
- type BroadcastMetrics
- type CircuitBreaker
- type CircuitBreakerConfig
- type CircuitState
- type Config
- type Entanglement
- func (e *Entanglement) AddJob(job Job)
- func (e *Entanglement) GetState(key string) (any, bool)
- func (e *Entanglement) GetStateHistory(sinceSequence uint64) []StateChange
- func (e *Entanglement) IsExpired() bool
- func (e *Entanglement) RemoveJob(jobID string) bool
- func (e *Entanglement) ReplayStateChanges(job Job)
- func (e *Entanglement) UpdateQuantumState(key string, state *State)
- func (e *Entanglement) UpdateState(key string, value any)
- type Evidence
- type ExponentialBackoff
- type FilterFunc
- type Job
- type JobOption
- func WithCircuitBreaker(id string, maxFailures int, resetTimeout time.Duration) JobOption
- func WithDependencies(dependencies []string) JobOption
- func WithDependencyRetry(attempts int, strategy RetryStrategy) JobOption
- func WithRetry(attempts int, strategy RetryStrategy) JobOption
- func WithTTL(ttl time.Duration) JobOption
- type LoadBalancer
- func (lb *LoadBalancer) Limit() bool
- func (lb *LoadBalancer) Observe(metrics *Metrics)
- func (lb *LoadBalancer) RecordJobComplete(workerID int, duration time.Duration)
- func (lb *LoadBalancer) RecordJobStart(workerID int)
- func (lb *LoadBalancer) Renormalize()
- func (lb *LoadBalancer) SelectWorker() (int, error)
- type Metrics
- type ObservationEffect
- type Probability
- type Q
- type QSpace
- func (qs *QSpace) AddRelationship(parentID, childID string) error
- func (qs *QSpace) Await(id string) chan *QValue
- func (qs *QSpace) Close()
- func (qs *QSpace) CreateBroadcastGroup(id string, ttl time.Duration) *BroadcastGroup
- func (qs *QSpace) CreateEntanglement(ids []string) *Entanglement
- func (qs *QSpace) CreateQuantumEntanglement(id string) *Entanglement
- func (qs *QSpace) Exists(id string) bool
- func (qs *QSpace) GetStateHistory(valueID string) []StateTransition
- func (qs *QSpace) Store(id string, value interface{}, states []State, ttl time.Duration)
- func (qs *QSpace) StoreError(id string, err error, ttl time.Duration)
- func (qs *QSpace) Subscribe(groupID string) chan *QValue
- type QValue
- type QuantumState
- type Qubit
- type RateLimiter
- type Regulator
- type ResourceGovernorRegulator
- func (rg *ResourceGovernorRegulator) GetResourceUsage() (cpu, memory float64)
- func (rg *ResourceGovernorRegulator) GetThresholds() (cpu, memory float64)
- func (rg *ResourceGovernorRegulator) Limit() bool
- func (rg *ResourceGovernorRegulator) Observe(metrics *Metrics)
- func (rg *ResourceGovernorRegulator) Renormalize()
- type RetryPolicy
- type RetryStrategy
- type RoutingRule
- type Scaler
- type ScalerConfig
- type State
- type StateChange
- type StateTransition
- type UncertaintyLevel
- type UncertaintyPrinciple
- type WaveFunction
- type Worker
Constants ¶
This section is empty.
Variables ¶
var ErrNoAvailableWorkers = errors.New("no workers available to process job")
ErrNoAvailableWorkers is returned when no workers are available to process a job
Functions ¶
Types ¶
type AdaptiveScalerRegulator ¶ added in v0.0.2
type AdaptiveScalerRegulator struct {
// contains filtered or unexported fields
}
AdaptiveScalerRegulator implements the Regulator interface to dynamically adjust worker pool size. It combines the functionality of the existing Scaler with additional adaptive behaviors, similar to how an adaptive cruise control system adjusts speed based on traffic conditions.
Key features:
- Dynamic worker pool sizing
- Load-based scaling
- Resource-aware adjustments
- Performance optimization
func NewAdaptiveScalerRegulator ¶ added in v0.0.2
func NewAdaptiveScalerRegulator(pool *Q, minWorkers, maxWorkers int, config *ScalerConfig) *AdaptiveScalerRegulator
NewAdaptiveScalerRegulator creates a new adaptive scaler regulator.
Parameters:
- pool: The worker pool to manage
- minWorkers: Minimum number of workers
- maxWorkers: Maximum number of workers
- config: Scaling configuration parameters
Returns:
- *AdaptiveScalerRegulator: A new adaptive scaler instance
Example:
scaler := NewAdaptiveScalerRegulator(pool, 2, 10, &ScalerConfig{...})
func (*AdaptiveScalerRegulator) Limit ¶ added in v0.0.2
func (as *AdaptiveScalerRegulator) Limit() bool
Limit implements the Regulator interface by determining if scaling operations should be limited. Returns true during cooldown periods or at worker limits.
Returns:
- bool: true if scaling should be limited, false if it can proceed
func (*AdaptiveScalerRegulator) Observe ¶ added in v0.0.2
func (as *AdaptiveScalerRegulator) Observe(metrics *Metrics)
Observe implements the Regulator interface by monitoring system metrics. This method updates the scaler's view of system load and performance.
Parameters:
- metrics: Current system metrics including worker and queue statistics
func (*AdaptiveScalerRegulator) Renormalize ¶ added in v0.0.2
func (as *AdaptiveScalerRegulator) Renormalize()
Renormalize implements the Regulator interface by attempting to restore normal operation. This method triggers a scaling evaluation if enough time has passed since the last scale.
type BackPressureRegulator ¶ added in v0.0.2
type BackPressureRegulator struct {
// contains filtered or unexported fields
}
BackPressureRegulator implements the Regulator interface to prevent system overload. It monitors queue depth and processing times to regulate job intake, similar to how pressure regulators in plumbing systems prevent pipe damage by limiting flow when pressure builds up.
Key features:
- Queue depth monitoring
- Processing time tracking
- Adaptive pressure thresholds
- Gradual flow control
func NewBackPressureRegulator ¶ added in v0.0.2
func NewBackPressureRegulator(maxQueueSize int, targetProcessTime, pressureWindow time.Duration) *BackPressureRegulator
NewBackPressureRegulator creates a new back pressure regulator.
Parameters:
- maxQueueSize: Maximum allowed queue size before applying back pressure
- targetProcessTime: Target job processing time
- pressureWindow: Time window for pressure calculations
Returns:
- *BackPressureRegulator: A new back pressure regulator instance
Example:
regulator := NewBackPressureRegulator(1000, time.Second, time.Minute)
func (*BackPressureRegulator) GetPressure ¶ added in v0.0.2
func (bp *BackPressureRegulator) GetPressure() float64
GetPressure returns the current system pressure level
func (*BackPressureRegulator) Limit ¶ added in v0.0.2
func (bp *BackPressureRegulator) Limit() bool
Limit implements the Regulator interface by determining if job intake should be limited. Returns true when system pressure exceeds acceptable levels.
Returns:
- bool: true if job intake should be limited, false if it can proceed
func (*BackPressureRegulator) Observe ¶ added in v0.0.2
func (bp *BackPressureRegulator) Observe(metrics *Metrics)
Observe implements the Regulator interface by monitoring system metrics. This method updates the regulator's view of system pressure based on queue size and processing times.
Parameters:
- metrics: Current system metrics including queue and timing data
func (*BackPressureRegulator) Renormalize ¶ added in v0.0.2
func (bp *BackPressureRegulator) Renormalize()
Renormalize implements the Regulator interface by attempting to restore normal operation. This method gradually reduces system pressure if conditions allow.
type BroadcastGroup ¶
type BroadcastGroup struct {
ID string
// Management
TTL time.Duration
LastUsed time.Time
// contains filtered or unexported fields
}
BroadcastGroup implements a quantum-aware pub/sub system.
It provides a quantum-inspired broadcast mechanism that maintains quantum properties such as entanglement and uncertainty while distributing messages to subscribers.
Key features:
- Quantum state preservation
- Filtered message routing
- Subscriber management
- Metrics collection
- Entanglement support
func NewBroadcastGroup ¶ added in v0.0.2
func NewBroadcastGroup(id string, ttl time.Duration, maxQueue int) *BroadcastGroup
NewBroadcastGroup creates a new broadcast group with quantum properties.
Initializes a new broadcast group with specified parameters and default quantum properties such as minimum uncertainty.
Parameters:
- id: Unique identifier for the broadcast group
- ttl: Time-to-live duration for the group
- maxQueue: Maximum queue size for message buffering
Returns:
- *BroadcastGroup: A new broadcast group instance
func (*BroadcastGroup) AddFilter ¶ added in v0.0.2
func (bg *BroadcastGroup) AddFilter(filter FilterFunc)
AddFilter adds a global filter to the broadcast group.
Registers a new filter function that will be applied to all messages before broadcasting.
Parameters:
- filter: The filter function to add
Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.
func (*BroadcastGroup) AddRoutingRule ¶ added in v0.0.2
func (bg *BroadcastGroup) AddRoutingRule(subscriberID string, rule RoutingRule)
AddRoutingRule adds a routing rule for a specific subscriber.
Adds a new routing rule that determines how messages should be filtered for a specific subscriber.
Parameters:
- subscriberID: ID of the subscriber to add the rule for
- rule: The routing rule to add
Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.
func (*BroadcastGroup) Close ¶ added in v0.0.2
func (bg *BroadcastGroup) Close()
Close shuts down the broadcast group and cleans up resources.
Performs graceful shutdown of the broadcast group, closing all subscriber channels and cleaning up internal resources.
Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.
func (*BroadcastGroup) GetMetrics ¶ added in v0.0.2
func (bg *BroadcastGroup) GetMetrics() BroadcastMetrics
GetMetrics returns current broadcast metrics.
Provides access to the current operational metrics of the broadcast group.
Returns:
- BroadcastMetrics: Copy of the current metrics
Thread-safe: This method uses read-lock to ensure safe concurrent access.
func (*BroadcastGroup) Send ¶
func (bg *BroadcastGroup) Send(qv *QValue)
Send broadcasts a quantum value to all applicable subscribers.
Distributes a quantum value to subscribers according to routing rules while maintaining quantum properties and updating metrics.
Parameters:
- qv: The quantum value to broadcast
Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.
func (*BroadcastGroup) SetEntanglement ¶ added in v0.0.2
func (bg *BroadcastGroup) SetEntanglement(e *Entanglement)
SetEntanglement connects the broadcast group to an entanglement.
Associates the broadcast group with a quantum entanglement, allowing it to participate in quantum-like state synchronization.
Parameters:
- e: The entanglement to connect to
Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.
func (*BroadcastGroup) Subscribe ¶ added in v0.0.2
func (bg *BroadcastGroup) Subscribe(subscriberID string, bufferSize int, rules ...RoutingRule) chan *QValue
Subscribe adds a new subscriber with optional filtering and routing rules.
Creates and registers a new subscriber channel with specified buffer size and optional routing rules for message filtering.
Parameters:
- subscriberID: Unique identifier for the subscriber
- bufferSize: Size of the subscriber's message buffer
- rules: Optional routing rules for message filtering
Returns:
- chan *QValue: Channel for receiving broadcast messages
Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.
func (*BroadcastGroup) Unsubscribe ¶ added in v0.0.2
func (bg *BroadcastGroup) Unsubscribe(subscriberID string)
Unsubscribe removes a subscriber and cleans up associated resources.
Safely removes a subscriber from the broadcast group, closing their channel and cleaning up any associated routing rules.
Parameters:
- subscriberID: ID of the subscriber to remove
Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.
type BroadcastMetrics ¶ added in v0.0.2
type BroadcastMetrics struct {
MessagesSent int64
MessagesDropped int64
AverageLatency time.Duration
ActiveSubscribers int
UncertaintyLevel UncertaintyLevel
LastBroadcastTime time.Time
}
BroadcastMetrics tracks performance and behavior of the broadcast group.
Collects and maintains statistical information about the broadcast group's operation, including message counts, latency, and quantum uncertainty levels.
type CircuitBreaker ¶
type CircuitBreaker struct {
// contains filtered or unexported fields
}
CircuitBreaker implements both the circuit breaker pattern and Regulator interface. It provides a way to automatically degrade service when the system is under stress by temporarily stopping operations when a failure threshold is reached.
The circuit breaker operates in three states:
- Closed: Normal operation, all requests are allowed
- Open: Failure threshold exceeded, all requests are rejected
- Half-Open: Probationary state allowing limited requests to test system health
This pattern helps prevent cascading failures and allows the system to recover from failure states without overwhelming potentially unstable dependencies.
func NewCircuitBreaker ¶ added in v0.0.2
func NewCircuitBreaker(maxFailures int, resetTimeout time.Duration, halfOpenMax int) *CircuitBreaker
NewCircuitBreaker creates a new circuit breaker instance with specified parameters.
Parameters:
- maxFailures: Number of failures allowed before opening the circuit
- resetTimeout: Duration to wait before attempting to close an open circuit
- halfOpenMax: Maximum number of requests allowed in half-open state
Returns:
- *CircuitBreaker: A new circuit breaker instance initialized in closed state
func (*CircuitBreaker) Allow ¶
func (cb *CircuitBreaker) Allow() bool
Allow determines if a request is allowed based on the circuit state. This method implements the core circuit breaker logic, determining whether to allow requests based on the current state and timing conditions.
Returns:
- bool: true if the request should be allowed, false if it should be rejected
func (*CircuitBreaker) Limit ¶ added in v0.0.2
func (cb *CircuitBreaker) Limit() bool
Limit implements the Regulator interface by determining if requests should be limited. This method provides a standardized way to check if the circuit breaker is currently preventing operations.
Returns:
- bool: true if requests should be limited, false if they should proceed
func (*CircuitBreaker) Observe ¶ added in v0.0.2
func (cb *CircuitBreaker) Observe(metrics *Metrics)
Observe implements the Regulator interface by accepting system metrics. This method allows the circuit breaker to monitor system health and adjust its behavior based on current conditions.
Parameters:
- metrics: Current system metrics including performance and health indicators
func (*CircuitBreaker) RecordFailure ¶
func (cb *CircuitBreaker) RecordFailure()
RecordFailure records a failure and updates the circuit state. This method tracks the number of failures and opens the circuit if the failure threshold is exceeded. It handles state transitions differently based on the current circuit state.
func (*CircuitBreaker) RecordSuccess ¶
func (cb *CircuitBreaker) RecordSuccess()
RecordSuccess records a successful attempt and updates the circuit state. This method handles the transition from half-open to closed state after successful operations, and resets failure counts in closed state.
func (*CircuitBreaker) Renormalize ¶ added in v0.0.2
func (cb *CircuitBreaker) Renormalize()
Renormalize implements the Regulator interface by attempting to restore normal operation. This method checks if enough time has passed since the circuit was opened and transitions to half-open state if appropriate, allowing for system recovery.
type CircuitBreakerConfig ¶
CircuitBreakerConfig defines configuration for a circuit breaker
type CircuitState ¶
type CircuitState int
CircuitState represents the state of the circuit breaker. This is used to track the current operational mode of the circuit breaker as it transitions between different states based on system health.
const ( CircuitClosed CircuitState = iota // Normal operation state CircuitOpen // Failure state, rejecting requests CircuitHalfOpen // Probationary state, allowing limited requests )
type Entanglement ¶ added in v0.0.2
type Entanglement struct {
ID string
Jobs []Job
CreatedAt time.Time
LastModified time.Time
Dependencies []string
TTL time.Duration
OnStateChange func(oldState, newState map[string]any)
// contains filtered or unexported fields
}
Entanglement wraps a selection of jobs into a shared space. Meant for jobs that each describe part of a larger task. When one job in the entanglement changes state, it affects all others.
Inspired by quantum entanglement, this type provides a way to create groups of jobs that share state and react to changes in that state simultaneously. Just as quantum particles can be entangled such that the state of one instantly affects the other, jobs in an Entanglement share a common state that, when changed, affects all jobs in the group.
The shared state in an Entanglement is immutable and persistent. State changes are recorded in a ledger and replayed for any job that joins or starts processing later, ensuring that the quantum-like property of entanglement is maintained across time. This means that even if jobs process at different times, they all see the complete history of state changes, preserving the causal relationship between entangled jobs.
Use cases include:
- Distributed data processing where multiple jobs need to share intermediate results
- Coordinated tasks where jobs need to react to each other's progress
- State synchronization across a set of related operations
- Fan-out/fan-in patterns where multiple jobs contribute to a shared outcome
func NewEntanglement ¶ added in v0.0.2
func NewEntanglement(id string, jobs []Job, ttl time.Duration) *Entanglement
NewEntanglement creates a new entanglement of jobs with the specified ID and TTL.
The entanglement acts as a quantum-inspired container that maintains shared state across multiple jobs. Like quantum entangled particles that remain connected regardless of distance, jobs in the entanglement remain connected through their shared state.
The state history is preserved and replayed for any job that starts processing later, ensuring that the quantum-like property of entanglement is maintained across time.
Parameters:
- id: A unique identifier for the entanglement
- jobs: Initial set of jobs to be entangled
- ttl: Time-to-live duration after which the entanglement expires
Example:
jobs := []Job{job1, job2, job3}
entanglement := NewEntanglement("data-processing", jobs, 1*time.Hour)
func (*Entanglement) AddJob ¶ added in v0.0.2
func (e *Entanglement) AddJob(job Job)
AddJob adds a job to the entanglement.
This method expands the entanglement to include a new job, similar to how quantum systems can be expanded to include more entangled particles. The newly added job becomes part of the shared state system and will be affected by state changes.
Parameters:
- job: The job to add to the entanglement
Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.
func (*Entanglement) GetState ¶ added in v0.0.2
func (e *Entanglement) GetState(key string) (any, bool)
GetState retrieves a value from the shared state.
This method provides a way to observe the current state of the entanglement. Like quantum measurement, it provides a snapshot of the current state at the time of observation.
Parameters:
- key: The state key to retrieve
Returns:
- value: The value associated with the key
- exists: Boolean indicating whether the key exists in the state
Thread-safe: This method uses a read lock to ensure safe concurrent access.
func (*Entanglement) GetStateHistory ¶ added in v0.0.2
func (e *Entanglement) GetStateHistory(sinceSequence uint64) []StateChange
GetStateHistory returns all state changes that have occurred since a given sequence number. This allows jobs that start processing later to catch up on all state changes they missed.
Parameters:
- sinceSequence: The sequence number to start from (0 for all history)
Returns:
- []StateChange: Ordered list of state changes since the specified sequence
func (*Entanglement) IsExpired ¶ added in v0.0.2
func (e *Entanglement) IsExpired() bool
IsExpired checks if the entanglement has exceeded its TTL.
This method determines if the entanglement should be considered expired based on its Time-To-Live (TTL) duration and the time since its last modification. An expired entanglement might be cleaned up by the system, similar to how quantum entanglement can be lost due to decoherence.
Returns:
- bool: True if the entanglement has expired, false otherwise
Note: A TTL of 0 or less means the entanglement never expires.
func (*Entanglement) RemoveJob ¶ added in v0.0.2
func (e *Entanglement) RemoveJob(jobID string) bool
RemoveJob removes a job from the entanglement.
This method removes a job from the entanglement, effectively "disentangling" it from the shared state system. The removed job will no longer be affected by or contribute to state changes in the entanglement.
Parameters:
- jobID: The ID of the job to remove
Returns:
- bool: True if the job was found and removed, false otherwise
Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.
func (*Entanglement) ReplayStateChanges ¶ added in v0.0.2
func (e *Entanglement) ReplayStateChanges(job Job)
ReplayStateChanges applies all historical state changes to a newly starting job. This ensures that jobs starting later still see the complete history of state changes in the correct order.
Parameters:
- job: The job to replay state changes for
func (*Entanglement) UpdateQuantumState ¶ added in v0.0.2
func (e *Entanglement) UpdateQuantumState(key string, state *State)
func (*Entanglement) UpdateState ¶ added in v0.0.2
func (e *Entanglement) UpdateState(key string, value any)
UpdateState updates the shared state and notifies all entangled jobs of the change.
Similar to how measuring one quantum particle instantly affects its entangled partner, updating state through this method instantly affects all jobs in the entanglement. The state change is recorded in an immutable ledger, ensuring that even jobs that haven't started processing yet will see this change when they begin.
The OnStateChange callback (if set) is triggered with both the old and new state, allowing jobs to react to the change. For jobs that start later, these changes are replayed in order during their initialization.
Parameters:
- key: The state key to update
- value: The new value for the state key
Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.
type Evidence ¶ added in v0.0.2
type Evidence struct {
Method string // Verification method used
Confidence float64 // Confidence in this evidence
Data interface{} // The actual evidence data
}
Evidence represents verification data supporting a particular state.
type ExponentialBackoff ¶
ExponentialBackoff implements RetryStrategy
type FilterFunc ¶ added in v0.0.2
FilterFunc defines a function type for filtering quantum values.
This function type is used to determine whether a quantum value should be processed or ignored in the broadcast system.
Parameters:
- *QValue: The quantum value to be filtered
Returns:
- bool: True if the value should be processed, false if it should be filtered out
type Job ¶
type Job struct {
ID string
Fn func() (any, error)
RetryPolicy *RetryPolicy
CircuitID string
CircuitConfig *CircuitBreakerConfig
Dependencies []string
TTL time.Duration
Attempt int
LastError error
DependencyRetryPolicy *RetryPolicy
StartTime time.Time
}
Job represents work to be done
type JobOption ¶
type JobOption func(*Job)
JobOption is a function type for configuring jobs
func WithCircuitBreaker ¶
WithCircuitBreaker configures circuit breaker for a job
func WithDependencies ¶ added in v0.0.2
WithDependencies configures job dependencies
func WithDependencyRetry ¶
func WithDependencyRetry(attempts int, strategy RetryStrategy) JobOption
WithDependencyRetry configures retry behavior for dependencies
func WithRetry ¶
func WithRetry(attempts int, strategy RetryStrategy) JobOption
WithRetry configures retry behavior for a job
type LoadBalancer ¶ added in v0.0.2
type LoadBalancer struct {
// contains filtered or unexported fields
}
LoadBalancer implements the Regulator interface to provide intelligent work distribution. It ensures even distribution of work across workers while considering system metrics like worker load, processing speed, and resource utilization.
Like a traffic controller directing vehicles to different lanes based on congestion, the load balancer directs work to workers based on their current capacity and performance characteristics.
Key features:
- Even work distribution
- Metric-based routing decisions
- Automatic worker selection
- Adaptive load management
func NewLoadBalancer ¶ added in v0.0.2
func NewLoadBalancer(workerCount, workerCapacity int) *LoadBalancer
NewLoadBalancer creates a new load balancer regulator with specified parameters.
Parameters:
- workerCount: Initial number of workers to balance between
- workerCapacity: Maximum concurrent jobs per worker
Returns:
- *LoadBalancer: A new load balancer instance
Example:
balancer := NewLoadBalancer(5, 10) // 5 workers, 10 jobs each
func (*LoadBalancer) Limit ¶ added in v0.0.2
func (lb *LoadBalancer) Limit() bool
Limit implements the Regulator interface by determining if work should be limited. Returns true if all workers are at capacity and no more work should be accepted.
Returns:
- bool: true if work should be limited, false if it can proceed
Thread-safety: This method is thread-safe through mutex protection.
func (*LoadBalancer) Observe ¶ added in v0.0.2
func (lb *LoadBalancer) Observe(metrics *Metrics)
Observe implements the Regulator interface by monitoring system metrics. This method updates the load balancer's view of worker performance and system state, allowing it to make informed routing decisions.
Parameters:
- metrics: Current system metrics including worker performance data
func (*LoadBalancer) RecordJobComplete ¶ added in v0.0.2
func (lb *LoadBalancer) RecordJobComplete(workerID int, duration time.Duration)
RecordJobComplete updates worker statistics when a job completes processing. This helps maintain accurate load and performance information.
Parameters:
- workerID: The ID of the worker that completed the job
- duration: How long the job took to process
func (*LoadBalancer) RecordJobStart ¶ added in v0.0.2
func (lb *LoadBalancer) RecordJobStart(workerID int)
RecordJobStart updates worker statistics when a job starts processing. This helps maintain accurate load information for future routing decisions.
Parameters:
- workerID: The ID of the worker that started the job
func (*LoadBalancer) Renormalize ¶ added in v0.0.2
func (lb *LoadBalancer) Renormalize()
Renormalize implements the Regulator interface by attempting to restore normal operation. This method resets worker statistics and redistributes load if necessary.
func (*LoadBalancer) SelectWorker ¶ added in v0.0.2
func (lb *LoadBalancer) SelectWorker() (int, error)
SelectWorker chooses the most appropriate worker for the next job based on current load distribution and worker performance metrics.
Returns:
- int: The selected worker ID
- error: Error if no suitable worker is available
type Metrics ¶
type Metrics struct {
WorkerCount int
JobQueueSize int
ActiveWorkers int
LastScale time.Time
ErrorRates map[string]float64
TotalJobTime time.Duration
JobCount int64
CircuitBreakerStates map[string]CircuitState
// Additional suggested metrics
AverageJobLatency time.Duration
P95JobLatency time.Duration
P99JobLatency time.Duration
JobSuccessRate float64
QueueWaitTime time.Duration
ResourceUtilization float64
// Rate limiting metrics
RateLimitHits int64
ThrottledJobs int64
// SchedulingFailures field to track scheduling timeouts
SchedulingFailures int64
// Additional metrics
FailureCount int64
// contains filtered or unexported fields
}
Metrics tracks and stores various performance metrics for the worker pool.
func NewMetrics ¶ added in v0.0.2
func NewMetrics() *Metrics
NewMetrics creates and initializes a new Metrics instance.
func (*Metrics) ExportMetrics ¶
Add metrics export functionality
func (*Metrics) RecordJobExecution ¶ added in v0.0.2
RecordJobExecution records the execution time and success status of a job.
func (*Metrics) RecordJobFailure ¶
func (m *Metrics) RecordJobFailure()
RecordJobFailure records the failure of a job and updates metrics
func (*Metrics) RecordJobSuccess ¶
type ObservationEffect ¶ added in v0.0.2
type ObservationEffect struct {
ObserverID string
ObservedAt time.Time
StateCollapse bool
Uncertainty UncertaintyLevel
}
ObservationEffect represents how observation affects the quantum state
type Probability ¶ added in v0.0.2
type Probability float64
Probability represents a quantum state probability
type Q ¶
type Q struct {
// contains filtered or unexported fields
}
Q is our hybrid worker pool/message queue implementation.
It combines traditional worker pool functionality with quantum-inspired state management. The pool maintains a balance between worker availability and job scheduling while providing quantum-like properties such as state superposition and entanglement through its integration with QSpace.
Key features:
- Dynamic worker scaling
- Circuit breaker pattern support
- Quantum-inspired state management
- Metrics collection and monitoring
func NewQ ¶
NewQ creates a new quantum pool with the specified worker constraints and configuration.
The pool initializes with the minimum number of workers and scales dynamically based on load.
Parameters:
- ctx: Parent context for lifecycle management
- minWorkers: Minimum number of workers to maintain
- maxWorkers: Maximum number of workers allowed
- config: Pool configuration parameters
Returns:
- *Q: A new quantum pool instance
func (*Q) Close ¶
func (q *Q) Close()
Close gracefully shuts down the quantum pool.
It ensures all workers complete their current jobs and cleans up resources. The shutdown process:
- Cancels the pool's context
- Waits for all goroutines to complete
- Closes all channels safely
- Cleans up worker resources
func (*Q) CreateBroadcastGroup ¶
func (q *Q) CreateBroadcastGroup(id string, ttl time.Duration) *BroadcastGroup
CreateBroadcastGroup creates a new broadcast group.
Initializes a new broadcast group with specified parameters and default quantum properties such as minimum uncertainty.
func (*Q) Schedule ¶
Schedule submits a job to the quantum pool for execution.
The job is processed according to quantum-inspired principles, maintaining state history and uncertainty levels through QSpace integration.
Parameters:
- id: Unique identifier for the job
- fn: The function to execute
- opts: Optional job configuration parameters
Returns:
- chan *QValue: Channel that will receive the job's result
type QSpace ¶ added in v0.0.2
type QSpace struct {
// contains filtered or unexported fields
}
QSpace represents a quantum-like state space.
It provides a managed environment for quantum-inspired values, maintaining their states, relationships, and uncertainties. The space implements concepts from quantum mechanics such as entanglement, state superposition, and the uncertainty principle.
Key features:
- Quantum value storage and retrieval
- State transition history
- Entanglement management
- Relationship tracking
- Automatic resource cleanup
func NewQSpace ¶ added in v0.0.2
func NewQSpace() *QSpace
NewQSpace creates a new quantum space.
Initializes a new space with default uncertainty principles and starts maintenance goroutines for cleanup and uncertainty monitoring.
Returns:
- *QSpace: A new quantum space instance ready for use
func (*QSpace) AddRelationship ¶ added in v0.0.2
AddRelationship establishes a parent-child relationship between values.
Creates directed relationships between values while preventing circular dependencies that could cause deadlocks or infinite loops.
Parameters:
- parentID: ID of the parent value
- childID: ID of the child value
Returns:
- error: Error if the relationship would create a circular dependency
Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.
func (*QSpace) Await ¶ added in v0.0.2
Await returns a channel that will receive the quantum value.
Implements quantum-inspired delayed observation, where values may be uncertain or not yet collapsed to a definite state.
Parameters:
- id: The identifier of the value to await
Returns:
- chan *QValue: Channel that will receive the value when available
Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.
func (*QSpace) Close ¶ added in v0.0.2
func (qs *QSpace) Close()
Close shuts down the quantum space.
Performs graceful shutdown of the quantum space, cleaning up resources and closing all channels safely.
Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.
func (*QSpace) CreateBroadcastGroup ¶ added in v0.0.2
func (qs *QSpace) CreateBroadcastGroup(id string, ttl time.Duration) *BroadcastGroup
CreateBroadcastGroup creates a new broadcast group.
Initializes a new broadcast group with specified parameters and default quantum properties such as minimum uncertainty.
func (*QSpace) CreateEntanglement ¶ added in v0.0.2
func (qs *QSpace) CreateEntanglement(ids []string) *Entanglement
CreateEntanglement establishes quantum entanglement between values.
Creates and manages relationships between quantum values that should maintain synchronized states. Like quantum entanglement in physics, changes to one value affect all entangled values.
Parameters:
- ids: Slice of value IDs to entangle together
Returns:
- *Entanglement: A new entanglement instance managing the relationship
Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.
func (*QSpace) CreateQuantumEntanglement ¶ added in v0.0.2
func (qs *QSpace) CreateQuantumEntanglement(id string) *Entanglement
Enhance existing QSpace
func (*QSpace) GetStateHistory ¶ added in v0.0.2
func (qs *QSpace) GetStateHistory(valueID string) []StateTransition
GetStateHistory returns the state transition history for a value.
Provides access to the complete history of state transitions for analysis and debugging purposes.
Parameters:
- valueID: ID of the value to get history for
Returns:
- []StateTransition: Ordered list of state transitions for the value
Thread-safe: This method uses read-lock to ensure safe concurrent access.
func (*QSpace) Store ¶ added in v0.0.2
Store stores a quantum value with proper uncertainty handling.
Values are stored with their associated states and TTL, maintaining quantum-inspired properties such as superposition and uncertainty.
Parameters:
- id: Unique identifier for the value
- value: The actual value to store
- states: Possible states with their probabilities
- ttl: Time-to-live duration for the value
Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.
func (*QSpace) StoreError ¶ added in v0.0.2
StoreError stores an error result in the quantum space
type QValue ¶ added in v0.0.2
type QValue struct {
// Core value and metadata
Value interface{}
Error error
CreatedAt time.Time
TTL time.Duration
// Quantum properties
States []State // Possible superposition states
Uncertainty UncertaintyLevel // Heisenberg-inspired uncertainty
Observations []ObservationEffect
Entangled []string // IDs of entangled values
// contains filtered or unexported fields
}
QValue represents a value with quantum-like properties
type QuantumState ¶ added in v0.0.2
type QuantumState struct {
Vector []complex128
Uncertainty float64
}
func (*QuantumState) Collapse ¶ added in v0.0.2
func (qs *QuantumState) Collapse() any
func (*QuantumState) Measure ¶ added in v0.0.2
func (qs *QuantumState) Measure() any
type Qubit ¶ added in v0.0.2
type Qubit struct {
// contains filtered or unexported fields
}
func NewQubit ¶ added in v0.0.2
func NewQubit(alpha, beta complex128) *Qubit
func (*Qubit) ApplyHadamard ¶ added in v0.0.2
func (q *Qubit) ApplyHadamard()
type RateLimiter ¶ added in v0.0.2
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter implements the Regulator interface using a token bucket algorithm. It controls the rate of operations by maintaining a bucket of tokens that are consumed by each operation and replenished at a fixed rate.
Like a water tank with a steady inflow and controlled outflow, this regulator ensures that operations occur at a sustainable rate, preventing system overload while allowing for brief bursts of activity when the token bucket is full.
Key features:
- Smooth rate limiting with burst capacity
- Configurable token replenishment rate
- Thread-safe operation
- Metric-aware for adaptive rate limiting
func NewRateLimiter ¶ added in v0.0.2
func NewRateLimiter(maxTokens int, refillRate time.Duration) *RateLimiter
NewRateLimiter creates a new rate limit regulator with specified parameters.
Parameters:
- maxTokens: Maximum number of tokens (burst capacity)
- refillRate: Duration between token replenishments
Returns:
- *RateLimiter: A new rate limit regulator instance
Example:
limiter := NewRateLimiter(100, time.Second) // 100 ops/second with burst capacity
func (*RateLimiter) Limit ¶ added in v0.0.2
func (rl *RateLimiter) Limit() bool
Limit implements the Regulator interface by determining if an operation should be limited. It consumes a token if available, allowing the operation to proceed. If no tokens are available, the operation is limited.
Returns:
- bool: true if the operation should be limited, false if it can proceed
Thread-safety: This method is thread-safe through mutex protection.
func (*RateLimiter) Observe ¶ added in v0.0.2
func (rl *RateLimiter) Observe(metrics *Metrics)
Observe implements the Regulator interface by monitoring system metrics. The rate limiter can use these metrics to dynamically adjust its rate limits based on system conditions.
For example, it might:
- Reduce rates during high system load
- Increase limits when resources are abundant
- Adjust burst capacity based on queue length
Parameters:
- metrics: Current system metrics including performance and health indicators
func (*RateLimiter) Renormalize ¶ added in v0.0.2
func (rl *RateLimiter) Renormalize()
Renormalize implements the Regulator interface by attempting to restore normal operation. This method triggers a token refill, potentially allowing more operations to proceed if enough time has passed since the last refill.
The rate limiter uses this method to maintain a steady flow of operations while adhering to the configured rate limits.
type Regulator ¶ added in v0.0.2
type Regulator interface {
// Observe allows the regulator to monitor system metrics and state.
// This is analogous to a sensor in a mechanical regulator, providing
// the feedback necessary for making control decisions.
//
// Parameters:
// - metrics: Current system metrics including performance and health indicators
Observe(metrics *Metrics)
// Limit determines if the regulated action should be restricted.
// Returns true if the action should be limited, false if it should proceed.
// This is the main control point where the regulator decides whether to
// allow or restrict operations based on observed conditions.
//
// Returns:
// - bool: true if the action should be limited, false if it should proceed
Limit() bool
// Renormalize attempts to return the system to a normal operating state.
// This is similar to a feedback loop in control systems, where the regulator
// takes active steps to restore normal operations after a period of restriction.
// The exact meaning of "normal" depends on the specific regulator implementation.
Renormalize()
}
Regulator defines an interface for types that regulate the flow and behavior of the pool. Inspired by biological and mechanical regulators that maintain system homeostasis, this interface provides a common pattern for implementing various regulation mechanisms.
Each regulator acts as a control system component, monitoring and adjusting the pool's behavior to maintain optimal performance and stability. Like a thermostat or pressure regulator in physical systems, these regulators help maintain the system within desired operational parameters.
Examples of regulators include:
- CircuitBreaker: Prevents cascading failures by stopping operations when error rates are high
- RateLimit: Controls the flow rate of jobs to prevent system overload
- LoadBalancer: Distributes work evenly across available resources
- BackPressure: Prevents system overload by controlling input rates
- ResourceGovernor: Manages resource consumption within defined limits
func NewRegulator ¶ added in v0.0.2
NewRegulator creates a new regulator of the specified type. This factory function allows for flexible creation of different regulator types while maintaining a consistent interface for the pool to interact with.
Parameters:
- regulatorType: A concrete implementation of the Regulator interface
Returns:
- Regulator: The initialized regulator instance
Example:
circuitBreaker := NewCircuitBreakerRegulator(5, time.Minute, 3) regulator := NewRegulator(circuitBreaker)
type ResourceGovernorRegulator ¶ added in v0.0.2
type ResourceGovernorRegulator struct {
// contains filtered or unexported fields
}
ResourceGovernorRegulator implements the Regulator interface to manage system resources. It monitors and controls resource usage (CPU, memory, etc.) to prevent system exhaustion, similar to how a power governor prevents engine damage by limiting power consumption under heavy load.
Key features:
- CPU usage monitoring
- Memory usage tracking
- Resource thresholds
- Adaptive limiting
func NewResourceGovernorRegulator ¶ added in v0.0.2
func NewResourceGovernorRegulator(maxCPUPercent, maxMemoryPercent float64, checkInterval time.Duration) *ResourceGovernorRegulator
NewResourceGovernorRegulator creates a new resource governor regulator.
Parameters:
- maxCPUPercent: Maximum allowed CPU usage (0.0-1.0)
- maxMemoryPercent: Maximum allowed memory usage (0.0-1.0)
- checkInterval: How often to check resource usage
Returns:
- *ResourceGovernorRegulator: A new resource governor instance
Example:
governor := NewResourceGovernorRegulator(0.8, 0.9, time.Second)
func (*ResourceGovernorRegulator) GetResourceUsage ¶ added in v0.0.2
func (rg *ResourceGovernorRegulator) GetResourceUsage() (cpu, memory float64)
GetResourceUsage returns current resource utilization levels
func (*ResourceGovernorRegulator) GetThresholds ¶ added in v0.0.2
func (rg *ResourceGovernorRegulator) GetThresholds() (cpu, memory float64)
GetThresholds returns the current resource usage thresholds
func (*ResourceGovernorRegulator) Limit ¶ added in v0.0.2
func (rg *ResourceGovernorRegulator) Limit() bool
Limit implements the Regulator interface by determining if resource usage should be limited. Returns true when resource usage exceeds thresholds.
Returns:
- bool: true if resource usage should be limited, false if it can proceed
func (*ResourceGovernorRegulator) Observe ¶ added in v0.0.2
func (rg *ResourceGovernorRegulator) Observe(metrics *Metrics)
Observe implements the Regulator interface by monitoring system metrics. This method updates the governor's view of resource utilization based on current system metrics.
Parameters:
- metrics: Current system metrics including resource utilization data
func (*ResourceGovernorRegulator) Renormalize ¶ added in v0.0.2
func (rg *ResourceGovernorRegulator) Renormalize()
Renormalize implements the Regulator interface by attempting to restore normal operation. This method updates resource usage measurements and adjusts thresholds if necessary.
type RetryPolicy ¶
type RetryPolicy struct {
MaxAttempts int
Strategy RetryStrategy
BackoffFunc func(attempt int) time.Duration
Filter func(error) bool
}
RetryPolicy defines retry behavior
type RetryStrategy ¶
RetryStrategy defines the interface for retry behavior
type RoutingRule ¶ added in v0.0.2
type RoutingRule struct {
SubscriberID string
Filter FilterFunc
Priority int
}
RoutingRule defines how messages should be routed to specific subscribers.
It combines subscriber identification with filtering logic and priority levels to enable sophisticated message routing in the broadcast system.
type Scaler ¶
type Scaler struct {
// contains filtered or unexported fields
}
Scaler manages pool size based on current load
type ScalerConfig ¶
type ScalerConfig struct {
TargetLoad float64
ScaleUpThreshold float64
ScaleDownThreshold float64
Cooldown time.Duration
}
ScalerConfig defines configuration for the Scaler
type State ¶ added in v0.0.2
type State struct {
Value interface{}
Probability float64
Amplitude complex128 // For future quantum computing features
Evidence []Evidence // Supporting evidence for this state
}
State represents a possible quantum state with its probability amplitude and associated verification information.
type StateChange ¶ added in v0.0.2
type StateChange struct {
Timestamp time.Time
Key string
Value any
Sequence uint64 // Monotonically increasing sequence number
}
StateChange represents an immutable record of a change to the shared state. Each change is timestamped and contains both the key and value that was changed, allowing for precise replay of state evolution.
type StateTransition ¶ added in v0.0.2
type StateTransition struct {
ValueID string
FromState State
ToState State
Timestamp time.Time
Cause string
}
StateTransition represents a change in quantum state.
It records the complete history of state changes, including the previous and new states, timing information, and the cause of the transition. This maintains a quantum-like state history that can be used to understand the evolution of values over time.
type UncertaintyLevel ¶ added in v0.0.2
type UncertaintyLevel float64
UncertaintyLevel defines how uncertain we are about a value
const ( MinUncertainty UncertaintyLevel = 0.0 MaxUncertainty UncertaintyLevel = 1.0 )
type UncertaintyPrinciple ¶ added in v0.0.2
type UncertaintyPrinciple struct {
MinDeltaTime time.Duration // Minimum time between observations
MaxDeltaTime time.Duration // Maximum time before max uncertainty
BaseUncertainty UncertaintyLevel
}
UncertaintyPrinciple enforces quantum-like uncertainty rules.
It implements Heisenberg-inspired uncertainty principles where observation and time affect the certainty of quantum values. The principle ensures that values become more uncertain over time and that frequent observations impact the system's state.
Key concepts:
- Minimum time between observations to limit measurement impact
- Maximum time before reaching maximum uncertainty
- Base uncertainty level for all measurements
type WaveFunction ¶ added in v0.0.2
type WaveFunction struct {
States []State
Uncertainty UncertaintyLevel
// contains filtered or unexported fields
}
WaveFunction represents a quantum state that can exist in multiple possible states simultaneously until observation or verification forces a collapse into a definite state.
func NewWaveFunction ¶ added in v0.0.2
func NewWaveFunction( states []State, uncertainty UncertaintyLevel, methodDiversity float64, ) *WaveFunction
func (*WaveFunction) AddEvidence ¶ added in v0.0.2
func (wf *WaveFunction) AddEvidence(stateValue interface{}, evidence Evidence)
AddEvidence allows adding new evidence to a state after creation.
func (*WaveFunction) Collapse ¶ added in v0.0.2
func (wf *WaveFunction) Collapse() interface{}
Collapse forces the wave function to choose a definite state based on both probabilities and verification evidence. The collapse mechanism considers: 1. State probabilities 2. Method diversity 3. Evidence quality 4. Uncertainty level
func (*WaveFunction) UpdateMethodDiversity ¶ added in v0.0.2
func (wf *WaveFunction) UpdateMethodDiversity(diversity float64)
UpdateMethodDiversity allows updating the method diversity score as new verification methods are added.