Documentation
¶
Index ¶
- Constants
- Variables
- func AttachDuckLake(db *sql.DB, dlCfg DuckLakeConfig, sem chan struct{}) error
- func BeginRateLimitedAuthAttempt(rateLimiter *RateLimiter, remoteAddr net.Addr) (release func(), rejectReason string)
- func BuildDuckDBCopyFromSQL(tableName, columnList, filePath string, opts *CopyFromOptions) string
- func ConfigureDBConnection(db *sql.DB, cfg Config, duckLakeSem chan struct{}, username string, ...) error
- func CreateDBConnection(cfg Config, duckLakeSem chan struct{}, username string, ...) (*sql.DB, error)
- func CreatePassthroughDBConnection(cfg Config, duckLakeSem chan struct{}, username string, ...) (*sql.DB, error)
- func EnsureCertificates(certFile, keyFile string) error
- func GenerateSecretKey() int32
- func InitMinimalServer(s *Server, cfg Config, queryCancelCh <-chan struct{})
- func IsEmptyQuery(query string) bool
- func LoadExtensions(db *sql.DB, extensions []string) error
- func NewClientConn(s *Server, conn net.Conn, reader *bufio.Reader, writer *bufio.Writer, ...) *clientConn
- func ParseMemoryBytes(s string) uint64
- func ProcessVersion() string
- func ReadMessage(r io.Reader) (byte, []byte, error)
- func ReadStartupMessage(r io.Reader) (map[string]string, error)
- func RecordFailedAuthAttempt(rateLimiter *RateLimiter, remoteAddr net.Addr) bool
- func RecordSuccessfulAuthAttempt(rateLimiter *RateLimiter, remoteAddr net.Addr)
- func RunChildMode()
- func RunMessageLoop(cc *clientConn) error
- func RunShell(cfg Config)
- func SendInitialParams(cc *clientConn)
- func SetProcessVersion(v string)
- func StartCredentialRefresh(execer sqlExecer, dlCfg DuckLakeConfig) func()
- func SystemMemoryBytes() uint64
- func ValidateMemoryLimit(v string) bool
- func ValidateUserPassword(users map[string]string, username, password string) bool
- func WriteAuthCleartextPassword(w io.Writer) error
- func WriteAuthOK(w io.Writer) error
- func WriteBackendKeyData(w io.Writer, pid, secretKey int32) error
- func WriteErrorResponse(w io.Writer, severity, code, message string) error
- func WriteParameterStatus(w io.Writer, name, value string) error
- func WriteReadyForQuery(w io.Writer, txStatus byte) error
- type ACMEManager
- type BackendKey
- type ChildConfig
- type ChildProcess
- type ChildTracker
- func (ct *ChildTracker) Add(child *ChildProcess)
- func (ct *ChildTracker) Count() int
- func (ct *ChildTracker) FindByBackendKey(key BackendKey) *ChildProcess
- func (ct *ChildTracker) Get(pid int) *ChildProcess
- func (ct *ChildTracker) Remove(pid int) *ChildProcess
- func (ct *ChildTracker) SignalAll(sig syscall.Signal)
- func (ct *ChildTracker) WaitAll() <-chan struct{}
- type ColumnTyper
- type Config
- type CopyFromOptions
- type CopyToOptions
- type DuckLakeConfig
- type ExecResult
- type FlightExecutor
- func (e *FlightExecutor) Close() error
- func (e *FlightExecutor) ConnContext(ctx context.Context) (RawConn, error)
- func (e *FlightExecutor) Exec(query string, args ...any) (ExecResult, error)
- func (e *FlightExecutor) ExecContext(ctx context.Context, query string, args ...any) (result ExecResult, err error)
- func (e *FlightExecutor) IsDead() bool
- func (e *FlightExecutor) MarkDead()
- func (e *FlightExecutor) PingContext(ctx context.Context) error
- func (e *FlightExecutor) Query(query string, args ...any) (RowSet, error)
- func (e *FlightExecutor) QueryContext(ctx context.Context, query string, args ...any) (rs RowSet, err error)
- type FlightRowSet
- type LocalExecutor
- func (e *LocalExecutor) Close() error
- func (e *LocalExecutor) ConnContext(ctx context.Context) (RawConn, error)
- func (e *LocalExecutor) DB() *sql.DB
- func (e *LocalExecutor) Exec(query string, args ...any) (ExecResult, error)
- func (e *LocalExecutor) ExecContext(ctx context.Context, query string, args ...any) (ExecResult, error)
- func (e *LocalExecutor) PingContext(ctx context.Context) error
- func (e *LocalExecutor) Query(query string, args ...any) (RowSet, error)
- func (e *LocalExecutor) QueryContext(ctx context.Context, query string, args ...any) (RowSet, error)
- type LocalRowSet
- type OrderedMapValue
- type QueryExecutor
- type RateLimitConfig
- type RateLimiter
- func (rl *RateLimiter) CheckConnection(addr net.Addr) string
- func (rl *RateLimiter) IsBanned(addr net.Addr) bool
- func (rl *RateLimiter) RecordFailedAuth(addr net.Addr) bool
- func (rl *RateLimiter) RecordSuccessfulAuth(addr net.Addr)
- func (rl *RateLimiter) RegisterConnection(addr net.Addr) bool
- func (rl *RateLimiter) UnregisterConnection(addr net.Addr)
- type RawConn
- type RowSet
- type Server
- func (s *Server) ActiveConnections() int64
- func (s *Server) CancelQuery(key BackendKey) bool
- func (s *Server) CancelQueryBySignal(key BackendKey) bool
- func (s *Server) Close() error
- func (s *Server) ListenAndServe() error
- func (s *Server) RegisterQuery(key BackendKey, cancel context.CancelFunc)
- func (s *Server) Shutdown(ctx context.Context) error
- func (s *Server) UnregisterQuery(key BackendKey)
- type TypeInfo
Constants ¶
const ( OidBool int32 = 16 OidBytea int32 = 17 OidChar int32 = 18 // "char" - single-byte internal type OidName int32 = 19 // name - 64-byte internal type for identifiers OidInt8 int32 = 20 // bigint OidInt2 int32 = 21 // smallint OidInt4 int32 = 23 // integer OidText int32 = 25 OidOid int32 = 26 OidFloat4 int32 = 700 // real OidFloat8 int32 = 701 // double precision OidBpchar int32 = 1042 // blank-padded char OidVarchar int32 = 1043 OidDate int32 = 1082 OidTime int32 = 1083 OidTimestamp int32 = 1114 OidTimestamptz int32 = 1184 OidInterval int32 = 1186 OidNumeric int32 = 1700 OidUUID int32 = 2950 OidTimetz int32 = 1266 OidJSON int32 = 114 OidJSONB int32 = 3802 // Array OIDs OidBoolArray int32 = 1000 OidInt2Array int32 = 1005 OidInt4Array int32 = 1007 OidTextArray int32 = 1009 OidVarcharArray int32 = 1015 OidInt8Array int32 = 1016 OidFloat4Array int32 = 1021 OidFloat8Array int32 = 1022 OidTimestampArray int32 = 1115 OidDateArray int32 = 1182 OidTimeArray int32 = 1183 OidTimestamptzArray int32 = 1185 OidIntervalArray int32 = 1187 OidNumericArray int32 = 1231 OidTimetzArray int32 = 1270 OidUUIDArray int32 = 2951 )
PostgreSQL type OIDs
const ( ExitSuccess = 0 // Clean disconnect ExitError = 1 // Error (crash, protocol error) ExitAuthFailure = 10 // Authentication failure (triggers rate limit update) )
Exit codes for child processes
const MaxGRPCMessageSize = 1 << 30 // 1GB
MaxGRPCMessageSize is the max gRPC message size for Flight SQL communication. DuckDB query results can easily exceed the default 4MB limit.
Variables ¶
var ErrWorkerDead = errors.New("flight worker is dead")
ErrWorkerDead is returned when the backing worker process has crashed.
Functions ¶
func AttachDuckLake ¶
func AttachDuckLake(db *sql.DB, dlCfg DuckLakeConfig, sem chan struct{}) error
AttachDuckLake attaches a DuckLake catalog if configured (but does NOT set it as default). Call setDuckLakeDefault after creating per-connection views in memory.main. This is a standalone function so it can be reused by control plane workers.
func BeginRateLimitedAuthAttempt ¶
func BeginRateLimitedAuthAttempt(rateLimiter *RateLimiter, remoteAddr net.Addr) (release func(), rejectReason string)
BeginRateLimitedAuthAttempt enforces rate-limit policy before an auth attempt. The returned release function must be called once the attempt is complete.
func BuildDuckDBCopyFromSQL ¶
func BuildDuckDBCopyFromSQL(tableName, columnList, filePath string, opts *CopyFromOptions) string
BuildDuckDBCopyFromSQL generates a DuckDB COPY FROM statement
func ConfigureDBConnection ¶
func ConfigureDBConnection(db *sql.DB, cfg Config, duckLakeSem chan struct{}, username string, serverStartTime time.Time, serverVersion string) error
ConfigureDBConnection initializes an existing DuckDB connection with pg_catalog, information_schema, and DuckLake catalog attachment.
func CreateDBConnection ¶
func CreateDBConnection(cfg Config, duckLakeSem chan struct{}, username string, serverStartTime time.Time, serverVersion string) (*sql.DB, error)
CreateDBConnection creates a DuckDB connection for a client session. Uses in-memory database as an anchor for DuckLake attachment (actual data lives in RDS/S3). This is a standalone function so it can be reused by both the server and control plane workers. serverStartTime is the time the top-level server process started (may differ from processStartTime in process isolation mode where each child has its own processStartTime). serverVersion is the version of the top-level server/control-plane process.
func CreatePassthroughDBConnection ¶
func CreatePassthroughDBConnection(cfg Config, duckLakeSem chan struct{}, username string, serverStartTime time.Time, serverVersion string) (*sql.DB, error)
CreatePassthroughDBConnection creates a DuckDB connection without pg_catalog or information_schema initialization. DuckLake is still attached if configured so passthrough users can access the same data. This is used for passthrough users who send DuckDB-native SQL and don't need the PostgreSQL compatibility layer.
func EnsureCertificates ¶
EnsureCertificates checks if TLS certificates exist, and generates self-signed ones if not. Returns the paths to the cert and key files.
func GenerateSecretKey ¶
func GenerateSecretKey() int32
GenerateSecretKey generates a cryptographically random secret key for cancel requests.
func InitMinimalServer ¶
InitMinimalServer initializes a Server struct with minimal fields for use in control plane worker sessions.
func IsEmptyQuery ¶
IsEmptyQuery checks if a query contains only semicolons, whitespace, and/or SQL comments. PostgreSQL returns EmptyQueryResponse for queries like ";", "-- ping", "/* */", etc.
func LoadExtensions ¶
LoadExtensions installs and loads DuckDB extensions. This is a standalone function so it can be reused by control plane workers. Extension strings can include a source, e.g. "cache_httpfs FROM community". INSTALL uses the full string; LOAD uses just the extension name.
NOTE: Extension names come from trusted server config, not user input.
func NewClientConn ¶
func NewClientConn(s *Server, conn net.Conn, reader *bufio.Reader, writer *bufio.Writer, username, database, applicationName string, executor QueryExecutor, pid, secretKey int32, workerID int) *clientConn
NewClientConn creates a clientConn with pre-initialized fields for use by the control plane worker. The returned value is opaque (*clientConn) but can be used with SendInitialParams and RunMessageLoop.
func ParseMemoryBytes ¶
ParseMemoryBytes parses a DuckDB memory size string (e.g., "4GB", "512MB", "1.5GB") into bytes. Supports both integer and fractional values. Returns 0 if the string is empty, invalid, or "0GB".
func ProcessVersion ¶
func ProcessVersion() string
ProcessVersion returns the version string for this process.
func RecordFailedAuthAttempt ¶
func RecordFailedAuthAttempt(rateLimiter *RateLimiter, remoteAddr net.Addr) bool
RecordFailedAuthAttempt records auth telemetry and updates rate-limit state. Returns true when this failure causes the source IP to be banned.
func RecordSuccessfulAuthAttempt ¶
func RecordSuccessfulAuthAttempt(rateLimiter *RateLimiter, remoteAddr net.Addr)
RecordSuccessfulAuthAttempt clears failure tracking after successful auth.
func RunChildMode ¶
func RunChildMode()
RunChildMode is the entry point for child worker processes. It reconstructs the TCP connection from FD 3, completes TLS handshake, authenticates the user, creates a DuckDB connection, and runs the message loop.
Configuration is read from stdin as JSON (more secure than env vars for passwords).
Exit codes:
- 0: Success (clean disconnect)
- 1: Error (crash, protocol error)
- 10: Authentication failure
func RunMessageLoop ¶
func RunMessageLoop(cc *clientConn) error
RunMessageLoop runs the main message loop for a client connection. It cancels the connection context when the loop exits, ensuring in-flight query contexts (and any gRPC calls derived from them) are cancelled promptly.
func RunShell ¶
func RunShell(cfg Config)
RunShell starts an interactive SQL shell with a fully initialized DuckDB connection. It uses the same CreateDBConnection path as the PostgreSQL server, so extensions, DuckLake, and pg_catalog views are all available.
func SendInitialParams ¶
func SendInitialParams(cc *clientConn)
SendInitialParams sends the initial parameter status messages and backend key data.
func SetProcessVersion ¶
func SetProcessVersion(v string)
SetProcessVersion sets the version string for this process. Called from main().
func StartCredentialRefresh ¶
func StartCredentialRefresh(execer sqlExecer, dlCfg DuckLakeConfig) func()
StartCredentialRefresh starts a background goroutine that periodically refreshes S3 credentials for long-lived DuckDB connections using the credential_chain provider. This prevents credential expiration when running on EC2 with IAM instance roles, STS assume-role, or other temporary credential sources.
The execer parameter accepts either *sql.DB (standalone mode) or *sql.Conn (worker mode where the pool's only connection is pinned by the session).
Note: ExecContext serializes behind any running query (pool contention for *sql.DB, internal mutex for *sql.Conn). This means credentials are refreshed between queries, not during them. A query that runs longer than the credential TTL (~6h for instance roles) could still fail if DuckDB makes S3 requests with stale cached credentials.
Returns a stop function that cancels the refresh goroutine. The caller must call the stop function when the connection is closed to prevent goroutine leaks. If credential refresh is not needed (static credentials, no S3, etc.), returns a no-op.
func SystemMemoryBytes ¶
func SystemMemoryBytes() uint64
SystemMemoryBytes returns total physical memory in bytes, cached after first call. Returns 0 if the system memory cannot be detected (e.g., on non-Linux systems).
func ValidateMemoryLimit ¶
ValidateMemoryLimit checks that a memory_limit string is a valid DuckDB size value.
func ValidateUserPassword ¶
ValidateUserPassword validates username/password without leaking user existence via credential-compare timing differences.
func WriteAuthOK ¶
func WriteErrorResponse ¶
Types ¶
type ACMEManager ¶
type ACMEManager struct {
// contains filtered or unexported fields
}
ACMEManager wraps autocert.Manager to provide Let's Encrypt TLS certificates. It starts an HTTP listener on port 80 for HTTP-01 challenge validation.
func NewACMEManager ¶
func NewACMEManager(domain, email, cacheDir, httpAddr string) (*ACMEManager, error)
NewACMEManager creates a new ACME manager for the given domain. It starts an HTTP listener on the specified address (default ":80") for HTTP-01 challenges. cacheDir is used to persist certificates across restarts.
func (*ACMEManager) Close ¶
func (a *ACMEManager) Close() error
Close gracefully shuts down the HTTP challenge listener. Safe to call multiple times.
func (*ACMEManager) TLSConfig ¶
func (a *ACMEManager) TLSConfig() *tls.Config
TLSConfig returns a tls.Config that uses ACME for certificate management. The GetCertificate callback dynamically obtains/renews certificates.
type BackendKey ¶
BackendKey uniquely identifies a backend connection for cancel requests
type ChildConfig ¶
type ChildConfig struct {
// Connection info (RemoteAddr is known at spawn time; Username/Database are read after TLS)
RemoteAddr string `json:"remote_addr"`
// Server config
DataDir string `json:"data_dir"`
Extensions []string `json:"extensions"`
IdleTimeout int64 `json:"idle_timeout"` // nanoseconds
// TLS config
TLSCertFile string `json:"tls_cert_file"`
TLSKeyFile string `json:"tls_key_file"`
// DuckLake config
DuckLake DuckLakeConfig `json:"ducklake"`
// Authentication - map of username -> password
// Child will look up after reading username from startup message
Users map[string]string `json:"users"`
// Backend key (pre-generated by parent for cancel request routing)
// BackendPid is set to child's actual PID after fork
BackendSecretKey int32 `json:"backend_secret_key"`
// ServerStartTime is the parent server's start time (Unix nanoseconds).
// Used to distinguish server uptime from child process uptime.
ServerStartTime int64 `json:"server_start_time"`
// ServerVersion is the parent server's version string.
// Used to distinguish control_plane_version() from worker_version().
ServerVersion string `json:"server_version,omitempty"`
}
ChildConfig contains all configuration needed by a child worker process. It is passed from parent to child via the DUCKGRES_CHILD_CONFIG env var as JSON.
type ChildProcess ¶
type ChildProcess struct {
PID int
Cmd *exec.Cmd
Username string
RemoteAddr string
BackendKey BackendKey
StartTime time.Time
// contains filtered or unexported fields
}
ChildProcess represents a spawned child worker process
type ChildTracker ¶
type ChildTracker struct {
// contains filtered or unexported fields
}
ChildTracker manages spawned child worker processes
func NewChildTracker ¶
func NewChildTracker() *ChildTracker
NewChildTracker creates a new child tracker
func (*ChildTracker) Add ¶
func (ct *ChildTracker) Add(child *ChildProcess)
Add registers a new child process
func (*ChildTracker) Count ¶
func (ct *ChildTracker) Count() int
Count returns the number of active child processes
func (*ChildTracker) FindByBackendKey ¶
func (ct *ChildTracker) FindByBackendKey(key BackendKey) *ChildProcess
FindByBackendKey finds a child process by its backend key (for cancel requests)
func (*ChildTracker) Get ¶
func (ct *ChildTracker) Get(pid int) *ChildProcess
Get returns a child process by PID
func (*ChildTracker) Remove ¶
func (ct *ChildTracker) Remove(pid int) *ChildProcess
Remove unregisters a child process by PID
func (*ChildTracker) SignalAll ¶
func (ct *ChildTracker) SignalAll(sig syscall.Signal)
SignalAll sends a signal to all child processes
func (*ChildTracker) WaitAll ¶
func (ct *ChildTracker) WaitAll() <-chan struct{}
WaitAll returns a channel that is closed when all children have exited. Caller should call this after SignalAll to wait for graceful shutdown.
NOTE: This method creates a new goroutine each time it's called. It captures a snapshot of current children at call time - children added after the call won't be waited on. For typical shutdown scenarios, call this once after SignalAll.
type ColumnTyper ¶
type ColumnTyper interface {
DatabaseTypeName() string
}
ColumnTyper provides type name information for a database column. *sql.ColumnType satisfies this interface.
type Config ¶
type Config struct {
Host string
Port int
// FlightPort enables Arrow Flight SQL ingress on the control plane.
// 0 disables Flight ingress.
FlightPort int
// FlightSessionIdleTTL controls how long an idle Flight auth session is kept
// before being reaped.
FlightSessionIdleTTL time.Duration
// FlightSessionReapInterval controls how frequently idle Flight auth sessions
// are scanned and reaped.
FlightSessionReapInterval time.Duration
// FlightHandleIdleTTL controls stale prepared/query handle cleanup inside a
// Flight auth session.
FlightHandleIdleTTL time.Duration
// FlightSessionTokenTTL controls the absolute lifetime of issued
// x-duckgres-session tokens. Expired tokens are rejected and require
// a fresh bootstrap request.
FlightSessionTokenTTL time.Duration
DataDir string
Users map[string]string // username -> password
// TLS configuration (required unless ACME is configured)
TLSCertFile string // Path to TLS certificate file
TLSKeyFile string // Path to TLS private key file
// ACME/Let's Encrypt configuration (alternative to static TLS cert/key)
ACMEDomain string // Domain for ACME certificate (e.g., "decisive-mongoose-wine.us.duckgres.com")
ACMEEmail string // Contact email for Let's Encrypt notifications
ACMECacheDir string // Directory for cached certificates (default: "./certs/acme")
// Rate limiting configuration
RateLimit RateLimitConfig
// Extensions to load on database initialization
Extensions []string
// DuckLake configuration
DuckLake DuckLakeConfig
// Graceful shutdown timeout (default: 30s)
ShutdownTimeout time.Duration
// IdleTimeout is the maximum time a connection can be idle before being closed.
// This prevents accumulation of zombie connections from clients that disconnect
// uncleanly. Default: 24 hours. Set to a negative value (e.g., -1) to disable.
IdleTimeout time.Duration
// ProcessIsolation enables spawning each client connection in a separate OS process.
// This prevents DuckDB C++ crashes from taking down the entire server.
// When enabled, rate limiting and cancel requests are handled by the parent process,
// while TLS, authentication, and query execution happen in child processes.
ProcessIsolation bool
// MemoryLimit is the DuckDB memory_limit per session (e.g., "4GB").
// If empty, auto-detected from system memory.
MemoryLimit string
// Threads is the DuckDB threads per session.
// If zero, defaults to runtime.NumCPU().
Threads int
// MemoryBudget is the total memory available for all DuckDB sessions (e.g., "24GB").
// Used in control-plane mode for dynamic per-session memory allocation.
// If empty, defaults to 75% of system RAM.
MemoryBudget string
// MemoryRebalance enables dynamic per-connection memory reallocation in control-plane mode.
// When enabled, the memory budget is redistributed across all active sessions on every
// connect/disconnect. When disabled (default), each session gets a static allocation
// of budget/max_workers at creation time.
MemoryRebalance bool
// MaxWorkers is the maximum number of worker processes in control-plane mode.
// 0 means unlimited.
MaxWorkers int
// MinWorkers is the number of pre-warmed worker processes at startup in control-plane mode.
// 0 means no pre-warming (workers spawn on demand).
MinWorkers int
// PassthroughUsers are users that bypass the SQL transpiler and pg_catalog initialization.
// Queries from these users go directly to DuckDB without any PostgreSQL compatibility layer.
PassthroughUsers map[string]bool
}
type CopyFromOptions ¶
type CopyFromOptions struct {
TableName string
ColumnList string // Empty string or "(col1, col2, ...)"
Delimiter string
HasHeader bool
NullString string
Quote string // Quote character (default " for CSV)
Escape string // Escape character (default same as Quote)
IsBinary bool // True if FORMAT binary
}
CopyFromOptions contains parsed options from a COPY FROM STDIN command
func ParseCopyFromOptions ¶
func ParseCopyFromOptions(query string) (*CopyFromOptions, error)
ParseCopyFromOptions extracts options from a COPY FROM STDIN command
type CopyToOptions ¶
type CopyToOptions struct {
Source string // Table name or (SELECT query)
Delimiter string
HasHeader bool
IsQuery bool // True if Source is a query in parentheses
}
CopyToOptions contains parsed options from a COPY TO STDOUT command
func ParseCopyToOptions ¶
func ParseCopyToOptions(query string) (*CopyToOptions, error)
ParseCopyToOptions extracts options from a COPY TO STDOUT command
type DuckLakeConfig ¶
type DuckLakeConfig struct {
// MetadataStore is the connection string for the DuckLake metadata database
// Format: "postgres:host=<host> user=<user> password=<password> dbname=<db>"
MetadataStore string
// ObjectStore is the S3-compatible storage path for DuckLake data files
// Format: "s3://bucket/path/" for S3/MinIO
// If not specified, uses DataPath for local storage
ObjectStore string
// DataPath is the local file system path for DuckLake data files
// Used when ObjectStore is not set (for local/non-S3 storage)
DataPath string
// S3 credential provider: "config" (explicit credentials) or "credential_chain" (AWS SDK chain)
// Default: "config" if S3AccessKey is set, otherwise "credential_chain"
S3Provider string
// S3 configuration for "config" provider (explicit credentials for MinIO or S3)
S3Endpoint string // e.g., "localhost:9000" for MinIO
S3AccessKey string // S3 access key ID
S3SecretKey string // S3 secret access key
S3Region string // S3 region (default: us-east-1)
S3UseSSL bool // Use HTTPS for S3 connections (default: false for MinIO)
S3URLStyle string // "path" or "vhost" (default: "path" for MinIO compatibility)
// S3 configuration for "credential_chain" provider (AWS SDK credential chain)
// Chain specifies which credential sources to check, semicolon-separated
// Options: env, config, sts, sso, instance, process
// Default: checks all sources in AWS SDK order
S3Chain string // e.g., "env;config" to check env vars then config files
S3Profile string // AWS profile name to use (for "config" chain)
}
DuckLakeConfig configures DuckLake catalog attachment
type ExecResult ¶
ExecResult represents the result of a non-query execution.
type FlightExecutor ¶
type FlightExecutor struct {
// contains filtered or unexported fields
}
FlightExecutor implements QueryExecutor backed by an Arrow Flight SQL client. It routes queries to a duckdb-service worker process over a Unix socket.
func NewFlightExecutor ¶
func NewFlightExecutor(addr, bearerToken, sessionToken string) (*FlightExecutor, error)
NewFlightExecutor creates a FlightExecutor connected to the given address. addr should be "unix:///path/to/socket" for Unix sockets or "host:port" for TCP. bearerToken is the authentication token for the duckdb-service. sessionToken is the session identifier for the x-duckgres-session header.
func NewFlightExecutorFromClient ¶
func NewFlightExecutorFromClient(client *flightsql.Client, sessionToken string) *FlightExecutor
NewFlightExecutorFromClient creates a FlightExecutor that shares an existing Flight SQL client. The client is NOT closed when this executor is closed. This avoids creating a new gRPC connection per session.
func (*FlightExecutor) Close ¶
func (e *FlightExecutor) Close() error
func (*FlightExecutor) ConnContext ¶
func (e *FlightExecutor) ConnContext(ctx context.Context) (RawConn, error)
func (*FlightExecutor) Exec ¶
func (e *FlightExecutor) Exec(query string, args ...any) (ExecResult, error)
func (*FlightExecutor) ExecContext ¶
func (e *FlightExecutor) ExecContext(ctx context.Context, query string, args ...any) (result ExecResult, err error)
func (*FlightExecutor) IsDead ¶
func (e *FlightExecutor) IsDead() bool
IsDead reports whether this executor has been marked dead.
func (*FlightExecutor) MarkDead ¶
func (e *FlightExecutor) MarkDead()
MarkDead marks this executor's backing worker as dead. All subsequent RPC calls will return ErrWorkerDead without touching the (possibly closed) gRPC client.
func (*FlightExecutor) PingContext ¶
func (e *FlightExecutor) PingContext(ctx context.Context) error
func (*FlightExecutor) Query ¶
func (e *FlightExecutor) Query(query string, args ...any) (RowSet, error)
func (*FlightExecutor) QueryContext ¶
type FlightRowSet ¶
type FlightRowSet struct {
// contains filtered or unexported fields
}
FlightRowSet wraps an Arrow Flight RecordBatch reader to implement RowSet.
func (*FlightRowSet) Close ¶
func (r *FlightRowSet) Close() error
func (*FlightRowSet) ColumnTypes ¶
func (r *FlightRowSet) ColumnTypes() ([]ColumnTyper, error)
func (*FlightRowSet) Columns ¶
func (r *FlightRowSet) Columns() ([]string, error)
func (*FlightRowSet) Err ¶
func (r *FlightRowSet) Err() error
func (*FlightRowSet) Next ¶
func (r *FlightRowSet) Next() bool
func (*FlightRowSet) Scan ¶
func (r *FlightRowSet) Scan(dest ...any) error
type LocalExecutor ¶
type LocalExecutor struct {
// contains filtered or unexported fields
}
LocalExecutor wraps *sql.DB to implement QueryExecutor for local DuckDB access.
func NewLocalExecutor ¶
func NewLocalExecutor(db *sql.DB) *LocalExecutor
NewLocalExecutor creates a new LocalExecutor wrapping the given *sql.DB.
func (*LocalExecutor) Close ¶
func (e *LocalExecutor) Close() error
func (*LocalExecutor) ConnContext ¶
func (e *LocalExecutor) ConnContext(ctx context.Context) (RawConn, error)
func (*LocalExecutor) DB ¶
func (e *LocalExecutor) DB() *sql.DB
DB returns the underlying *sql.DB (for credential refresh and other direct access).
func (*LocalExecutor) Exec ¶
func (e *LocalExecutor) Exec(query string, args ...any) (ExecResult, error)
func (*LocalExecutor) ExecContext ¶
func (e *LocalExecutor) ExecContext(ctx context.Context, query string, args ...any) (ExecResult, error)
func (*LocalExecutor) PingContext ¶
func (e *LocalExecutor) PingContext(ctx context.Context) error
func (*LocalExecutor) Query ¶
func (e *LocalExecutor) Query(query string, args ...any) (RowSet, error)
func (*LocalExecutor) QueryContext ¶
type LocalRowSet ¶
type LocalRowSet struct {
// contains filtered or unexported fields
}
LocalRowSet wraps *sql.Rows to implement RowSet.
func (*LocalRowSet) Close ¶
func (r *LocalRowSet) Close() error
func (*LocalRowSet) ColumnTypes ¶
func (r *LocalRowSet) ColumnTypes() ([]ColumnTyper, error)
func (*LocalRowSet) Columns ¶
func (r *LocalRowSet) Columns() ([]string, error)
func (*LocalRowSet) Err ¶
func (r *LocalRowSet) Err() error
func (*LocalRowSet) Next ¶
func (r *LocalRowSet) Next() bool
func (*LocalRowSet) Scan ¶
func (r *LocalRowSet) Scan(dest ...any) error
type OrderedMapValue ¶
OrderedMapValue represents a DuckDB MAP as parallel key/value slices, preserving insertion order from Arrow MAP arrays. Using parallel slices instead of a Go map avoids panics on non-comparable key types (e.g., []byte from BLOB keys) and preserves DuckDB's MAP ordering.
type QueryExecutor ¶
type QueryExecutor interface {
QueryContext(ctx context.Context, query string, args ...any) (RowSet, error)
ExecContext(ctx context.Context, query string, args ...any) (ExecResult, error)
Query(query string, args ...any) (RowSet, error)
Exec(query string, args ...any) (ExecResult, error)
ConnContext(ctx context.Context) (RawConn, error)
PingContext(ctx context.Context) error
Close() error
}
QueryExecutor abstracts database query execution, allowing both local (*sql.DB) and remote (Arrow Flight SQL) backends.
type RateLimitConfig ¶
type RateLimitConfig struct {
// MaxFailedAttempts is the maximum number of failed auth attempts before banning
MaxFailedAttempts int
// FailedAttemptWindow is the time window for counting failed attempts
FailedAttemptWindow time.Duration
// BanDuration is how long to ban an IP after exceeding max failed attempts
BanDuration time.Duration
// MaxConnectionsPerIP is the max concurrent connections from a single IP (0 = unlimited)
MaxConnectionsPerIP int
// MaxConnections is the total max concurrent connections (0 = unlimited)
MaxConnections int
}
RateLimitConfig configures rate limiting behavior
func DefaultRateLimitConfig ¶
func DefaultRateLimitConfig() RateLimitConfig
DefaultRateLimitConfig returns sensible defaults for rate limiting
type RateLimiter ¶
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter tracks and limits connections per IP
func NewRateLimiter ¶
func NewRateLimiter(cfg RateLimitConfig) *RateLimiter
NewRateLimiter creates a new rate limiter with the given config
func (*RateLimiter) CheckConnection ¶
func (rl *RateLimiter) CheckConnection(addr net.Addr) string
CheckConnection checks if a connection from the given address should be allowed Returns an error message if the connection should be rejected, or empty string if allowed
func (*RateLimiter) IsBanned ¶
func (rl *RateLimiter) IsBanned(addr net.Addr) bool
IsBanned checks if an IP is currently banned
func (*RateLimiter) RecordFailedAuth ¶
func (rl *RateLimiter) RecordFailedAuth(addr net.Addr) bool
RecordFailedAuth records a failed authentication attempt Returns true if the IP is now banned
func (*RateLimiter) RecordSuccessfulAuth ¶
func (rl *RateLimiter) RecordSuccessfulAuth(addr net.Addr)
RecordSuccessfulAuth clears failed attempts for an IP after successful auth
func (*RateLimiter) RegisterConnection ¶
func (rl *RateLimiter) RegisterConnection(addr net.Addr) bool
RegisterConnection records a new connection from the given address Returns true if the connection is allowed, false otherwise
func (*RateLimiter) UnregisterConnection ¶
func (rl *RateLimiter) UnregisterConnection(addr net.Addr)
UnregisterConnection decrements the active connection count for an IP
type RawConn ¶
RawConn provides access to the underlying driver connection. *sql.Conn satisfies this interface.
type RowSet ¶
type RowSet interface {
Columns() ([]string, error)
ColumnTypes() ([]ColumnTyper, error)
Next() bool
Scan(dest ...any) error
Close() error
Err() error
}
RowSet represents a set of rows from a query result.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
func (*Server) ActiveConnections ¶
ActiveConnections returns the number of active connections
func (*Server) CancelQuery ¶
func (s *Server) CancelQuery(key BackendKey) bool
CancelQuery cancels a running query by its backend key. Returns true if a query was found and cancelled, false otherwise.
func (*Server) CancelQueryBySignal ¶
func (s *Server) CancelQueryBySignal(key BackendKey) bool
CancelQueryBySignal sends SIGUSR1 to a child process to cancel its current query. Returns true if the signal was sent successfully.
func (*Server) ListenAndServe ¶
func (*Server) RegisterQuery ¶
func (s *Server) RegisterQuery(key BackendKey, cancel context.CancelFunc)
RegisterQuery registers a cancel function for a backend key. This allows the query to be cancelled via a cancel request from another connection.
func (*Server) UnregisterQuery ¶
func (s *Server) UnregisterQuery(key BackendKey)
UnregisterQuery removes the cancel function for a backend key. This should be called when a query completes (successfully or with error).