rtkv

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 9, 2025 License: AGPL-3.0-or-later Imports: 10 Imported by: 0

README

rtkv

Build Coverage Go Report Card Go Version

Small wrapper around go-redis to get and set data in Redis by ID with a separate index for time ordered retrieval. Suitable for large data sets.

Fetching Ranges

There are 2 methods to fetch entities by time range:

FetchPage()

Locally retrieves the keys of entities to fetch, then yields the results using MGET. No read consistency when fetching pages. In very high concurrency contexts where timestamps are updated often, or entities often deleted, there's a non-zero chance of getting less items than the requested page size, or a page containing a more recent version of an entity; one that reflects a state outside of the requested time window. This is an intentional trade-off, as fetching pages accross different contexts will not be consistent anyway. If you need consistent pages, use FetchPageConsistent().

The byte slices yielded by the iterator returned by this method are not safe for reuse.

FetchPageConsistent()

Uses a Lua script to ensure that selecting a time range and retrieving data is done in an atomic operation. Don't reuse the byte slices yielded. It is not possible to get a consistent scan of a range that yields more than a little over 5k results.

Benchmarks

These benchmarks show the difference between the 2 methods of fetching pages. It uses the Paginate function to fetch a total of 100k records in 20 pages of 5k. This was done on a single Redis instance. Consistent fetching delegates more work (and memory usage) to Redis, which may affect your scaling considerations. To save you some math: fetching a single page of 5k records takes about 2.3ms and 6.8ms respectively.

goos: linux
goarch: amd64
pkg: github.com/johnknl/rtkv
cpu: AMD Ryzen 9 5950X 16-Core Processor
BenchmarkRedisTKV_FetchPage
BenchmarkRedisTKV_FetchPage/Default
BenchmarkRedisTKV_FetchPage/Default-32               195          57347413 ns/op        69601861 B/op     400833 allocs/op
BenchmarkRedisTKV_FetchPage/Default-32               214          59313912 ns/op        69601491 B/op     400832 allocs/op
BenchmarkRedisTKV_FetchPage/Default-32               205          60582640 ns/op        69601354 B/op     400831 allocs/op
BenchmarkRedisTKV_FetchPage/Consistent
BenchmarkRedisTKV_FetchPage/Consistent-32             73         169228474 ns/op        59909809 B/op     200513 allocs/op
BenchmarkRedisTKV_FetchPage/Consistent-32             73         169711133 ns/op        59909780 B/op     200514 allocs/op
BenchmarkRedisTKV_FetchPage/Consistent-32             74         169084674 ns/op        59909592 B/op     200513 allocs/op
PASS
ok      github.com/johnknl/rtkv 103.021s

Documentation / Usage

Documentation and usage examples are available on pkg.go.dev.

Documentation

Index

Examples

Constants

View Source
const (
	// DelimUnit is ASCII unit separator character.
	// Use this as a delimeter for keys if you need
	// a non-printable character. Safest choice.
	DelimUnit = "\x1f"

	// DelimPipe is ASCII pipe character.
	// Use this as a separator in keys if you need
	// a printable character. May be easier to debug.
	DelimPipe = "|"
)

Variables

View Source
var ErrUnexpectedScriptResult = errors.New("unexpected result from lua script")

Functions

func Paginate

func Paginate(
	ctx context.Context,
	pageFn PageFunc,
	from, to *time.Time,
	offset, limit int,
) (iter.Seq2[[]byte, error], error)

Types

type BulkSetRecord

type BulkSetRecord struct {
	LastModified time.Time
	ID           []string
	Data         []byte
}

type PageFunc

type PageFunc func(
	ctx context.Context,
	from, to *time.Time,
	offset, limit int,
) (iter.Seq2[[]byte, error], int64, error)

type RedisTKV

type RedisTKV struct {
	// contains filtered or unexported fields
}

RedisTKV is a k/v store backed by Redis. It uses a sorted set to keep track of last modified time and enable range queries.

Example
ctx := context.Background()
store := rtkv.NewRedisTKV(rtkv.DelimUnit, "example", newGoRedisClient(0))
now := time.Now()

// Set the value of entity "a", "a"
existed, err := store.Set(ctx, []byte(`{"id": "a"}`), now, "a", "a")

fmt.Println(existed, err)

// Get the value of id "a" ([]byte(nil))
val, err := store.Get(ctx, "a")

fmt.Printf("%#v %v\n", val, err)

// Get the value of id "a", "a" ({"id": "a"}, <nil>)
val, err = store.Get(ctx, "a", "a")
fmt.Printf("%s %v\n", val, err)

// Bulk set some entities
_ = store.BulkSet(ctx, []rtkv.BulkSetRecord{
	{Data: []byte(`{"id": "b"}`), ID: []string{"a", "b", "b"}, LastModified: now.Add(-time.Minute)},
	{Data: []byte(`{"id": "c"}`), ID: []string{"a", "b", "c"}, LastModified: now.Add(-2 * time.Minute)},
	{Data: []byte(`{"id": "d"}`), ID: []string{"a", "b", "d"}, LastModified: now.Add(-3 * time.Minute)},
	{Data: []byte(`{"id": "e"}`), ID: []string{"a", "b", "e"}, LastModified: now.Add(-4 * time.Hour)},
})

// Get max 2 entities from a range that matches 3 in a set of 5 (oldest first)
from := now.Add(-3 * time.Minute)
to := now.Add(-time.Minute)
iterator, total, err := store.FetchPage(ctx, &from, &to, 0, 2)

fmt.Println(
	"err:", err,
	"total:", total,
)

for data, err := range iterator {
	fmt.Println(string(data), err)
}
Output:

false <nil>
[]byte(nil) <nil>
{"id": "a"} <nil>
err: <nil> total: 3
{"id": "d"} <nil>
{"id": "c"} <nil>

func NewRedisTKV

func NewRedisTKV(idDelimiter, namespace string, c *redis.Client) *RedisTKV

NewRedisTKV creates a new RedisTKV instance. The namespace is used to prefix keys in Redis.

The `idDelimiter` argument is used as a namespace delimeter and to pack composite IDs into a single key.

The `namespace` argument prevents key collisions for different entitiy types.

func (*RedisTKV) BulkSet

func (r *RedisTKV) BulkSet(ctx context.Context, records []BulkSetRecord) error

BulkSet sets multiple entities in the store.

func (*RedisTKV) Delete

func (r *RedisTKV) Delete(ctx context.Context, id ...string) error

func (*RedisTKV) Exists

func (r *RedisTKV) Exists(ctx context.Context, id ...string) (bool, error)

func (*RedisTKV) FetchPage

func (r *RedisTKV) FetchPage(
	ctx context.Context,
	from, to *time.Time,
	offset, limit int,
) (iter.Seq2[[]byte, error], int64, error)

func (*RedisTKV) FetchPageConsistent

func (r *RedisTKV) FetchPageConsistent(
	ctx context.Context,
	from, to *time.Time,
	offset, limit int,
) (iter.Seq2[[]byte, error], int64, error)

func (*RedisTKV) Get

func (r *RedisTKV) Get(ctx context.Context, id ...string) ([]byte, error)

Get an entity by ID.

func (*RedisTKV) Set

func (r *RedisTKV) Set(ctx context.Context, data []byte, lastModified time.Time, id ...string) (bool, error)

Set an entity in the store by ID. If the entity already exists, it will be overwritten. Returns boolean true if entity already existed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL