Documentation
¶
Index ¶
- type Float64SerDe
- type Int32SerDe
- type Int64SerDe
- type ItemsSerDe
- type ReservoirItemsSketch
- func (s *ReservoirItemsSketch[T]) Copy() *ReservoirItemsSketch[T]
- func (s *ReservoirItemsSketch[T]) DownsampledCopy(newK int) (*ReservoirItemsSketch[T], error)
- func (s *ReservoirItemsSketch[T]) EstimateSubsetSum(predicate func(T) bool) (SampleSubsetSummary, error)
- func (s *ReservoirItemsSketch[T]) ImplicitSampleWeight() float64
- func (s *ReservoirItemsSketch[T]) IsEmpty() bool
- func (s *ReservoirItemsSketch[T]) K() int
- func (s *ReservoirItemsSketch[T]) N() int64
- func (s *ReservoirItemsSketch[T]) NumSamples() int
- func (s *ReservoirItemsSketch[T]) Reset()
- func (s *ReservoirItemsSketch[T]) Samples() []T
- func (s *ReservoirItemsSketch[T]) String() string
- func (s *ReservoirItemsSketch[T]) ToSlice(serde ItemsSerDe[T]) ([]byte, error)
- func (s *ReservoirItemsSketch[T]) Update(item T)
- type ReservoirItemsSketchOptionFunc
- type ReservoirItemsUnion
- func (u *ReservoirItemsUnion[T]) MaxK() int
- func (u *ReservoirItemsUnion[T]) Reset()
- func (u *ReservoirItemsUnion[T]) Result() (*ReservoirItemsSketch[T], error)
- func (u *ReservoirItemsUnion[T]) String() string
- func (u *ReservoirItemsUnion[T]) ToSlice(serde ItemsSerDe[T]) ([]byte, error)
- func (u *ReservoirItemsUnion[T]) Update(item T)
- func (u *ReservoirItemsUnion[T]) UpdateFromRaw(n int64, k int, items []T) error
- func (u *ReservoirItemsUnion[T]) UpdateSketch(sketch *ReservoirItemsSketch[T]) error
- type ResizeFactor
- type Sample
- type SampleSubsetSummary
- type StringSerDe
- type VarOptItemsSketch
- func (s *VarOptItemsSketch[T]) All() iter.Seq[Sample[T]]
- func (s *VarOptItemsSketch[T]) H() int
- func (s *VarOptItemsSketch[T]) IsEmpty() bool
- func (s *VarOptItemsSketch[T]) K() int
- func (s *VarOptItemsSketch[T]) N() int64
- func (s *VarOptItemsSketch[T]) NumSamples() int
- func (s *VarOptItemsSketch[T]) R() int
- func (s *VarOptItemsSketch[T]) Reset()
- func (s *VarOptItemsSketch[T]) TotalWeightR() float64
- func (s *VarOptItemsSketch[T]) Update(item T, weight float64) error
- type VarOptOption
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Float64SerDe ¶
type Float64SerDe struct{}
Float64SerDe provides serialization for float64 (8 bytes per item).
func (Float64SerDe) DeserializeFromBytes ¶
func (s Float64SerDe) DeserializeFromBytes(data []byte, numItems int) ([]float64, error)
func (Float64SerDe) SerializeToBytes ¶
func (s Float64SerDe) SerializeToBytes(items []float64) ([]byte, error)
func (Float64SerDe) SizeOfItem ¶
func (s Float64SerDe) SizeOfItem() int
type Int32SerDe ¶
type Int32SerDe struct{}
Int32SerDe provides serialization for int32 (4 bytes per item).
func (Int32SerDe) DeserializeFromBytes ¶
func (s Int32SerDe) DeserializeFromBytes(data []byte, numItems int) ([]int32, error)
func (Int32SerDe) SerializeToBytes ¶
func (s Int32SerDe) SerializeToBytes(items []int32) ([]byte, error)
func (Int32SerDe) SizeOfItem ¶
func (s Int32SerDe) SizeOfItem() int
type Int64SerDe ¶
type Int64SerDe struct{}
Int64SerDe provides serialization for int64 (8 bytes per item).
func (Int64SerDe) DeserializeFromBytes ¶
func (s Int64SerDe) DeserializeFromBytes(data []byte, numItems int) ([]int64, error)
func (Int64SerDe) SerializeToBytes ¶
func (s Int64SerDe) SerializeToBytes(items []int64) ([]byte, error)
func (Int64SerDe) SizeOfItem ¶
func (s Int64SerDe) SizeOfItem() int
type ItemsSerDe ¶
type ItemsSerDe[T any] interface { // SerializeToBytes converts items to a byte slice. SerializeToBytes(items []T) ([]byte, error) // DeserializeFromBytes converts bytes back to items. // numItems specifies how many items to read from the data. DeserializeFromBytes(data []byte, numItems int) ([]T, error) // SizeOfItem returns the size in bytes for a single item. // Returns -1 for variable-length types (like string). SizeOfItem() int }
ItemsSerDe defines the interface for serializing and deserializing items. Users must implement this interface for custom types. Built-in implementations are provided for common types (int64, int32, string, float64).
type ReservoirItemsSketch ¶
type ReservoirItemsSketch[T any] struct { // contains filtered or unexported fields }
ReservoirItemsSketch provides a uniform random sample of items from a stream of unknown size using the reservoir sampling algorithm.
The algorithm works in two phases:
- Initial phase (n < k): all items are stored
- Steady state (n >= k): each new item replaces a random item with probability k/n
This ensures each item has equal probability k/n of being in the final sample.
func NewReservoirItemsSketch ¶
func NewReservoirItemsSketch[T any]( k int, opts ...ReservoirItemsSketchOptionFunc, ) (*ReservoirItemsSketch[T], error)
NewReservoirItemsSketch creates a new reservoir sketch with the given capacity k.
func NewReservoirItemsSketchFromSlice ¶
func NewReservoirItemsSketchFromSlice[T any](data []byte, serde ItemsSerDe[T]) (*ReservoirItemsSketch[T], error)
NewReservoirItemsSketchFromSlice deserializes a sketch from a byte slice.
func (*ReservoirItemsSketch[T]) Copy ¶
func (s *ReservoirItemsSketch[T]) Copy() *ReservoirItemsSketch[T]
Copy returns a deep copy of the sketch.
func (*ReservoirItemsSketch[T]) DownsampledCopy ¶
func (s *ReservoirItemsSketch[T]) DownsampledCopy(newK int) (*ReservoirItemsSketch[T], error)
DownsampledCopy returns a copy with a reduced reservoir size. If newK >= current K, returns a regular copy.
func (*ReservoirItemsSketch[T]) EstimateSubsetSum ¶
func (s *ReservoirItemsSketch[T]) EstimateSubsetSum(predicate func(T) bool) (SampleSubsetSummary, error)
EstimateSubsetSum computes an estimated subset sum from the entire stream for objects matching a given predicate. Provides a lower bound, estimate, and upper bound using a target of 2 standard deviations.
NOTE: This is technically a heuristic method, and tries to err on the conservative side.
predicate: A predicate to use when identifying items. Returns a summary object containing the estimate, upper and lower bounds, and the total sketch weight.
func (*ReservoirItemsSketch[T]) ImplicitSampleWeight ¶
func (s *ReservoirItemsSketch[T]) ImplicitSampleWeight() float64
ImplicitSampleWeight returns N/K when in sampling mode, or 1.0 in exact mode.
func (*ReservoirItemsSketch[T]) IsEmpty ¶
func (s *ReservoirItemsSketch[T]) IsEmpty() bool
IsEmpty returns true if no items have been seen.
func (*ReservoirItemsSketch[T]) K ¶
func (s *ReservoirItemsSketch[T]) K() int
K returns the maximum reservoir capacity.
func (*ReservoirItemsSketch[T]) N ¶
func (s *ReservoirItemsSketch[T]) N() int64
N returns the total number of items seen by the sketch.
func (*ReservoirItemsSketch[T]) NumSamples ¶
func (s *ReservoirItemsSketch[T]) NumSamples() int
NumSamples returns the number of items currently in the reservoir.
func (*ReservoirItemsSketch[T]) Reset ¶
func (s *ReservoirItemsSketch[T]) Reset()
Reset clears the sketch while preserving capacity k.
func (*ReservoirItemsSketch[T]) Samples ¶
func (s *ReservoirItemsSketch[T]) Samples() []T
Samples returns a copy of the items in the reservoir.
func (*ReservoirItemsSketch[T]) String ¶
func (s *ReservoirItemsSketch[T]) String() string
String returns human-readable summary of the sketch, without items.
func (*ReservoirItemsSketch[T]) ToSlice ¶
func (s *ReservoirItemsSketch[T]) ToSlice(serde ItemsSerDe[T]) ([]byte, error)
ToSlice serializes the sketch to a byte slice.
func (*ReservoirItemsSketch[T]) Update ¶
func (s *ReservoirItemsSketch[T]) Update(item T)
Update adds an item to the sketch using reservoir sampling algorithm.
type ReservoirItemsSketchOptionFunc ¶
type ReservoirItemsSketchOptionFunc func(*reservoirItemsSketchOptions)
ReservoirItemsSketchOptionFunc defines a functional option for configuring reservoirItemsSketchOptions.
func WithReservoirItemsSketchResizeFactor ¶
func WithReservoirItemsSketchResizeFactor(rf ResizeFactor) ReservoirItemsSketchOptionFunc
WithReservoirItemsSketchResizeFactor sets the resize factor for the internal array.
type ReservoirItemsUnion ¶
type ReservoirItemsUnion[T any] struct { // contains filtered or unexported fields }
ReservoirItemsUnion enables merging of multiple ReservoirItemsSketch instances. This is useful for distributed sampling where each node maintains a local sketch, and the results are merged to get a global sample.
The union maintains statistical correctness by: - Dynamically choosing merge direction (lighter sketch merges into heavier) - Using weighted sampling with correct probability formula - Preserving the smaller K when merging sketches in sampling mode
func NewReservoirItemsUnion ¶
func NewReservoirItemsUnion[T any](maxK int) (*ReservoirItemsUnion[T], error)
NewReservoirItemsUnion creates a new union with the specified maximum k.
func NewReservoirItemsUnionFromSlice ¶
func NewReservoirItemsUnionFromSlice[T any](data []byte, serde ItemsSerDe[T]) (*ReservoirItemsUnion[T], error)
NewReservoirItemsUnionFromSlice deserializes a union from a byte slice.
func (*ReservoirItemsUnion[T]) MaxK ¶
func (u *ReservoirItemsUnion[T]) MaxK() int
MaxK returns the maximum k for this union.
func (*ReservoirItemsUnion[T]) Reset ¶
func (u *ReservoirItemsUnion[T]) Reset()
Reset clears the union.
func (*ReservoirItemsUnion[T]) Result ¶
func (u *ReservoirItemsUnion[T]) Result() (*ReservoirItemsSketch[T], error)
Result returns a copy of the internal sketch.
func (*ReservoirItemsUnion[T]) String ¶
func (u *ReservoirItemsUnion[T]) String() string
String returns a human-readable summary of the union.
func (*ReservoirItemsUnion[T]) ToSlice ¶
func (u *ReservoirItemsUnion[T]) ToSlice(serde ItemsSerDe[T]) ([]byte, error)
ToSlice serializes the union to a byte slice.
func (*ReservoirItemsUnion[T]) Update ¶
func (u *ReservoirItemsUnion[T]) Update(item T)
Update adds a single item to the union.
func (*ReservoirItemsUnion[T]) UpdateFromRaw ¶
func (u *ReservoirItemsUnion[T]) UpdateFromRaw(n int64, k int, items []T) error
UpdateFromRaw creates a sketch from raw components and merges it. Useful in distributed environments. Items slice is used directly, not copied.
func (*ReservoirItemsUnion[T]) UpdateSketch ¶
func (u *ReservoirItemsUnion[T]) UpdateSketch(sketch *ReservoirItemsSketch[T]) error
UpdateSketch merges another sketch into the union. This implements Java's update(ReservoirItemsSketch) with twoWayMergeInternal logic.
type ResizeFactor ¶
type ResizeFactor int
ResizeFactor controls how the internal array grows. Note: Go's slice append has automatic resizing, so this is kept for API compatibility with the Java version. Can be removed if not needed. TODO: In Java, this is abstracted into a common package. Consider if this should be moved to a common package in the future.
const ( ResizeX1 ResizeFactor = 1 ResizeX2 ResizeFactor = 2 ResizeX4 ResizeFactor = 4 ResizeX8 ResizeFactor = 8 )
type SampleSubsetSummary ¶
type SampleSubsetSummary struct {
LowerBound float64
Estimate float64
UpperBound float64
TotalSketchWeight float64
}
SampleSubsetSummary is a simple object that captures the results of a subset sum query on a sampling sketch.
type StringSerDe ¶
type StringSerDe struct{}
StringSerDe provides serialization for string (variable length: 4-byte length prefix + content).
func (StringSerDe) DeserializeFromBytes ¶
func (s StringSerDe) DeserializeFromBytes(data []byte, numItems int) ([]string, error)
func (StringSerDe) SerializeToBytes ¶
func (s StringSerDe) SerializeToBytes(items []string) ([]byte, error)
func (StringSerDe) SizeOfItem ¶
func (s StringSerDe) SizeOfItem() int
type VarOptItemsSketch ¶
type VarOptItemsSketch[T any] struct { // contains filtered or unexported fields }
VarOptItemsSketch implements variance-optimal weighted sampling.
This sketch samples weighted items from a stream with optimal variance for subset sum estimation. The algorithm maintains two regions:
- H region (heavy): Items with weight >= tau (stored in a min-heap)
- R region (reservoir): Items with weight < tau (sampled proportionally)
The array layout is: [H region: 0..h) [gap/M region: h..h+m) [R region: h+m..h+m+r) In steady state, m=0 and h+r=k. The gap slot at position h is used during updates.
When all weights are equal (e.g., 1.0), this reduces to standard reservoir sampling.
Reference: Cohen et al., "Efficient Stream Sampling for Variance-Optimal Estimation of Subset Sums", SIAM J. Comput. 40(5): 1402-1431, 2011.
func NewVarOptItemsSketch ¶
func NewVarOptItemsSketch[T any](k int, opts ...VarOptOption) (*VarOptItemsSketch[T], error)
func (*VarOptItemsSketch[T]) All ¶
func (s *VarOptItemsSketch[T]) All() iter.Seq[Sample[T]]
All returns an iterator over all samples with their adjusted weights. For items in H region, the weight is the original weight. For items in R region, the weight is tau (totalWeightR / r).
func (*VarOptItemsSketch[T]) H ¶
func (s *VarOptItemsSketch[T]) H() int
H returns the number of items in the H (heavy) region.
func (*VarOptItemsSketch[T]) IsEmpty ¶
func (s *VarOptItemsSketch[T]) IsEmpty() bool
IsEmpty returns true if the sketch has not processed any items.
func (*VarOptItemsSketch[T]) K ¶
func (s *VarOptItemsSketch[T]) K() int
K returns the configured maximum sample size.
func (*VarOptItemsSketch[T]) N ¶
func (s *VarOptItemsSketch[T]) N() int64
N returns the total number of items processed by the sketch.
func (*VarOptItemsSketch[T]) NumSamples ¶
func (s *VarOptItemsSketch[T]) NumSamples() int
NumSamples returns the number of items currently retained in the sketch.
func (*VarOptItemsSketch[T]) R ¶
func (s *VarOptItemsSketch[T]) R() int
R returns the number of items in the R (reservoir) region.
func (*VarOptItemsSketch[T]) Reset ¶
func (s *VarOptItemsSketch[T]) Reset()
Reset clears the sketch to its initial empty state while preserving k.
func (*VarOptItemsSketch[T]) TotalWeightR ¶
func (s *VarOptItemsSketch[T]) TotalWeightR() float64
TotalWeightR returns the total weight of items in the R region.
func (*VarOptItemsSketch[T]) Update ¶
func (s *VarOptItemsSketch[T]) Update(item T, weight float64) error
Update adds an item with the given weight to the sketch. Weight must be positive and finite.
type VarOptOption ¶
type VarOptOption func(*varOptConfig)
func WithResizeFactor ¶
func WithResizeFactor(rf ResizeFactor) VarOptOption