sampling

package
v0.1.0-rc4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 4, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Float64SerDe

type Float64SerDe struct{}

Float64SerDe provides serialization for float64 (8 bytes per item).

func (Float64SerDe) DeserializeFromBytes

func (s Float64SerDe) DeserializeFromBytes(data []byte, numItems int) ([]float64, error)

func (Float64SerDe) SerializeToBytes

func (s Float64SerDe) SerializeToBytes(items []float64) ([]byte, error)

func (Float64SerDe) SizeOfItem

func (s Float64SerDe) SizeOfItem() int

type Int32SerDe

type Int32SerDe struct{}

Int32SerDe provides serialization for int32 (4 bytes per item).

func (Int32SerDe) DeserializeFromBytes

func (s Int32SerDe) DeserializeFromBytes(data []byte, numItems int) ([]int32, error)

func (Int32SerDe) SerializeToBytes

func (s Int32SerDe) SerializeToBytes(items []int32) ([]byte, error)

func (Int32SerDe) SizeOfItem

func (s Int32SerDe) SizeOfItem() int

type Int64SerDe

type Int64SerDe struct{}

Int64SerDe provides serialization for int64 (8 bytes per item).

func (Int64SerDe) DeserializeFromBytes

func (s Int64SerDe) DeserializeFromBytes(data []byte, numItems int) ([]int64, error)

func (Int64SerDe) SerializeToBytes

func (s Int64SerDe) SerializeToBytes(items []int64) ([]byte, error)

func (Int64SerDe) SizeOfItem

func (s Int64SerDe) SizeOfItem() int

type ItemsSerDe

type ItemsSerDe[T any] interface {
	// SerializeToBytes converts items to a byte slice.
	SerializeToBytes(items []T) ([]byte, error)

	// DeserializeFromBytes converts bytes back to items.
	// numItems specifies how many items to read from the data.
	DeserializeFromBytes(data []byte, numItems int) ([]T, error)

	// SizeOfItem returns the size in bytes for a single item.
	// Returns -1 for variable-length types (like string).
	SizeOfItem() int
}

ItemsSerDe defines the interface for serializing and deserializing items. Users must implement this interface for custom types. Built-in implementations are provided for common types (int64, int32, string, float64).

type ReservoirItemsSketch

type ReservoirItemsSketch[T any] struct {
	// contains filtered or unexported fields
}

ReservoirItemsSketch provides a uniform random sample of items from a stream of unknown size using the reservoir sampling algorithm.

The algorithm works in two phases:

  • Initial phase (n < k): all items are stored
  • Steady state (n >= k): each new item replaces a random item with probability k/n

This ensures each item has equal probability k/n of being in the final sample.

func NewReservoirItemsSketch

func NewReservoirItemsSketch[T any](
	k int, opts ...ReservoirItemsSketchOptionFunc,
) (*ReservoirItemsSketch[T], error)

NewReservoirItemsSketch creates a new reservoir sketch with the given capacity k.

func NewReservoirItemsSketchFromSlice

func NewReservoirItemsSketchFromSlice[T any](data []byte, serde ItemsSerDe[T]) (*ReservoirItemsSketch[T], error)

NewReservoirItemsSketchFromSlice deserializes a sketch from a byte slice.

func (*ReservoirItemsSketch[T]) Copy

func (s *ReservoirItemsSketch[T]) Copy() *ReservoirItemsSketch[T]

Copy returns a deep copy of the sketch.

func (*ReservoirItemsSketch[T]) DownsampledCopy

func (s *ReservoirItemsSketch[T]) DownsampledCopy(newK int) (*ReservoirItemsSketch[T], error)

DownsampledCopy returns a copy with a reduced reservoir size. If newK >= current K, returns a regular copy.

func (*ReservoirItemsSketch[T]) EstimateSubsetSum

func (s *ReservoirItemsSketch[T]) EstimateSubsetSum(predicate func(T) bool) (SampleSubsetSummary, error)

EstimateSubsetSum computes an estimated subset sum from the entire stream for objects matching a given predicate. Provides a lower bound, estimate, and upper bound using a target of 2 standard deviations.

NOTE: This is technically a heuristic method, and tries to err on the conservative side.

predicate: A predicate to use when identifying items. Returns a summary object containing the estimate, upper and lower bounds, and the total sketch weight.

func (*ReservoirItemsSketch[T]) ImplicitSampleWeight

func (s *ReservoirItemsSketch[T]) ImplicitSampleWeight() float64

ImplicitSampleWeight returns N/K when in sampling mode, or 1.0 in exact mode.

func (*ReservoirItemsSketch[T]) IsEmpty

func (s *ReservoirItemsSketch[T]) IsEmpty() bool

IsEmpty returns true if no items have been seen.

func (*ReservoirItemsSketch[T]) K

func (s *ReservoirItemsSketch[T]) K() int

K returns the maximum reservoir capacity.

func (*ReservoirItemsSketch[T]) N

func (s *ReservoirItemsSketch[T]) N() int64

N returns the total number of items seen by the sketch.

func (*ReservoirItemsSketch[T]) NumSamples

func (s *ReservoirItemsSketch[T]) NumSamples() int

NumSamples returns the number of items currently in the reservoir.

func (*ReservoirItemsSketch[T]) Reset

func (s *ReservoirItemsSketch[T]) Reset()

Reset clears the sketch while preserving capacity k.

func (*ReservoirItemsSketch[T]) Samples

func (s *ReservoirItemsSketch[T]) Samples() []T

Samples returns a copy of the items in the reservoir.

func (*ReservoirItemsSketch[T]) String

func (s *ReservoirItemsSketch[T]) String() string

String returns human-readable summary of the sketch, without items.

func (*ReservoirItemsSketch[T]) ToSlice

func (s *ReservoirItemsSketch[T]) ToSlice(serde ItemsSerDe[T]) ([]byte, error)

ToSlice serializes the sketch to a byte slice.

func (*ReservoirItemsSketch[T]) Update

func (s *ReservoirItemsSketch[T]) Update(item T)

Update adds an item to the sketch using reservoir sampling algorithm.

type ReservoirItemsSketchOptionFunc

type ReservoirItemsSketchOptionFunc func(*reservoirItemsSketchOptions)

ReservoirItemsSketchOptionFunc defines a functional option for configuring reservoirItemsSketchOptions.

func WithReservoirItemsSketchResizeFactor

func WithReservoirItemsSketchResizeFactor(rf ResizeFactor) ReservoirItemsSketchOptionFunc

WithReservoirItemsSketchResizeFactor sets the resize factor for the internal array.

type ReservoirItemsUnion

type ReservoirItemsUnion[T any] struct {
	// contains filtered or unexported fields
}

ReservoirItemsUnion enables merging of multiple ReservoirItemsSketch instances. This is useful for distributed sampling where each node maintains a local sketch, and the results are merged to get a global sample.

The union maintains statistical correctness by: - Dynamically choosing merge direction (lighter sketch merges into heavier) - Using weighted sampling with correct probability formula - Preserving the smaller K when merging sketches in sampling mode

func NewReservoirItemsUnion

func NewReservoirItemsUnion[T any](maxK int) (*ReservoirItemsUnion[T], error)

NewReservoirItemsUnion creates a new union with the specified maximum k.

func NewReservoirItemsUnionFromSlice

func NewReservoirItemsUnionFromSlice[T any](data []byte, serde ItemsSerDe[T]) (*ReservoirItemsUnion[T], error)

NewReservoirItemsUnionFromSlice deserializes a union from a byte slice.

func (*ReservoirItemsUnion[T]) MaxK

func (u *ReservoirItemsUnion[T]) MaxK() int

MaxK returns the maximum k for this union.

func (*ReservoirItemsUnion[T]) Reset

func (u *ReservoirItemsUnion[T]) Reset()

Reset clears the union.

func (*ReservoirItemsUnion[T]) Result

func (u *ReservoirItemsUnion[T]) Result() (*ReservoirItemsSketch[T], error)

Result returns a copy of the internal sketch.

func (*ReservoirItemsUnion[T]) String

func (u *ReservoirItemsUnion[T]) String() string

String returns a human-readable summary of the union.

func (*ReservoirItemsUnion[T]) ToSlice

func (u *ReservoirItemsUnion[T]) ToSlice(serde ItemsSerDe[T]) ([]byte, error)

ToSlice serializes the union to a byte slice.

func (*ReservoirItemsUnion[T]) Update

func (u *ReservoirItemsUnion[T]) Update(item T)

Update adds a single item to the union.

func (*ReservoirItemsUnion[T]) UpdateFromRaw

func (u *ReservoirItemsUnion[T]) UpdateFromRaw(n int64, k int, items []T) error

UpdateFromRaw creates a sketch from raw components and merges it. Useful in distributed environments. Items slice is used directly, not copied.

func (*ReservoirItemsUnion[T]) UpdateSketch

func (u *ReservoirItemsUnion[T]) UpdateSketch(sketch *ReservoirItemsSketch[T]) error

UpdateSketch merges another sketch into the union. This implements Java's update(ReservoirItemsSketch) with twoWayMergeInternal logic.

type ResizeFactor

type ResizeFactor int

ResizeFactor controls how the internal array grows. Note: Go's slice append has automatic resizing, so this is kept for API compatibility with the Java version. Can be removed if not needed. TODO: In Java, this is abstracted into a common package. Consider if this should be moved to a common package in the future.

const (
	ResizeX1 ResizeFactor = 1
	ResizeX2 ResizeFactor = 2
	ResizeX4 ResizeFactor = 4
	ResizeX8 ResizeFactor = 8
)

type Sample

type Sample[T any] struct {
	Item   T
	Weight float64
}

Sample represents a weighted sample item.

type SampleSubsetSummary

type SampleSubsetSummary struct {
	LowerBound        float64
	Estimate          float64
	UpperBound        float64
	TotalSketchWeight float64
}

SampleSubsetSummary is a simple object that captures the results of a subset sum query on a sampling sketch.

type StringSerDe

type StringSerDe struct{}

StringSerDe provides serialization for string (variable length: 4-byte length prefix + content).

func (StringSerDe) DeserializeFromBytes

func (s StringSerDe) DeserializeFromBytes(data []byte, numItems int) ([]string, error)

func (StringSerDe) SerializeToBytes

func (s StringSerDe) SerializeToBytes(items []string) ([]byte, error)

func (StringSerDe) SizeOfItem

func (s StringSerDe) SizeOfItem() int

type VarOptItemsSketch

type VarOptItemsSketch[T any] struct {
	// contains filtered or unexported fields
}

VarOptItemsSketch implements variance-optimal weighted sampling.

This sketch samples weighted items from a stream with optimal variance for subset sum estimation. The algorithm maintains two regions:

  • H region (heavy): Items with weight >= tau (stored in a min-heap)
  • R region (reservoir): Items with weight < tau (sampled proportionally)

The array layout is: [H region: 0..h) [gap/M region: h..h+m) [R region: h+m..h+m+r) In steady state, m=0 and h+r=k. The gap slot at position h is used during updates.

When all weights are equal (e.g., 1.0), this reduces to standard reservoir sampling.

Reference: Cohen et al., "Efficient Stream Sampling for Variance-Optimal Estimation of Subset Sums", SIAM J. Comput. 40(5): 1402-1431, 2011.

func NewVarOptItemsSketch

func NewVarOptItemsSketch[T any](k int, opts ...VarOptOption) (*VarOptItemsSketch[T], error)

func (*VarOptItemsSketch[T]) All

func (s *VarOptItemsSketch[T]) All() iter.Seq[Sample[T]]

All returns an iterator over all samples with their adjusted weights. For items in H region, the weight is the original weight. For items in R region, the weight is tau (totalWeightR / r).

func (*VarOptItemsSketch[T]) H

func (s *VarOptItemsSketch[T]) H() int

H returns the number of items in the H (heavy) region.

func (*VarOptItemsSketch[T]) IsEmpty

func (s *VarOptItemsSketch[T]) IsEmpty() bool

IsEmpty returns true if the sketch has not processed any items.

func (*VarOptItemsSketch[T]) K

func (s *VarOptItemsSketch[T]) K() int

K returns the configured maximum sample size.

func (*VarOptItemsSketch[T]) N

func (s *VarOptItemsSketch[T]) N() int64

N returns the total number of items processed by the sketch.

func (*VarOptItemsSketch[T]) NumSamples

func (s *VarOptItemsSketch[T]) NumSamples() int

NumSamples returns the number of items currently retained in the sketch.

func (*VarOptItemsSketch[T]) R

func (s *VarOptItemsSketch[T]) R() int

R returns the number of items in the R (reservoir) region.

func (*VarOptItemsSketch[T]) Reset

func (s *VarOptItemsSketch[T]) Reset()

Reset clears the sketch to its initial empty state while preserving k.

func (*VarOptItemsSketch[T]) TotalWeightR

func (s *VarOptItemsSketch[T]) TotalWeightR() float64

TotalWeightR returns the total weight of items in the R region.

func (*VarOptItemsSketch[T]) Update

func (s *VarOptItemsSketch[T]) Update(item T, weight float64) error

Update adds an item with the given weight to the sketch. Weight must be positive and finite.

type VarOptOption

type VarOptOption func(*varOptConfig)

func WithResizeFactor

func WithResizeFactor(rf ResizeFactor) VarOptOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL