Documentation
¶
Index ¶
- type BulkSearchPayload
- type BulkSearchRoutePayload
- type CertWorker
- type CertificateJob
- type ContinuousPriceGraphPayload
- type ContinuousSweepConfig
- type ContinuousSweepRunner
- func (r *ContinuousSweepRunner) GetConfig() ContinuousSweepConfig
- func (r *ContinuousSweepRunner) GetStatus() db.SweepStatusResponse
- func (r *ContinuousSweepRunner) Pause()
- func (r *ContinuousSweepRunner) PauseAndAutoResumeAfterQueueDrain(queueName string)
- func (r *ContinuousSweepRunner) RestartSweep()
- func (r *ContinuousSweepRunner) Resume()
- func (r *ContinuousSweepRunner) SetConfig(config ContinuousSweepConfig)
- func (r *ContinuousSweepRunner) SkipRoute()
- func (r *ContinuousSweepRunner) Start() error
- func (r *ContinuousSweepRunner) Stop()
- type Cronner
- type FlightSearchPayload
- type LeaderElector
- type Manager
- func (m *Manager) GetQueue() queue.Queue
- func (m *Manager) GetScheduler() *Scheduler
- func (m *Manager) GetSweepRunner() *ContinuousSweepRunner
- func (m *Manager) SetSweepRunner(runner *ContinuousSweepRunner)
- func (m *Manager) Start()
- func (m *Manager) Stop()
- func (m *Manager) WorkerStatuses() []WorkerStatus
- type PacingMode
- type PriceGraphSweepPayload
- type Scheduler
- type Worker
- type WorkerStatus
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BulkSearchPayload ¶
type BulkSearchPayload struct {
Origin string
Destination string
Origins []string
Destinations []string
DepartureDateFrom time.Time
DepartureDateTo time.Time
ReturnDateFrom time.Time
ReturnDateTo time.Time
TripLength int
Adults int
Children int
InfantsLap int
InfantsSeat int
TripType string
Class string // Changed to string for JSON unmarshal
Stops string // Changed to string for JSON unmarshal
Currency string
Carriers []string `json:"carriers,omitempty"`
BulkSearchID int `json:"bulk_search_id,omitempty"`
JobID int `json:"job_id,omitempty"`
}
type BulkSearchRoutePayload ¶
type BulkSearchRoutePayload struct {
BulkSearchID int `json:"bulk_search_id"`
TotalRoutes int `json:"total_routes"` // Total routes in the parent bulk search
Origin string `json:"origin"`
Destination string `json:"destination"`
DepartureDateFrom time.Time `json:"departure_date_from"`
DepartureDateTo time.Time `json:"departure_date_to"`
TripLength int `json:"trip_length"`
TripType string `json:"trip_type"`
Class string `json:"class"`
Stops string `json:"stops"`
Currency string `json:"currency"`
Adults int `json:"adults"`
Children int `json:"children,omitempty"`
InfantsLap int `json:"infants_lap,omitempty"`
InfantsSeat int `json:"infants_seat,omitempty"`
Carriers []string `json:"carriers,omitempty"`
}
BulkSearchRoutePayload represents a single route in a fanned-out bulk search. Each route is processed independently by any available worker.
type CertWorker ¶
type CertWorker struct {
ID int
JobChannel chan CertificateJob
WorkerPool chan chan CertificateJob
// contains filtered or unexported fields
}
CertWorker handles certificate issuance/renewal with Cloudflare DNS challenge
func (*CertWorker) ProcessJob ¶
func (cw *CertWorker) ProcessJob(ctx context.Context, job CertificateJob) error
ProcessJob handles certificate issuance/renewal workflow
func (*CertWorker) Start ¶
func (cw *CertWorker) Start()
Start begins listening for certificate jobs
func (*CertWorker) Stop ¶
func (cw *CertWorker) Stop()
Stop signals the worker to stop processing jobs
type CertificateJob ¶
type CertificateJob struct {
Domain string
ChallengeType string
CloudflareToken string
CloudflareZoneID string
ForceRenewal bool
}
CertificateJob represents a TLS certificate management task
type ContinuousPriceGraphPayload ¶
type ContinuousPriceGraphPayload struct {
Origin string `json:"origin"`
Destination string `json:"destination"`
RangeStartDate time.Time `json:"range_start_date"`
RangeEndDate time.Time `json:"range_end_date"`
TripLength int `json:"trip_length"`
Class string `json:"class"`
Stops string `json:"stops"`
Adults int `json:"adults"`
Currency string `json:"currency"`
}
type ContinuousSweepConfig ¶
type ContinuousSweepConfig struct {
TripLengths []int
DepartureWindowDays int
Class string
Stops string
Adults int
Currency string
PacingMode PacingMode
TargetDurationHours int
MinDelayMs int
InternationalOnly bool
}
ContinuousSweepConfig holds configuration for continuous sweeps
func DefaultContinuousSweepConfig ¶
func DefaultContinuousSweepConfig() ContinuousSweepConfig
DefaultContinuousSweepConfig returns the default configuration
type ContinuousSweepRunner ¶
type ContinuousSweepRunner struct {
// contains filtered or unexported fields
}
ContinuousSweepRunner manages continuous price graph sweeps
func NewContinuousSweepRunner ¶
func NewContinuousSweepRunner( postgresDB db.PostgresDB, queue queue.Queue, notifier *notify.NTFYClient, config ContinuousSweepConfig, ) *ContinuousSweepRunner
NewContinuousSweepRunner creates a new continuous sweep runner
func (*ContinuousSweepRunner) GetConfig ¶
func (r *ContinuousSweepRunner) GetConfig() ContinuousSweepConfig
GetConfig returns a copy of the current configuration.
func (*ContinuousSweepRunner) GetStatus ¶
func (r *ContinuousSweepRunner) GetStatus() db.SweepStatusResponse
GetStatus returns the current status
func (*ContinuousSweepRunner) Pause ¶
func (r *ContinuousSweepRunner) Pause()
Pause pauses the sweep (can be resumed)
func (*ContinuousSweepRunner) PauseAndAutoResumeAfterQueueDrain ¶
func (r *ContinuousSweepRunner) PauseAndAutoResumeAfterQueueDrain(queueName string)
func (*ContinuousSweepRunner) RestartSweep ¶
func (r *ContinuousSweepRunner) RestartSweep()
RestartSweep restarts the sweep from the beginning
func (*ContinuousSweepRunner) Resume ¶
func (r *ContinuousSweepRunner) Resume()
Resume resumes a paused sweep
func (*ContinuousSweepRunner) SetConfig ¶
func (r *ContinuousSweepRunner) SetConfig(config ContinuousSweepConfig)
SetConfig updates the configuration (safe to call while running)
func (*ContinuousSweepRunner) SkipRoute ¶
func (r *ContinuousSweepRunner) SkipRoute()
SkipRoute skips the current route and moves to the next one
func (*ContinuousSweepRunner) Start ¶
func (r *ContinuousSweepRunner) Start() error
Start begins the continuous sweep process. Note: This method creates its own long-lived context independent of HTTP requests.
func (*ContinuousSweepRunner) Stop ¶
func (r *ContinuousSweepRunner) Stop()
Stop gracefully stops the continuous sweep
type Cronner ¶
type Cronner interface {
Start()
Stop() context.Context // cron.Stop() returns a context
AddFunc(spec string, cmd func()) (cron.EntryID, error)
}
Cronner defines the interface for cron operations needed by the scheduler. This allows mocking the cron dependency in tests.
type FlightSearchPayload ¶
type FlightSearchPayload struct {
Origin string
Destination string
DepartureDate time.Time
ReturnDate time.Time
Adults int
Children int
InfantsLap int
InfantsSeat int
TripType string
Class string // Changed to string for JSON unmarshal
Stops string // Changed to string for JSON unmarshal
Currency string
}
type LeaderElector ¶
type LeaderElector struct {
// contains filtered or unexported fields
}
LeaderElector manages distributed leader election using Redis. Only the leader instance runs the scheduler to prevent duplicate job execution.
func NewLeaderElector ¶
func NewLeaderElector( redisClient *redis.Client, lockKey string, lockTTL time.Duration, renewInterval time.Duration, onBecomeLeader func(), onLoseLeader func(), ) *LeaderElector
NewLeaderElector creates a new leader elector. onBecomeLeader is called when this instance acquires leadership. onLoseLeader is called when this instance loses leadership.
func (*LeaderElector) InstanceID ¶
func (le *LeaderElector) InstanceID() string
InstanceID returns the unique identifier for this instance.
func (*LeaderElector) IsLeader ¶
func (le *LeaderElector) IsLeader() bool
IsLeader returns whether this instance currently holds leadership.
func (*LeaderElector) Start ¶
func (le *LeaderElector) Start()
Start begins the leader election loop. It runs in a goroutine and periodically attempts to acquire or renew leadership.
func (*LeaderElector) Stop ¶
func (le *LeaderElector) Stop()
Stop releases leadership (if held) and stops the election loop.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager manages a pool of workers
func NewManager ¶
func NewManager(queue queue.Queue, redisClient *redis.Client, postgresDB db.PostgresDB, neo4jDB db.Neo4jDatabase, workerConfig config.WorkerConfig, flightConfig config.FlightConfig, dealConfig config.DealConfig) *Manager
NewManager creates a new worker manager. If redisClient is provided, leader election is enabled for the scheduler. If redisClient is nil, the scheduler runs on every instance (legacy behavior).
func (*Manager) GetScheduler ¶
GetScheduler returns the scheduler instance
func (*Manager) GetSweepRunner ¶
func (m *Manager) GetSweepRunner() *ContinuousSweepRunner
GetSweepRunner returns the continuous sweep runner instance
func (*Manager) SetSweepRunner ¶
func (m *Manager) SetSweepRunner(runner *ContinuousSweepRunner)
SetSweepRunner sets the continuous sweep runner instance
func (*Manager) Start ¶
func (m *Manager) Start()
Start starts the worker pool and scheduler. If leader election is enabled, only the leader instance runs the scheduler.
func (*Manager) Stop ¶
func (m *Manager) Stop()
Stop stops the worker pool and scheduler. If leader election is enabled, it releases leadership first.
func (*Manager) WorkerStatuses ¶
func (m *Manager) WorkerStatuses() []WorkerStatus
WorkerStatuses returns a snapshot of current worker metrics.
type PacingMode ¶
type PacingMode string
PacingMode defines how delays between queries are calculated
const ( PacingModeAdaptive PacingMode = "adaptive" PacingModeFixed PacingMode = "fixed" )
type PriceGraphSweepPayload ¶
type PriceGraphSweepPayload struct {
SweepID int `json:"sweep_id,omitempty"`
JobID int `json:"job_id,omitempty"`
Origins []string `json:"origins"`
Destinations []string `json:"destinations"`
DepartureDateFrom time.Time `json:"departure_date_from"`
DepartureDateTo time.Time `json:"departure_date_to"`
TripLengths []int `json:"trip_lengths,omitempty"`
TripType string `json:"trip_type"`
// Class is kept for backward compatibility; prefer Classes for multi-cabin sweeps.
Class string `json:"class,omitempty"`
Classes []string `json:"classes,omitempty"`
Stops string `json:"stops"`
Adults int `json:"adults"`
Children int `json:"children"`
InfantsLap int `json:"infants_lap"`
InfantsSeat int `json:"infants_seat"`
Currency string `json:"currency"`
RateLimitMillis int `json:"rate_limit_millis,omitempty"`
}
PriceGraphSweepPayload defines the data needed to execute a price graph sweep
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler schedules jobs to be executed at specific times
func NewScheduler ¶
NewScheduler creates a new scheduler instance. It accepts a Cronner interface; if nil, a default cron.Cron instance is created.
func (*Scheduler) EnqueuePriceGraphSweep ¶
func (s *Scheduler) EnqueuePriceGraphSweep(ctx context.Context, payload PriceGraphSweepPayload) (int, error)
EnqueuePriceGraphSweep enqueues a price graph sweep job and returns the sweep ID
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Main Worker struct remains for flight search operations
func NewWorker ¶
func NewWorker(postgresDB db.PostgresDB, neo4jDB db.Neo4jDatabase) *Worker
NewWorker creates a new worker instance
func (*Worker) StoreFlightInNeo4j ¶
func (w *Worker) StoreFlightInNeo4j(ctx context.Context, offer flights.FullOffer, class string) error
StoreFlightInNeo4j stores flight data in Neo4j for graph analysis (Exported for testing)
func (*Worker) StoreFlightOffers ¶
func (w *Worker) StoreFlightOffers(ctx context.Context, payload FlightSearchPayload, offers []flights.FullOffer, priceRange *flights.PriceRange) error
StoreFlightOffers stores flight offers in the database (Exported for testing)