Documentation
¶
Overview ¶
Package niro is a streaming-first LLM runtime for Go.
Niro provides a minimal, composable architecture for building real-time AI systems. It is designed for low-latency, multimodal streaming pipelines — not request/response wrappers.
Core Concepts ¶
- Frame: Universal unit of data (text tokens, audio, image, video, tool calls)
- Stream: Backpressure-aware, cancellable sequence of Frames with usage tracking
- [Processor]: Composable stream transformer (the building block)
- [Pipeline]: Concurrent chain of Processors with automatic lifecycle
- Provider: LLM backend interface (OpenAI, Anthropic, Google, Bedrock, or custom)
- [Hook]: Telemetry / observability interface for tracing every generation
Quick Start ¶
provider := openai.New(os.Getenv("OPENAI_API_KEY"))
stream, err := provider.Generate(ctx, &niro.Request{
Model: "gpt-4o",
Messages: []niro.Message{
niro.UserText("Hello!"),
},
})
if err != nil {
log.Fatal(err)
}
for stream.Next(ctx) {
fmt.Print(stream.Frame().Text)
}
usage := stream.Usage()
fmt.Printf("tokens: %d in, %d out\n", usage.InputTokens, usage.OutputTokens)
Design Principles ¶
Streaming-first, not streaming-compatible. Minimal abstractions, maximum control. Zero magic. Composable pipelines. Backpressure-aware. Low allocations. Go idiomatic. Production-first.
Index ¶
- Constants
- Variables
- func AttachCacheContext(ctx context.Context, hint CacheHint, engine CacheEngine) context.Context
- func CollectText(ctx context.Context, s *Stream) (string, error)
- func DefaultScrubber(key string, val any) any
- func Forward(ctx context.Context, src *Stream, dst *Emitter) error
- func IsAuthError(err error) bool
- func IsRateLimited(err error) bool
- func IsRetryable(err error) bool
- func IsTimeout(err error) bool
- func JSONMarshal(v any) ([]byte, error)
- func JSONMarshalIndent(v any, prefix, indent string) ([]byte, error)
- func JSONUnmarshal(data []byte, v any) error
- func JSONValid(data []byte) bool
- func LogDebug(ctx context.Context, msg string, args ...any)
- func LogError(ctx context.Context, msg string, args ...any)
- func LogInfo(ctx context.Context, msg string, args ...any)
- func LogWarn(ctx context.Context, msg string, args ...any)
- func NewStream(bufSize int) (*Stream, *Emitter)
- func NormalizeCacheOptions(req *Request, normalizer PrefixNormalizer) (CacheHint, *Error)
- func PutResponseMeta(m *ResponseMeta)
- func PutUsage(u *Usage)
- func ResetLogger()
- func SetCacheUsageDetail(u *Usage, attempted, hit, write bool, cachedInputTokens int, ...)
- func SetJSON(lib *JSONLibrary)
- func SetLogger(l Logger)
- func SetScrubber(s Scrubber)
- func Temp(v float64) *float64
- func TopKVal(v int) *int
- func TopPVal(v float64) *float64
- func WithCacheEngine(ctx context.Context, engine CacheEngine) context.Context
- func WithCacheHint(ctx context.Context, hint CacheHint) context.Context
- func WithGenerationTimeout(ctx context.Context, timeout time.Duration) (context.Context, context.CancelFunc)
- type BytePool
- type CacheCapabilities
- type CacheCapableProvider
- type CacheEngine
- type CacheHint
- type CacheMode
- type CacheOptions
- type CacheScope
- type DefaultPrefixNormalizer
- type DiscardHandler
- type Emitter
- type Error
- type ErrorCode
- type ExperimentalFrame
- type Frame
- func AudioFrame(data []byte, mime string) Frame
- func AudioFramePooled(bp *BytePool, data []byte, mime string) Frame
- func Collect(ctx context.Context, s *Stream) ([]Frame, error)
- func ControlFrame(sig Signal) Frame
- func CustomFrame(c *ExperimentalFrame) Frame
- func ImageFrame(data []byte, mime string) Frame
- func ImageFramePooled(bp *BytePool, data []byte, mime string) Frame
- func TextFrame(s string) Frame
- func ToolCallFrame(call *ToolCall) Frame
- func ToolResultFrame(result *ToolResult) Frame
- func UsageFrame(u *Usage) Frame
- func VideoFrame(data []byte, mime string) Frame
- func VideoFramePooled(bp *BytePool, data []byte, mime string) Frame
- type JSONDecoder
- type JSONEncoder
- type JSONLibrary
- type Kind
- type Level
- type Logger
- type Message
- type Options
- type Part
- func AudioPart(data []byte, mime string) Part
- func CustomPart(custom *ExperimentalFrame) Part
- func ImagePart(data []byte, mime string) Part
- func ImageURLPart(url, mime string) Part
- func TextPart(s string) Part
- func ToolCallPart(call *ToolCall) Part
- func ToolResultPart(result *ToolResult) Part
- func VideoPart(data []byte, mime string) Part
- type PrefixNormalizer
- type PrefixNormalizerFunc
- type Provider
- type ProviderFunc
- type RealtimeConfig
- type RealtimeProvider
- type RealtimeSession
- type Request
- type ResponseMeta
- type Role
- type STTFunc
- type STTProvider
- type STTRequest
- type Scrubber
- type Signal
- type Stream
- type TTSFunc
- type TTSProvider
- type TTSRequest
- type TimeoutConfig
- type TimeoutProvider
- type Tool
- type ToolCall
- type ToolChoice
- type ToolResult
- type Usage
- type VADConfig
Constants ¶
const ( // AudioPCM8k is 8 kHz 16-bit mono PCM — telephony grade (PSTN / G.711-compatible). AudioPCM8k = "audio/pcm;rate=8000;bits=16;channels=1" // AudioPCM16k is 16 kHz 16-bit mono PCM — standard ASR / Nova Sonic input. AudioPCM16k = "audio/pcm;rate=16000;bits=16;channels=1" // AudioPCM24k is 24 kHz 16-bit mono PCM — Nova Sonic output, OpenAI Realtime. AudioPCM24k = "audio/pcm;rate=24000;bits=16;channels=1" // AudioPCM44k is 44.1 kHz 16-bit mono PCM — CD quality. AudioPCM44k = "audio/pcm;rate=44100;bits=16;channels=1" // AudioPCM48k is 48 kHz 16-bit mono PCM — WebRTC / studio quality. AudioPCM48k = "audio/pcm;rate=48000;bits=16;channels=1" )
--- Audio format MIME types ---
These constants encode sample rate, bit depth, and channel count in a single MIME string carried by Frame.Mime / Part.Mime. All formats are raw uncompressed PCM (little-endian signed 16-bit, mono) unless noted.
Use them with AudioFrame and AudioPart so consumers can decode without additional out-of-band configuration.
const ( // UsageReasoningTokens is the standard key for reasoning token count. UsageReasoningTokens = "reasoning_tokens" // UsageReasoningCost is the standard key for provider-reported reasoning cost units. UsageReasoningCost = "reasoning_cost" // UsageCacheAttempted is 0|1 indicating cache was attempted. UsageCacheAttempted = "cache_attempted" // UsageCacheHit is 0|1 indicating cache hit occurred. UsageCacheHit = "cache_hit" // UsageCacheWrite is 0|1 indicating cache write occurred. UsageCacheWrite = "cache_write" // UsageCachedInputTokens is the number of prompt tokens served from cache. UsageCachedInputTokens = "cached_input_tokens" // UsageCacheLatencySavedMS is integer milliseconds saved by cache. UsageCacheLatencySavedMS = "cache_latency_saved_ms" )
const ( // AudioOGGOpus is OGG-encapsulated Opus — ElevenLabs default, web-friendly. AudioOGGOpus = "audio/ogg;codecs=opus" // AudioMP3 is MPEG Layer 3 — universal playback support. AudioMP3 = "audio/mpeg" // AudioAAC is AAC in MP4 container — mobile-friendly. AudioAAC = "audio/aac" // AudioFLAC is lossless FLAC — high-quality archival. AudioFLAC = "audio/flac" // AudioWAV is RIFF/WAV — uncompressed, widely supported. AudioWAV = "audio/wav" // AudioPCMU8k is G.711 μ-law at 8kHz (telephony/PSTN). AudioPCMU8k = "audio/pcmu;rate=8000" // AudioPCMA8k is G.711 A-law at 8kHz (telephony/PSTN). AudioPCMA8k = "audio/pcma;rate=8000" )
── Encoded audio MIME constants ────────────────────────────────────────────
These complement the raw PCM constants in frame.go. Use them for TTS output and STT input when dealing with compressed formats.
Variables ¶
var ( ErrClosed = NewError(ErrCodeStreamClosed, "stream closed") ErrNoStructuredOutput = NewError(ErrCodeNoStructuredOutput, "no structured output") ErrContextCancelled = NewError(ErrCodeContextCancelled, "context cancelled") )
var DefaultBytePool = pool.DefaultBytePool
DefaultBytePool is the process-wide byte pool. Providers and processors should use this unless they need isolation.
Functions ¶
func AttachCacheContext ¶
AttachCacheContext stores cache metadata in one context.WithValue call.
func CollectText ¶
CollectText reads all text frames and concatenates their content. Uses a byte buffer with pre-allocated capacity.
func DefaultScrubber ¶
DefaultScrubber redacts values whose keys contain substrings associated with authentication material or cardholder data, satisfying PCI-DSS Requirements 3.4 and 10.3. Key matching is case-insensitive substring.
Redacted key patterns:
authorization, api_key, apikey, password, passwd, secret, token, credential, auth, bearer, pan, card, cvv, cvc, ssn.
func Forward ¶
Forward reads all frames from src and emits them to dst. Useful for connecting streams in custom Processors.
func IsAuthError ¶
IsAuthError reports whether an error is an authentication error.
func IsRateLimited ¶
IsRateLimited reports whether an error is a rate limit error.
func IsRetryable ¶
IsRetryable reports whether an error is retryable.
func JSONMarshal ¶
JSONMarshal marshals v using the configured JSON library.
func JSONMarshalIndent ¶
JSONMarshalIndent marshals v with indentation using the configured library.
func JSONUnmarshal ¶
JSONUnmarshal unmarshals data into v using the configured JSON library.
func LogDebug ¶
LogDebug emits a DEBUG record. Zero overhead when DEBUG is disabled.
PCI-DSS: DEBUG MUST NOT be enabled in production or PCI-scoped environments.
func LogError ¶
LogError emits an ERROR record for failures requiring operator attention.
PCI-DSS Req 10.2.4: authentication failures are always emitted at ERROR.
func LogWarn ¶
LogWarn emits a WARN record for transient failures and retries.
PCI-DSS Req 10.2: warn events are audit signals; retain logs for 12 months.
func NewStream ¶
NewStream creates a paired Stream and Emitter.
bufSize controls the channel buffer size between writer and reader. A bufSize of 0 means fully synchronous (unbuffered) — the writer blocks until the reader consumes each frame. Larger values allow the writer to get ahead, trading memory for throughput.
Typical values:
- 0: telephony / real-time (minimal latency)
- 16: general streaming (good default)
- 64: batch-style processing
func NormalizeCacheOptions ¶
func NormalizeCacheOptions(req *Request, normalizer PrefixNormalizer) (CacheHint, *Error)
NormalizeCacheOptions validates and normalizes cache metadata.
Key ownership is enforced to avoid cross-tenant data leakage: finalKey = Request.Client + ":" + CacheOptions.Key.
func PutResponseMeta ¶
func PutResponseMeta(m *ResponseMeta)
PutResponseMeta returns a ResponseMeta to the pool.
func ResetLogger ¶
func ResetLogger()
ResetLogger removes any override installed by SetLogger, restoring live delegation to slog.Default.
func SetCacheUsageDetail ¶
func SetCacheUsageDetail(u *Usage, attempted, hit, write bool, cachedInputTokens int, latencySavedMS int)
SetCacheUsageDetail writes canonical provider-agnostic cache metrics.
func SetJSON ¶
func SetJSON(lib *JSONLibrary)
SetJSON replaces the JSON implementation used by Niro. If lib is nil, the stdlib encoding/json implementation is used. Any nil fields are filled with stdlib defaults.
func SetLogger ¶
func SetLogger(l Logger)
SetLogger replaces the library-wide logger shared by all niro packages. It is safe to call concurrently at any time.
Pass nil to install Discard (suppress all output). Call ResetLogger to restore live slog.Default delegation.
// JSON output at debug level (use only in non-PCI environments):
niro.SetLogger(niro.NewSlogAdapter(slog.New(
slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug}),
)))
// Suppress all output:
niro.SetLogger(niro.Discard())
// Custom backend (zap, zerolog, …):
niro.SetLogger(&myZapAdapter{l: zapLogger})
func SetScrubber ¶
func SetScrubber(s Scrubber)
SetScrubber installs a global Scrubber applied to all niro log helpers. Pass nil to remove the installed scrubber (default: none).
niro.SetScrubber(niro.DefaultScrubber)
func WithCacheEngine ¶
func WithCacheEngine(ctx context.Context, engine CacheEngine) context.Context
WithCacheEngine stores an optional CacheEngine in context.
func WithCacheHint ¶
WithCacheHint stores normalized cache metadata in context.
func WithGenerationTimeout ¶
func WithGenerationTimeout(ctx context.Context, timeout time.Duration) (context.Context, context.CancelFunc)
WithGenerationTimeout returns a context with a generation timeout applied.
Types ¶
type BytePool ¶
BytePool is an alias for pool.BytePool. See github.com/alexedtionweb/niro-stream/pool for direct usage without the niro import.
type CacheCapabilities ¶
type CacheCapabilities struct {
SupportsPrefix bool
SupportsExplicitKeys bool
SupportsTTL bool
SupportsBypass bool
}
CacheCapabilities describes what a provider can honor.
func ProviderCacheCaps ¶
func ProviderCacheCaps(p Provider) CacheCapabilities
ProviderCacheCaps returns provider cache capabilities if exposed.
func (CacheCapabilities) SupportsHint ¶
func (c CacheCapabilities) SupportsHint(h CacheHint) bool
SupportsHint returns whether these capabilities can honor a require-level hint.
type CacheCapableProvider ¶
type CacheCapableProvider interface {
Provider
CacheCaps() CacheCapabilities
}
CacheCapableProvider optionally exposes provider cache capabilities. This is additive and keeps the Provider interface backward-compatible.
type CacheEngine ¶
type CacheEngine interface {
ResolvePrefixHash(ctx context.Context, req *Request, scope CacheScope) (key string, ok bool, err error)
StorePrefix(ctx context.Context, key string, scope CacheScope, ttl time.Duration, meta map[string]string) error
LookupPrefix(ctx context.Context, key string, scope CacheScope) (meta map[string]string, ok bool, err error)
}
CacheEngine is an optional pluggable local cache extension point. Core runtime works without any engine; providers may consume this via context.
func GetCacheEngine ¶
func GetCacheEngine(ctx context.Context) (CacheEngine, bool)
GetCacheEngine retrieves an optional CacheEngine from context.
type CacheHint ¶
type CacheHint struct {
Mode CacheMode
Scope CacheScope
Key string
TTL time.Duration
PrefixHash string
}
CacheHint is normalized cache metadata attached to request context. It is derived once per Runtime.Generate call and reused for retries.
type CacheMode ¶
type CacheMode uint8
CacheMode defines cache intent semantics.
const ( // CacheAuto lets the provider choose best-effort cache behavior. CacheAuto CacheMode = iota // CachePrefer requests cache usage but does not fail if unsupported. CachePrefer // CacheRequire fails the request if cache semantics cannot be applied. CacheRequire // CacheBypass explicitly disables cache usage for this request. CacheBypass )
type CacheOptions ¶
type CacheOptions struct {
Mode CacheMode
Key string // Optional deterministic key (namespaced by tenant/client)
TTL time.Duration // Hint only; providers may ignore
Scope CacheScope
}
CacheOptions declares cache intent in a provider-agnostic way.
Zero value means best-effort prefix cache with no explicit key/ttl hint.
type CacheScope ¶
type CacheScope uint8
CacheScope defines the logical cache scope.
const ( // CacheScopePrefix caches reusable prompt prefixes. // This is the only stable provider-agnostic scope currently supported. CacheScopePrefix CacheScope = iota )
type DefaultPrefixNormalizer ¶
type DefaultPrefixNormalizer struct{}
DefaultPrefixNormalizer normalizes model + effective messages as JSON.
func (DefaultPrefixNormalizer) NormalizePrefix ¶
func (DefaultPrefixNormalizer) NormalizePrefix(req *Request) ([]byte, error)
func (DefaultPrefixNormalizer) WritePrefixHash ¶
func (DefaultPrefixNormalizer) WritePrefixHash(h hash.Hash, req *Request) error
type DiscardHandler ¶
type DiscardHandler struct{}
DiscardHandler is a slog.Handler that silently discards all records. Use it when you need a silent *slog.Logger for NewSlogAdapter:
niro.SetLogger(niro.NewSlogAdapter(slog.New(niro.DiscardHandler{})))
For most cases, prefer Discard directly.
type Emitter ¶
type Emitter struct {
// contains filtered or unexported fields
}
Emitter writes Frames into a Stream. It is the write half of a Stream pipe.
Contract: do not call Emit after Close or Error. The Pipeline ensures this automatically for Processors.
func (*Emitter) Close ¶
func (e *Emitter) Close()
Close closes the stream. Safe to call multiple times.
func (*Emitter) Emit ¶
Emit sends a Frame into the stream. Blocks if the stream buffer is full (backpressure). Returns an error if the context is canceled or the stream is closed.
Emit is safe to call concurrently with Close — a concurrent close returns ErrClosed instead of panicking.
func (*Emitter) Error ¶
Error sets an error on the stream and closes it. The error is visible to the reader via Stream.Err().
func (*Emitter) SetResponse ¶
func (e *Emitter) SetResponse(meta *ResponseMeta)
SetResponse stores provider metadata on the stream. Call this before Close, typically after all frames are emitted.
type Error ¶
type Error struct {
Code ErrorCode
Message string
Err error // underlying error for error chaining
Provider string // which provider failed (if applicable)
RequestID string // trace ID for debugging
Retryable bool // whether the operation can be safely retried
StatusCode int // HTTP status code (if applicable)
}
Error represents a detailed error from Niro or a provider.
func WrapErrorf ¶
WrapErrorf wraps an error with formatted message.
func (*Error) LogValue ¶
LogValue implements slog.LogValuer so *Error emits structured attributes when passed to any slog call:
slog.Error("generate failed", "err", err)
// → err.code=429 err.provider=google err.message="Quota exceeded" err.retryable=true
func (*Error) WithProvider ¶
WithProvider adds provider context to an error.
func (*Error) WithRequestID ¶
WithRequestID adds request/trace ID context to an error.
func (*Error) WithStatusCode ¶
WithStatusCode adds HTTP status code context to an error.
type ErrorCode ¶
type ErrorCode int
ErrorCode categorizes runtime errors for proper handling.
const ( // Client errors (4xx) ErrCodeInvalidRequest ErrorCode = 400 ErrCodeAuthenticationFailed ErrorCode = 401 ErrCodeModelNotFound ErrorCode = 404 ErrCodeInvalidModel ErrorCode = 422 ErrCodeInsufficientQuota ErrorCode = 429 // Server errors (5xx) ErrCodeProviderError ErrorCode = 500 ErrCodeRateLimited ErrorCode = 509 ErrCodeTimeout ErrorCode = 504 ErrCodeInternalError ErrorCode = 510 // Niro-specific errors (6xx) ErrCodeStreamClosed ErrorCode = 600 ErrCodeNoStructuredOutput ErrorCode = 601 ErrCodeInvalidSchema ErrorCode = 602 ErrCodeContextCancelled ErrorCode = 603 ErrCodeStreamError ErrorCode = 604 )
func ConvertHTTPStatusToCode ¶
ConvertHTTPStatusToCode maps HTTP status codes to ErrorCode.
type ExperimentalFrame ¶
ExperimentalFrame carries provider-specific data without expanding core kinds. Type is an application/provider-defined discriminator (e.g. "reasoning_summary").
type Frame ¶
type Frame struct {
Kind Kind // Discriminator — always check this first
Text string // Token text (KindText)
Data []byte // Binary payload (KindAudio, KindImage, KindVideo)
Mime string // MIME type for Data (e.g. "audio/pcm", "image/png")
Tool *ToolCall // Tool call request (KindToolCall)
Result *ToolResult // Tool call result (KindToolResult)
Usage *Usage // Token usage (KindUsage) — emitted by providers at end of stream
Custom *ExperimentalFrame // Provider-specific/experimental payload (KindCustom)
Signal Signal // Control signal (KindControl)
}
Frame is the fundamental unit of data flowing through a Niro pipeline.
Frame is a tagged union optimized for the common case: text tokens. For text, only Kind and Text are populated — zero allocations beyond the string header. Binary payloads (audio, image, video) use the Data and Mime fields. Tool interactions use Tool and Result.
Frames are passed by value through channels. They are small and most fields are zero for any given Kind.
func AudioFrame ¶
AudioFrame creates a Frame carrying audio data.
func AudioFramePooled ¶
AudioFramePooled creates an audio Frame using a pooled buffer. The data is copied into a buffer from the pool. The caller should call [BytePool.Put] on Frame.Data after consumption.
func Collect ¶
Collect reads all frames from a Stream into a slice. Pre-allocates capacity when possible to minimize slice growth.
func ControlFrame ¶
ControlFrame creates a Frame carrying a control signal.
func CustomFrame ¶
func CustomFrame(c *ExperimentalFrame) Frame
CustomFrame creates a Frame carrying an experimental/provider-specific payload.
func ImageFrame ¶
ImageFrame creates a Frame carrying image data.
func ImageFramePooled ¶
ImageFramePooled creates an image Frame using a pooled buffer.
func ToolCallFrame ¶
ToolCallFrame creates a Frame carrying a tool call request.
func ToolResultFrame ¶
func ToolResultFrame(result *ToolResult) Frame
ToolResultFrame creates a Frame carrying a tool call result.
func UsageFrame ¶
UsageFrame creates a Frame carrying token usage data.
func VideoFrame ¶
VideoFrame creates a Frame carrying video data.
type JSONDecoder ¶
JSONDecoder is the minimal interface required by JSON decoders. Compatible with encoding/json and the same JSON libraries supported by Fiber.
func JSONNewDecoder ¶
func JSONNewDecoder(r io.Reader) JSONDecoder
JSONNewDecoder returns a new decoder using the configured JSON library.
type JSONEncoder ¶
JSONEncoder is the minimal interface required by JSON encoders. Compatible with encoding/json and the same JSON libraries supported by Fiber.
func JSONNewEncoder ¶
func JSONNewEncoder(w io.Writer) JSONEncoder
JSONNewEncoder returns a new encoder using the configured JSON library.
type JSONLibrary ¶
type JSONLibrary struct {
Marshal func(v any) ([]byte, error)
Unmarshal func(data []byte, v any) error
Valid func(data []byte) bool
NewEncoder func(w io.Writer) JSONEncoder
NewDecoder func(r io.Reader) JSONDecoder
MarshalIndent func(v any, prefix, indent string) ([]byte, error)
}
JSONLibrary defines the JSON functions used by Niro.
Compatible with the same libraries supported by Fiber:
- encoding/json (stdlib)
- github.com/goccy/go-json
- github.com/bytedance/sonic
- github.com/segmentio/encoding/json
- github.com/json-iterator/go
Users can call SetJSON to swap the implementation globally.
type Kind ¶
type Kind uint8
Kind identifies the type of data a Frame carries.
const ( KindText Kind = iota + 1 // Text token (the hot path) KindAudio // Audio chunk (PCM, opus, etc.) KindImage // Image data (PNG, JPEG, etc.) KindVideo // Video frame KindToolCall // Tool invocation request from LLM KindToolResult // Tool invocation result KindUsage // Token usage report KindCustom // Experimental/provider-specific payload KindControl // Pipeline control signal )
type Level ¶
type Level int
Level is the severity of a log record.
Values are intentionally identical to slog.Level so adapters convert between the two types with a zero-cost integer cast: slog.Level(rynLevel).
PCI-DSS guidance for each level is documented on each constant below.
const ( // LevelDebug is for verbose, per-request diagnostics. // // PCI-DSS Requirement 10: DEBUG MUST NOT be enabled in production or any // PCI-scoped environment. Debug output may include request or response // payloads that could expose cardholder data or authentication credentials. LevelDebug Level = -4 // LevelInfo records normal operational events (provider selection, model // routing). Safe for production. LevelInfo Level = 0 // LevelWarn records transient, recoverable conditions: retries, rate-limit // back-off, degraded provider state. Always safe for production. // // PCI-DSS Requirement 10.2: repeated warn events are audit signals; ensure // your log aggregator retains them for the required 12-month period. LevelWarn Level = 4 // LevelError records unrecoverable failures requiring operator attention. // Always safe for production. // // PCI-DSS Requirement 10.2.4: authentication failures MUST be logged at // this level or above. LevelError Level = 8 )
type Logger ¶
type Logger interface {
// Enabled reports whether records at level would be processed.
// MUST be cheap: no allocation, no lock, branch-predictor friendly.
// niro calls Enabled before constructing expensive args; a false return
// is a guaranteed zero-cost exit from the log helper.
Enabled(ctx context.Context, level Level) bool
// Log emits a structured record. niro guarantees Enabled(ctx,level)==true.
// args is a flat alternating (string key, any value) list.
Log(ctx context.Context, level Level, msg string, args ...any)
}
Logger is the minimal structured-logging interface used internally by niro. Any logging backend can be adapted by implementing two methods.
Key-value pairs in args follow the slog alternating-key-value convention (string, any, string, any, …). niro's own call sites use only plain string keys and standard Go values — no slog.Attr — so non-slog adapters require no special-case handling.
Implementations MUST be safe for concurrent use.
PCI-DSS guarantee ¶
niro only ever passes pre-approved, non-sensitive attributes to the logger: error codes, retry counts, durations, and request IDs. Request/response content and credentials are NEVER passed to the logger. Install a Scrubber as defence-in-depth if your application extends niro log call sites.
Adapter recipes ¶
// slog (built-in, zero boilerplate):
niro.SetLogger(niro.NewSlogAdapter(slog.Default()))
// zap:
niro.SetLogger(&zapAdapter{l: zapLogger})
// Enabled: l.Core().Enabled(zapcore.Level(level+4))
// Log: l.Sugar().Log(zapcore.Level(level+4), msg, keysAndValues...)
// zerolog:
niro.SetLogger(&zerologAdapter{l: &zerologLogger})
// Enabled: l.GetLevel() <= zerolog.Level(level/4+1)
// Log: l.WithLevel(...).Fields(args).Msg(msg)
func Discard ¶
func Discard() Logger
Discard returns a Logger that silently drops all records. Enabled always returns false so callers never build args.
niro.SetLogger(niro.Discard())
func GetLogger ¶
func GetLogger() Logger
GetLogger returns the active library Logger.
Before any SetLogger call (or after ResetLogger) this returns a live adapter over slog.Default: changes applied via slog.SetDefault are automatically visible here with no extra configuration.
type Message ¶
Message represents a single message in a conversation. A message contains one or more Parts, enabling multimodal content: a user message can carry text alongside images, audio, or video.
func AssistantText ¶
AssistantText creates an assistant text message. Useful for injecting assistant-turn prefills.
func ToolErrorMessage ¶
ToolErrorMessage creates a tool error result message.
func ToolMessage ¶
ToolMessage creates a tool result message.
type Options ¶
type Options struct {
MaxTokens int // Maximum output tokens
Temperature *float64 // Sampling temperature
TopP *float64 // Nucleus sampling
TopK *int // Top-K sampling (Anthropic, Google)
FrequencyPenalty *float64 // Frequency penalty (OpenAI)
PresencePenalty *float64 // Presence penalty (OpenAI)
Stop []string // Stop sequences
// Cache configures provider-agnostic input cache intent.
// Nil means cache is disabled with zero hot-path overhead.
Cache *CacheOptions
// ExperimentalReasoning enables provider-specific reasoning extensions.
// Providers may emit KindCustom frames (summaries/traces) when enabled.
ExperimentalReasoning bool
}
Options controls LLM generation parameters. Pointer fields distinguish "not set" from zero values.
type Part ¶
type Part struct {
Kind Kind
// Text content (KindText)
Text string
// Binary content (KindAudio, KindImage, KindVideo)
Data []byte
Mime string // MIME type for Data
// URL reference for remote content (alternative to Data).
// Providers that support URL references will use this directly;
// others will fetch and inline the data.
URL string
// Tool call (KindToolCall) — for assistant messages
Tool *ToolCall
// Tool result (KindToolResult) — for tool messages
Result *ToolResult
// Provider-specific/experimental payload (KindCustom)
Custom *ExperimentalFrame
}
Part is a content segment within a Message. Each Part carries exactly one kind of content.
func CustomPart ¶
func CustomPart(custom *ExperimentalFrame) Part
CustomPart creates a Part carrying an experimental/provider-specific payload.
func ImageURLPart ¶
ImageURLPart creates an image content Part from a URL.
func ToolCallPart ¶
ToolCallPart creates a tool call Part (for assistant messages).
func ToolResultPart ¶
func ToolResultPart(result *ToolResult) Part
ToolResultPart creates a tool result Part (for tool messages).
type PrefixNormalizer ¶
PrefixNormalizer creates deterministic bytes for prefix hashing. Implementations should avoid non-deterministic ordering.
type PrefixNormalizerFunc ¶
PrefixNormalizerFunc adapts a function to PrefixNormalizer.
func (PrefixNormalizerFunc) NormalizePrefix ¶
func (f PrefixNormalizerFunc) NormalizePrefix(req *Request) ([]byte, error)
type Provider ¶
Provider generates streaming LLM responses.
This is the primary interface for integrating LLM backends. Implementations must return a Stream that emits Frames as they arrive from the model — not after the full response.
Provider implementations should:
- Emit KindText frames for each text token delta
- Emit KindToolCall frames for completed tool calls
- Emit KindUsage frames with token counts (consumed automatically by Stream)
- Optionally emit KindCustom frames for provider-specific extensions
- Set ResponseMeta via Emitter.SetResponse before closing
- Respect context cancellation
Built-in: provider/openai, provider/anthropic, provider/google, provider/bedrock. Custom: implement this interface or use ProviderFunc.
type ProviderFunc ¶
ProviderFunc adapts a plain function to the Provider interface. Useful for ad-hoc providers, testing, and bring-your-own-model.
mock := niro.ProviderFunc(func(ctx context.Context, req *niro.Request) (*niro.Stream, error) {
s, e := niro.NewStream(0)
go func() {
defer e.Close()
e.Emit(ctx, niro.TextFrame("hello from mock"))
}()
return s, nil
})
type RealtimeConfig ¶
type RealtimeConfig struct {
// Model identifier. If empty, the provider's default model is used.
// Nova Sonic: "amazon.nova-sonic-v1:0"
// OpenAI: "gpt-4o-realtime-preview"
Model string
// SystemPrompt is sent as the initial SYSTEM instruction.
SystemPrompt string
// Voice selects the TTS synthesis voice.
// Nova Sonic: "matthew", "tiffany", "amy", "brian"
// OpenAI: "alloy", "echo", "fable", "nova", "onyx", "shimmer"
Voice string
// InputFormat is the MIME type of audio sent via Send.
// Defaults to AudioPCM16k for Nova Sonic, AudioPCM24k for OpenAI.
// Use the AudioPCM* constants.
InputFormat string
// OutputFormat is the requested audio MIME type received via Recv.
// Defaults to AudioPCM24k (supported by all providers).
OutputFormat string
// Tools available for the model to call.
// Received as KindToolCall frames from Recv.
Tools []Tool
// ToolChoice controls how the model selects tools.
ToolChoice ToolChoice
// VAD configures server-side Voice Activity Detection.
// When nil, VAD is not configured; the caller signals turn boundaries
// manually via Send(ControlFrame(SignalEOT)).
VAD *VADConfig
// Options controls generation parameters (temperature, max tokens, etc.)
Options Options
}
RealtimeConfig configures a bidirectional speech session.
type RealtimeProvider ¶
type RealtimeProvider interface {
Session(ctx context.Context, cfg RealtimeConfig) (RealtimeSession, error)
}
RealtimeProvider creates long-lived bidirectional speech sessions. Unlike Provider (unidirectional request-response), RealtimeProvider supports continuous audio streaming in both directions simultaneously — essential for voice assistants, telephony agents, and live translation.
Built-in implementations:
- provider/bedrock [SonicProvider] — Amazon Nova Sonic (16 kHz in, 24 kHz out)
- provider/realtime Provider — OpenAI Realtime API (24 kHz in/out)
type RealtimeSession ¶
type RealtimeSession interface {
// Send sends a frame to the model.
//
// Supported frame kinds:
// - KindAudio raw PCM audio chunk from the microphone
// - KindText text message (provider-dependent support)
// - KindToolResult result of a tool call received via Recv
// - KindControl
// SignalEOT end of user turn; model generates a response
// SignalAbort cancel an in-progress model response (barge-in)
Send(ctx context.Context, f Frame) error
// Recv returns the read-only output stream from the model.
// Call once; the stream remains open for the full session lifetime.
//
// Frame kinds emitted:
// - KindAudio synthesized speech
// - KindText transcript of the model's speech (when available)
// - KindToolCall tool invocation request; respond with Send(ToolResultFrame)
// - KindControl
// SignalFlush barge-in detected; clear the audio playback buffer
// SignalEOT model finished speaking this turn
// - KindUsage token/audio usage at end of turn
Recv() *Stream
// Close terminates the session and releases all resources.
// Safe to call multiple times and from any goroutine.
Close() error
// Err returns the first error that caused the session to fail.
// Returns nil if the session closed cleanly.
Err() error
}
RealtimeSession is a live bidirectional speech session.
Send and Recv operate concurrently. Run the audio-sending loop in a separate goroutine from the receiving loop.
sess, err := provider.Session(ctx, niro.RealtimeConfig{
SystemPrompt: "You are a helpful voice assistant.",
})
defer sess.Close()
// --- send goroutine ---
go func() {
for chunk := range mic.Chunks() {
if err := sess.Send(ctx, niro.AudioFrame(chunk, niro.AudioPCM16k)); err != nil {
return
}
}
// Signal end of user turn; model will respond.
sess.Send(ctx, niro.ControlFrame(niro.SignalEOT))
}()
// --- receive loop ---
out := sess.Recv()
for out.Next(ctx) {
f := out.Frame()
switch f.Kind {
case niro.KindAudio:
speaker.Play(f.Data) // PCM24k synthesized speech
case niro.KindText:
fmt.Print(f.Text) // transcript (when available)
case niro.KindToolCall:
result := executeTool(f.Tool)
sess.Send(ctx, niro.ToolResultFrame(&niro.ToolResult{
CallID: f.Tool.ID,
Content: result,
}))
case niro.KindControl:
if f.Signal == niro.SignalFlush {
speaker.ClearBuffer() // barge-in: user interrupted
}
}
}
type Request ¶
type Request struct {
// Client selects a logical provider/client at runtime.
//
// This is used by multi-tenant routers (e.g. MultiTenantProvider)
// to pick the underlying SDK client/provider for this request.
//
// Example values: "tenant-a-openai", "enterprise-bedrock-usw2".
//
// If empty, router-specific fallbacks apply (context/default client).
Client string
// Model identifier (e.g. "gpt-4o", "claude-sonnet-4-5", "gemini-2.0-flash").
// If empty, the provider's default model is used.
Model string
// SystemPrompt is a convenience field for a single system message.
// Prepended to Messages automatically. If you need multiple system
// messages or interleaved system turns, use Messages directly.
SystemPrompt string
// Messages is the conversation history.
// Multimodal: messages can contain text, images, audio, video.
Messages []Message
// Tools available for the LLM to call.
Tools []Tool
// ToolChoice controls how the model selects tools.
// Default is ToolChoiceAuto.
ToolChoice ToolChoice
// ResponseFormat controls the output format.
// Supported values depend on the provider:
// - "" (default): plain text
// - "json": JSON output
// - "json_schema": structured output (use with ResponseSchema)
ResponseFormat string
// ResponseSchema is a JSON Schema for structured output.
// Only used when ResponseFormat is "json_schema".
ResponseSchema json.RawMessage
// Options controls generation parameters.
Options Options
// Extra carries provider-specific configuration.
// Each provider documents its accepted types (typically a RequestHook
// function that receives the raw SDK params). Providers ignore
// unrecognized types. Use for per-request SDK customization not
// covered by the common Options.
//
// stream, err := llm.Generate(ctx, &niro.Request{
// Messages: msgs,
// Extra: openai.RequestHook(func(p *oai.ChatCompletionNewParams) {
// p.LogProbs = oai.Bool(true)
// }),
// })
Extra any
}
Request contains everything needed to call an LLM.
func (*Request) EffectiveMessages ¶
EffectiveMessages returns the final message list including any SystemPrompt prepended as a system message.
type ResponseMeta ¶
type ResponseMeta struct {
// Model actually used (may differ from requested if provider aliases).
Model string
// FinishReason indicates why generation stopped.
// Common values: "stop", "length", "tool_calls", "content_filter".
FinishReason string
// ID is the provider-assigned response ID.
ID string
// Usage is the token usage for this generation.
Usage Usage
// Provider-specific metadata (opaque).
ProviderMeta map[string]any
}
ResponseMeta carries metadata about a completed generation. Available after the stream is fully consumed via Stream.Response().
func GetResponseMeta ¶
func GetResponseMeta() *ResponseMeta
GetResponseMeta returns a ResponseMeta from the pool. Reset to zero.
type STTFunc ¶
type STTFunc func(ctx context.Context, req *STTRequest) (*Stream, error)
STTFunc adapts a plain function to the STTProvider interface.
func (STTFunc) Transcribe ¶
type STTProvider ¶
type STTProvider interface {
Transcribe(ctx context.Context, req *STTRequest) (*Stream, error)
}
STTProvider transcribes audio to text.
The returned Stream emits KindText frames for transcript segments. Interim (partial) results and final results are both KindText; use [STTMeta] in Frame.Extra or provider conventions to distinguish them.
Built-in: provider/elevenlabs, provider/googlespeech. Custom: implement this interface or use STTFunc.
type STTRequest ¶
type STTRequest struct {
// Audio is the input audio data.
// For single-shot transcription: provide the full audio bytes.
// For streaming: use AudioStream instead.
Audio []byte
// AudioStream is a Stream of [KindAudio] frames for streaming transcription.
// When set, Audio is ignored.
AudioStream *Stream
// InputFormat is the MIME type of the input audio (e.g. AudioPCM16k, AudioOGGOpus).
// Required so the provider knows how to decode.
InputFormat string
// Model selects the STT model (provider-specific).
// If empty, the provider's default model is used.
Model string
// Language is a BCP-47 language tag hint.
Language string
// InterimResults requests partial/interim transcripts in addition to final ones.
InterimResults bool
// Extra carries provider-specific configuration.
Extra any
}
STTRequest contains everything needed for a speech-to-text call.
type Scrubber ¶
Scrubber sanitises log attribute values before they reach the Logger.
PCI-DSS Requirements 3.4 and 10.3 prohibit logging PANs, credentials, and authentication tokens in clear text. Install a Scrubber as a defence-in-depth layer. niro itself never passes sensitive values to the logger; the Scrubber protects against application code that adds attrs to niro log call sites.
The function receives each attribute key and value; it returns the value to emit, a masked replacement such as "[REDACTED]", or nil to drop the field.
Install via SetScrubber. DefaultScrubber covers the most common PCI-DSS sensitive key patterns. The scrubber is only invoked when the logger is enabled for the record's level — zero overhead when the level is off.
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream reads Frames from a pipeline stage.
Stream follows the bufio.Scanner iteration pattern:
for stream.Next(ctx) {
f := stream.Frame()
// process f
}
if err := stream.Err(); err != nil {
// handle error
}
usage := stream.Usage()
Streams respect context cancellation and propagate errors from the writing side (Emitter). Usage data is accumulated automatically from KindUsage frames.
func StreamFromSlice ¶
StreamFromSlice creates a Stream pre-loaded with the given frames. The stream is immediately closed after all frames are buffered. Useful for testing and providing static input.
func (*Stream) Chan ¶
Chan exposes the underlying channel for use in select statements. Advanced use only — prefer Next for standard iteration.
func (*Stream) Err ¶
Err returns the first error encountered during iteration. Returns nil on clean end-of-stream.
func (*Stream) Frame ¶
Frame returns the current Frame. Must only be called after Next returns true.
func (*Stream) Next ¶
Next advances the Stream to the next Frame. Returns false when the stream is exhausted, an error occurs, or the context is canceled.
KindUsage frames are consumed automatically and accumulated in the Usage — they are not returned to the caller.
func (*Stream) Response ¶
func (s *Stream) Response() *ResponseMeta
Response returns provider metadata set by the Emitter. Available after the stream is fully consumed.
type TTSFunc ¶
type TTSFunc func(ctx context.Context, req *TTSRequest) (*Stream, error)
TTSFunc adapts a plain function to the TTSProvider interface.
func (TTSFunc) Synthesize ¶
type TTSProvider ¶
type TTSProvider interface {
Synthesize(ctx context.Context, req *TTSRequest) (*Stream, error)
}
TTSProvider synthesizes speech from text.
Unlike Provider (which operates on message turns), TTSProvider is a simple text→audio streaming interface. The returned Stream emits KindAudio frames as chunks arrive from the synthesis engine.
Built-in: provider/elevenlabs, provider/googlespeech. Custom: implement this interface or use TTSFunc.
type TTSRequest ¶
type TTSRequest struct {
// Text is the input to synthesize.
Text string
// Voice selects the synthesis voice (provider-specific).
// If empty, the provider's default voice is used.
Voice string
// Model selects the TTS model (provider-specific).
// If empty, the provider's default model is used.
Model string
// Language is a BCP-47 language tag (e.g. "en", "es", "de").
// Not all providers require this.
Language string
// OutputFormat is the desired output MIME type (e.g. AudioOGGOpus, AudioMP3).
// If empty, the provider picks its default format.
OutputFormat string
// Speed is a playback speed multiplier. 1.0 = normal.
// Not all providers support this.
Speed float64
// Extra carries provider-specific configuration.
// Each provider documents its accepted types.
Extra any
}
TTSRequest contains everything needed for a text-to-speech call.
type TimeoutConfig ¶
type TimeoutConfig struct {
// GenerationTimeout is the max time for a single generation request
GenerationTimeout time.Duration
// FrameTimeout is the max time to wait for the next frame
FrameTimeout time.Duration
// ToolTimeout is the max time to wait for a tool execution result
ToolTimeout time.Duration
}
TimeoutConfig configures timeout behavior.
func DefaultTimeoutConfig ¶
func DefaultTimeoutConfig() TimeoutConfig
DefaultTimeoutConfig returns sensible defaults: - 5 minutes for generation - 30 seconds for each frame - 1 minute for tool execution
func TelephonyTimeoutConfig ¶
func TelephonyTimeoutConfig() TimeoutConfig
TelephonyTimeoutConfig returns timeouts tuned for real-time voice pipelines where the full response must fit inside a conversational turn:
- GenerationTimeout: 8 s — total budget for a single voice turn
- FrameTimeout: 300 ms — barge-in detection window; stalled tokens abort the stream before the user notices a freeze
- ToolTimeout: 5 s — tools must resolve within the voice-turn budget
These values are intentionally conservative. Adjust per deployment after measuring p99 TTFT and tool-execution latencies in production.
type TimeoutProvider ¶
type TimeoutProvider struct {
// contains filtered or unexported fields
}
TimeoutProvider wraps a Provider with generation timeout enforcement.
The timeout covers the entire generation lifecycle: from the initial API call through streaming until the last frame is consumed.
func NewTimeoutProvider ¶
func NewTimeoutProvider(p Provider, timeout time.Duration) *TimeoutProvider
NewTimeoutProvider creates a Provider that enforces generation timeouts.
type Tool ¶
type Tool struct {
Name string // Function name
Description string // Human-readable description
Parameters json.RawMessage // JSON Schema for parameters
}
Tool defines a tool that can be provided to an LLM.
type ToolCall ¶
type ToolCall struct {
ID string // Provider-assigned call ID
Name string // Tool function name
Args json.RawMessage // Arguments as JSON
}
ToolCall represents an LLM's request to invoke a tool.
type ToolChoice ¶
type ToolChoice string
ToolChoice controls how the model selects tools.
const ( ToolChoiceAuto ToolChoice = "auto" // Model decides (default) ToolChoiceNone ToolChoice = "none" // Never call tools ToolChoiceRequired ToolChoice = "required" // Must call at least one tool )
func ToolChoiceFunc ¶
func ToolChoiceFunc(name string) ToolChoice
ToolChoiceFunc forces the model to call a specific tool.
func (ToolChoice) Validate ¶
func (tc ToolChoice) Validate() error
Validate checks if ToolChoice is valid when tools are present.
type ToolResult ¶
type ToolResult struct {
CallID string // Matches ToolCall.ID
Content string // Result content (may be JSON or plain text)
IsError bool // Whether this result represents an error
}
ToolResult represents the outcome of a tool invocation.
func (*ToolResult) Validate ¶
func (tr *ToolResult) Validate() error
Validate checks if a ToolResult is valid.
type Usage ¶
type Usage struct {
InputTokens int // Prompt tokens
OutputTokens int // Completion tokens
TotalTokens int // InputTokens + OutputTokens (some providers report directly)
// Provider-specific detail (optional).
// E.g. cached tokens, audio tokens, reasoning tokens.
Detail map[string]int
}
Usage tracks token consumption for a generation.
func GetUsage ¶
func GetUsage() *Usage
GetUsage returns a Usage from the pool. Reset to zero values. Call PutUsage when done.
type VADConfig ¶
type VADConfig struct {
// Threshold is the activation confidence (0.0–1.0).
// Higher values reduce false positives at the cost of onset latency.
// Default: 0.5
Threshold float64
// PrefixPaddingMs is the pre-speech audio prepended to each utterance
// to avoid clipping word onset. Default: 300 ms.
PrefixPaddingMs int
// SilenceDurationMs is the post-speech silence before turn-end is
// triggered. Default: 200 ms.
SilenceDurationMs int
}
VADConfig configures server-side Voice Activity Detection.
When enabled, the provider automatically detects speech boundaries and emits KindControl frames:
- SignalFlush on speech start (user is talking; clear playback buffer)
- SignalEOT on speech end (user stopped talking; model will respond)
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package component provides the Component interface and ComponentHost for managing pluggable runtime extensions with lifecycle management.
|
Package component provides the Component interface and ComponentHost for managing pluggable runtime extensions with lifecycle management. |
|
Package hook provides the Hook interface for observing LLM generation lifecycle events: start, end, per-frame, tool calls, and errors.
|
Package hook provides the Hook interface for observing LLM generation lifecycle events: start, end, per-frame, tool calls, and errors. |
|
internal
|
|
|
sse
Package sse provides a minimal Server-Sent Events reader.
|
Package sse provides a minimal Server-Sent Events reader. |
|
Package middleware provides provider wrappers: Cache, Retry, Timeout, and Tracing.
|
Package middleware provides provider wrappers: Cache, Retry, Timeout, and Tracing. |
|
Package orchestrate provides concurrent workflow primitives: Fan (parallel merge), Race (first wins), and Sequence (chained).
|
Package orchestrate provides concurrent workflow primitives: Fan (parallel merge), Race (first wins), and Sequence (chained). |
|
Package pipe provides the Processor interface and built-in processors for transforming Frame streams in a Pipeline.
|
Package pipe provides the Processor interface and built-in processors for transforming Frame streams in a Pipeline. |
|
Package pool provides a size-class byte buffer pool that eliminates allocations on media-heavy hot paths (audio chunks, image tiles, video frames).
|
Package pool provides a size-class byte buffer pool that eliminates allocations on media-heavy hot paths (audio chunks, image tiles, video frames). |
|
provider
|
|
|
compat
Package compat implements a Niro Provider using raw HTTP + SSE.
|
Package compat implements a Niro Provider using raw HTTP + SSE. |
|
anthropic
module
|
|
|
bedrock
module
|
|
|
elevenlabs
module
|
|
|
google
module
|
|
|
openai
module
|
|
|
realtime
module
|
|
|
Package registry provides named provider registration and routing.
|
Package registry provides named provider registration and routing. |
|
Package runtime manages the lifecycle of a Provider with optional hooks and a post-processing Pipeline.
|
Package runtime manages the lifecycle of a Provider with optional hooks and a post-processing Pipeline. |
|
Package transport provides production-grade HTTP transport and client configurations tuned for high-concurrency LLM API traffic.
|
Package transport provides production-grade HTTP transport and client configurations tuned for high-concurrency LLM API traffic. |