Documentation
¶
Overview ¶
CLAUDE:SUMMARY HTTP gateway exposing local connectivity handlers — mount on any router to serve cross-process calls. CLAUDE:DEPENDS net/http, io, encoding/json CLAUDE:EXPORTS Gateway
Package connectivity provides a smart service router that dispatches calls either locally (in-memory function call, ~0.01ms) or remotely (QUIC/HTTP, ~50ms) based on a SQLite routes table reloaded at runtime.
This implements the "Job as Library" pattern: you code as a monolith, deploy as microservices, and switch between the two by updating one SQL row.
router := connectivity.New()
router.RegisterTransport("quic", myQuicFactory)
router.RegisterLocal("billing", billingService.Process)
go router.Watch(ctx, db, 200*time.Millisecond)
// Caller doesn't know or care whether this is local or remote:
resp, err := router.Call(ctx, "billing", payload)
The routes table in SQLite decides the strategy. Change it at runtime and the next Call picks up the new route — zero downtime, zero restart.
Example ¶
package main
import (
"context"
"database/sql"
"fmt"
"log/slog"
"os"
"github.com/hazyhaar/pkg/connectivity"
_ "modernc.org/sqlite"
)
func main() {
// 1. Open an in-memory SQLite database for the routes table.
db, err := sql.Open("sqlite", ":memory:")
if err != nil {
slog.Error("open db", "error", err)
os.Exit(1)
}
defer db.Close()
if err := connectivity.Init(db); err != nil {
slog.Error("init connectivity", "error", err)
os.Exit(1)
}
// 2. Create router and register a local handler.
router := connectivity.New()
defer router.Close()
router.RegisterLocal("billing", func(ctx context.Context, payload []byte) ([]byte, error) {
return []byte("billed:" + string(payload)), nil
})
// 3. Register the HTTP transport factory for remote calls.
router.RegisterTransport("http", connectivity.HTTPFactory())
// 4. Configure routes in SQLite: billing runs locally.
db.Exec(`INSERT INTO routes (service_name, strategy) VALUES ('billing', 'local')`)
// 5. Load routes.
if err := router.Reload(context.Background(), db); err != nil {
slog.Error("reload routes", "error", err)
os.Exit(1)
}
// 6. Call the service — routed locally.
resp, err := router.Call(context.Background(), "billing", []byte("$100"))
if err != nil {
slog.Error("call billing", "error", err)
os.Exit(1)
}
fmt.Println(string(resp))
// 7. Switch to noop — disable the service with zero downtime.
db.Exec(`UPDATE routes SET strategy='noop' WHERE service_name='billing'`)
router.Reload(context.Background(), db)
resp, err = router.Call(context.Background(), "billing", []byte("$200"))
if err != nil {
slog.Error("call billing noop", "error", err)
os.Exit(1)
}
fmt.Println(resp == nil)
}
Output: billed:$100 true
Example (CircuitBreaker) ¶
package main
import (
"context"
"fmt"
"time"
"github.com/hazyhaar/pkg/connectivity"
_ "modernc.org/sqlite"
)
func main() {
cb := connectivity.NewCircuitBreaker(
connectivity.WithBreakerThreshold(2),
connectivity.WithBreakerResetTimeout(100*time.Millisecond),
)
failingHandler := func(ctx context.Context, payload []byte) ([]byte, error) {
return nil, fmt.Errorf("service down")
}
wrapped := connectivity.WithCircuitBreaker(cb, "payments")(failingHandler)
// First two calls fail and trip the breaker.
wrapped(context.Background(), nil)
wrapped(context.Background(), nil)
// Third call is rejected by the circuit breaker.
_, err := wrapped(context.Background(), nil)
fmt.Println(err)
}
Output: connectivity: circuit open: payments
Example (HttpFactory) ¶
package main
import (
"encoding/json"
"fmt"
"log/slog"
"os"
"github.com/hazyhaar/pkg/connectivity"
_ "modernc.org/sqlite"
)
func main() {
f := connectivity.HTTPFactory()
cfg := json.RawMessage(`{"timeout_ms": 5000, "content_type": "application/json"}`)
handler, closeFn, err := f("https://api.example.com/v1", cfg)
if err != nil {
slog.Error("create http factory", "error", err)
os.Exit(1)
}
defer closeFn()
_ = handler // Use handler via router.Call or directly.
fmt.Println("HTTP factory created successfully")
}
Output: HTTP factory created successfully
Example (Middleware) ¶
package main
import (
"context"
"fmt"
"log/slog"
"os"
"time"
"github.com/hazyhaar/pkg/connectivity"
_ "modernc.org/sqlite"
)
func main() {
router := connectivity.New()
defer router.Close()
// A handler that echoes the payload.
echo := func(ctx context.Context, payload []byte) ([]byte, error) {
return payload, nil
}
// Wrap with middleware chain: recovery → timeout → logging.
wrapped := connectivity.Chain(
connectivity.Recovery(nil),
connectivity.Timeout(5*time.Second),
)(echo)
resp, err := wrapped(context.Background(), []byte("hello"))
if err != nil {
slog.Error("call wrapped handler", "error", err)
os.Exit(1)
}
fmt.Println(string(resp))
}
Output: hello
Index ¶
- Constants
- func Init(db *sql.DB) error
- func OpenDB(path string) (*sql.DB, error)
- type Admin
- func (a *Admin) DeleteRoute(ctx context.Context, serviceName string) error
- func (a *Admin) GetRoute(ctx context.Context, serviceName string) (*RouteRow, error)
- func (a *Admin) ListRoutes(ctx context.Context) ([]RouteRow, error)
- func (a *Admin) SetStrategy(ctx context.Context, serviceName, strategy string) error
- func (a *Admin) UpsertRoute(ctx context.Context, serviceName, strategy, endpoint string, ...) error
- type BreakerOption
- type BreakerState
- type CircuitBreaker
- type ErrCallTimeout
- type ErrCircuitOpen
- type ErrFactoryFailed
- type ErrNoFactory
- type ErrPanic
- type ErrServiceNotFound
- type HTTPOpt
- type Handler
- type HandlerMiddleware
- func Chain(mws ...HandlerMiddleware) HandlerMiddleware
- func Logging(logger *slog.Logger) HandlerMiddleware
- func Recovery(logger *slog.Logger) HandlerMiddleware
- func Timeout(d time.Duration) HandlerMiddleware
- func WithCallLogging(logger *slog.Logger, service string) HandlerMiddleware
- func WithCircuitBreaker(cb *CircuitBreaker, service string) HandlerMiddleware
- func WithFallback(local Handler, service string, logger *slog.Logger) HandlerMiddleware
- func WithObservability(mm *observability.MetricsManager, service, strategy string) HandlerMiddleware
- func WithRetry(maxRetries int, baseBackoff time.Duration, logger *slog.Logger) HandlerMiddleware
- func WithTimeout(defaultTimeout time.Duration) HandlerMiddleware
- type Option
- type RouteRow
- type Router
- func (r *Router) Call(ctx context.Context, service string, payload []byte) ([]byte, error)
- func (r *Router) Close() error
- func (r *Router) Gateway() http.Handler
- func (r *Router) Inspect(service string) (info ServiceInfo, ok bool)
- func (r *Router) ListServices() iter.Seq[ServiceInfo]
- func (r *Router) RegisterLocal(service string, h Handler)
- func (r *Router) RegisterTransport(protocol string, f TransportFactory)
- func (r *Router) Reload(ctx context.Context, db *sql.DB) error
- func (r *Router) Watch(ctx context.Context, db *sql.DB, interval time.Duration)
- type ServiceInfo
- type TransportFactory
Examples ¶
Constants ¶
const Schema = `` /* 575-byte string literal not displayed */
Schema defines the routes table that drives the smart router. Each row maps a service name to a dispatch strategy.
Strategies:
- "local": dispatch to an in-memory Handler registered via RegisterLocal.
- "quic": dispatch via the QUIC transport factory.
- "http": dispatch via the HTTP transport factory.
- "mcp": dispatch via the MCP transport factory.
- "dbsync": dispatch via the dbsync transport factory.
- "embed": dispatch via the embed transport factory (horosembed.EmbedFactory).
- "noop": silently succeed without doing anything (feature flag / disable).
The config column holds per-route JSON (timeouts, retry policy, etc.). Any UPDATE to this table automatically increments PRAGMA data_version, which the Watch loop detects to trigger a hot-reload.
Variables ¶
This section is empty.
Functions ¶
func OpenDB ¶
OpenDB opens a SQLite database at path with production-safe pragmas:
- journal_mode=WAL: concurrent reads during writes
- busy_timeout=5000: wait up to 5s for locks instead of immediate SQLITE_BUSY
- foreign_keys=ON: enforce FK constraints
The caller must blank-import the SQLite driver:
import _ "modernc.org/sqlite"
Use this instead of sql.Open for any database that will be shared between Admin writes, Router.Reload reads, and Watch polling.
Types ¶
type Admin ¶
type Admin struct {
// contains filtered or unexported fields
}
Admin provides CRUD operations on the routes table, suitable for exposure as MCP tools so an LLM can administer routes at runtime.
All mutations go through SQLite, so the Watch loop automatically picks up changes — no need to call Reload manually.
func NewAdmin ¶
NewAdmin creates an Admin backed by the given routes database. The database must have the routes schema applied (via Init).
func (*Admin) DeleteRoute ¶
DeleteRoute removes a route from the routes table. The watcher will detect the change and close any associated handler.
func (*Admin) ListRoutes ¶
ListRoutes returns all routes from the SQLite table.
func (*Admin) SetStrategy ¶
SetStrategy changes only the strategy of an existing route. Useful for quick enable/disable: set to "noop" to disable, "local" to re-enable with zero downtime.
func (*Admin) UpsertRoute ¶
func (a *Admin) UpsertRoute(ctx context.Context, serviceName, strategy, endpoint string, config json.RawMessage) error
UpsertRoute inserts or updates a route in the routes table. On conflict (same service_name), strategy, endpoint, and config are updated; updated_at is refreshed by the trigger. The watcher will detect the change and trigger a Reload automatically.
type BreakerOption ¶
type BreakerOption func(*CircuitBreaker)
BreakerOption configures a CircuitBreaker.
func WithBreakerClock ¶
func WithBreakerClock(fn func() time.Time) BreakerOption
WithBreakerClock sets a custom clock function (for testing).
func WithBreakerHalfOpenMax ¶
func WithBreakerHalfOpenMax(n int) BreakerOption
WithBreakerHalfOpenMax sets how many consecutive successes in half-open are needed to close the breaker.
func WithBreakerResetTimeout ¶
func WithBreakerResetTimeout(d time.Duration) BreakerOption
WithBreakerResetTimeout sets how long the breaker stays open before transitioning to half-open.
func WithBreakerThreshold ¶
func WithBreakerThreshold(n int) BreakerOption
WithBreakerThreshold sets the failure count that trips the breaker open.
type BreakerState ¶
type BreakerState int
BreakerState represents the circuit breaker state.
const ( BreakerClosed BreakerState = iota // Normal operation, calls pass through. BreakerOpen // Calls rejected immediately. BreakerHalfOpen // One probe call allowed to test recovery. )
type CircuitBreaker ¶
type CircuitBreaker struct {
// contains filtered or unexported fields
}
CircuitBreaker implements the circuit breaker pattern per service. Thread-safe: all state transitions use a mutex.
func NewCircuitBreaker ¶
func NewCircuitBreaker(opts ...BreakerOption) *CircuitBreaker
NewCircuitBreaker creates a breaker with sensible defaults: 5 failures to open, 30s reset timeout, 2 successes to close from half-open.
func (*CircuitBreaker) Allow ¶
func (cb *CircuitBreaker) Allow() bool
Allow checks whether a call is allowed. Returns false if the breaker is open and the reset timeout has not elapsed.
func (*CircuitBreaker) RecordFailure ¶
func (cb *CircuitBreaker) RecordFailure()
RecordFailure records a failed call.
func (*CircuitBreaker) RecordSuccess ¶
func (cb *CircuitBreaker) RecordSuccess()
RecordSuccess records a successful call.
func (*CircuitBreaker) Reset ¶
func (cb *CircuitBreaker) Reset()
Reset forces the breaker back to closed state.
func (*CircuitBreaker) State ¶
func (cb *CircuitBreaker) State() BreakerState
State returns the current breaker state.
type ErrCallTimeout ¶
type ErrCallTimeout struct {
Service string
}
ErrCallTimeout is returned when a remote call exceeds its configured timeout_ms from the route's config JSON.
func (*ErrCallTimeout) Error ¶
func (e *ErrCallTimeout) Error() string
type ErrCircuitOpen ¶
type ErrCircuitOpen struct {
Service string
}
ErrCircuitOpen is returned when the circuit breaker for a service is open, rejecting the call without attempting the remote handler.
func (*ErrCircuitOpen) Error ¶
func (e *ErrCircuitOpen) Error() string
type ErrFactoryFailed ¶
ErrFactoryFailed is returned when a TransportFactory returns an error while building a handler for a route.
func (*ErrFactoryFailed) Error ¶
func (e *ErrFactoryFailed) Error() string
func (*ErrFactoryFailed) Unwrap ¶
func (e *ErrFactoryFailed) Unwrap() error
type ErrNoFactory ¶
ErrNoFactory is returned during Reload when a route's strategy has no registered TransportFactory.
func (*ErrNoFactory) Error ¶
func (e *ErrNoFactory) Error() string
type ErrPanic ¶
type ErrPanic struct {
Value any
}
ErrPanic wraps a recovered panic value as an error.
type ErrServiceNotFound ¶
type ErrServiceNotFound struct {
Service string
}
ErrServiceNotFound is returned when Call targets a service with no route and no local handler.
func (*ErrServiceNotFound) Error ¶
func (e *ErrServiceNotFound) Error() string
type HTTPOpt ¶
type HTTPOpt func(*httpFactoryOpts)
HTTPOpt configures HTTPFactory behavior.
func AllowInternal ¶
func AllowInternal() HTTPOpt
AllowInternal disables the SSRF guard, permitting localhost and private IPs as endpoints. Use this for trusted inter-service calls on the same machine (e.g. siftrag → veille via Gateway on 127.0.0.1).
type Handler ¶
Handler is a transport-agnostic service function: bytes in, bytes out. Both local Go functions and remote RPC clients implement this signature.
type HandlerMiddleware ¶
HandlerMiddleware wraps a Handler, adding cross-cutting behaviour (logging, timeout, recovery, metrics) without changing the signature.
func Chain ¶
func Chain(mws ...HandlerMiddleware) HandlerMiddleware
Chain composes middlewares left-to-right: the first middleware in the slice is the outermost wrapper (executed first on the request path).
chain := Chain(logging, timeout, recovery) wrapped := chain(baseHandler)
func Logging ¶
func Logging(logger *slog.Logger) HandlerMiddleware
Logging returns a middleware that logs every call with its duration.
func Recovery ¶
func Recovery(logger *slog.Logger) HandlerMiddleware
Recovery returns a middleware that catches panics in downstream handlers and converts them into errors instead of crashing the process.
func Timeout ¶
func Timeout(d time.Duration) HandlerMiddleware
Timeout returns a middleware that enforces a maximum call duration. If the context deadline is exceeded, the handler's goroutine keeps running (Go has no goroutine cancellation), but the caller gets an immediate context.DeadlineExceeded error.
func WithCallLogging ¶
func WithCallLogging(logger *slog.Logger, service string) HandlerMiddleware
WithCallLogging returns a HandlerMiddleware that uses slog for structured call logging with duration, payload size and error details.
func WithCircuitBreaker ¶
func WithCircuitBreaker(cb *CircuitBreaker, service string) HandlerMiddleware
WithCircuitBreaker returns a HandlerMiddleware that wraps calls with a circuit breaker. When the breaker is open, calls are rejected immediately with ErrCircuitOpen.
func WithFallback ¶
func WithFallback(local Handler, service string, logger *slog.Logger) HandlerMiddleware
WithFallback returns a HandlerMiddleware that falls back to a local handler when the primary (remote) handler fails. This enables graceful degradation: if the remote service is down, the call is retried locally.
The fallback is only attempted if the local handler is non-nil. Context cancellation errors are NOT retried locally — they indicate the caller gave up, not that the remote failed.
func WithObservability ¶
func WithObservability(mm *observability.MetricsManager, service, strategy string) HandlerMiddleware
WithObservability returns a HandlerMiddleware that records call duration as a metric and logs errors via the observability package.
It emits a "connectivity.call.duration_ms" metric for every call and a "connectivity.call.error" metric on failures. Labels include the service name and strategy.
func WithRetry ¶
WithRetry returns a HandlerMiddleware that retries failed calls with exponential backoff. It respects context cancellation between retries.
Parameters:
- maxRetries: maximum number of retry attempts (0 = no retry)
- baseBackoff: initial wait between retries, doubled each attempt
- logger: used to log retry attempts (may be nil for silent retries)
func WithTimeout ¶
func WithTimeout(defaultTimeout time.Duration) HandlerMiddleware
WithTimeout returns a HandlerMiddleware that applies a per-call timeout derived from the route config's timeout_ms field. If timeout_ms is zero or absent, the provided default is used. A zero default disables the timeout entirely.
type Option ¶
type Option func(*Router)
Option configures a Router.
func WithLogger ¶
WithLogger sets a custom logger for the router.
type RouteRow ¶
type RouteRow struct {
ServiceName string `json:"service_name"`
Strategy string `json:"strategy"`
Endpoint string `json:"endpoint,omitempty"`
Config json.RawMessage `json:"config,omitempty"`
UpdatedAt int64 `json:"updated_at"`
}
RouteRow represents a single row from the routes table.
type Router ¶
type Router struct {
// contains filtered or unexported fields
}
Router dispatches service calls based on SQLite configuration. Thread-safe: reads use RLock, reloads use full Lock.
func New ¶
New creates a Router with no routes. Register transports and local handlers, then call Watch to start hot-reloading from SQLite.
func (*Router) Call ¶
Call dispatches a service call. The resolution order is:
- Noop route — silently succeeds (feature flag / service disabled).
- Explicit remote route (from SQLite) — if strategy is "quic", "http", etc.
- Local handler — if strategy is "local" or no remote route exists.
- Error — service not routable.
Callers never need to know whether the call is local or remote.
func (*Router) Gateway ¶
Gateway returns an http.Handler that exposes local handlers over HTTP. Incoming POST requests are dispatched to the matching local handler:
POST /{service_name} → router.Call(ctx, service_name, body)
Mount it on a chi router or http.ServeMux:
r.Mount("/connectivity", router.Gateway())
This is the server-side counterpart of HTTPFactory: one service mounts the Gateway, another service calls it via HTTPFactory routes.
func (*Router) Inspect ¶
func (r *Router) Inspect(service string) (info ServiceInfo, ok bool)
Inspect returns detailed information about a single service. Returns ok=false if the service is not registered in any form.
func (*Router) ListServices ¶
func (r *Router) ListServices() iter.Seq[ServiceInfo]
ListServices returns an iterator over all services known to the router. This includes services with remote routes (from SQLite) and services with local-only handlers (registered via RegisterLocal).
func (*Router) RegisterLocal ¶
RegisterLocal registers an in-memory handler for a service. This is the "Job as Library" side: the function lives in the same binary. If the routes table says strategy="local" for this service, Call dispatches here with zero network overhead.
func (*Router) RegisterTransport ¶
func (r *Router) RegisterTransport(protocol string, f TransportFactory)
RegisterTransport registers a factory for a transport protocol. Example protocols: "quic", "http", "grpc", "mcp". The factory is called during Reload when a route uses this protocol.
func (*Router) Reload ¶
Reload reads the routes table and rebuilds the remote handler map. Routes with strategy "local" or "noop" do not create remote handlers. Only routes whose (strategy, endpoint, config) changed are rebuilt, preserving existing connections for unchanged routes.
func (*Router) Watch ¶
Watch polls PRAGMA data_version on the database at the given interval. When the version changes (meaning any write occurred), it triggers a Reload.
data_version is auto-incremented by SQLite on any write — no triggers needed. This is the same proven pattern used by the mcprt tool registry.
Watch blocks until ctx is cancelled. Run it in a goroutine:
go router.Watch(ctx, db, 200*time.Millisecond)
type ServiceInfo ¶
type ServiceInfo struct {
Name string `json:"name"`
Strategy string `json:"strategy"`
Endpoint string `json:"endpoint"`
HasLocal bool `json:"has_local"`
}
ServiceInfo describes a routed service as seen by the router at a point in time. The struct is a snapshot; the router may have reloaded since this was created.
type TransportFactory ¶
type TransportFactory func(endpoint string, config json.RawMessage) (handler Handler, close func(), err error)
TransportFactory creates a Handler for a given remote endpoint. It receives the endpoint URL (e.g. "quic://10.0.0.5:443") and any per-route config JSON. The returned close function is called when the route is removed or replaced during hot-reload; it may be nil if no cleanup is needed.
func HTTPFactory ¶
func HTTPFactory(opts ...HTTPOpt) TransportFactory
HTTPFactory creates Handlers that POST the payload to a remote HTTP endpoint. It supports per-route timeout and content-type from the config JSON column.
SSRF prevention: the endpoint URL is validated against private/loopback addresses at factory creation time, unless AllowInternal() is passed.
Register it with:
router.RegisterTransport("http", connectivity.HTTPFactory())
router.RegisterTransport("http", connectivity.HTTPFactory(connectivity.AllowInternal()))
func MCPFactory ¶
func MCPFactory() TransportFactory
MCPFactory creates Handlers that dispatch calls as MCP tool invocations over QUIC. The payload is unmarshalled as a JSON map of tool arguments. The endpoint is a QUIC address (e.g. "10.0.0.5:4433").
The route config JSON must include "tool_name" to specify which MCP tool to call. Example config:
{"tool_name": "billing_process", "insecure_tls": true}
Register it with:
router.RegisterTransport("mcp", connectivity.MCPFactory())