connectivity

package
v0.0.0-...-37fe60a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2026 License: MIT Imports: 18 Imported by: 0

README

connectivity — smart service routing with hot-reload

connectivity dispatches service calls to local (in-memory) or remote (HTTP, MCP-over-QUIC) handlers based on a SQLite routes table. Changing a row switches a service from monolith to microservice with zero downtime.

router.Call(ctx, "billing", payload)
        │
        ▼
  ┌─────────────┐    routes table
  │   Router     │◄── watch (PRAGMA data_version)
  └──────┬──────┘
         │ strategy?
    ┌────┼────┬────────┐
    ▼    ▼    ▼        ▼
  local  http  mcp    noop
  (mem) (POST) (QUIC)  (∅)

Quick start

router := connectivity.New(
    connectivity.WithLogger(logger),
)
router.RegisterLocal("billing", billingHandler)
router.RegisterTransport("http", connectivity.HTTPFactory())
router.RegisterTransport("mcp", connectivity.MCPFactory())

connectivity.Init(db)
go connectivity.Watch(ctx, db, 2*time.Second)

result, err := router.Call(ctx, "billing", payload)

Routes table

CREATE TABLE routes (
    service_name TEXT PRIMARY KEY,
    strategy     TEXT NOT NULL,  -- local, http, mcp, quic, dbsync, noop
    endpoint     TEXT,
    config       TEXT,           -- JSON, transport-specific
    updated_at   INTEGER
);
Strategy Dispatch
local In-memory function call
http HTTP POST to endpoint (SSRF-validated)
mcp MCP tool call over QUIC
noop Silently returns nil

Middleware

Handler middlewares compose via Chain:

handler = connectivity.Chain(
    connectivity.Logging(logger),
    connectivity.Timeout(5*time.Second),
    connectivity.WithRetry(3, 500*time.Millisecond, logger),
    connectivity.WithCircuitBreaker(cb, "billing"),
    connectivity.WithFallback(localHandler, "billing", logger),
)(handler)

Circuit breaker

cb := connectivity.NewCircuitBreaker(
    connectivity.WithBreakerThreshold(5),      // 5 failures → open
    connectivity.WithBreakerResetTimeout(30*time.Second),
    connectivity.WithBreakerHalfOpenMax(2),     // 2 successes → closed
)

States: Closed → Open (on threshold) → HalfOpen (after timeout) → Closed (on success).

Exported API

Symbol Description
Router Service dispatcher with local/remote routing
Handler func(ctx, payload []byte) ([]byte, error)
TransportFactory Creates handlers from endpoint + config
HTTPFactory() HTTP POST transport with SSRF guard
MCPFactory() MCP-over-QUIC transport
CircuitBreaker Three-state circuit breaker
Chain(mws...) Compose handler middlewares
WithRetry, WithFallback, WithCircuitBreaker Resilience middlewares

Documentation

Overview

CLAUDE:SUMMARY HTTP gateway exposing local connectivity handlers — mount on any router to serve cross-process calls. CLAUDE:DEPENDS net/http, io, encoding/json CLAUDE:EXPORTS Gateway

Package connectivity provides a smart service router that dispatches calls either locally (in-memory function call, ~0.01ms) or remotely (QUIC/HTTP, ~50ms) based on a SQLite routes table reloaded at runtime.

This implements the "Job as Library" pattern: you code as a monolith, deploy as microservices, and switch between the two by updating one SQL row.

router := connectivity.New()
router.RegisterTransport("quic", myQuicFactory)
router.RegisterLocal("billing", billingService.Process)
go router.Watch(ctx, db, 200*time.Millisecond)

// Caller doesn't know or care whether this is local or remote:
resp, err := router.Call(ctx, "billing", payload)

The routes table in SQLite decides the strategy. Change it at runtime and the next Call picks up the new route — zero downtime, zero restart.

Example
package main

import (
	"context"
	"database/sql"
	"fmt"
	"log/slog"
	"os"

	"github.com/hazyhaar/pkg/connectivity"
	_ "modernc.org/sqlite"
)

func main() {
	// 1. Open an in-memory SQLite database for the routes table.
	db, err := sql.Open("sqlite", ":memory:")
	if err != nil {
		slog.Error("open db", "error", err)
		os.Exit(1)
	}
	defer db.Close()

	if err := connectivity.Init(db); err != nil {
		slog.Error("init connectivity", "error", err)
		os.Exit(1)
	}

	// 2. Create router and register a local handler.
	router := connectivity.New()
	defer router.Close()

	router.RegisterLocal("billing", func(ctx context.Context, payload []byte) ([]byte, error) {
		return []byte("billed:" + string(payload)), nil
	})

	// 3. Register the HTTP transport factory for remote calls.
	router.RegisterTransport("http", connectivity.HTTPFactory())

	// 4. Configure routes in SQLite: billing runs locally.
	db.Exec(`INSERT INTO routes (service_name, strategy) VALUES ('billing', 'local')`)

	// 5. Load routes.
	if err := router.Reload(context.Background(), db); err != nil {
		slog.Error("reload routes", "error", err)
		os.Exit(1)
	}

	// 6. Call the service — routed locally.
	resp, err := router.Call(context.Background(), "billing", []byte("$100"))
	if err != nil {
		slog.Error("call billing", "error", err)
		os.Exit(1)
	}
	fmt.Println(string(resp))

	// 7. Switch to noop — disable the service with zero downtime.
	db.Exec(`UPDATE routes SET strategy='noop' WHERE service_name='billing'`)
	router.Reload(context.Background(), db)

	resp, err = router.Call(context.Background(), "billing", []byte("$200"))
	if err != nil {
		slog.Error("call billing noop", "error", err)
		os.Exit(1)
	}
	fmt.Println(resp == nil)

}
Output:

billed:$100
true
Example (CircuitBreaker)
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/hazyhaar/pkg/connectivity"
	_ "modernc.org/sqlite"
)

func main() {
	cb := connectivity.NewCircuitBreaker(
		connectivity.WithBreakerThreshold(2),
		connectivity.WithBreakerResetTimeout(100*time.Millisecond),
	)

	failingHandler := func(ctx context.Context, payload []byte) ([]byte, error) {
		return nil, fmt.Errorf("service down")
	}

	wrapped := connectivity.WithCircuitBreaker(cb, "payments")(failingHandler)

	// First two calls fail and trip the breaker.
	wrapped(context.Background(), nil)
	wrapped(context.Background(), nil)

	// Third call is rejected by the circuit breaker.
	_, err := wrapped(context.Background(), nil)
	fmt.Println(err)
}
Output:

connectivity: circuit open: payments
Example (HttpFactory)
package main

import (
	"encoding/json"
	"fmt"
	"log/slog"
	"os"

	"github.com/hazyhaar/pkg/connectivity"
	_ "modernc.org/sqlite"
)

func main() {
	f := connectivity.HTTPFactory()
	cfg := json.RawMessage(`{"timeout_ms": 5000, "content_type": "application/json"}`)

	handler, closeFn, err := f("https://api.example.com/v1", cfg)
	if err != nil {
		slog.Error("create http factory", "error", err)
		os.Exit(1)
	}
	defer closeFn()

	_ = handler // Use handler via router.Call or directly.
	fmt.Println("HTTP factory created successfully")
}
Output:

HTTP factory created successfully
Example (Middleware)
package main

import (
	"context"
	"fmt"
	"log/slog"
	"os"
	"time"

	"github.com/hazyhaar/pkg/connectivity"
	_ "modernc.org/sqlite"
)

func main() {
	router := connectivity.New()
	defer router.Close()

	// A handler that echoes the payload.
	echo := func(ctx context.Context, payload []byte) ([]byte, error) {
		return payload, nil
	}

	// Wrap with middleware chain: recovery → timeout → logging.
	wrapped := connectivity.Chain(
		connectivity.Recovery(nil),
		connectivity.Timeout(5*time.Second),
	)(echo)

	resp, err := wrapped(context.Background(), []byte("hello"))
	if err != nil {
		slog.Error("call wrapped handler", "error", err)
		os.Exit(1)
	}
	fmt.Println(string(resp))
}
Output:

hello

Index

Examples

Constants

View Source
const Schema = `` /* 575-byte string literal not displayed */

Schema defines the routes table that drives the smart router. Each row maps a service name to a dispatch strategy.

Strategies:

  • "local": dispatch to an in-memory Handler registered via RegisterLocal.
  • "quic": dispatch via the QUIC transport factory.
  • "http": dispatch via the HTTP transport factory.
  • "mcp": dispatch via the MCP transport factory.
  • "dbsync": dispatch via the dbsync transport factory.
  • "embed": dispatch via the embed transport factory (horosembed.EmbedFactory).
  • "noop": silently succeed without doing anything (feature flag / disable).

The config column holds per-route JSON (timeouts, retry policy, etc.). Any UPDATE to this table automatically increments PRAGMA data_version, which the Watch loop detects to trigger a hot-reload.

Variables

This section is empty.

Functions

func Init

func Init(db *sql.DB) error

Init creates the routes table if it doesn't exist.

func OpenDB

func OpenDB(path string) (*sql.DB, error)

OpenDB opens a SQLite database at path with production-safe pragmas:

  • journal_mode=WAL: concurrent reads during writes
  • busy_timeout=5000: wait up to 5s for locks instead of immediate SQLITE_BUSY
  • foreign_keys=ON: enforce FK constraints

The caller must blank-import the SQLite driver:

import _ "modernc.org/sqlite"

Use this instead of sql.Open for any database that will be shared between Admin writes, Router.Reload reads, and Watch polling.

Types

type Admin

type Admin struct {
	// contains filtered or unexported fields
}

Admin provides CRUD operations on the routes table, suitable for exposure as MCP tools so an LLM can administer routes at runtime.

All mutations go through SQLite, so the Watch loop automatically picks up changes — no need to call Reload manually.

func NewAdmin

func NewAdmin(db *sql.DB) *Admin

NewAdmin creates an Admin backed by the given routes database. The database must have the routes schema applied (via Init).

func (*Admin) DeleteRoute

func (a *Admin) DeleteRoute(ctx context.Context, serviceName string) error

DeleteRoute removes a route from the routes table. The watcher will detect the change and close any associated handler.

func (*Admin) GetRoute

func (a *Admin) GetRoute(ctx context.Context, serviceName string) (*RouteRow, error)

GetRoute returns a single route by service name.

func (*Admin) ListRoutes

func (a *Admin) ListRoutes(ctx context.Context) ([]RouteRow, error)

ListRoutes returns all routes from the SQLite table.

func (*Admin) SetStrategy

func (a *Admin) SetStrategy(ctx context.Context, serviceName, strategy string) error

SetStrategy changes only the strategy of an existing route. Useful for quick enable/disable: set to "noop" to disable, "local" to re-enable with zero downtime.

func (*Admin) UpsertRoute

func (a *Admin) UpsertRoute(ctx context.Context, serviceName, strategy, endpoint string, config json.RawMessage) error

UpsertRoute inserts or updates a route in the routes table. On conflict (same service_name), strategy, endpoint, and config are updated; updated_at is refreshed by the trigger. The watcher will detect the change and trigger a Reload automatically.

type BreakerOption

type BreakerOption func(*CircuitBreaker)

BreakerOption configures a CircuitBreaker.

func WithBreakerClock

func WithBreakerClock(fn func() time.Time) BreakerOption

WithBreakerClock sets a custom clock function (for testing).

func WithBreakerHalfOpenMax

func WithBreakerHalfOpenMax(n int) BreakerOption

WithBreakerHalfOpenMax sets how many consecutive successes in half-open are needed to close the breaker.

func WithBreakerResetTimeout

func WithBreakerResetTimeout(d time.Duration) BreakerOption

WithBreakerResetTimeout sets how long the breaker stays open before transitioning to half-open.

func WithBreakerThreshold

func WithBreakerThreshold(n int) BreakerOption

WithBreakerThreshold sets the failure count that trips the breaker open.

type BreakerState

type BreakerState int

BreakerState represents the circuit breaker state.

const (
	BreakerClosed   BreakerState = iota // Normal operation, calls pass through.
	BreakerOpen                         // Calls rejected immediately.
	BreakerHalfOpen                     // One probe call allowed to test recovery.
)

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker implements the circuit breaker pattern per service. Thread-safe: all state transitions use a mutex.

func NewCircuitBreaker

func NewCircuitBreaker(opts ...BreakerOption) *CircuitBreaker

NewCircuitBreaker creates a breaker with sensible defaults: 5 failures to open, 30s reset timeout, 2 successes to close from half-open.

func (*CircuitBreaker) Allow

func (cb *CircuitBreaker) Allow() bool

Allow checks whether a call is allowed. Returns false if the breaker is open and the reset timeout has not elapsed.

func (*CircuitBreaker) RecordFailure

func (cb *CircuitBreaker) RecordFailure()

RecordFailure records a failed call.

func (*CircuitBreaker) RecordSuccess

func (cb *CircuitBreaker) RecordSuccess()

RecordSuccess records a successful call.

func (*CircuitBreaker) Reset

func (cb *CircuitBreaker) Reset()

Reset forces the breaker back to closed state.

func (*CircuitBreaker) State

func (cb *CircuitBreaker) State() BreakerState

State returns the current breaker state.

type ErrCallTimeout

type ErrCallTimeout struct {
	Service string
}

ErrCallTimeout is returned when a remote call exceeds its configured timeout_ms from the route's config JSON.

func (*ErrCallTimeout) Error

func (e *ErrCallTimeout) Error() string

type ErrCircuitOpen

type ErrCircuitOpen struct {
	Service string
}

ErrCircuitOpen is returned when the circuit breaker for a service is open, rejecting the call without attempting the remote handler.

func (*ErrCircuitOpen) Error

func (e *ErrCircuitOpen) Error() string

type ErrFactoryFailed

type ErrFactoryFailed struct {
	Service  string
	Strategy string
	Endpoint string
	Cause    error
}

ErrFactoryFailed is returned when a TransportFactory returns an error while building a handler for a route.

func (*ErrFactoryFailed) Error

func (e *ErrFactoryFailed) Error() string

func (*ErrFactoryFailed) Unwrap

func (e *ErrFactoryFailed) Unwrap() error

type ErrNoFactory

type ErrNoFactory struct {
	Service  string
	Strategy string
}

ErrNoFactory is returned during Reload when a route's strategy has no registered TransportFactory.

func (*ErrNoFactory) Error

func (e *ErrNoFactory) Error() string

type ErrPanic

type ErrPanic struct {
	Value any
}

ErrPanic wraps a recovered panic value as an error.

func (*ErrPanic) Error

func (e *ErrPanic) Error() string

type ErrServiceNotFound

type ErrServiceNotFound struct {
	Service string
}

ErrServiceNotFound is returned when Call targets a service with no route and no local handler.

func (*ErrServiceNotFound) Error

func (e *ErrServiceNotFound) Error() string

type HTTPOpt

type HTTPOpt func(*httpFactoryOpts)

HTTPOpt configures HTTPFactory behavior.

func AllowInternal

func AllowInternal() HTTPOpt

AllowInternal disables the SSRF guard, permitting localhost and private IPs as endpoints. Use this for trusted inter-service calls on the same machine (e.g. siftrag → veille via Gateway on 127.0.0.1).

type Handler

type Handler func(ctx context.Context, payload []byte) ([]byte, error)

Handler is a transport-agnostic service function: bytes in, bytes out. Both local Go functions and remote RPC clients implement this signature.

type HandlerMiddleware

type HandlerMiddleware func(next Handler) Handler

HandlerMiddleware wraps a Handler, adding cross-cutting behaviour (logging, timeout, recovery, metrics) without changing the signature.

func Chain

Chain composes middlewares left-to-right: the first middleware in the slice is the outermost wrapper (executed first on the request path).

chain := Chain(logging, timeout, recovery)
wrapped := chain(baseHandler)

func Logging

func Logging(logger *slog.Logger) HandlerMiddleware

Logging returns a middleware that logs every call with its duration.

func Recovery

func Recovery(logger *slog.Logger) HandlerMiddleware

Recovery returns a middleware that catches panics in downstream handlers and converts them into errors instead of crashing the process.

func Timeout

func Timeout(d time.Duration) HandlerMiddleware

Timeout returns a middleware that enforces a maximum call duration. If the context deadline is exceeded, the handler's goroutine keeps running (Go has no goroutine cancellation), but the caller gets an immediate context.DeadlineExceeded error.

func WithCallLogging

func WithCallLogging(logger *slog.Logger, service string) HandlerMiddleware

WithCallLogging returns a HandlerMiddleware that uses slog for structured call logging with duration, payload size and error details.

func WithCircuitBreaker

func WithCircuitBreaker(cb *CircuitBreaker, service string) HandlerMiddleware

WithCircuitBreaker returns a HandlerMiddleware that wraps calls with a circuit breaker. When the breaker is open, calls are rejected immediately with ErrCircuitOpen.

func WithFallback

func WithFallback(local Handler, service string, logger *slog.Logger) HandlerMiddleware

WithFallback returns a HandlerMiddleware that falls back to a local handler when the primary (remote) handler fails. This enables graceful degradation: if the remote service is down, the call is retried locally.

The fallback is only attempted if the local handler is non-nil. Context cancellation errors are NOT retried locally — they indicate the caller gave up, not that the remote failed.

func WithObservability

func WithObservability(mm *observability.MetricsManager, service, strategy string) HandlerMiddleware

WithObservability returns a HandlerMiddleware that records call duration as a metric and logs errors via the observability package.

It emits a "connectivity.call.duration_ms" metric for every call and a "connectivity.call.error" metric on failures. Labels include the service name and strategy.

func WithRetry

func WithRetry(maxRetries int, baseBackoff time.Duration, logger *slog.Logger) HandlerMiddleware

WithRetry returns a HandlerMiddleware that retries failed calls with exponential backoff. It respects context cancellation between retries.

Parameters:

  • maxRetries: maximum number of retry attempts (0 = no retry)
  • baseBackoff: initial wait between retries, doubled each attempt
  • logger: used to log retry attempts (may be nil for silent retries)

func WithTimeout

func WithTimeout(defaultTimeout time.Duration) HandlerMiddleware

WithTimeout returns a HandlerMiddleware that applies a per-call timeout derived from the route config's timeout_ms field. If timeout_ms is zero or absent, the provided default is used. A zero default disables the timeout entirely.

type Option

type Option func(*Router)

Option configures a Router.

func WithLogger

func WithLogger(l *slog.Logger) Option

WithLogger sets a custom logger for the router.

type RouteRow

type RouteRow struct {
	ServiceName string          `json:"service_name"`
	Strategy    string          `json:"strategy"`
	Endpoint    string          `json:"endpoint,omitempty"`
	Config      json.RawMessage `json:"config,omitempty"`
	UpdatedAt   int64           `json:"updated_at"`
}

RouteRow represents a single row from the routes table.

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router dispatches service calls based on SQLite configuration. Thread-safe: reads use RLock, reloads use full Lock.

func New

func New(opts ...Option) *Router

New creates a Router with no routes. Register transports and local handlers, then call Watch to start hot-reloading from SQLite.

func (*Router) Call

func (r *Router) Call(ctx context.Context, service string, payload []byte) ([]byte, error)

Call dispatches a service call. The resolution order is:

  1. Noop route — silently succeeds (feature flag / service disabled).
  2. Explicit remote route (from SQLite) — if strategy is "quic", "http", etc.
  3. Local handler — if strategy is "local" or no remote route exists.
  4. Error — service not routable.

Callers never need to know whether the call is local or remote.

func (*Router) Close

func (r *Router) Close() error

Close shuts down all remote handlers.

func (*Router) Gateway

func (r *Router) Gateway() http.Handler

Gateway returns an http.Handler that exposes local handlers over HTTP. Incoming POST requests are dispatched to the matching local handler:

POST /{service_name}  →  router.Call(ctx, service_name, body)

Mount it on a chi router or http.ServeMux:

r.Mount("/connectivity", router.Gateway())

This is the server-side counterpart of HTTPFactory: one service mounts the Gateway, another service calls it via HTTPFactory routes.

func (*Router) Inspect

func (r *Router) Inspect(service string) (info ServiceInfo, ok bool)

Inspect returns detailed information about a single service. Returns ok=false if the service is not registered in any form.

func (*Router) ListServices

func (r *Router) ListServices() iter.Seq[ServiceInfo]

ListServices returns an iterator over all services known to the router. This includes services with remote routes (from SQLite) and services with local-only handlers (registered via RegisterLocal).

func (*Router) RegisterLocal

func (r *Router) RegisterLocal(service string, h Handler)

RegisterLocal registers an in-memory handler for a service. This is the "Job as Library" side: the function lives in the same binary. If the routes table says strategy="local" for this service, Call dispatches here with zero network overhead.

func (*Router) RegisterTransport

func (r *Router) RegisterTransport(protocol string, f TransportFactory)

RegisterTransport registers a factory for a transport protocol. Example protocols: "quic", "http", "grpc", "mcp". The factory is called during Reload when a route uses this protocol.

func (*Router) Reload

func (r *Router) Reload(ctx context.Context, db *sql.DB) error

Reload reads the routes table and rebuilds the remote handler map. Routes with strategy "local" or "noop" do not create remote handlers. Only routes whose (strategy, endpoint, config) changed are rebuilt, preserving existing connections for unchanged routes.

func (*Router) Watch

func (r *Router) Watch(ctx context.Context, db *sql.DB, interval time.Duration)

Watch polls PRAGMA data_version on the database at the given interval. When the version changes (meaning any write occurred), it triggers a Reload.

data_version is auto-incremented by SQLite on any write — no triggers needed. This is the same proven pattern used by the mcprt tool registry.

Watch blocks until ctx is cancelled. Run it in a goroutine:

go router.Watch(ctx, db, 200*time.Millisecond)

type ServiceInfo

type ServiceInfo struct {
	Name     string `json:"name"`
	Strategy string `json:"strategy"`
	Endpoint string `json:"endpoint"`
	HasLocal bool   `json:"has_local"`
}

ServiceInfo describes a routed service as seen by the router at a point in time. The struct is a snapshot; the router may have reloaded since this was created.

type TransportFactory

type TransportFactory func(endpoint string, config json.RawMessage) (handler Handler, close func(), err error)

TransportFactory creates a Handler for a given remote endpoint. It receives the endpoint URL (e.g. "quic://10.0.0.5:443") and any per-route config JSON. The returned close function is called when the route is removed or replaced during hot-reload; it may be nil if no cleanup is needed.

func HTTPFactory

func HTTPFactory(opts ...HTTPOpt) TransportFactory

HTTPFactory creates Handlers that POST the payload to a remote HTTP endpoint. It supports per-route timeout and content-type from the config JSON column.

SSRF prevention: the endpoint URL is validated against private/loopback addresses at factory creation time, unless AllowInternal() is passed.

Register it with:

router.RegisterTransport("http", connectivity.HTTPFactory())
router.RegisterTransport("http", connectivity.HTTPFactory(connectivity.AllowInternal()))

func MCPFactory

func MCPFactory() TransportFactory

MCPFactory creates Handlers that dispatch calls as MCP tool invocations over QUIC. The payload is unmarshalled as a JSON map of tool arguments. The endpoint is a QUIC address (e.g. "10.0.0.5:4433").

The route config JSON must include "tool_name" to specify which MCP tool to call. Example config:

{"tool_name": "billing_process", "insecure_tls": true}

Register it with:

router.RegisterTransport("mcp", connectivity.MCPFactory())

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL