xsync

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 15, 2025 License: Apache-2.0, Apache-2.0 Imports: 10 Imported by: 3

README

GoDoc reference GoReport codecov

xsync

Concurrent data structures for Go. Aims to provide more scalable alternatives for some of the data structures from the standard sync package, but not only.

Covered with tests following the approach described here.

Benchmarks

Benchmark results may be found here. I'd like to thank @felixge who kindly ran the benchmarks on a beefy multicore machine.

Also, a non-scientific, unfair benchmark comparing Java's j.u.c.ConcurrentHashMap and xsync.Map is available here.

Usage

The latest xsync major version is v4, so /v4 suffix should be used when importing the library:

import (
	"github.com/fufuok/cache/xsync"
)

Minimal required Golang version is 1.24.

Note for pre-v4 users: the main change between v3 and v4 is removal of non-generic data structures and some improvements in Map API. The old *Of types are kept as type aliases for the renamed data structures to simplify the migration, e.g. MapOf is an alias for Map. While the API has some breaking changes, the migration should be trivial.

Counter

A Counter is a striped int64 counter inspired by the j.u.c.a.LongAdder class from the Java standard library.

c := xsync.NewCounter()
// increment and decrement the counter
c.Inc()
c.Dec()
// read the current value
v := c.Value()

Works better in comparison with a single atomically updated int64 counter in high contention scenarios.

Map

A Map is like a concurrent hash table-based map. It follows the interface of sync.Map with a number of valuable extensions like Compute or Size.

m := xsync.NewMap[string, string]()
m.Store("foo", "bar")
v, ok := m.Load("foo")
s := m.Size()

Map uses a modified version of Cache-Line Hash Table (CLHT) data structure: https://github.com/LPD-EPFL/CLHT

CLHT is built around the idea of organizing the hash table in cache-line-sized buckets, so that on all modern CPUs update operations complete with minimal cache-line transfer. Also, Get operations are obstruction-free and involve no writes to shared memory, hence no mutexes or any other sort of locks. Due to this design, in all considered scenarios Map outperforms sync.Map. Map also uses cooperative parallel rehashing: this means that the goroutines executing write operations may participate in a concurrent rehashing instead of waiting for it to finish.

Apart from CLHT, Map borrows ideas from Java's j.u.c.ConcurrentHashMap (immutable K/V pair structs instead of atomic snapshots) and C++'s absl::flat_hash_map (meta memory and SWAR-based lookups).

Besides the Range method available for map iteration, there is also ToPlainMap utility function to convert a Map to a built-in Go's map:

m := xsync.NewMap[int, int]()
m.Store(42, 42)
pm := xsync.ToPlainMap(m)

Map uses the built-in Golang's hash function which has DDOS protection. It uses maphash.Comparable as the default hash function. This means that each map instance gets its own seed number and the hash function uses that seed for hash code calculation.

By default, Map spawns additional goroutines to speed up resizing the hash table. This can be disabled by creating a Map with the WithSerialResize setting.

m := xsync.NewMap[int, int](xsync.WithSerialResize())
// resize will take place on the current goroutine only
for i := 0; i < 10000; i++ {
	m.Store(i, i)
}
UMPSCQueue

A UMPSCQueue is an unbounded multi-producer single-consumer concurrent queue. This means that multiple goroutines can publish items to the queue while not more than a single goroutine must be consuming those items. Unlike bounded queues, this one puts no limit to the queue capacity.

q := xsync.NewUMPSCQueue[string]()
// producer inserts an item into the queue; doesn't block
// safe to invoke from multiple goroutines
inserted := q.Enqueue("bar")
// consumer obtains an item from the queue
// must be called from a single goroutine
item := q.Dequeue() // string

UMPSCQueue is meant to serve as a replacement for a channel. However, crucially, it has infinite capacity. This is a very bad idea in many cases as it means that it never exhibits backpressure. In other words, if nothing is consuming elements from the queue, it will eventually consume all available memory and crash the process. However, there are also cases where this is desired behavior as it means the queue will dynamically allocate more memory to store temporary bursts, allowing producers to never block while the consumer catches up.

The backing data structure is represented as a singly linked list of large segments. Each segment is a slice of T along with a corresponding sync.WaitGroup for each index. Producers use an atomic counter to determine the unique index in the segment where they will write their value, and mark the corresponding wait group as done after having written the value. The consumer simply keeps track of the index it wants to read and waits for the corresponding wait group to complete. Neither operation acquires a lock and therefore performs quite well under highly contentious loads.

Note however that because no locks are acquired, it is unsafe for multiple goroutines to consume from the queue. Consumers must explicitly synchronize between themselves. This allows setups with a single consumer to never acquire a lock, significantly speeding up consumption.

SPSCQueue

A SPSCQueue is a bounded single-producer single-consumer concurrent queue. This means that not more than a single goroutine must be publishing items to the queue while not more than a single goroutine must be consuming those items.

q := xsync.NewSPSCQueue[string](1024)
// producer inserts an item into the queue
// optimistic insertion attempt; doesn't block
inserted := q.TryEnqueue("bar")
// consumer obtains an item from the queue
// optimistic obtain attempt; doesn't block
item, ok := q.TryDequeue() // string

The queue is based on the data structure from this article. The idea is to reduce the CPU cache coherency traffic by keeping cached copies of read and write indexes used by producer and consumer respectively.

Make sure to implement proper back-off strategy to handle failed optimistic operation attempts. The most basic back-off would be calling runtime.Gosched().

MPMCQueue

A MPMCQueue is a bounded multi-producer multi-consumer concurrent queue.

q := xsync.NewMPMCQueue[string](1024)
// producer optimistically inserts an item into the queue
// optimistic insertion attempt; doesn't block
inserted := q.TryEnqueue("bar")
// consumer obtains an item from the queue
// optimistic obtain attempt; doesn't block
item, ok := q.TryDequeue() // string

The queue is based on the algorithm from the MPMCQueue C++ library which in its turn references D.Vyukov's MPMC queue. According to the following classification, the queue is array-based, fails on overflow, provides causal FIFO, has blocking producers and consumers.

The idea of the algorithm is to allow parallelism for concurrent producers and consumers by introducing the notion of tickets, i.e. values of two counters, one per producers/consumers. An atomic increment of one of those counters is the only noticeable contention point in queue operations. The rest of the operation avoids contention on writes thanks to the turn-based read/write access for each of the queue items.

In essence, MPMCQueue is a specialized queue for scenarios where there are multiple concurrent producers and consumers of a single queue running on a large multicore machine.

To get the optimal performance, you may want to set the queue size to be large enough, say, an order of magnitude greater than the number of producers/consumers, to allow producers and consumers to progress with their queue operations in parallel most of the time.

Other than that, make sure to implement proper back-off strategy to handle failed optimistic operation attempts. The most basic back-off would be calling runtime.Gosched().

RBMutex

A RBMutex is a reader-biased reader/writer mutual exclusion lock. The lock can be held by many readers or a single writer.

mu := xsync.NewRBMutex()
// reader lock calls return a token
t := mu.RLock()
// the token must be later used to unlock the mutex
mu.RUnlock(t)
// writer locks are the same as in sync.RWMutex
mu.Lock()
mu.Unlock()

RBMutex is based on a modified version of BRAVO (Biased Locking for Reader-Writer Locks) algorithm: https://arxiv.org/pdf/1810.01553.pdf

The idea of the algorithm is to build on top of an existing reader-writer mutex and introduce a fast path for readers. On the fast path, reader lock attempts are sharded over an internal array based on the reader identity (a token in the case of Golang). This means that readers do not contend over a single atomic counter like it's done in, say, sync.RWMutex allowing for better scalability in terms of cores.

Hence, by the design RBMutex is a specialized mutex for scenarios, such as caches, where the vast majority of locks are acquired by readers and write lock acquire attempts are infrequent. In such scenarios, RBMutex should perform better than the sync.RWMutex on large multicore machines.

RBMutex extends sync.RWMutex internally and uses it as the "reader bias disabled" fallback, so the same semantics apply. The only noticeable difference is in the reader tokens returned from the RLock/RUnlock methods.

Apart from blocking methods, RBMutex also has methods for optimistic locking:

mu := xsync.NewRBMutex()
if locked, t := mu.TryRLock(); locked {
	// critical reader section...
	mu.RUnlock(t)
}
if mu.TryLock() {
	// critical writer section...
	mu.Unlock()
}

License

Licensed under Apache v2.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ToPlainMap

func ToPlainMap[K comparable, V any](m *Map[K, V]) map[K]V

ToPlainMap returns a native map with a copy of xsync Map's contents. The copied xsync Map should not be modified while this call is made. If the copied Map is modified, the copying behavior is the same as in the Range method.

func WithGrowOnly

func WithGrowOnly() func(*MapConfig)

WithGrowOnly configures new Map instance to be grow-only. This means that the underlying hash table grows in capacity when new keys are added, but does not shrink when keys are deleted. The only exception to this rule is the Clear method which shrinks the hash table back to the initial capacity.

func WithPresize

func WithPresize(sizeHint int) func(*MapConfig)

WithPresize configures new Map instance with capacity enough to hold sizeHint entries. The capacity is treated as the minimal capacity meaning that the underlying hash table will never shrink to a smaller capacity. If sizeHint is zero or negative, the value is ignored.

func WithSerialResize deprecated

func WithSerialResize() func(*MapConfig)

Deprecated: map resizing now happens cooperatively, without starting any additional goroutines.

Types

type ComputeOp

type ComputeOp int
const (
	// CancelOp signals to Compute to not do anything as a result
	// of executing the lambda. If the entry was not present in
	// the map, nothing happens, and if it was present, the
	// returned value is ignored.
	CancelOp ComputeOp = iota
	// UpdateOp signals to Compute to update the entry to the
	// value returned by the lambda, creating it if necessary.
	UpdateOp
	// DeleteOp signals to Compute to always delete the entry
	// from the map.
	DeleteOp
)

type Counter

type Counter struct {
	// contains filtered or unexported fields
}

A Counter is a striped int64 counter.

Should be preferred over a single atomically updated int64 counter in high contention scenarios.

A Counter must not be copied after first use.

func NewCounter

func NewCounter() *Counter

NewCounter creates a new Counter instance.

func (*Counter) Add

func (c *Counter) Add(delta int64)

Add adds the delta to the counter.

func (*Counter) Dec

func (c *Counter) Dec()

Dec decrements the counter by 1.

func (*Counter) Inc

func (c *Counter) Inc()

Inc increments the counter by 1.

func (*Counter) Reset

func (c *Counter) Reset()

Reset resets the counter to zero. This method should only be used when it is known that there are no concurrent modifications of the counter.

func (*Counter) Value

func (c *Counter) Value() int64

Value returns the current counter value. The returned value may not include all of the latest operations in presence of concurrent modifications of the counter.

type MPMCQueue

type MPMCQueue[I any] struct {
	// contains filtered or unexported fields
}

A MPMCQueue is a bounded multi-producer multi-consumer concurrent queue.

MPMCQueue instances must be created with NewMPMCQueue function. A MPMCQueue must not be copied after first use.

Based on the data structure from the following C++ library: https://github.com/rigtorp/MPMCQueue

func NewMPMCQueue

func NewMPMCQueue[I any](capacity int) *MPMCQueue[I]

NewMPMCQueue creates a new MPMCQueue instance with the given capacity.

func NewMPMCQueueOf deprecated

func NewMPMCQueueOf[I any](capacity int) *MPMCQueue[I]

Deprecated: use NewMPMCQueue.

func (*MPMCQueue[I]) TryDequeue

func (q *MPMCQueue[I]) TryDequeue() (item I, ok bool)

TryDequeue retrieves and removes the item from the head of the queue. Does not block and returns immediately. The ok result indicates that the queue isn't empty and an item was retrieved.

func (*MPMCQueue[I]) TryEnqueue

func (q *MPMCQueue[I]) TryEnqueue(item I) bool

TryEnqueue inserts the given item into the queue. Does not block and returns immediately. The result indicates that the queue isn't full and the item was inserted.

type MPMCQueueOf deprecated

type MPMCQueueOf[I any] = MPMCQueue[I]

Deprecated: use MPMCQueue.

type Map

type Map[K comparable, V any] struct {
	// contains filtered or unexported fields
}

Map is like a Go map[K]V but is safe for concurrent use by multiple goroutines without additional locking or coordination. It follows the interface of sync.Map with a number of valuable extensions like Compute or Size.

A Map must not be copied after first use.

Map uses a modified version of Cache-Line Hash Table (CLHT) data structure: https://github.com/LPD-EPFL/CLHT

CLHT is built around idea to organize the hash table in cache-line-sized buckets, so that on all modern CPUs update operations complete with at most one cache-line transfer. Also, Get operations involve no write to memory, as well as no mutexes or any other sort of locks. Due to this design, in all considered scenarios Map outperforms sync.Map.

Map also borrows ideas from Java's j.u.c.ConcurrentHashMap (immutable K/V pair structs instead of atomic snapshots) and C++'s absl::flat_hash_map (meta memory and SWAR-based lookups).

func NewMap

func NewMap[K comparable, V any](options ...func(*MapConfig)) *Map[K, V]

NewMap creates a new Map instance configured with the given options.

func NewMapOf deprecated

func NewMapOf[K comparable, V any](options ...func(*MapConfig)) *Map[K, V]

Deprecated: use NewMap.

func (*Map[K, V]) Clear

func (m *Map[K, V]) Clear()

Clear deletes all keys and values currently stored in the map.

func (*Map[K, V]) Compute

func (m *Map[K, V]) Compute(
	key K,
	valueFn func(oldValue V, loaded bool) (newValue V, op ComputeOp),
) (actual V, ok bool)

Compute either sets the computed new value for the key, deletes the value for the key, or does nothing, based on the returned ComputeOp. When the op returned by valueFn is UpdateOp, the value is updated to the new value. If it is DeleteOp, the entry is removed from the map altogether. And finally, if the op is CancelOp then the entry is left as-is. In other words, if it did not already exist, it is not created, and if it did exist, it is not updated. This is useful to synchronously execute some operation on the value without incurring the cost of updating the map every time. The ok result indicates whether the entry is present in the map after the compute operation. The actual result contains the value of the map if a corresponding entry is present, or the zero value otherwise. See the example for a few use cases.

This call locks a hash table bucket while the compute function is executed. It means that modifications on other entries in the bucket will be blocked until the valueFn executes. Consider this when the function includes long-running operations.

func (*Map[K, V]) Delete

func (m *Map[K, V]) Delete(key K)

Delete deletes the value for a key.

func (*Map[K, V]) Load

func (m *Map[K, V]) Load(key K) (value V, ok bool)

Load returns the value stored in the map for a key, or zero value of type V if no value is present. The ok result indicates whether value was found in the map.

func (*Map[K, V]) LoadAndDelete

func (m *Map[K, V]) LoadAndDelete(key K) (value V, loaded bool)

LoadAndDelete deletes the value for a key, returning the previous value if any. The loaded result reports whether the key was present.

func (*Map[K, V]) LoadAndStore

func (m *Map[K, V]) LoadAndStore(key K, value V) (actual V, loaded bool)

LoadAndStore returns the existing value for the key if present, while setting the new value for the key. It stores the new value and returns the existing one, if present. The loaded result is true if the existing value was loaded, false otherwise.

func (*Map[K, V]) LoadOrCompute

func (m *Map[K, V]) LoadOrCompute(
	key K,
	valueFn func() (newValue V, cancel bool),
) (value V, loaded bool)

LoadOrCompute returns the existing value for the key if present. Otherwise, it tries to compute the value using the provided function and, if successful, stores and returns the computed value. The loaded result is true if the value was loaded, or false if computed. If valueFn returns true as the cancel value, the computation is cancelled and the zero value for type V is returned.

This call locks a hash table bucket while the compute function is executed. It means that modifications on other entries in the bucket will be blocked until the valueFn executes. Consider this when the function includes long-running operations.

func (*Map[K, V]) LoadOrStore

func (m *Map[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool)

LoadOrStore returns the existing value for the key if present. Otherwise, it stores and returns the given value. The loaded result is true if the value was loaded, false if stored.

func (*Map[K, V]) Range

func (m *Map[K, V]) Range(f func(key K, value V) bool)

Range calls f sequentially for each key and value present in the map. If f returns false, range stops the iteration.

Range does not necessarily correspond to any consistent snapshot of the Map's contents: no key will be visited more than once, but if the value for any key is stored or deleted concurrently, Range may reflect any mapping for that key from any point during the Range call.

It is safe to modify the map while iterating it, including entry creation, modification and deletion. However, the concurrent modification rule apply, i.e. the changes may be not reflected in the subsequently iterated entries.

func (*Map[K, V]) Size

func (m *Map[K, V]) Size() int

Size returns current size of the map.

func (*Map[K, V]) Stats

func (m *Map[K, V]) Stats() MapStats

Stats returns statistics for the Map. Just like other map methods, this one is thread-safe. Yet it's an O(N) operation, so it should be used only for diagnostics or debugging purposes.

func (*Map[K, V]) Store

func (m *Map[K, V]) Store(key K, value V)

Store sets the value for a key.

type MapConfig

type MapConfig struct {
	// contains filtered or unexported fields
}

MapConfig defines configurable Map options.

type MapOf deprecated

type MapOf[K comparable, V any] = Map[K, V]

Deprecated: use Map

type MapStats

type MapStats struct {
	// RootBuckets is the number of root buckets in the hash table.
	// Each bucket holds a few entries.
	RootBuckets int
	// TotalBuckets is the total number of buckets in the hash table,
	// including root and their chained buckets. Each bucket holds
	// a few entries.
	TotalBuckets int
	// EmptyBuckets is the number of buckets that hold no entries.
	EmptyBuckets int
	// Capacity is the Map capacity, i.e. the total number of
	// entries that all buckets can physically hold. This number
	// does not consider the load factor.
	Capacity int
	// Size is the exact number of entries stored in the map.
	Size int
	// Counter is the number of entries stored in the map according
	// to the internal atomic counter. In case of concurrent map
	// modifications this number may be different from Size.
	Counter int
	// CounterLen is the number of internal atomic counter stripes.
	// This number may grow with the map capacity to improve
	// multithreaded scalability.
	CounterLen int
	// MinEntries is the minimum number of entries per a chain of
	// buckets, i.e. a root bucket and its chained buckets.
	MinEntries int
	// MinEntries is the maximum number of entries per a chain of
	// buckets, i.e. a root bucket and its chained buckets.
	MaxEntries int
	// TotalGrowths is the number of times the hash table grew.
	TotalGrowths int64
	// TotalGrowths is the number of times the hash table shrinked.
	TotalShrinks int64
}

MapStats is Map statistics.

Warning: map statistics are intented to be used for diagnostic purposes, not for production code. This means that breaking changes may be introduced into this struct even between minor releases.

func (*MapStats) ToString

func (s *MapStats) ToString() string

ToString returns string representation of map stats.

type RBMutex

type RBMutex struct {
	// contains filtered or unexported fields
}

A RBMutex is a reader biased reader/writer mutual exclusion lock. The lock can be held by an many readers or a single writer. The zero value for a RBMutex is an unlocked mutex.

A RBMutex must not be copied after first use.

RBMutex is based on a modified version of BRAVO (Biased Locking for Reader-Writer Locks) algorithm: https://arxiv.org/pdf/1810.01553.pdf

RBMutex is a specialized mutex for scenarios, such as caches, where the vast majority of locks are acquired by readers and write lock acquire attempts are infrequent. In such scenarios, RBMutex performs better than sync.RWMutex on large multicore machines.

RBMutex extends sync.RWMutex internally and uses it as the "reader bias disabled" fallback, so the same semantics apply. The only noticeable difference is in reader tokens returned from the RLock/RUnlock methods.

func NewRBMutex

func NewRBMutex() *RBMutex

NewRBMutex creates a new RBMutex instance.

func (*RBMutex) Lock

func (mu *RBMutex) Lock()

Lock locks m for writing. If the lock is already locked for reading or writing, Lock blocks until the lock is available.

func (*RBMutex) RLock

func (mu *RBMutex) RLock() *RToken

RLock locks m for reading and returns a reader token. The token must be used in the later RUnlock call.

Should not be used for recursive read locking; a blocked Lock call excludes new readers from acquiring the lock.

func (*RBMutex) RUnlock

func (mu *RBMutex) RUnlock(t *RToken)

RUnlock undoes a single RLock call. A reader token obtained from the RLock call must be provided. RUnlock does not affect other simultaneous readers. A panic is raised if m is not locked for reading on entry to RUnlock.

func (*RBMutex) TryLock

func (mu *RBMutex) TryLock() bool

TryLock tries to lock m for writing without blocking.

func (*RBMutex) TryRLock

func (mu *RBMutex) TryRLock() (bool, *RToken)

TryRLock tries to lock m for reading without blocking. When TryRLock succeeds, it returns true and a reader token. In case of a failure, a false is returned.

func (*RBMutex) Unlock

func (mu *RBMutex) Unlock()

Unlock unlocks m for writing. A panic is raised if m is not locked for writing on entry to Unlock.

As with RWMutex, a locked RBMutex is not associated with a particular goroutine. One goroutine may RLock (Lock) a RBMutex and then arrange for another goroutine to RUnlock (Unlock) it.

type RToken

type RToken struct {
	// contains filtered or unexported fields
}

RToken is a reader lock token.

type SPSCQueue

type SPSCQueue[I any] struct {
	// contains filtered or unexported fields
}

A SPSCQueue is a bounded single-producer single-consumer concurrent queue. This means that not more than a single goroutine must be publishing items to the queue while not more than a single goroutine must be consuming those items.

SPSCQueue instances must be created with NewSPSCQueue function. A SPSCQueue must not be copied after first use.

Based on the data structure from the following article: https://rigtorp.se/ringbuffer/

func NewSPSCQueue

func NewSPSCQueue[I any](capacity int) *SPSCQueue[I]

NewSPSCQueue creates a new SPSCQueue instance with the given capacity.

func NewSPSCQueueOf deprecated

func NewSPSCQueueOf[I any](capacity int) *SPSCQueue[I]

Deprecated: use NewSPSCQueue.

func (*SPSCQueue[I]) TryDequeue

func (q *SPSCQueue[I]) TryDequeue() (item I, ok bool)

TryDequeue retrieves and removes the item from the head of the queue. Does not block and returns immediately. The ok result indicates that the queue isn't empty and an item was retrieved.

func (*SPSCQueue[I]) TryEnqueue

func (q *SPSCQueue[I]) TryEnqueue(item I) bool

TryEnqueue inserts the given item into the queue. Does not block and returns immediately. The result indicates that the queue isn't full and the item was inserted.

type SPSCQueueOf deprecated

type SPSCQueueOf[I any] = SPSCQueue[I]

Deprecated: use SPSCQueue.

type UMPSCQueue

type UMPSCQueue[T any] struct {
	// contains filtered or unexported fields
}

A UMPSCQueue an unbounded multi-producer single-consumer concurrent queue. It is meant to serve as a replacement for a channel. However, crucially, it has infinite capacity. This is a very bad idea in many cases as it means that it never exhibits backpressure. In other words, if nothing is consuming elements from the queue, it will eventually consume all available memory and crash the process. However, there are also cases where this is desired behavior as it means the queue will dynamically allocate more memory to store temporary bursts, allowing producers to never block while the consumer catches up.

Note however that because no locks are acquired, it is unsafe for multiple goroutines to consume from the queue. Consumers must explicitly synchronize between themselves.

func NewUMPSCQueue

func NewUMPSCQueue[T any]() *UMPSCQueue[T]

NewUMPSCQueue creates a new UMPSCQueue instance.

func (*UMPSCQueue[T]) Dequeue

func (q *UMPSCQueue[T]) Dequeue() T

Dequeue returns the next value in the queue, blocking if it is empty. It is not safe to invoke Dequeue from multiple goroutines.

func (*UMPSCQueue[T]) Enqueue

func (q *UMPSCQueue[T]) Enqueue(value T)

Enqueue writes the given value to the queue. It never blocks and is safe to be called by multiple goroutines concurrently.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL