Documentation
¶
Overview ¶
Package simultaneously provides utilities for running functions concurrently with controlled parallelism. It handles context cancellation, panic recovery, and error aggregation automatically.
Index ¶
- Variables
- func Do(maxConcurrent int, f ...func(ctx context.Context) error) error
- func DoCtx(ctx context.Context, maxConcurrent int, ...) error
- func DoCtxWithExecutor(ctx context.Context, exec Executor, ...) error
- func DoWithExecutor(exec Executor, callback ...func(ctx context.Context) error) error
- func FlatMapGoMap[InKey comparable, InVal any, OutKey comparable, OutVal any](maxConcurrent int, input map[InKey]InVal, ...) (map[OutKey]OutVal, error)
- func FlatMapGoMapCtx[InKey comparable, InVal any, OutKey comparable, OutVal any](ctx context.Context, maxConcurrent int, input map[InKey]InVal, ...) (result map[OutKey]OutVal, err error)
- func FlatMapGoMapCtxWithExecutor[InKey comparable, InVal any, OutKey comparable, OutVal any](ctx context.Context, exec Executor, input map[InKey]InVal, ...) (map[OutKey]OutVal, error)
- func FlatMapGoMapWithExecutor[InKey comparable, InVal any, OutKey comparable, OutVal any](exec Executor, input map[InKey]InVal, ...) (map[OutKey]OutVal, error)
- func FlatMapMap[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](maxConcurrent int, input maps.Map[InKey, InVal], ...) (maps.Map[OutKey, OutVal], error)
- func FlatMapMapCtx[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](ctx context.Context, maxConcurrent int, input maps.Map[InKey, InVal], ...) (result maps.Map[OutKey, OutVal], err error)
- func FlatMapMapCtxWithExecutor[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](ctx context.Context, exec Executor, input maps.Map[InKey, InVal], ...) (maps.Map[OutKey, OutVal], error)
- func FlatMapMapWithExecutor[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](exec Executor, input maps.Map[InKey, InVal], ...) (maps.Map[OutKey, OutVal], error)
- func FlatMapOrderedMap[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](maxConcurrent int, input maps.OrderedMap[InKey, InVal], ...) (maps.OrderedMap[OutKey, OutVal], error)
- func FlatMapOrderedMapCtx[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](ctx context.Context, maxConcurrent int, input maps.OrderedMap[InKey, InVal], ...) (result maps.OrderedMap[OutKey, OutVal], err error)
- func FlatMapOrderedMapCtxWithExecutor[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](ctx context.Context, exec Executor, input maps.OrderedMap[InKey, InVal], ...) (maps.OrderedMap[OutKey, OutVal], error)
- func FlatMapOrderedMapWithExecutor[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](exec Executor, input maps.OrderedMap[InKey, InVal], ...) (maps.OrderedMap[OutKey, OutVal], error)
- func FlatMapOrderedSet[InElem Collectable[InElem], OutElem Collectable[OutElem]](maxConcurrent int, input set.OrderedSet[InElem], ...) (set.OrderedSet[OutElem], error)
- func FlatMapOrderedSetCtx[InElem Collectable[InElem], OutElem Collectable[OutElem]](ctx context.Context, maxConcurrent int, input set.OrderedSet[InElem], ...) (result set.OrderedSet[OutElem], err error)
- func FlatMapOrderedSetCtxWithExecutor[InElem Collectable[InElem], OutElem Collectable[OutElem]](ctx context.Context, exec Executor, input set.OrderedSet[InElem], ...) (set.OrderedSet[OutElem], error)
- func FlatMapOrderedSetWithExecutor[InElem Collectable[InElem], OutElem Collectable[OutElem]](exec Executor, input set.OrderedSet[InElem], ...) (set.OrderedSet[OutElem], error)
- func FlatMapSet[InElem Collectable[InElem], OutElem Collectable[OutElem]](maxConcurrent int, input set.Set[InElem], ...) (set.Set[OutElem], error)
- func FlatMapSetCtx[InElem Collectable[InElem], OutElem Collectable[OutElem]](ctx context.Context, maxConcurrent int, input set.Set[InElem], ...) (result set.Set[OutElem], err error)
- func FlatMapSetCtxWithExecutor[InElem Collectable[InElem], OutElem Collectable[OutElem]](ctx context.Context, exec Executor, input set.Set[InElem], ...) (set.Set[OutElem], error)
- func FlatMapSetWithExecutor[InElem Collectable[InElem], OutElem Collectable[OutElem]](exec Executor, input set.Set[InElem], ...) (set.Set[OutElem], error)
- func FlatMapSlice[Input, Output any](maxConcurrent int, values []Input, ...) ([]Output, error)
- func FlatMapSliceCtx[Input, Output any](ctx context.Context, maxConcurrent int, values []Input, ...) (result []Output, err error)
- func FlatMapSliceCtxWithExecutor[Input, Output any](ctx context.Context, exec Executor, values []Input, ...) ([]Output, error)
- func FlatMapSliceWithExecutor[Input, Output any](exec Executor, values []Input, ...) ([]Output, error)
- func MapGoMap[InKey comparable, InVal any, OutKey comparable, OutVal any](maxConcurrent int, input map[InKey]InVal, ...) (map[OutKey]OutVal, error)
- func MapGoMapCtx[InKey comparable, InVal any, OutKey comparable, OutVal any](ctx context.Context, maxConcurrent int, input map[InKey]InVal, ...) (result map[OutKey]OutVal, err error)
- func MapGoMapCtxWithExecutor[InKey comparable, InVal any, OutKey comparable, OutVal any](ctx context.Context, exec Executor, input map[InKey]InVal, ...) (map[OutKey]OutVal, error)
- func MapGoMapWithExecutor[InKey comparable, InVal any, OutKey comparable, OutVal any](exec Executor, input map[InKey]InVal, ...) (map[OutKey]OutVal, error)
- func MapMap[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](maxConcurrent int, input maps.Map[InKey, InVal], ...) (maps.Map[OutKey, OutVal], error)
- func MapMapCtx[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](ctx context.Context, maxConcurrent int, input maps.Map[InKey, InVal], ...) (result maps.Map[OutKey, OutVal], err error)
- func MapMapCtxWithExecutor[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](ctx context.Context, exec Executor, input maps.Map[InKey, InVal], ...) (maps.Map[OutKey, OutVal], error)
- func MapMapWithExecutor[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](exec Executor, input maps.Map[InKey, InVal], ...) (maps.Map[OutKey, OutVal], error)
- func MapOrderedMap[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](maxConcurrent int, input maps.OrderedMap[InKey, InVal], ...) (maps.OrderedMap[OutKey, OutVal], error)
- func MapOrderedMapCtx[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](ctx context.Context, maxConcurrent int, input maps.OrderedMap[InKey, InVal], ...) (result maps.OrderedMap[OutKey, OutVal], err error)
- func MapOrderedMapCtxWithExecutor[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](ctx context.Context, exec Executor, input maps.OrderedMap[InKey, InVal], ...) (maps.OrderedMap[OutKey, OutVal], error)
- func MapOrderedMapWithExecutor[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any](exec Executor, input maps.OrderedMap[InKey, InVal], ...) (maps.OrderedMap[OutKey, OutVal], error)
- func MapOrderedSet[InElem Collectable[InElem], OutElem Collectable[OutElem]](maxConcurrent int, input set.OrderedSet[InElem], ...) (set.OrderedSet[OutElem], error)
- func MapOrderedSetCtx[InElem Collectable[InElem], OutElem Collectable[OutElem]](ctx context.Context, maxConcurrent int, input set.OrderedSet[InElem], ...) (set.OrderedSet[OutElem], error)
- func MapOrderedSetCtxWithExecutor[InElem Collectable[InElem], OutElem Collectable[OutElem]](ctx context.Context, exec Executor, input set.OrderedSet[InElem], ...) (set.OrderedSet[OutElem], error)
- func MapOrderedSetWithExecutor[InElem Collectable[InElem], OutElem Collectable[OutElem]](exec Executor, input set.OrderedSet[InElem], ...) (set.OrderedSet[OutElem], error)
- func MapSet[InElem Collectable[InElem], OutElem Collectable[OutElem]](maxConcurrent int, input set.Set[InElem], ...) (set.Set[OutElem], error)
- func MapSetCtx[InElem Collectable[InElem], OutElem Collectable[OutElem]](ctx context.Context, maxConcurrent int, input set.Set[InElem], ...) (result set.Set[OutElem], err error)
- func MapSetCtxWithExecutor[InElem Collectable[InElem], OutElem Collectable[OutElem]](ctx context.Context, exec Executor, input set.Set[InElem], ...) (set.Set[OutElem], error)
- func MapSetWithExecutor[InElem Collectable[InElem], OutElem Collectable[OutElem]](exec Executor, input set.Set[InElem], ...) (set.Set[OutElem], error)
- func MapSlice[Input, Output any](maxConcurrent int, values []Input, ...) ([]Output, error)
- func MapSliceCtx[Input, Output any](ctx context.Context, maxConcurrent int, values []Input, ...) (result []Output, err error)
- func MapSliceCtxWithExecutor[Input, Output any](ctx context.Context, exec Executor, values []Input, ...) ([]Output, error)
- func MapSliceWithExecutor[Input, Output any](exec Executor, values []Input, ...) ([]Output, error)
- type Collectable
- type Executor
Constants ¶
This section is empty.
Variables ¶
var ErrExecutorClosed = errors.New("executor is closed")
ErrExecutorClosed is returned when attempting to execute functions on a closed executor.
Functions ¶
func Do ¶
Do runs the given functions in parallel and returns the first error encountered. See SimultaneouslyCtx for more information.
func DoCtx ¶
func DoCtx(ctx context.Context, maxConcurrent int, callback ...func(ctx context.Context) error) error
DoCtx runs the given functions in parallel and returns the first error encountered. If no error is encountered, it returns nil. In the event that an error happens, all other functions are canceled (via their context) to hopefully save on CPU cycles. It's up to the individual functions to check their context and return early if they are canceled.
The maxConcurrent parameter is used to limit the number of functions that run at the same time. If maxConcurrent is less than 1, all functions will run at the same time.
Panics that occur within the callback functions are automatically recovered and converted to errors. This prevents a single panicking function from crashing the entire process.
func DoCtxWithExecutor ¶
func DoCtxWithExecutor(ctx context.Context, exec Executor, callback ...func(ctx context.Context) error) error
DoCtxWithExecutor runs the given functions in parallel using a custom executor. This is useful when you want to reuse an executor across multiple batches of work or when you need custom execution behavior. The executor is not closed by this function, allowing it to be reused. All other behavior matches DoCtx including context cancellation, panic recovery, and error handling.
func DoWithExecutor ¶
DoWithExecutor runs the given functions in parallel using a custom executor. See DoCtxWithExecutor for more information.
func FlatMapGoMap ¶
func FlatMapGoMap[InKey comparable, InVal any, OutKey comparable, OutVal any]( maxConcurrent int, input map[InKey]InVal, transform func(ctx context.Context, key InKey, val InVal) (map[OutKey]OutVal, error), ) (map[OutKey]OutVal, error)
FlatMapGoMap transforms a standard Go map by applying a transform function to each key-value pair in parallel, where each transform can produce multiple output entries (flattening).
Unlike MapGoMap which produces one output entry per input entry, FlatMapGoMap allows each transform to return an entire map of results, which are then merged into the final output map. This is useful when one input entry should expand into multiple output entries.
The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).
The transform function is called for each entry in the input map. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.
This is the non-context version that uses context.Background(). For context-aware operations with cancellation support, use FlatMapGoMapCtx.
Returns nil if the input map is nil. If multiple transforms produce the same output key, later entries overwrite earlier ones (the order is non-deterministic due to parallelism).
Example:
// Expand each entry into multiple entries
input := map[string]int{"a": 2, "b": 3}
output, err := FlatMapGoMap(2, input, func(ctx context.Context, k string, v int) (map[string]int, error) {
// Create v entries for each input entry
result := make(map[string]int)
for i := 0; i < v; i++ {
result[fmt.Sprintf("%s%d", k, i)] = i
}
return result, nil
})
// output: map[string]int{"a0": 0, "a1": 1, "b0": 0, "b1": 1, "b2": 2}
func FlatMapGoMapCtx ¶
func FlatMapGoMapCtx[InKey comparable, InVal any, OutKey comparable, OutVal any]( ctx context.Context, maxConcurrent int, input map[InKey]InVal, transform func(ctx context.Context, key InKey, val InVal) (map[OutKey]OutVal, error), ) (result map[OutKey]OutVal, err error)
FlatMapGoMapCtx transforms a standard Go map by applying a transform function to each key-value pair in parallel, where each transform can produce multiple output entries (flattening).
This is the context-aware version of FlatMapGoMap. The provided context can be used to cancel the operation or set deadlines. If the context is canceled, the operation stops immediately and returns the context's error.
Unlike MapGoMapCtx which produces one output entry per input entry, FlatMapGoMapCtx allows each transform to return an entire map of results, which are then merged into the final output map. This is useful when one input entry should expand into multiple output entries.
The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).
The transform function is called for each entry in the input map with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.
Returns nil if the input map is nil. If multiple transforms produce the same output key, later entries overwrite earlier ones (the order is non-deterministic due to parallelism).
Example:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
output, err := FlatMapGoMapCtx(ctx, 2, input, func(ctx context.Context, k string, v int) (map[string]int, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
result := make(map[string]int)
for i := 0; i < v; i++ {
result[fmt.Sprintf("%s%d", k, i)] = i
}
return result, nil
})
func FlatMapGoMapCtxWithExecutor ¶
func FlatMapGoMapCtxWithExecutor[InKey comparable, InVal any, OutKey comparable, OutVal any]( ctx context.Context, exec Executor, input map[InKey]InVal, transform func(ctx context.Context, key InKey, val InVal) (map[OutKey]OutVal, error), ) (map[OutKey]OutVal, error)
FlatMapGoMapCtxWithExecutor transforms a standard Go map by applying a transform function to each key-value pair in parallel, where each transform can produce multiple output entries (flattening), using a custom executor.
This is useful when you want to reuse an executor across multiple batches of work or when you need custom execution behavior. The executor is not closed by this function, allowing it to be reused.
Unlike MapGoMapCtxWithExecutor which produces one output entry per input entry, FlatMapGoMapCtxWithExecutor allows each transform to return an entire map of results, which are then merged into the final output map. This is useful when one input entry should expand into multiple output entries.
The transform function is called for each entry in the input map with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.
Returns nil if the input map is nil. If multiple transforms produce the same output key, later entries overwrite earlier ones (the order is non-deterministic due to parallelism).
Example:
exec := NewDefaultExecutor(2)
defer exec.Close()
output, err := FlatMapGoMapCtxWithExecutor(ctx, exec, input,
func(ctx context.Context, k string, v int) (map[string]int, error) {
result := make(map[string]int)
for i := 0; i < v; i++ {
result[fmt.Sprintf("%s%d", k, i)] = i
}
return result, nil
})
func FlatMapGoMapWithExecutor ¶
func FlatMapGoMapWithExecutor[InKey comparable, InVal any, OutKey comparable, OutVal any]( exec Executor, input map[InKey]InVal, transform func(ctx context.Context, key InKey, val InVal) (map[OutKey]OutVal, error), ) (map[OutKey]OutVal, error)
FlatMapGoMapWithExecutor transforms a standard Go map by applying a transform function to each key-value pair in parallel, where each transform can produce multiple output entries (flattening), using a custom executor. See FlatMapGoMapCtxWithExecutor for more information.
func FlatMapMap ¶
func FlatMapMap[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any]( maxConcurrent int, input maps.Map[InKey, InVal], transform func(ctx context.Context, key InKey, val InVal) (maps.Map[OutKey, OutVal], error), ) (maps.Map[OutKey, OutVal], error)
FlatMapMap transforms an amp-common Map by applying a transform function to each key-value pair in parallel, where each transform can produce multiple output entries (flattening).
Unlike MapMap which produces one output entry per input entry, FlatMapMap allows each transform to return an entire Map of results, which are then merged into the final output map. This is useful when one input entry should expand into multiple output entries.
This function is similar to FlatMapGoMap but works with amp-common Map types instead of standard Go maps. Keys must implement the Collectable interface (hashable and comparable).
The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).
The transform function is called for each entry in the input map. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.
This is the non-context version that uses context.Background(). For context-aware operations with cancellation support, use FlatMapMapCtx.
Returns nil if the input map is nil. The output map uses the same hash function as the input. If multiple transforms produce the same output key, later entries overwrite earlier ones (the order is non-deterministic due to parallelism).
Example:
// Expand each entry into multiple entries
input := maps.NewHashMap[MyKey, int](hashing.Sha256)
output, err := FlatMapMap(2, input, func(ctx context.Context, k MyKey, v int) (maps.Map[MyKey, int], error) {
result := maps.NewHashMap[MyKey, int](hashing.Sha256)
for i := 0; i < v; i++ {
result.Add(MyKey{ID: fmt.Sprintf("%s-%d", k.ID, i)}, i)
}
return result, nil
})
func FlatMapMapCtx ¶
func FlatMapMapCtx[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any]( ctx context.Context, maxConcurrent int, input maps.Map[InKey, InVal], transform func(ctx context.Context, key InKey, val InVal) (maps.Map[OutKey, OutVal], error), ) (result maps.Map[OutKey, OutVal], err error)
FlatMapMapCtx transforms an amp-common Map by applying a transform function to each key-value pair in parallel, where each transform can produce multiple output entries (flattening).
This is the context-aware version of FlatMapMap. The provided context can be used to cancel the operation or set deadlines. If the context is canceled, the operation stops immediately and returns the context's error.
Unlike MapMapCtx which produces one output entry per input entry, FlatMapMapCtx allows each transform to return an entire Map of results, which are then merged into the final output map. This is useful when one input entry should expand into multiple output entries.
This function is similar to FlatMapGoMapCtx but works with amp-common Map types instead of standard Go maps. Keys must implement the Collectable interface (hashable and comparable).
The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).
The transform function is called for each entry in the input map with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.
Returns nil if the input map is nil. The output map uses the same hash function as the input. If multiple transforms produce the same output key, later entries overwrite earlier ones (the order is non-deterministic due to parallelism).
Thread-safety: The output map is built with a mutex to handle concurrent additions from all the flattened results, ensuring thread-safe construction even when transforms execute in parallel.
Example:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
output, err := FlatMapMapCtx(ctx, 2, input, func(ctx context.Context, k MyKey, v int) (maps.Map[MyKey, int], error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
result := maps.NewHashMap[MyKey, int](hashing.Sha256)
for i := 0; i < v; i++ {
result.Add(MyKey{ID: fmt.Sprintf("%s-%d", k.ID, i)}, i)
}
return result, nil
})
func FlatMapMapCtxWithExecutor ¶
func FlatMapMapCtxWithExecutor[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any]( ctx context.Context, exec Executor, input maps.Map[InKey, InVal], transform func(ctx context.Context, key InKey, val InVal) (maps.Map[OutKey, OutVal], error), ) (maps.Map[OutKey, OutVal], error)
FlatMapMapCtxWithExecutor transforms an amp-common Map by applying a transform function to each key-value pair in parallel, where each transform can produce multiple output entries (flattening), using a custom executor.
This is useful when you want to reuse an executor across multiple batches of work or when you need custom execution behavior. The executor is not closed by this function, allowing it to be reused.
Unlike MapMapCtxWithExecutor which produces one output entry per input entry, FlatMapMapCtxWithExecutor allows each transform to return an entire Map of results, which are then merged into the final output map. This is useful when one input entry should expand into multiple output entries.
This function is similar to FlatMapGoMapCtxWithExecutor but works with amp-common Map types instead of standard Go maps. Keys must implement the Collectable interface (hashable and comparable).
The transform function is called for each entry in the input map with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.
Returns nil if the input map is nil. The output map uses the same hash function as the input. If multiple transforms produce the same output key, later entries overwrite earlier ones (the order is non-deterministic due to parallelism).
Thread-safety: The output map is built with a mutex to handle concurrent additions from all the flattened results, ensuring thread-safe construction even when transforms execute in parallel.
Example:
exec := NewDefaultExecutor(2)
defer exec.Close()
output, err := FlatMapMapCtxWithExecutor(ctx, exec, input,
func(ctx context.Context, k MyKey, v int) (maps.Map[MyKey, int], error) {
result := maps.NewHashMap[MyKey, int](hashing.Sha256)
for i := 0; i < v; i++ {
result.Add(MyKey{ID: fmt.Sprintf("%s-%d", k.ID, i)}, i)
}
return result, nil
})
func FlatMapMapWithExecutor ¶
func FlatMapMapWithExecutor[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any]( exec Executor, input maps.Map[InKey, InVal], transform func(ctx context.Context, key InKey, val InVal) (maps.Map[OutKey, OutVal], error), ) (maps.Map[OutKey, OutVal], error)
FlatMapMapWithExecutor transforms an amp-common Map by applying a transform function to each key-value pair in parallel, where each transform can produce multiple output entries (flattening), using a custom executor. See FlatMapMapCtxWithExecutor for more information.
func FlatMapOrderedMap ¶
func FlatMapOrderedMap[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any]( maxConcurrent int, input maps.OrderedMap[InKey, InVal], transform func(ctx context.Context, key InKey, val InVal) (maps.OrderedMap[OutKey, OutVal], error), ) (maps.OrderedMap[OutKey, OutVal], error)
FlatMapOrderedMap transforms an OrderedMap by applying a transform function to each key-value pair in parallel, where each transform can produce multiple output entries (flattening).
Unlike FlatMapMap, this function preserves order. Results from inputs[i] appear before results from inputs[i+1] in the output map's insertion order, even though transforms execute in parallel.
The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).
The transform function is called for each entry in the input map. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.
This is the non-context version that uses context.Background(). For context-aware operations with cancellation support, use FlatMapOrderedMapCtx.
Returns nil if the input map is nil. The output map uses the same hash function as the input. Order preservation: all results from inputs[i] appear before all results from inputs[i+1].
Example:
input := maps.NewOrderedHashMap[maps.Key[string], int](hashing.Sha256)
input.Add(maps.Key[string]{Key: "a"}, 2)
input.Add(maps.Key[string]{Key: "b"}, 3)
output, err := FlatMapOrderedMap(2, input,
func(ctx context.Context, k maps.Key[string], v int) (maps.OrderedMap[maps.Key[string], int], error) {
result := maps.NewOrderedHashMap[maps.Key[string], int](hashing.Sha256)
for i := 0; i < v; i++ {
key := maps.Key[string]{Key: fmt.Sprintf("%s%d", k.Key, i)}
result.Add(key, i)
}
return result, nil
})
// output: a0->0, a1->1, b0->0, b1->1, b2->2 (in this order)
func FlatMapOrderedMapCtx ¶
func FlatMapOrderedMapCtx[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any]( ctx context.Context, maxConcurrent int, input maps.OrderedMap[InKey, InVal], transform func(ctx context.Context, key InKey, val InVal) (maps.OrderedMap[OutKey, OutVal], error), ) (result maps.OrderedMap[OutKey, OutVal], err error)
FlatMapOrderedMapCtx transforms an OrderedMap by applying a transform function to each key-value pair in parallel, where each transform can produce multiple output entries (flattening).
This is the context-aware version of FlatMapOrderedMap. The provided context can be used to cancel the operation or set deadlines. If the context is canceled, the operation stops immediately and returns the context's error.
Unlike FlatMapMapCtx, this function preserves order. Results from inputs[i] appear before results from inputs[i+1] in the output map's insertion order, even though transforms execute in parallel.
The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).
The transform function is called for each entry in the input map with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.
Returns nil if the input map is nil. The output map uses the same hash function as the input. Order preservation: all results from inputs[i] appear before all results from inputs[i+1].
Thread-safety: Results are collected in parallel with a mutex, then flattened and added to the output map in the original insertion order to preserve ordering semantics.
Example:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
output, err := FlatMapOrderedMapCtx(ctx, 2, input,
func(ctx context.Context, k maps.Key[string], v int) (maps.OrderedMap[maps.Key[string], int], error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
result := maps.NewOrderedHashMap[maps.Key[string], int](hashing.Sha256)
for i := 0; i < v; i++ {
result.Add(maps.Key[string]{Key: fmt.Sprintf("%s%d", k.Key, i)}, i)
}
return result, nil
})
func FlatMapOrderedMapCtxWithExecutor ¶
func FlatMapOrderedMapCtxWithExecutor[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any]( ctx context.Context, exec Executor, input maps.OrderedMap[InKey, InVal], transform func(ctx context.Context, key InKey, val InVal) (maps.OrderedMap[OutKey, OutVal], error), ) (maps.OrderedMap[OutKey, OutVal], error)
FlatMapOrderedMapCtxWithExecutor transforms an OrderedMap by applying a transform function to each key-value pair in parallel, where each transform can produce multiple output entries (flattening), using a custom executor.
This is useful when you want to reuse an executor across multiple batches of work or when you need custom execution behavior. The executor is not closed by this function, allowing it to be reused.
Unlike FlatMapMapCtxWithExecutor, this function preserves order. Results from inputs[i] appear before results from inputs[i+1] in the output map's insertion order, even though transforms execute in parallel.
Returns nil if the input map is nil. The output map uses the same hash function as the input. Order preservation: all results from inputs[i] appear before all results from inputs[i+1].
Thread-safety: Results are collected in parallel with a mutex, then flattened and added to the output map in the original insertion order to preserve ordering semantics.
Example:
exec := NewDefaultExecutor(2)
defer exec.Close()
output, err := FlatMapOrderedMapCtxWithExecutor(ctx, exec, input,
func(ctx context.Context, k maps.Key[string], v int) (maps.OrderedMap[maps.Key[string], int], error) {
result := maps.NewOrderedHashMap[maps.Key[string], int](hashing.Sha256)
for i := 0; i < v; i++ {
result.Add(maps.Key[string]{Key: fmt.Sprintf("%s%d", k.Key, i)}, i)
}
return result, nil
})
func FlatMapOrderedMapWithExecutor ¶
func FlatMapOrderedMapWithExecutor[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any]( exec Executor, input maps.OrderedMap[InKey, InVal], transform func(ctx context.Context, key InKey, val InVal) (maps.OrderedMap[OutKey, OutVal], error), ) (maps.OrderedMap[OutKey, OutVal], error)
FlatMapOrderedMapWithExecutor transforms an OrderedMap by applying a transform function to each key-value pair in parallel, where each transform can produce multiple output entries (flattening), using a custom executor. See FlatMapOrderedMapCtxWithExecutor for more information.
func FlatMapOrderedSet ¶
func FlatMapOrderedSet[InElem Collectable[InElem], OutElem Collectable[OutElem]]( maxConcurrent int, input set.OrderedSet[InElem], transform func(ctx context.Context, elem InElem) (set.OrderedSet[OutElem], error), ) (set.OrderedSet[OutElem], error)
FlatMapOrderedSet transforms an OrderedSet by applying a transform function to each element in parallel, where each transform can produce multiple output elements (flattening).
Unlike FlatMapSet, this function preserves order. Results from inputs[i] appear before results from inputs[i+1] in the output set's insertion order, even though transforms execute in parallel.
The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).
The transform function is called for each element in the input set. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.
This is the non-context version that uses context.Background(). For context-aware operations with cancellation support, use FlatMapOrderedSetCtx.
Returns nil if the input set is nil. The output set uses the same hash function as the input. Order preservation: all results from inputs[i] appear before all results from inputs[i+1].
Example:
input := set.NewOrderedSet[hashing.HashableString](hashing.Sha256)
input.Add(hashing.HashableString("ab"))
input.Add(hashing.HashableString("cd"))
output, err := FlatMapOrderedSet(2, input,
func(ctx context.Context, s hashing.HashableString) (set.OrderedSet[hashing.HashableString], error) {
result := set.NewOrderedSet[hashing.HashableString](hashing.Sha256)
for _, ch := range string(s) {
result.Add(hashing.HashableString(string(ch)))
}
return result, nil
})
// output: "a", "b", "c", "d" (in this order)
func FlatMapOrderedSetCtx ¶
func FlatMapOrderedSetCtx[InElem Collectable[InElem], OutElem Collectable[OutElem]]( ctx context.Context, maxConcurrent int, input set.OrderedSet[InElem], transform func(ctx context.Context, elem InElem) (set.OrderedSet[OutElem], error), ) (result set.OrderedSet[OutElem], err error)
FlatMapOrderedSetCtx transforms an OrderedSet by applying a transform function to each element in parallel, where each transform can produce multiple output elements (flattening).
This is the context-aware version of FlatMapOrderedSet. The provided context can be used to cancel the operation or set deadlines. If the context is canceled, the operation stops immediately and returns the context's error.
Unlike FlatMapSetCtx, this function preserves order. Results from inputs[i] appear before results from inputs[i+1] in the output set's insertion order, even though transforms execute in parallel.
The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).
The transform function is called for each element in the input set with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.
Returns nil if the input set is nil. The output set uses the same hash function as the input. Order preservation: all results from inputs[i] appear before all results from inputs[i+1].
Thread-safety: Results are collected in parallel using FlatMapSlice, then flattened and added to the output set in the original insertion order to preserve ordering semantics.
Example:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
output, err := FlatMapOrderedSetCtx(ctx, 2, input,
func(ctx context.Context, s hashing.HashableString) (set.OrderedSet[hashing.HashableString], error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
result := set.NewOrderedSet[hashing.HashableString](hashing.Sha256)
for _, ch := range string(s) {
result.Add(hashing.HashableString(string(ch)))
}
return result, nil
})
func FlatMapOrderedSetCtxWithExecutor ¶
func FlatMapOrderedSetCtxWithExecutor[InElem Collectable[InElem], OutElem Collectable[OutElem]]( ctx context.Context, exec Executor, input set.OrderedSet[InElem], transform func(ctx context.Context, elem InElem) (set.OrderedSet[OutElem], error), ) (set.OrderedSet[OutElem], error)
FlatMapOrderedSetCtxWithExecutor transforms an OrderedSet by applying a transform function to each element in parallel, where each transform can produce multiple output elements (flattening), using a custom executor.
This is useful when you want to reuse an executor across multiple batches of work or when you need custom execution behavior. The executor is not closed by this function, allowing it to be reused.
Unlike FlatMapSetCtxWithExecutor, this function preserves order. Results from inputs[i] appear before results from inputs[i+1] in the output set's insertion order, even though transforms execute in parallel.
Returns nil if the input set is nil. The output set uses the same hash function as the input. Order preservation: all results from inputs[i] appear before all results from inputs[i+1].
Thread-safety: Results are collected in parallel using FlatMapSliceCtxWithExecutor, then flattened and added to the output set in the original insertion order to preserve ordering semantics.
Example:
exec := NewDefaultExecutor(2)
defer exec.Close()
output, err := FlatMapOrderedSetCtxWithExecutor(ctx, exec, input,
func(ctx context.Context, s hashing.HashableString) (set.OrderedSet[hashing.HashableString], error) {
result := set.NewOrderedSet[hashing.HashableString](hashing.Sha256)
for _, ch := range string(s) {
result.Add(hashing.HashableString(string(ch)))
}
return result, nil
})
func FlatMapOrderedSetWithExecutor ¶
func FlatMapOrderedSetWithExecutor[InElem Collectable[InElem], OutElem Collectable[OutElem]]( exec Executor, input set.OrderedSet[InElem], transform func(ctx context.Context, elem InElem) (set.OrderedSet[OutElem], error), ) (set.OrderedSet[OutElem], error)
FlatMapOrderedSetWithExecutor transforms an OrderedSet by applying a transform function to each element in parallel, where each transform can produce multiple output elements (flattening), using a custom executor. See FlatMapOrderedSetCtxWithExecutor for more information.
func FlatMapSet ¶
func FlatMapSet[InElem Collectable[InElem], OutElem Collectable[OutElem]]( maxConcurrent int, input set.Set[InElem], transform func(ctx context.Context, elem InElem) (set.Set[OutElem], error), ) (set.Set[OutElem], error)
FlatMapSet transforms a Set by applying a transform function to each element in parallel, where each transform can produce multiple output elements (flattening).
Unlike MapSet which produces one output element per input element, FlatMapSet allows each transform to return an entire Set of results, which are then merged into the final output set. This is useful when one input element should expand into multiple output elements.
The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).
The transform function is called for each element in the input set. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.
This is the non-context version that uses context.Background(). For context-aware operations with cancellation support, use FlatMapSetCtx.
Returns nil if the input set is nil. The output set uses the same hash function as the input. If multiple transforms produce the same output element, duplicates are automatically handled by the set semantics.
Example:
// Expand each string into its individual characters
input := set.NewSet[hashing.HashableString](hashing.Sha256)
input.Add(hashing.HashableString("hi"))
output, err := FlatMapSet(2, input,
func(ctx context.Context, s hashing.HashableString) (set.Set[hashing.HashableString], error) {
result := set.NewSet[hashing.HashableString](hashing.Sha256)
for _, ch := range string(s) {
result.Add(hashing.HashableString(string(ch)))
}
return result, nil
})
// output contains: "h", "i"
func FlatMapSetCtx ¶
func FlatMapSetCtx[InElem Collectable[InElem], OutElem Collectable[OutElem]]( ctx context.Context, maxConcurrent int, input set.Set[InElem], transform func(ctx context.Context, elem InElem) (set.Set[OutElem], error), ) (result set.Set[OutElem], err error)
FlatMapSetCtx transforms a Set by applying a transform function to each element in parallel, where each transform can produce multiple output elements (flattening).
This is the context-aware version of FlatMapSet. The provided context can be used to cancel the operation or set deadlines. If the context is canceled, the operation stops immediately and returns the context's error.
Unlike MapSetCtx which produces one output element per input element, FlatMapSetCtx allows each transform to return an entire Set of results, which are then merged into the final output set. This is useful when one input element should expand into multiple output elements.
The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).
The transform function is called for each element in the input set with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.
Returns nil if the input set is nil. The output set uses the same hash function as the input. If multiple transforms produce the same output element, duplicates are automatically handled by the set semantics.
Thread-safety: The output set is built with a mutex to handle concurrent additions from all the flattened results, ensuring thread-safe construction even when transforms execute in parallel.
Example:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
output, err := FlatMapSetCtx(ctx, 2, input,
func(ctx context.Context, s hashing.HashableString) (set.Set[hashing.HashableString], error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
result := set.NewSet[hashing.HashableString](hashing.Sha256)
for _, ch := range string(s) {
result.Add(hashing.HashableString(string(ch)))
}
return result, nil
})
func FlatMapSetCtxWithExecutor ¶
func FlatMapSetCtxWithExecutor[InElem Collectable[InElem], OutElem Collectable[OutElem]]( ctx context.Context, exec Executor, input set.Set[InElem], transform func(ctx context.Context, elem InElem) (set.Set[OutElem], error), ) (set.Set[OutElem], error)
FlatMapSetCtxWithExecutor transforms a Set by applying a transform function to each element in parallel, where each transform can produce multiple output elements (flattening), using a custom executor.
This is useful when you want to reuse an executor across multiple batches of work or when you need custom execution behavior. The executor is not closed by this function, allowing it to be reused.
Unlike MapSetCtxWithExecutor which produces one output element per input element, FlatMapSetCtxWithExecutor allows each transform to return an entire Set of results, which are then merged into the final output set.
The transform function is called for each element in the input set with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.
Returns nil if the input set is nil. The output set uses the same hash function as the input. If multiple transforms produce the same output element, duplicates are automatically handled by the set semantics.
Thread-safety: The output set is built with a mutex to handle concurrent additions from all the flattened results, ensuring thread-safe construction even when transforms execute in parallel.
Example:
exec := NewDefaultExecutor(2)
defer exec.Close()
output, err := FlatMapSetCtxWithExecutor(ctx, exec, input,
func(ctx context.Context, s hashing.HashableString) (set.Set[hashing.HashableString], error) {
result := set.NewSet[hashing.HashableString](hashing.Sha256)
for _, ch := range string(s) {
result.Add(hashing.HashableString(string(ch)))
}
return result, nil
})
func FlatMapSetWithExecutor ¶
func FlatMapSetWithExecutor[InElem Collectable[InElem], OutElem Collectable[OutElem]]( exec Executor, input set.Set[InElem], transform func(ctx context.Context, elem InElem) (set.Set[OutElem], error), ) (set.Set[OutElem], error)
FlatMapSetWithExecutor transforms a Set by applying a transform function to each element in parallel, where each transform can produce multiple output elements (flattening), using a custom executor. See FlatMapSetCtxWithExecutor for more information.
func FlatMapSlice ¶
func FlatMapSlice[Input, Output any]( maxConcurrent int, values []Input, transform func(ctx context.Context, value Input) ([]Output, error), ) ([]Output, error)
FlatMapSlice transforms a slice of values in parallel where each input produces zero or more outputs, then flattens the results into a single slice. See FlatMapSliceCtx for more information.
func FlatMapSliceCtx ¶
func FlatMapSliceCtx[Input, Output any]( ctx context.Context, maxConcurrent int, values []Input, transform func(ctx context.Context, value Input) ([]Output, error), ) (result []Output, err error)
FlatMapSliceCtx transforms a slice of values in parallel where each input produces zero or more outputs, then flattens the results into a single slice. This is useful when each input element needs to be expanded into multiple output elements.
The maxConcurrent parameter limits the number of concurrent transformations. If maxConcurrent is less than 1, all transformations will run at the same time.
If any transformation returns an error, all remaining transformations are canceled (via their context) and the first error is returned. The output slice will be nil.
Panics that occur within the transformation function are automatically recovered and converted to errors. Order is preserved: results from values[i] appear before results from values[i+1] in the flattened output.
Example:
words := []string{"hello", "world"}
chars, err := FlatMapSliceCtx(ctx, 2, words, func(ctx context.Context, word string) ([]rune, error) {
return []rune(word), nil
})
// chars = ['h', 'e', 'l', 'l', 'o', 'w', 'o', 'r', 'l', 'd']
func FlatMapSliceCtxWithExecutor ¶
func FlatMapSliceCtxWithExecutor[Input, Output any]( ctx context.Context, exec Executor, values []Input, transform func(ctx context.Context, value Input) ([]Output, error), ) ([]Output, error)
FlatMapSliceCtxWithExecutor transforms a slice of values in parallel where each input produces zero or more outputs, then flattens the results into a single slice, using a custom executor for concurrency control.
This is useful when you want to reuse an executor across multiple batches of work or when you need custom execution behavior. The executor is not closed by this function, allowing it to be reused.
If any transformation returns an error, all remaining transformations are canceled (via their context) and the first error is returned. The output slice will be nil.
Panics that occur within the transformation function are automatically recovered and converted to errors. Order is preserved: results from values[i] appear before results from values[i+1] in the flattened output.
Example:
exec := NewDefaultExecutor(2)
defer exec.Close()
words := []string{"hello", "world"}
chars, err := FlatMapSliceCtxWithExecutor(ctx, exec, words, func(ctx context.Context, word string) ([]rune, error) {
return []rune(word), nil
})
// chars = ['h', 'e', 'l', 'l', 'o', 'w', 'o', 'r', 'l', 'd']
func FlatMapSliceWithExecutor ¶
func FlatMapSliceWithExecutor[Input, Output any]( exec Executor, values []Input, transform func(ctx context.Context, value Input) ([]Output, error), ) ([]Output, error)
FlatMapSliceWithExecutor transforms a slice of values in parallel where each input produces zero or more outputs, then flattens the results into a single slice, using a custom executor. See FlatMapSliceCtxWithExecutor for more information.
func MapGoMap ¶
func MapGoMap[InKey comparable, InVal any, OutKey comparable, OutVal any]( maxConcurrent int, input map[InKey]InVal, transform func(ctx context.Context, key InKey, val InVal) (OutKey, OutVal, error), ) (map[OutKey]OutVal, error)
MapGoMap transforms a standard Go map by applying a transform function to each key-value pair in parallel, producing a new map with potentially different key and value types.
The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).
The transform function is called for each entry in the input map. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.
This is the non-context version that uses context.Background(). For context-aware operations with cancellation support, use MapGoMapCtx.
Returns nil if the input map is nil. The output map may have fewer entries if the transform produces duplicate keys (later entries overwrite earlier ones).
Example:
// Convert map[string]int to map[int]string in parallel
input := map[string]int{"a": 1, "b": 2, "c": 3}
output, err := MapGoMap(2, input, func(ctx context.Context, k string, v int) (int, string, error) {
return v, strings.ToUpper(k), nil
})
// output: map[int]string{1: "A", 2: "B", 3: "C"}
func MapGoMapCtx ¶
func MapGoMapCtx[InKey comparable, InVal any, OutKey comparable, OutVal any]( ctx context.Context, maxConcurrent int, input map[InKey]InVal, transform func(ctx context.Context, key InKey, val InVal) (OutKey, OutVal, error), ) (result map[OutKey]OutVal, err error)
MapGoMapCtx transforms a standard Go map by applying a transform function to each key-value pair in parallel, producing a new map with potentially different key and value types.
This is the context-aware version of MapGoMap. The provided context can be used to cancel the operation or set deadlines. If the context is canceled, the operation stops immediately and returns the context's error.
The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).
The transform function is called for each entry in the input map with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.
Returns nil if the input map is nil. The output map may have fewer entries if the transform produces duplicate keys (later entries overwrite earlier ones).
Example:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
output, err := MapGoMapCtx(ctx, 2, input, func(ctx context.Context, k string, v int) (int, string, error) {
// Check context cancellation
select {
case <-ctx.Done():
return 0, "", ctx.Err()
default:
}
return v, strings.ToUpper(k), nil
})
func MapGoMapCtxWithExecutor ¶
func MapGoMapCtxWithExecutor[InKey comparable, InVal any, OutKey comparable, OutVal any]( ctx context.Context, exec Executor, input map[InKey]InVal, transform func(ctx context.Context, key InKey, val InVal) (OutKey, OutVal, error), ) (map[OutKey]OutVal, error)
MapGoMapCtxWithExecutor transforms a standard Go map by applying a transform function to each key-value pair in parallel, producing a new map with potentially different key and value types, using a custom executor.
This is useful when you want to reuse an executor across multiple batches of work or when you need custom execution behavior. The executor is not closed by this function, allowing it to be reused.
The transform function is called for each entry in the input map with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.
Returns nil if the input map is nil. The output map may have fewer entries if the transform produces duplicate keys (later entries overwrite earlier ones).
Example:
exec := NewDefaultExecutor(2)
defer exec.Close()
output, err := MapGoMapCtxWithExecutor(ctx, exec, input,
func(ctx context.Context, k string, v int) (int, string, error) {
return v, strings.ToUpper(k), nil
})
func MapGoMapWithExecutor ¶
func MapGoMapWithExecutor[InKey comparable, InVal any, OutKey comparable, OutVal any]( exec Executor, input map[InKey]InVal, transform func(ctx context.Context, key InKey, val InVal) (OutKey, OutVal, error), ) (map[OutKey]OutVal, error)
MapGoMapWithExecutor transforms a standard Go map by applying a transform function to each key-value pair in parallel, producing a new map with potentially different key and value types, using a custom executor. See MapGoMapCtxWithExecutor for more information.
func MapMap ¶
func MapMap[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any]( maxConcurrent int, input maps.Map[InKey, InVal], transform func(ctx context.Context, key InKey, val InVal) (OutKey, OutVal, error), ) (maps.Map[OutKey, OutVal], error)
MapMap transforms an amp-common Map by applying a transform function to each key-value pair in parallel, producing a new Map with potentially different key and value types.
This function is similar to MapGoMap but works with amp-common Map types instead of standard Go maps. Keys must implement the Collectable interface (hashable and comparable).
The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).
The transform function is called for each entry in the input map. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.
This is the non-context version that uses context.Background(). For context-aware operations with cancellation support, use MapMapCtx.
Returns nil if the input map is nil. The output map uses the same hash function as the input. The output map may have fewer entries if the transform produces duplicate keys.
Example:
// Transform map entries while preserving map type
input := maps.NewHashMap[MyKey, int](hashing.Sha256)
output, err := MapMap(2, input, func(ctx context.Context, k MyKey, v int) (MyKey, string, error) {
return k, strconv.Itoa(v), nil
})
func MapMapCtx ¶
func MapMapCtx[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any]( ctx context.Context, maxConcurrent int, input maps.Map[InKey, InVal], transform func(ctx context.Context, key InKey, val InVal) (OutKey, OutVal, error), ) (result maps.Map[OutKey, OutVal], err error)
MapMapCtx transforms an amp-common Map by applying a transform function to each key-value pair in parallel, producing a new Map with potentially different key and value types.
This is the context-aware version of MapMap. The provided context can be used to cancel the operation or set deadlines. If the context is canceled, the operation stops immediately and returns the context's error.
This function is similar to MapGoMapCtx but works with amp-common Map types instead of standard Go maps. Keys must implement the Collectable interface (hashable and comparable).
The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).
The transform function is called for each entry in the input map with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.
Returns nil if the input map is nil. The output map uses the same hash function as the input. The output map may have fewer entries if the transform produces duplicate keys.
Thread-safety: The output map is built with a mutex to handle concurrent additions, ensuring thread-safe construction even when transforms execute in parallel.
Example:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
output, err := MapMapCtx(ctx, 2, input, func(ctx context.Context, k MyKey, v int) (MyKey, string, error) {
select {
case <-ctx.Done():
return MyKey{}, "", ctx.Err()
default:
}
return k, strconv.Itoa(v), nil
})
func MapMapCtxWithExecutor ¶
func MapMapCtxWithExecutor[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any]( ctx context.Context, exec Executor, input maps.Map[InKey, InVal], transform func(ctx context.Context, key InKey, val InVal) (OutKey, OutVal, error), ) (maps.Map[OutKey, OutVal], error)
MapMapCtxWithExecutor transforms an amp-common Map by applying a transform function to each key-value pair in parallel, producing a new Map with potentially different key and value types, using a custom executor.
This is useful when you want to reuse an executor across multiple batches of work or when you need custom execution behavior. The executor is not closed by this function, allowing it to be reused.
This function is similar to MapGoMapCtxWithExecutor but works with amp-common Map types instead of standard Go maps. Keys must implement the Collectable interface (hashable and comparable).
The transform function is called for each entry in the input map with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.
Returns nil if the input map is nil. The output map uses the same hash function as the input. The output map may have fewer entries if the transform produces duplicate keys.
Thread-safety: The output map is built with a mutex to handle concurrent additions, ensuring thread-safe construction even when transforms execute in parallel.
Example:
exec := NewDefaultExecutor(2)
defer exec.Close()
output, err := MapMapCtxWithExecutor(ctx, exec, input,
func(ctx context.Context, k MyKey, v int) (MyKey, string, error) {
return k, strconv.Itoa(v), nil
})
func MapMapWithExecutor ¶
func MapMapWithExecutor[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any]( exec Executor, input maps.Map[InKey, InVal], transform func(ctx context.Context, key InKey, val InVal) (OutKey, OutVal, error), ) (maps.Map[OutKey, OutVal], error)
MapMapWithExecutor transforms an amp-common Map by applying a transform function to each key-value pair in parallel, producing a new Map with potentially different key and value types, using a custom executor. See MapMapCtxWithExecutor for more information.
func MapOrderedMap ¶
func MapOrderedMap[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any]( maxConcurrent int, input maps.OrderedMap[InKey, InVal], transform func(ctx context.Context, key InKey, val InVal) (OutKey, OutVal, error), ) (maps.OrderedMap[OutKey, OutVal], error)
MapOrderedMap transforms an OrderedMap by applying a transform function to each key-value pair in parallel, producing a new OrderedMap with potentially different key and value types.
Unlike MapMap, this function preserves the insertion order of entries. The output map will have entries in the same order as the input map, even though transforms execute in parallel.
The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).
The transform function is called for each entry in the input map. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.
This is the non-context version that uses context.Background(). For context-aware operations with cancellation support, use MapOrderedMapCtx.
Returns nil if the input map is nil. The output map uses the same hash function as the input. Order preservation: outputs[i] corresponds to inputs[i] in insertion order.
Example:
input := maps.NewOrderedHashMap[maps.Key[string], int](hashing.Sha256)
input.Add(maps.Key[string]{Key: "first"}, 1)
input.Add(maps.Key[string]{Key: "second"}, 2)
output, err := MapOrderedMap(2, input,
func(ctx context.Context, k maps.Key[string], v int) (maps.Key[string], string, error) {
return k, strconv.Itoa(v), nil
})
// output has entries in order: "first" -> "1", "second" -> "2"
func MapOrderedMapCtx ¶
func MapOrderedMapCtx[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any]( ctx context.Context, maxConcurrent int, input maps.OrderedMap[InKey, InVal], transform func(ctx context.Context, key InKey, val InVal) (OutKey, OutVal, error), ) (result maps.OrderedMap[OutKey, OutVal], err error)
MapOrderedMapCtx transforms an OrderedMap by applying a transform function to each key-value pair in parallel, producing a new OrderedMap with potentially different key and value types.
This is the context-aware version of MapOrderedMap. The provided context can be used to cancel the operation or set deadlines. If the context is canceled, the operation stops immediately and returns the context's error.
Unlike MapMapCtx, this function preserves the insertion order of entries. The output map will have entries in the same order as the input map, even though transforms execute in parallel.
The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).
The transform function is called for each entry in the input map with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.
Returns nil if the input map is nil. The output map uses the same hash function as the input. Order preservation: outputs[i] corresponds to inputs[i] in insertion order.
Thread-safety: Results are collected in parallel with a mutex, then added to the output map in the original insertion order to preserve ordering semantics.
Example:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
output, err := MapOrderedMapCtx(ctx, 2, input,
func(ctx context.Context, k maps.Key[string], v int) (maps.Key[string], string, error) {
select {
case <-ctx.Done():
return maps.Key[string]{}, "", ctx.Err()
default:
}
return k, strconv.Itoa(v), nil
})
func MapOrderedMapCtxWithExecutor ¶
func MapOrderedMapCtxWithExecutor[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any]( ctx context.Context, exec Executor, input maps.OrderedMap[InKey, InVal], transform func(ctx context.Context, key InKey, val InVal) (OutKey, OutVal, error), ) (maps.OrderedMap[OutKey, OutVal], error)
MapOrderedMapCtxWithExecutor transforms an OrderedMap by applying a transform function to each key-value pair in parallel, producing a new OrderedMap with potentially different key and value types, using a custom executor.
This is useful when you want to reuse an executor across multiple batches of work or when you need custom execution behavior. The executor is not closed by this function, allowing it to be reused.
Unlike MapMapCtxWithExecutor, this function preserves the insertion order of entries. The output map will have entries in the same order as the input map, even though transforms execute in parallel.
Returns nil if the input map is nil. The output map uses the same hash function as the input. Order preservation: outputs[i] corresponds to inputs[i] in insertion order.
Thread-safety: Results are collected in parallel with a mutex, then added to the output map in the original insertion order to preserve ordering semantics.
Example:
exec := NewDefaultExecutor(2)
defer exec.Close()
output, err := MapOrderedMapCtxWithExecutor(ctx, exec, input,
func(ctx context.Context, k maps.Key[string], v int) (maps.Key[string], string, error) {
return k, strconv.Itoa(v), nil
})
func MapOrderedMapWithExecutor ¶
func MapOrderedMapWithExecutor[InKey Collectable[InKey], InVal any, OutKey Collectable[OutKey], OutVal any]( exec Executor, input maps.OrderedMap[InKey, InVal], transform func(ctx context.Context, key InKey, val InVal) (OutKey, OutVal, error), ) (maps.OrderedMap[OutKey, OutVal], error)
MapOrderedMapWithExecutor transforms an OrderedMap by applying a transform function to each key-value pair in parallel, producing a new OrderedMap with potentially different key and value types, using a custom executor. See MapOrderedMapCtxWithExecutor for more information.
func MapOrderedSet ¶
func MapOrderedSet[InElem Collectable[InElem], OutElem Collectable[OutElem]]( maxConcurrent int, input set.OrderedSet[InElem], transform func(ctx context.Context, elem InElem) (OutElem, error), ) (set.OrderedSet[OutElem], error)
MapOrderedSet transforms an OrderedSet by applying a transform function to each element in parallel, producing a new OrderedSet with potentially different element types.
Unlike MapSet, this function preserves the insertion order of elements. The output set will have elements in the same order as the input set, even though transforms execute in parallel.
The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).
The transform function is called for each element in the input set. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.
This is the non-context version that uses context.Background(). For context-aware operations with cancellation support, use MapOrderedSetCtx.
Returns nil if the input set is nil. The output set uses the same hash function as the input. Order preservation: outputs[i] corresponds to inputs[i] in insertion order.
Example:
input := set.NewOrderedSet[hashing.HashableInt](hashing.Sha256)
input.Add(hashing.HashableInt(1))
input.Add(hashing.HashableInt(2))
input.Add(hashing.HashableInt(3))
output, err := MapOrderedSet(2, input,
func(ctx context.Context, v hashing.HashableInt) (hashing.HashableString, error) {
return hashing.HashableString(strconv.Itoa(int(v))), nil
})
// output has elements in order: "1", "2", "3"
func MapOrderedSetCtx ¶
func MapOrderedSetCtx[InElem Collectable[InElem], OutElem Collectable[OutElem]]( ctx context.Context, maxConcurrent int, input set.OrderedSet[InElem], transform func(ctx context.Context, elem InElem) (OutElem, error), ) (set.OrderedSet[OutElem], error)
MapOrderedSetCtx transforms an OrderedSet by applying a transform function to each element in parallel, producing a new OrderedSet with potentially different element types.
This is the context-aware version of MapOrderedSet. The provided context can be used to cancel the operation or set deadlines. If the context is canceled, the operation stops immediately and returns the context's error.
Unlike MapSetCtx, this function preserves the insertion order of elements. The output set will have elements in the same order as the input set, even though transforms execute in parallel.
The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).
The transform function is called for each element in the input set with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.
Returns nil if the input set is nil. The output set uses the same hash function as the input. Order preservation: outputs[i] corresponds to inputs[i] in insertion order.
Thread-safety: Results are collected in parallel using MapSlice, then added to the output set in the original insertion order to preserve ordering semantics.
Example:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
output, err := MapOrderedSetCtx(ctx, 2, input,
func(ctx context.Context, v hashing.HashableInt) (hashing.HashableString, error) {
select {
case <-ctx.Done():
return "", ctx.Err()
default:
}
return hashing.HashableString(strconv.Itoa(int(v))), nil
})
func MapOrderedSetCtxWithExecutor ¶
func MapOrderedSetCtxWithExecutor[InElem Collectable[InElem], OutElem Collectable[OutElem]]( ctx context.Context, exec Executor, input set.OrderedSet[InElem], transform func(ctx context.Context, elem InElem) (OutElem, error), ) (set.OrderedSet[OutElem], error)
MapOrderedSetCtxWithExecutor transforms an OrderedSet by applying a transform function to each element in parallel, producing a new OrderedSet with potentially different element types, using a custom executor.
This is useful when you want to reuse an executor across multiple batches of work or when you need custom execution behavior. The executor is not closed by this function, allowing it to be reused.
Unlike MapSetCtxWithExecutor, this function preserves the insertion order of elements. The output set will have elements in the same order as the input set, even though transforms execute in parallel.
Returns nil if the input set is nil. The output set uses the same hash function as the input. Order preservation: outputs[i] corresponds to inputs[i] in insertion order.
Thread-safety: Results are collected in parallel using MapSliceCtxWithExecutor, then added to the output set in the original insertion order to preserve ordering semantics.
Example:
exec := NewDefaultExecutor(2)
defer exec.Close()
output, err := MapOrderedSetCtxWithExecutor(ctx, exec, input,
func(ctx context.Context, v hashing.HashableInt) (hashing.HashableString, error) {
return hashing.HashableString(strconv.Itoa(int(v))), nil
})
func MapOrderedSetWithExecutor ¶
func MapOrderedSetWithExecutor[InElem Collectable[InElem], OutElem Collectable[OutElem]]( exec Executor, input set.OrderedSet[InElem], transform func(ctx context.Context, elem InElem) (OutElem, error), ) (set.OrderedSet[OutElem], error)
MapOrderedSetWithExecutor transforms an OrderedSet by applying a transform function to each element in parallel, producing a new OrderedSet with potentially different element types, using a custom executor. See MapOrderedSetCtxWithExecutor for more information.
func MapSet ¶
func MapSet[InElem Collectable[InElem], OutElem Collectable[OutElem]]( maxConcurrent int, input set.Set[InElem], transform func(ctx context.Context, elem InElem) (OutElem, error), ) (set.Set[OutElem], error)
MapSet transforms a Set by applying a transform function to each element in parallel, producing a new Set with potentially different element types.
The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).
The transform function is called for each element in the input set. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.
This is the non-context version that uses context.Background(). For context-aware operations with cancellation support, use MapSetCtx.
Returns nil if the input set is nil. The output set uses the same hash function as the input. The output set may have fewer elements if the transform produces duplicate elements.
Example:
// Transform string set to int set by converting to lengths in parallel
input := set.NewSet[hashing.HashableString](hashing.Sha256)
input.Add(hashing.HashableString("hello"))
input.Add(hashing.HashableString("world"))
output, err := MapSet(2, input, func(ctx context.Context, s hashing.HashableString) (hashing.HashableInt, error) {
return hashing.HashableInt(len(s)), nil
})
// output contains: HashableInt(5) for both "hello" and "world"
func MapSetCtx ¶
func MapSetCtx[InElem Collectable[InElem], OutElem Collectable[OutElem]]( ctx context.Context, maxConcurrent int, input set.Set[InElem], transform func(ctx context.Context, elem InElem) (OutElem, error), ) (result set.Set[OutElem], err error)
MapSetCtx transforms a Set by applying a transform function to each element in parallel, producing a new Set with potentially different element types.
This is the context-aware version of MapSet. The provided context can be used to cancel the operation or set deadlines. If the context is canceled, the operation stops immediately and returns the context's error.
The maxConcurrent parameter limits the number of concurrent transform operations. Set to 0 for unlimited concurrency (bounded only by available goroutines).
The transform function is called for each element in the input set with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.
Returns nil if the input set is nil. The output set uses the same hash function as the input. The output set may have fewer elements if the transform produces duplicate elements.
Thread-safety: The output set is built with a mutex to handle concurrent additions, ensuring thread-safe construction even when transforms execute in parallel.
Example:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
output, err := MapSetCtx(ctx, 2, input,
func(ctx context.Context, s hashing.HashableString) (hashing.HashableInt, error) {
select {
case <-ctx.Done():
return 0, ctx.Err()
default:
}
return hashing.HashableInt(len(s)), nil
})
func MapSetCtxWithExecutor ¶
func MapSetCtxWithExecutor[InElem Collectable[InElem], OutElem Collectable[OutElem]]( ctx context.Context, exec Executor, input set.Set[InElem], transform func(ctx context.Context, elem InElem) (OutElem, error), ) (set.Set[OutElem], error)
MapSetCtxWithExecutor transforms a Set by applying a transform function to each element in parallel, producing a new Set with potentially different element types, using a custom executor.
This is useful when you want to reuse an executor across multiple batches of work or when you need custom execution behavior. The executor is not closed by this function, allowing it to be reused.
The transform function is called for each element in the input set with the provided context. If any transform returns an error, the operation stops and returns that error immediately, canceling any remaining transforms.
Returns nil if the input set is nil. The output set uses the same hash function as the input. The output set may have fewer elements if the transform produces duplicate elements.
Thread-safety: The output set is built with a mutex to handle concurrent additions, ensuring thread-safe construction even when transforms execute in parallel.
Example:
exec := NewDefaultExecutor(2)
defer exec.Close()
output, err := MapSetCtxWithExecutor(ctx, exec, input,
func(ctx context.Context, s hashing.HashableString) (hashing.HashableInt, error) {
return hashing.HashableInt(len(s)), nil
})
func MapSetWithExecutor ¶
func MapSetWithExecutor[InElem Collectable[InElem], OutElem Collectable[OutElem]]( exec Executor, input set.Set[InElem], transform func(ctx context.Context, elem InElem) (OutElem, error), ) (set.Set[OutElem], error)
MapSetWithExecutor transforms a Set by applying a transform function to each element in parallel, producing a new Set with potentially different element types, using a custom executor. See MapSetCtxWithExecutor for more information.
func MapSlice ¶
func MapSlice[Input, Output any]( maxConcurrent int, values []Input, transform func(ctx context.Context, value Input) (Output, error), ) ([]Output, error)
MapSlice transforms a slice of values in parallel by applying a function to each element. See MapSliceCtx for more information.
func MapSliceCtx ¶
func MapSliceCtx[Input, Output any]( ctx context.Context, maxConcurrent int, values []Input, transform func(ctx context.Context, value Input) (Output, error), ) (result []Output, err error)
MapSliceCtx transforms a slice of values in parallel by applying a function to each element. It returns a new slice containing the transformed values in the same order as the input.
The maxConcurrent parameter limits the number of concurrent transformations. If maxConcurrent is less than 1, all transformations will run at the same time.
If any transformation returns an error, all remaining transformations are canceled (via their context) and the first error is returned. The output slice will be nil.
Panics that occur within the transformation function are automatically recovered and converted to errors. Order is preserved: outputs[i] corresponds to values[i].
Example:
numbers := []int{1, 2, 3, 4, 5}
doubled, err := MapSliceCtx(ctx, 2, numbers, func(ctx context.Context, n int) (int, error) {
return n * 2, nil
})
// doubled = [2, 4, 6, 8, 10]
func MapSliceCtxWithExecutor ¶
func MapSliceCtxWithExecutor[Input, Output any]( ctx context.Context, exec Executor, values []Input, transform func(ctx context.Context, value Input) (Output, error), ) ([]Output, error)
MapSliceCtxWithExecutor transforms a slice of values in parallel by applying a function to each element, using a custom executor for concurrency control instead of creating a new one.
This is useful when you want to reuse an executor across multiple batches of work or when you need custom execution behavior. The executor is not closed by this function, allowing it to be reused.
If any transformation returns an error, all remaining transformations are canceled (via their context) and the first error is returned. The output slice will be nil.
Panics that occur within the transformation function are automatically recovered and converted to errors. Order is preserved: outputs[i] corresponds to values[i].
Example:
exec := NewDefaultExecutor(2) // Create reusable executor with max concurrency of 2
defer exec.Close()
numbers := []int{1, 2, 3, 4, 5}
doubled, err := MapSliceCtxWithExecutor(ctx, exec, numbers, func(ctx context.Context, n int) (int, error) {
return n * 2, nil
})
// doubled = [2, 4, 6, 8, 10]
func MapSliceWithExecutor ¶
func MapSliceWithExecutor[Input, Output any]( exec Executor, values []Input, transform func(ctx context.Context, value Input) (Output, error), ) ([]Output, error)
MapSliceWithExecutor transforms a slice of values in parallel by applying a function to each element, using a custom executor for concurrency control. See MapSliceCtxWithExecutor for more information.
Types ¶
type Collectable ¶
type Collectable[T any] = collectable.Collectable[T]
Collectable is a type alias for collectable.Collectable, providing a shorter name for use in this package. It represents types that can be both hashed and compared for equality, which is required for use as map keys in amp-common maps.
type Executor ¶
type Executor interface {
// GoContext executes fn asynchronously using the provided context, calling done with the result.
// If the executor is closed or the context is canceled, done is called with the appropriate error.
GoContext(ctx context.Context, fn func(context.Context) error, done func(error))
// Go executes fn asynchronously using a background context, calling done with the result.
// This is a convenience wrapper around GoContext that uses context.Background().
Go(fn func(context.Context) error, done func(error))
// Close shuts down the executor, preventing new executions and waiting for in-flight operations.
// Returns ErrExecutorClosed if the executor is already closed.
Close() error
}
Executor manages concurrent execution of functions with a configurable concurrency limit. It provides methods to execute functions asynchronously while respecting resource constraints.
func NewDefaultExecutor ¶
NewDefaultExecutor creates a new executor with the specified concurrency limit.
The executor manages parallel execution of functions while respecting the maxConcurrent limit. It uses a semaphore-based approach to control how many functions can run simultaneously.
This creates an executor with a fixed concurrency limit that does not adapt to the size of input data. Use this when you want consistent concurrency across multiple batches with varying sizes.
For single-use transformations, prefer the base functions (MapSlice, Do, etc.) which create optimally-sized internal executors automatically that adapt to your data size.
Parameters:
- maxConcurrent: Maximum number of functions that can execute concurrently. If less than 1, defaults to 1 (sequential execution).
Returns:
- An Executor that can be used with DoWithExecutor, MapSliceWithExecutor, and other *WithExecutor variant functions.
The executor must be closed when no longer needed to release resources:
exec := NewDefaultExecutor(5) defer exec.Close()
Example usage:
// Create executor with max 3 concurrent operations
exec := NewDefaultExecutor(3)
defer exec.Close()
// Use with DoWithExecutor to reuse across multiple batches
batch1 := []func(context.Context) error{...}
batch2 := []func(context.Context) error{...}
if err := DoWithExecutor(exec, batch1...); err != nil {
return err
}
if err := DoWithExecutor(exec, batch2...); err != nil {
return err
}
Executor reuse is beneficial when processing multiple batches of work as it avoids the overhead of creating and destroying executors repeatedly.