worker

package
v0.0.0-...-4cdf813 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 4, 2026 License: MIT Imports: 35 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BulkSearchPayload

type BulkSearchPayload struct {
	Origin            string
	Destination       string
	Origins           []string
	Destinations      []string
	DepartureDateFrom time.Time
	DepartureDateTo   time.Time
	ReturnDateFrom    time.Time
	ReturnDateTo      time.Time
	TripLength        int
	Adults            int
	Children          int
	InfantsLap        int
	InfantsSeat       int
	TripType          string
	Class             string // Changed to string for JSON unmarshal
	Stops             string // Changed to string for JSON unmarshal
	Currency          string
	Carriers          []string `json:"carriers,omitempty"`
	BulkSearchID      int      `json:"bulk_search_id,omitempty"`
	JobID             int      `json:"job_id,omitempty"`
}

type BulkSearchRoutePayload

type BulkSearchRoutePayload struct {
	BulkSearchID      int       `json:"bulk_search_id"`
	TotalRoutes       int       `json:"total_routes"` // Total routes in the parent bulk search
	Origin            string    `json:"origin"`
	Destination       string    `json:"destination"`
	DepartureDateFrom time.Time `json:"departure_date_from"`
	DepartureDateTo   time.Time `json:"departure_date_to"`
	TripLength        int       `json:"trip_length"`
	TripType          string    `json:"trip_type"`
	Class             string    `json:"class"`
	Stops             string    `json:"stops"`
	Currency          string    `json:"currency"`
	Adults            int       `json:"adults"`
	Children          int       `json:"children,omitempty"`
	InfantsLap        int       `json:"infants_lap,omitempty"`
	InfantsSeat       int       `json:"infants_seat,omitempty"`
	Carriers          []string  `json:"carriers,omitempty"`
}

BulkSearchRoutePayload represents a single route in a fanned-out bulk search. Each route is processed independently by any available worker.

type CertWorker

type CertWorker struct {
	ID         int
	JobChannel chan CertificateJob
	WorkerPool chan chan CertificateJob
	// contains filtered or unexported fields
}

CertWorker handles certificate issuance/renewal with Cloudflare DNS challenge

func (*CertWorker) ProcessJob

func (cw *CertWorker) ProcessJob(ctx context.Context, job CertificateJob) error

ProcessJob handles certificate issuance/renewal workflow

func (*CertWorker) Start

func (cw *CertWorker) Start()

Start begins listening for certificate jobs

func (*CertWorker) Stop

func (cw *CertWorker) Stop()

Stop signals the worker to stop processing jobs

type CertificateJob

type CertificateJob struct {
	Domain           string
	ChallengeType    string
	CloudflareToken  string
	CloudflareZoneID string
	ForceRenewal     bool
}

CertificateJob represents a TLS certificate management task

type ContinuousPriceGraphPayload

type ContinuousPriceGraphPayload struct {
	Origin         string    `json:"origin"`
	Destination    string    `json:"destination"`
	RangeStartDate time.Time `json:"range_start_date"`
	RangeEndDate   time.Time `json:"range_end_date"`
	TripLength     int       `json:"trip_length"`
	Class          string    `json:"class"`
	Stops          string    `json:"stops"`
	Adults         int       `json:"adults"`
	Currency       string    `json:"currency"`
}

type ContinuousSweepConfig

type ContinuousSweepConfig struct {
	TripLengths         []int
	DepartureWindowDays int
	Class               string
	Stops               string
	Adults              int
	Currency            string
	PacingMode          PacingMode
	TargetDurationHours int
	MinDelayMs          int
	InternationalOnly   bool
}

ContinuousSweepConfig holds configuration for continuous sweeps

func DefaultContinuousSweepConfig

func DefaultContinuousSweepConfig() ContinuousSweepConfig

DefaultContinuousSweepConfig returns the default configuration

type ContinuousSweepRunner

type ContinuousSweepRunner struct {
	// contains filtered or unexported fields
}

ContinuousSweepRunner manages continuous price graph sweeps

func NewContinuousSweepRunner

func NewContinuousSweepRunner(
	postgresDB db.PostgresDB,
	queue queue.Queue,
	notifier *notify.NTFYClient,
	config ContinuousSweepConfig,
) *ContinuousSweepRunner

NewContinuousSweepRunner creates a new continuous sweep runner

func (*ContinuousSweepRunner) GetConfig

GetConfig returns a copy of the current configuration.

func (*ContinuousSweepRunner) GetStatus

GetStatus returns the current status

func (*ContinuousSweepRunner) Pause

func (r *ContinuousSweepRunner) Pause()

Pause pauses the sweep (can be resumed)

func (*ContinuousSweepRunner) PauseAndAutoResumeAfterQueueDrain

func (r *ContinuousSweepRunner) PauseAndAutoResumeAfterQueueDrain(queueName string)

func (*ContinuousSweepRunner) RestartSweep

func (r *ContinuousSweepRunner) RestartSweep()

RestartSweep restarts the sweep from the beginning

func (*ContinuousSweepRunner) Resume

func (r *ContinuousSweepRunner) Resume()

Resume resumes a paused sweep

func (*ContinuousSweepRunner) SetConfig

func (r *ContinuousSweepRunner) SetConfig(config ContinuousSweepConfig)

SetConfig updates the configuration (safe to call while running)

func (*ContinuousSweepRunner) SkipRoute

func (r *ContinuousSweepRunner) SkipRoute()

SkipRoute skips the current route and moves to the next one

func (*ContinuousSweepRunner) Start

func (r *ContinuousSweepRunner) Start() error

Start begins the continuous sweep process. Note: This method creates its own long-lived context independent of HTTP requests.

func (*ContinuousSweepRunner) Stop

func (r *ContinuousSweepRunner) Stop()

Stop gracefully stops the continuous sweep

type Cronner

type Cronner interface {
	Start()
	Stop() context.Context // cron.Stop() returns a context
	AddFunc(spec string, cmd func()) (cron.EntryID, error)
}

Cronner defines the interface for cron operations needed by the scheduler. This allows mocking the cron dependency in tests.

type FlightSearchPayload

type FlightSearchPayload struct {
	Origin        string
	Destination   string
	DepartureDate time.Time
	ReturnDate    time.Time
	Adults        int
	Children      int
	InfantsLap    int
	InfantsSeat   int
	TripType      string
	Class         string // Changed to string for JSON unmarshal
	Stops         string // Changed to string for JSON unmarshal
	Currency      string
}

type LeaderElector

type LeaderElector struct {
	// contains filtered or unexported fields
}

LeaderElector manages distributed leader election using Redis. Only the leader instance runs the scheduler to prevent duplicate job execution.

func NewLeaderElector

func NewLeaderElector(
	redisClient *redis.Client,
	lockKey string,
	lockTTL time.Duration,
	renewInterval time.Duration,
	onBecomeLeader func(),
	onLoseLeader func(),
) *LeaderElector

NewLeaderElector creates a new leader elector. onBecomeLeader is called when this instance acquires leadership. onLoseLeader is called when this instance loses leadership.

func (*LeaderElector) InstanceID

func (le *LeaderElector) InstanceID() string

InstanceID returns the unique identifier for this instance.

func (*LeaderElector) IsLeader

func (le *LeaderElector) IsLeader() bool

IsLeader returns whether this instance currently holds leadership.

func (*LeaderElector) Start

func (le *LeaderElector) Start()

Start begins the leader election loop. It runs in a goroutine and periodically attempts to acquire or renew leadership.

func (*LeaderElector) Stop

func (le *LeaderElector) Stop()

Stop releases leadership (if held) and stops the election loop.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages a pool of workers

func NewManager

func NewManager(queue queue.Queue, redisClient *redis.Client, postgresDB db.PostgresDB, neo4jDB db.Neo4jDatabase, workerConfig config.WorkerConfig, flightConfig config.FlightConfig, dealConfig config.DealConfig) *Manager

NewManager creates a new worker manager. If redisClient is provided, leader election is enabled for the scheduler. If redisClient is nil, the scheduler runs on every instance (legacy behavior).

func (*Manager) GetQueue

func (m *Manager) GetQueue() queue.Queue

func (*Manager) GetScheduler

func (m *Manager) GetScheduler() *Scheduler

GetScheduler returns the scheduler instance

func (*Manager) GetSweepRunner

func (m *Manager) GetSweepRunner() *ContinuousSweepRunner

GetSweepRunner returns the continuous sweep runner instance

func (*Manager) SetSweepRunner

func (m *Manager) SetSweepRunner(runner *ContinuousSweepRunner)

SetSweepRunner sets the continuous sweep runner instance

func (*Manager) Start

func (m *Manager) Start()

Start starts the worker pool and scheduler. If leader election is enabled, only the leader instance runs the scheduler.

func (*Manager) Stop

func (m *Manager) Stop()

Stop stops the worker pool and scheduler. If leader election is enabled, it releases leadership first.

func (*Manager) WorkerStatuses

func (m *Manager) WorkerStatuses() []WorkerStatus

WorkerStatuses returns a snapshot of current worker metrics.

type PacingMode

type PacingMode string

PacingMode defines how delays between queries are calculated

const (
	PacingModeAdaptive PacingMode = "adaptive"
	PacingModeFixed    PacingMode = "fixed"
)

type PriceGraphSweepPayload

type PriceGraphSweepPayload struct {
	SweepID           int       `json:"sweep_id,omitempty"`
	JobID             int       `json:"job_id,omitempty"`
	Origins           []string  `json:"origins"`
	Destinations      []string  `json:"destinations"`
	DepartureDateFrom time.Time `json:"departure_date_from"`
	DepartureDateTo   time.Time `json:"departure_date_to"`
	TripLengths       []int     `json:"trip_lengths,omitempty"`
	TripType          string    `json:"trip_type"`
	// Class is kept for backward compatibility; prefer Classes for multi-cabin sweeps.
	Class           string   `json:"class,omitempty"`
	Classes         []string `json:"classes,omitempty"`
	Stops           string   `json:"stops"`
	Adults          int      `json:"adults"`
	Children        int      `json:"children"`
	InfantsLap      int      `json:"infants_lap"`
	InfantsSeat     int      `json:"infants_seat"`
	Currency        string   `json:"currency"`
	RateLimitMillis int      `json:"rate_limit_millis,omitempty"`
}

PriceGraphSweepPayload defines the data needed to execute a price graph sweep

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler schedules jobs to be executed at specific times

func NewScheduler

func NewScheduler(queue queue.Queue, postgresDB db.PostgresDB, cronner Cronner) *Scheduler

NewScheduler creates a new scheduler instance. It accepts a Cronner interface; if nil, a default cron.Cron instance is created.

func (*Scheduler) AddJob

func (s *Scheduler) AddJob(payload []byte) error

AddJob adds a job to the scheduler

func (*Scheduler) EnqueuePriceGraphSweep

func (s *Scheduler) EnqueuePriceGraphSweep(ctx context.Context, payload PriceGraphSweepPayload) (int, error)

EnqueuePriceGraphSweep enqueues a price graph sweep job and returns the sweep ID

func (*Scheduler) Start

func (s *Scheduler) Start() error

Start starts the scheduler

func (*Scheduler) Stop

func (s *Scheduler) Stop()

Stop stops the scheduler

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Main Worker struct remains for flight search operations

func NewWorker

func NewWorker(postgresDB db.PostgresDB, neo4jDB db.Neo4jDatabase) *Worker

NewWorker creates a new worker instance

func (*Worker) StoreFlightInNeo4j

func (w *Worker) StoreFlightInNeo4j(ctx context.Context, offer flights.FullOffer, class string) error

StoreFlightInNeo4j stores flight data in Neo4j for graph analysis (Exported for testing)

func (*Worker) StoreFlightOffers

func (w *Worker) StoreFlightOffers(ctx context.Context, payload FlightSearchPayload, offers []flights.FullOffer, priceRange *flights.PriceRange) error

StoreFlightOffers stores flight offers in the database (Exported for testing)

type WorkerStatus

type WorkerStatus struct {
	ID            int    `json:"id"`
	Status        string `json:"status"`
	CurrentJob    string `json:"current_job,omitempty"`
	ProcessedJobs int    `json:"processed_jobs"`
	Uptime        int64  `json:"uptime"` // seconds
}

WorkerStatus is a snapshot of worker metrics exposed via the API.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL