server

package
v0.0.0-...-8ab4017 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2026 License: MIT Imports: 53 Imported by: 0

Documentation

Index

Constants

View Source
const (
	OidBool        int32 = 16
	OidBytea       int32 = 17
	OidChar        int32 = 18 // "char" - single-byte internal type
	OidName        int32 = 19 // name - 64-byte internal type for identifiers
	OidInt8        int32 = 20 // bigint
	OidInt2        int32 = 21 // smallint
	OidInt4        int32 = 23 // integer
	OidText        int32 = 25
	OidOid         int32 = 26
	OidFloat4      int32 = 700  // real
	OidFloat8      int32 = 701  // double precision
	OidBpchar      int32 = 1042 // blank-padded char
	OidVarchar     int32 = 1043
	OidDate        int32 = 1082
	OidTime        int32 = 1083
	OidTimestamp   int32 = 1114
	OidTimestamptz int32 = 1184
	OidInterval    int32 = 1186
	OidNumeric     int32 = 1700
	OidUUID        int32 = 2950
	OidTimetz      int32 = 1266
	OidJSON        int32 = 114
	OidJSONB       int32 = 3802

	// Array OIDs
	OidBoolArray        int32 = 1000
	OidInt2Array        int32 = 1005
	OidInt4Array        int32 = 1007
	OidTextArray        int32 = 1009
	OidVarcharArray     int32 = 1015
	OidInt8Array        int32 = 1016
	OidFloat4Array      int32 = 1021
	OidFloat8Array      int32 = 1022
	OidTimestampArray   int32 = 1115
	OidDateArray        int32 = 1182
	OidTimeArray        int32 = 1183
	OidTimestamptzArray int32 = 1185
	OidIntervalArray    int32 = 1187
	OidNumericArray     int32 = 1231
	OidTimetzArray      int32 = 1270
	OidUUIDArray        int32 = 2951
)

PostgreSQL type OIDs

View Source
const (
	ExitSuccess     = 0  // Clean disconnect
	ExitError       = 1  // Error (crash, protocol error)
	ExitAuthFailure = 10 // Authentication failure (triggers rate limit update)
)

Exit codes for child processes

View Source
const MaxGRPCMessageSize = 1 << 30 // 1GB

MaxGRPCMessageSize is the max gRPC message size for Flight SQL communication. DuckDB query results can easily exceed the default 4MB limit.

Variables

View Source
var ErrWorkerDead = errors.New("flight worker is dead")

ErrWorkerDead is returned when the backing worker process has crashed.

Functions

func AttachDuckLake

func AttachDuckLake(db *sql.DB, dlCfg DuckLakeConfig, sem chan struct{}) error

AttachDuckLake attaches a DuckLake catalog if configured (but does NOT set it as default). Call setDuckLakeDefault after creating per-connection views in memory.main. This is a standalone function so it can be reused by control plane workers.

func BeginRateLimitedAuthAttempt

func BeginRateLimitedAuthAttempt(rateLimiter *RateLimiter, remoteAddr net.Addr) (release func(), rejectReason string)

BeginRateLimitedAuthAttempt enforces rate-limit policy before an auth attempt. The returned release function must be called once the attempt is complete.

func BuildDuckDBCopyFromSQL

func BuildDuckDBCopyFromSQL(tableName, columnList, filePath string, opts *CopyFromOptions) string

BuildDuckDBCopyFromSQL generates a DuckDB COPY FROM statement

func ConfigureDBConnection

func ConfigureDBConnection(db *sql.DB, cfg Config, duckLakeSem chan struct{}, username string, serverStartTime time.Time, serverVersion string) error

ConfigureDBConnection initializes an existing DuckDB connection with pg_catalog, information_schema, and DuckLake catalog attachment.

func CreateDBConnection

func CreateDBConnection(cfg Config, duckLakeSem chan struct{}, username string, serverStartTime time.Time, serverVersion string) (*sql.DB, error)

CreateDBConnection creates a DuckDB connection for a client session. Uses in-memory database as an anchor for DuckLake attachment (actual data lives in RDS/S3). This is a standalone function so it can be reused by both the server and control plane workers. serverStartTime is the time the top-level server process started (may differ from processStartTime in process isolation mode where each child has its own processStartTime). serverVersion is the version of the top-level server/control-plane process.

func CreatePassthroughDBConnection

func CreatePassthroughDBConnection(cfg Config, duckLakeSem chan struct{}, username string, serverStartTime time.Time, serverVersion string) (*sql.DB, error)

CreatePassthroughDBConnection creates a DuckDB connection without pg_catalog or information_schema initialization. DuckLake is still attached if configured so passthrough users can access the same data. This is used for passthrough users who send DuckDB-native SQL and don't need the PostgreSQL compatibility layer.

func EnsureCertificates

func EnsureCertificates(certFile, keyFile string) error

EnsureCertificates checks if TLS certificates exist, and generates self-signed ones if not. Returns the paths to the cert and key files.

func GenerateSecretKey

func GenerateSecretKey() int32

GenerateSecretKey generates a cryptographically random secret key for cancel requests.

func InitMinimalServer

func InitMinimalServer(s *Server, cfg Config, queryCancelCh <-chan struct{})

InitMinimalServer initializes a Server struct with minimal fields for use in control plane worker sessions.

func IsEmptyQuery

func IsEmptyQuery(query string) bool

IsEmptyQuery checks if a query contains only semicolons, whitespace, and/or SQL comments. PostgreSQL returns EmptyQueryResponse for queries like ";", "-- ping", "/* */", etc.

func LoadExtensions

func LoadExtensions(db *sql.DB, extensions []string) error

LoadExtensions installs and loads DuckDB extensions. This is a standalone function so it can be reused by control plane workers. Extension strings can include a source, e.g. "cache_httpfs FROM community". INSTALL uses the full string; LOAD uses just the extension name.

NOTE: Extension names come from trusted server config, not user input.

func NewClientConn

func NewClientConn(s *Server, conn net.Conn, reader *bufio.Reader, writer *bufio.Writer,
	username, database, applicationName string, executor QueryExecutor, pid, secretKey int32, workerID int) *clientConn

NewClientConn creates a clientConn with pre-initialized fields for use by the control plane worker. The returned value is opaque (*clientConn) but can be used with SendInitialParams and RunMessageLoop.

func ParseMemoryBytes

func ParseMemoryBytes(s string) uint64

ParseMemoryBytes parses a DuckDB memory size string (e.g., "4GB", "512MB", "1.5GB") into bytes. Supports both integer and fractional values. Returns 0 if the string is empty, invalid, or "0GB".

func ProcessVersion

func ProcessVersion() string

ProcessVersion returns the version string for this process.

func ReadMessage

func ReadMessage(r io.Reader) (byte, []byte, error)

func ReadStartupMessage

func ReadStartupMessage(r io.Reader) (map[string]string, error)

func RecordFailedAuthAttempt

func RecordFailedAuthAttempt(rateLimiter *RateLimiter, remoteAddr net.Addr) bool

RecordFailedAuthAttempt records auth telemetry and updates rate-limit state. Returns true when this failure causes the source IP to be banned.

func RecordSuccessfulAuthAttempt

func RecordSuccessfulAuthAttempt(rateLimiter *RateLimiter, remoteAddr net.Addr)

RecordSuccessfulAuthAttempt clears failure tracking after successful auth.

func RunChildMode

func RunChildMode()

RunChildMode is the entry point for child worker processes. It reconstructs the TCP connection from FD 3, completes TLS handshake, authenticates the user, creates a DuckDB connection, and runs the message loop.

Configuration is read from stdin as JSON (more secure than env vars for passwords).

Exit codes:

  • 0: Success (clean disconnect)
  • 1: Error (crash, protocol error)
  • 10: Authentication failure

func RunMessageLoop

func RunMessageLoop(cc *clientConn) error

RunMessageLoop runs the main message loop for a client connection. It cancels the connection context when the loop exits, ensuring in-flight query contexts (and any gRPC calls derived from them) are cancelled promptly.

func RunShell

func RunShell(cfg Config)

RunShell starts an interactive SQL shell with a fully initialized DuckDB connection. It uses the same CreateDBConnection path as the PostgreSQL server, so extensions, DuckLake, and pg_catalog views are all available.

func SendInitialParams

func SendInitialParams(cc *clientConn)

SendInitialParams sends the initial parameter status messages and backend key data.

func SetProcessVersion

func SetProcessVersion(v string)

SetProcessVersion sets the version string for this process. Called from main().

func StartCredentialRefresh

func StartCredentialRefresh(execer sqlExecer, dlCfg DuckLakeConfig) func()

StartCredentialRefresh starts a background goroutine that periodically refreshes S3 credentials for long-lived DuckDB connections using the credential_chain provider. This prevents credential expiration when running on EC2 with IAM instance roles, STS assume-role, or other temporary credential sources.

The execer parameter accepts either *sql.DB (standalone mode) or *sql.Conn (worker mode where the pool's only connection is pinned by the session).

Note: ExecContext serializes behind any running query (pool contention for *sql.DB, internal mutex for *sql.Conn). This means credentials are refreshed between queries, not during them. A query that runs longer than the credential TTL (~6h for instance roles) could still fail if DuckDB makes S3 requests with stale cached credentials.

Returns a stop function that cancels the refresh goroutine. The caller must call the stop function when the connection is closed to prevent goroutine leaks. If credential refresh is not needed (static credentials, no S3, etc.), returns a no-op.

func SystemMemoryBytes

func SystemMemoryBytes() uint64

SystemMemoryBytes returns total physical memory in bytes, cached after first call. Returns 0 if the system memory cannot be detected (e.g., on non-Linux systems).

func ValidateMemoryLimit

func ValidateMemoryLimit(v string) bool

ValidateMemoryLimit checks that a memory_limit string is a valid DuckDB size value.

func ValidateUserPassword

func ValidateUserPassword(users map[string]string, username, password string) bool

ValidateUserPassword validates username/password without leaking user existence via credential-compare timing differences.

func WriteAuthCleartextPassword

func WriteAuthCleartextPassword(w io.Writer) error

func WriteAuthOK

func WriteAuthOK(w io.Writer) error

func WriteBackendKeyData

func WriteBackendKeyData(w io.Writer, pid, secretKey int32) error

func WriteErrorResponse

func WriteErrorResponse(w io.Writer, severity, code, message string) error

func WriteParameterStatus

func WriteParameterStatus(w io.Writer, name, value string) error

func WriteReadyForQuery

func WriteReadyForQuery(w io.Writer, txStatus byte) error

Types

type ACMEManager

type ACMEManager struct {
	// contains filtered or unexported fields
}

ACMEManager wraps autocert.Manager to provide Let's Encrypt TLS certificates. It starts an HTTP listener on port 80 for HTTP-01 challenge validation.

func NewACMEManager

func NewACMEManager(domain, email, cacheDir, httpAddr string) (*ACMEManager, error)

NewACMEManager creates a new ACME manager for the given domain. It starts an HTTP listener on the specified address (default ":80") for HTTP-01 challenges. cacheDir is used to persist certificates across restarts.

func (*ACMEManager) Close

func (a *ACMEManager) Close() error

Close gracefully shuts down the HTTP challenge listener. Safe to call multiple times.

func (*ACMEManager) TLSConfig

func (a *ACMEManager) TLSConfig() *tls.Config

TLSConfig returns a tls.Config that uses ACME for certificate management. The GetCertificate callback dynamically obtains/renews certificates.

type BackendKey

type BackendKey struct {
	Pid       int32
	SecretKey int32
}

BackendKey uniquely identifies a backend connection for cancel requests

type ChildConfig

type ChildConfig struct {
	// Connection info (RemoteAddr is known at spawn time; Username/Database are read after TLS)
	RemoteAddr string `json:"remote_addr"`

	// Server config
	DataDir     string   `json:"data_dir"`
	Extensions  []string `json:"extensions"`
	IdleTimeout int64    `json:"idle_timeout"` // nanoseconds

	// TLS config
	TLSCertFile string `json:"tls_cert_file"`
	TLSKeyFile  string `json:"tls_key_file"`

	// DuckLake config
	DuckLake DuckLakeConfig `json:"ducklake"`

	// Authentication - map of username -> password
	// Child will look up after reading username from startup message
	Users map[string]string `json:"users"`

	// Backend key (pre-generated by parent for cancel request routing)
	// BackendPid is set to child's actual PID after fork
	BackendSecretKey int32 `json:"backend_secret_key"`

	// ServerStartTime is the parent server's start time (Unix nanoseconds).
	// Used to distinguish server uptime from child process uptime.
	ServerStartTime int64 `json:"server_start_time"`

	// ServerVersion is the parent server's version string.
	// Used to distinguish control_plane_version() from worker_version().
	ServerVersion string `json:"server_version,omitempty"`
}

ChildConfig contains all configuration needed by a child worker process. It is passed from parent to child via the DUCKGRES_CHILD_CONFIG env var as JSON.

type ChildProcess

type ChildProcess struct {
	PID        int
	Cmd        *exec.Cmd
	Username   string
	RemoteAddr string
	BackendKey BackendKey
	StartTime  time.Time
	// contains filtered or unexported fields
}

ChildProcess represents a spawned child worker process

type ChildTracker

type ChildTracker struct {
	// contains filtered or unexported fields
}

ChildTracker manages spawned child worker processes

func NewChildTracker

func NewChildTracker() *ChildTracker

NewChildTracker creates a new child tracker

func (*ChildTracker) Add

func (ct *ChildTracker) Add(child *ChildProcess)

Add registers a new child process

func (*ChildTracker) Count

func (ct *ChildTracker) Count() int

Count returns the number of active child processes

func (*ChildTracker) FindByBackendKey

func (ct *ChildTracker) FindByBackendKey(key BackendKey) *ChildProcess

FindByBackendKey finds a child process by its backend key (for cancel requests)

func (*ChildTracker) Get

func (ct *ChildTracker) Get(pid int) *ChildProcess

Get returns a child process by PID

func (*ChildTracker) Remove

func (ct *ChildTracker) Remove(pid int) *ChildProcess

Remove unregisters a child process by PID

func (*ChildTracker) SignalAll

func (ct *ChildTracker) SignalAll(sig syscall.Signal)

SignalAll sends a signal to all child processes

func (*ChildTracker) WaitAll

func (ct *ChildTracker) WaitAll() <-chan struct{}

WaitAll returns a channel that is closed when all children have exited. Caller should call this after SignalAll to wait for graceful shutdown.

NOTE: This method creates a new goroutine each time it's called. It captures a snapshot of current children at call time - children added after the call won't be waited on. For typical shutdown scenarios, call this once after SignalAll.

type ColumnTyper

type ColumnTyper interface {
	DatabaseTypeName() string
}

ColumnTyper provides type name information for a database column. *sql.ColumnType satisfies this interface.

type Config

type Config struct {
	Host string
	Port int
	// FlightPort enables Arrow Flight SQL ingress on the control plane.
	// 0 disables Flight ingress.
	FlightPort int

	// FlightSessionIdleTTL controls how long an idle Flight auth session is kept
	// before being reaped.
	FlightSessionIdleTTL time.Duration

	// FlightSessionReapInterval controls how frequently idle Flight auth sessions
	// are scanned and reaped.
	FlightSessionReapInterval time.Duration

	// FlightHandleIdleTTL controls stale prepared/query handle cleanup inside a
	// Flight auth session.
	FlightHandleIdleTTL time.Duration

	// FlightSessionTokenTTL controls the absolute lifetime of issued
	// x-duckgres-session tokens. Expired tokens are rejected and require
	// a fresh bootstrap request.
	FlightSessionTokenTTL time.Duration
	DataDir               string
	Users                 map[string]string // username -> password

	// TLS configuration (required unless ACME is configured)
	TLSCertFile string // Path to TLS certificate file
	TLSKeyFile  string // Path to TLS private key file

	// ACME/Let's Encrypt configuration (alternative to static TLS cert/key)
	ACMEDomain   string // Domain for ACME certificate (e.g., "decisive-mongoose-wine.us.duckgres.com")
	ACMEEmail    string // Contact email for Let's Encrypt notifications
	ACMECacheDir string // Directory for cached certificates (default: "./certs/acme")

	// Rate limiting configuration
	RateLimit RateLimitConfig

	// Extensions to load on database initialization
	Extensions []string

	// DuckLake configuration
	DuckLake DuckLakeConfig

	// Graceful shutdown timeout (default: 30s)
	ShutdownTimeout time.Duration

	// IdleTimeout is the maximum time a connection can be idle before being closed.
	// This prevents accumulation of zombie connections from clients that disconnect
	// uncleanly. Default: 24 hours. Set to a negative value (e.g., -1) to disable.
	IdleTimeout time.Duration

	// ProcessIsolation enables spawning each client connection in a separate OS process.
	// This prevents DuckDB C++ crashes from taking down the entire server.
	// When enabled, rate limiting and cancel requests are handled by the parent process,
	// while TLS, authentication, and query execution happen in child processes.
	ProcessIsolation bool

	// MemoryLimit is the DuckDB memory_limit per session (e.g., "4GB").
	// If empty, auto-detected from system memory.
	MemoryLimit string

	// Threads is the DuckDB threads per session.
	// If zero, defaults to runtime.NumCPU().
	Threads int

	// MemoryBudget is the total memory available for all DuckDB sessions (e.g., "24GB").
	// Used in control-plane mode for dynamic per-session memory allocation.
	// If empty, defaults to 75% of system RAM.
	MemoryBudget string

	// MemoryRebalance enables dynamic per-connection memory reallocation in control-plane mode.
	// When enabled, the memory budget is redistributed across all active sessions on every
	// connect/disconnect. When disabled (default), each session gets a static allocation
	// of budget/max_workers at creation time.
	MemoryRebalance bool

	// MaxWorkers is the maximum number of worker processes in control-plane mode.
	// 0 means unlimited.
	MaxWorkers int

	// MinWorkers is the number of pre-warmed worker processes at startup in control-plane mode.
	// 0 means no pre-warming (workers spawn on demand).
	MinWorkers int

	// PassthroughUsers are users that bypass the SQL transpiler and pg_catalog initialization.
	// Queries from these users go directly to DuckDB without any PostgreSQL compatibility layer.
	PassthroughUsers map[string]bool
}

type CopyFromOptions

type CopyFromOptions struct {
	TableName  string
	ColumnList string // Empty string or "(col1, col2, ...)"
	Delimiter  string
	HasHeader  bool
	NullString string
	Quote      string // Quote character (default " for CSV)
	Escape     string // Escape character (default same as Quote)
	IsBinary   bool   // True if FORMAT binary
}

CopyFromOptions contains parsed options from a COPY FROM STDIN command

func ParseCopyFromOptions

func ParseCopyFromOptions(query string) (*CopyFromOptions, error)

ParseCopyFromOptions extracts options from a COPY FROM STDIN command

type CopyToOptions

type CopyToOptions struct {
	Source    string // Table name or (SELECT query)
	Delimiter string
	HasHeader bool
	IsQuery   bool // True if Source is a query in parentheses
}

CopyToOptions contains parsed options from a COPY TO STDOUT command

func ParseCopyToOptions

func ParseCopyToOptions(query string) (*CopyToOptions, error)

ParseCopyToOptions extracts options from a COPY TO STDOUT command

type DuckLakeConfig

type DuckLakeConfig struct {
	// MetadataStore is the connection string for the DuckLake metadata database
	// Format: "postgres:host=<host> user=<user> password=<password> dbname=<db>"
	MetadataStore string

	// ObjectStore is the S3-compatible storage path for DuckLake data files
	// Format: "s3://bucket/path/" for S3/MinIO
	// If not specified, uses DataPath for local storage
	ObjectStore string

	// DataPath is the local file system path for DuckLake data files
	// Used when ObjectStore is not set (for local/non-S3 storage)
	DataPath string

	// S3 credential provider: "config" (explicit credentials) or "credential_chain" (AWS SDK chain)
	// Default: "config" if S3AccessKey is set, otherwise "credential_chain"
	S3Provider string

	// S3 configuration for "config" provider (explicit credentials for MinIO or S3)
	S3Endpoint  string // e.g., "localhost:9000" for MinIO
	S3AccessKey string // S3 access key ID
	S3SecretKey string // S3 secret access key
	S3Region    string // S3 region (default: us-east-1)
	S3UseSSL    bool   // Use HTTPS for S3 connections (default: false for MinIO)
	S3URLStyle  string // "path" or "vhost" (default: "path" for MinIO compatibility)

	// S3 configuration for "credential_chain" provider (AWS SDK credential chain)
	// Chain specifies which credential sources to check, semicolon-separated
	// Options: env, config, sts, sso, instance, process
	// Default: checks all sources in AWS SDK order
	S3Chain   string // e.g., "env;config" to check env vars then config files
	S3Profile string // AWS profile name to use (for "config" chain)
}

DuckLakeConfig configures DuckLake catalog attachment

type ExecResult

type ExecResult interface {
	RowsAffected() (int64, error)
}

ExecResult represents the result of a non-query execution.

type FlightExecutor

type FlightExecutor struct {
	// contains filtered or unexported fields
}

FlightExecutor implements QueryExecutor backed by an Arrow Flight SQL client. It routes queries to a duckdb-service worker process over a Unix socket.

func NewFlightExecutor

func NewFlightExecutor(addr, bearerToken, sessionToken string) (*FlightExecutor, error)

NewFlightExecutor creates a FlightExecutor connected to the given address. addr should be "unix:///path/to/socket" for Unix sockets or "host:port" for TCP. bearerToken is the authentication token for the duckdb-service. sessionToken is the session identifier for the x-duckgres-session header.

func NewFlightExecutorFromClient

func NewFlightExecutorFromClient(client *flightsql.Client, sessionToken string) *FlightExecutor

NewFlightExecutorFromClient creates a FlightExecutor that shares an existing Flight SQL client. The client is NOT closed when this executor is closed. This avoids creating a new gRPC connection per session.

func (*FlightExecutor) Close

func (e *FlightExecutor) Close() error

func (*FlightExecutor) ConnContext

func (e *FlightExecutor) ConnContext(ctx context.Context) (RawConn, error)

func (*FlightExecutor) Exec

func (e *FlightExecutor) Exec(query string, args ...any) (ExecResult, error)

func (*FlightExecutor) ExecContext

func (e *FlightExecutor) ExecContext(ctx context.Context, query string, args ...any) (result ExecResult, err error)

func (*FlightExecutor) IsDead

func (e *FlightExecutor) IsDead() bool

IsDead reports whether this executor has been marked dead.

func (*FlightExecutor) MarkDead

func (e *FlightExecutor) MarkDead()

MarkDead marks this executor's backing worker as dead. All subsequent RPC calls will return ErrWorkerDead without touching the (possibly closed) gRPC client.

func (*FlightExecutor) PingContext

func (e *FlightExecutor) PingContext(ctx context.Context) error

func (*FlightExecutor) Query

func (e *FlightExecutor) Query(query string, args ...any) (RowSet, error)

func (*FlightExecutor) QueryContext

func (e *FlightExecutor) QueryContext(ctx context.Context, query string, args ...any) (rs RowSet, err error)

type FlightRowSet

type FlightRowSet struct {
	// contains filtered or unexported fields
}

FlightRowSet wraps an Arrow Flight RecordBatch reader to implement RowSet.

func (*FlightRowSet) Close

func (r *FlightRowSet) Close() error

func (*FlightRowSet) ColumnTypes

func (r *FlightRowSet) ColumnTypes() ([]ColumnTyper, error)

func (*FlightRowSet) Columns

func (r *FlightRowSet) Columns() ([]string, error)

func (*FlightRowSet) Err

func (r *FlightRowSet) Err() error

func (*FlightRowSet) Next

func (r *FlightRowSet) Next() bool

func (*FlightRowSet) Scan

func (r *FlightRowSet) Scan(dest ...any) error

type LocalExecutor

type LocalExecutor struct {
	// contains filtered or unexported fields
}

LocalExecutor wraps *sql.DB to implement QueryExecutor for local DuckDB access.

func NewLocalExecutor

func NewLocalExecutor(db *sql.DB) *LocalExecutor

NewLocalExecutor creates a new LocalExecutor wrapping the given *sql.DB.

func (*LocalExecutor) Close

func (e *LocalExecutor) Close() error

func (*LocalExecutor) ConnContext

func (e *LocalExecutor) ConnContext(ctx context.Context) (RawConn, error)

func (*LocalExecutor) DB

func (e *LocalExecutor) DB() *sql.DB

DB returns the underlying *sql.DB (for credential refresh and other direct access).

func (*LocalExecutor) Exec

func (e *LocalExecutor) Exec(query string, args ...any) (ExecResult, error)

func (*LocalExecutor) ExecContext

func (e *LocalExecutor) ExecContext(ctx context.Context, query string, args ...any) (ExecResult, error)

func (*LocalExecutor) PingContext

func (e *LocalExecutor) PingContext(ctx context.Context) error

func (*LocalExecutor) Query

func (e *LocalExecutor) Query(query string, args ...any) (RowSet, error)

func (*LocalExecutor) QueryContext

func (e *LocalExecutor) QueryContext(ctx context.Context, query string, args ...any) (RowSet, error)

type LocalRowSet

type LocalRowSet struct {
	// contains filtered or unexported fields
}

LocalRowSet wraps *sql.Rows to implement RowSet.

func (*LocalRowSet) Close

func (r *LocalRowSet) Close() error

func (*LocalRowSet) ColumnTypes

func (r *LocalRowSet) ColumnTypes() ([]ColumnTyper, error)

func (*LocalRowSet) Columns

func (r *LocalRowSet) Columns() ([]string, error)

func (*LocalRowSet) Err

func (r *LocalRowSet) Err() error

func (*LocalRowSet) Next

func (r *LocalRowSet) Next() bool

func (*LocalRowSet) Scan

func (r *LocalRowSet) Scan(dest ...any) error

type OrderedMapValue

type OrderedMapValue struct {
	Keys   []any
	Values []any
}

OrderedMapValue represents a DuckDB MAP as parallel key/value slices, preserving insertion order from Arrow MAP arrays. Using parallel slices instead of a Go map avoids panics on non-comparable key types (e.g., []byte from BLOB keys) and preserves DuckDB's MAP ordering.

type QueryExecutor

type QueryExecutor interface {
	QueryContext(ctx context.Context, query string, args ...any) (RowSet, error)
	ExecContext(ctx context.Context, query string, args ...any) (ExecResult, error)
	Query(query string, args ...any) (RowSet, error)
	Exec(query string, args ...any) (ExecResult, error)
	ConnContext(ctx context.Context) (RawConn, error)
	PingContext(ctx context.Context) error
	Close() error
}

QueryExecutor abstracts database query execution, allowing both local (*sql.DB) and remote (Arrow Flight SQL) backends.

type RateLimitConfig

type RateLimitConfig struct {
	// MaxFailedAttempts is the maximum number of failed auth attempts before banning
	MaxFailedAttempts int
	// FailedAttemptWindow is the time window for counting failed attempts
	FailedAttemptWindow time.Duration
	// BanDuration is how long to ban an IP after exceeding max failed attempts
	BanDuration time.Duration
	// MaxConnectionsPerIP is the max concurrent connections from a single IP (0 = unlimited)
	MaxConnectionsPerIP int
	// MaxConnections is the total max concurrent connections (0 = unlimited)
	MaxConnections int
}

RateLimitConfig configures rate limiting behavior

func DefaultRateLimitConfig

func DefaultRateLimitConfig() RateLimitConfig

DefaultRateLimitConfig returns sensible defaults for rate limiting

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter tracks and limits connections per IP

func NewRateLimiter

func NewRateLimiter(cfg RateLimitConfig) *RateLimiter

NewRateLimiter creates a new rate limiter with the given config

func (*RateLimiter) CheckConnection

func (rl *RateLimiter) CheckConnection(addr net.Addr) string

CheckConnection checks if a connection from the given address should be allowed Returns an error message if the connection should be rejected, or empty string if allowed

func (*RateLimiter) IsBanned

func (rl *RateLimiter) IsBanned(addr net.Addr) bool

IsBanned checks if an IP is currently banned

func (*RateLimiter) RecordFailedAuth

func (rl *RateLimiter) RecordFailedAuth(addr net.Addr) bool

RecordFailedAuth records a failed authentication attempt Returns true if the IP is now banned

func (*RateLimiter) RecordSuccessfulAuth

func (rl *RateLimiter) RecordSuccessfulAuth(addr net.Addr)

RecordSuccessfulAuth clears failed attempts for an IP after successful auth

func (*RateLimiter) RegisterConnection

func (rl *RateLimiter) RegisterConnection(addr net.Addr) bool

RegisterConnection records a new connection from the given address Returns true if the connection is allowed, false otherwise

func (*RateLimiter) UnregisterConnection

func (rl *RateLimiter) UnregisterConnection(addr net.Addr)

UnregisterConnection decrements the active connection count for an IP

type RawConn

type RawConn interface {
	Raw(func(any) error) error
	Close() error
}

RawConn provides access to the underlying driver connection. *sql.Conn satisfies this interface.

type RowSet

type RowSet interface {
	Columns() ([]string, error)
	ColumnTypes() ([]ColumnTyper, error)
	Next() bool
	Scan(dest ...any) error
	Close() error
	Err() error
}

RowSet represents a set of rows from a query result.

type Server

type Server struct {
	// contains filtered or unexported fields
}

func New

func New(cfg Config) (*Server, error)

func (*Server) ActiveConnections

func (s *Server) ActiveConnections() int64

ActiveConnections returns the number of active connections

func (*Server) CancelQuery

func (s *Server) CancelQuery(key BackendKey) bool

CancelQuery cancels a running query by its backend key. Returns true if a query was found and cancelled, false otherwise.

func (*Server) CancelQueryBySignal

func (s *Server) CancelQueryBySignal(key BackendKey) bool

CancelQueryBySignal sends SIGUSR1 to a child process to cancel its current query. Returns true if the signal was sent successfully.

func (*Server) Close

func (s *Server) Close() error

func (*Server) ListenAndServe

func (s *Server) ListenAndServe() error

func (*Server) RegisterQuery

func (s *Server) RegisterQuery(key BackendKey, cancel context.CancelFunc)

RegisterQuery registers a cancel function for a backend key. This allows the query to be cancelled via a cancel request from another connection.

func (*Server) Shutdown

func (s *Server) Shutdown(ctx context.Context) error

Shutdown performs a graceful shutdown with the given context

func (*Server) UnregisterQuery

func (s *Server) UnregisterQuery(key BackendKey)

UnregisterQuery removes the cancel function for a backend key. This should be called when a query completes (successfully or with error).

type TypeInfo

type TypeInfo struct {
	OID    int32
	Size   int16 // -1 for variable length
	Typmod int32 // -1 = no modifier; for NUMERIC: ((precision << 16) | scale) + 4
}

TypeInfo contains PostgreSQL type information

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL