Distributed Mutex
A Redis-based distributed mutex implementation for coordinating access to shared resources across multiple application instances.
Overview
This package provides a distributed locking mechanism using Redis as the coordination backend. It's built on top of the redislock library and provides a simple interface for acquiring and releasing locks across distributed systems.
Features
- Distributed Locking: Coordinate access to shared resources across multiple application instances
- Automatic Retry: Built-in retry logic with configurable backoff strategy
- Thread-Safe: Safe for concurrent use within a single application
- Formatted Keys: Support for formatted lock keys using
Acquiref and Releasef
- Logging: Integrated zap logging for debugging and monitoring
Installation
The package is already included in the project. The required dependency (github.com/bsm/redislock) is automatically installed.
Interface
type Adapter interface {
Acquire(ctx context.Context, key string)
Acquiref(ctx context.Context, format string, a ...any)
Release(ctx context.Context, key string)
Releasef(ctx context.Context, format string, a ...any)
}
Usage
Basic Example
import (
"context"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
"codeberg.org/mapleapps/monorepo/cloud/maplepress-backend/pkg/distributedmutex"
)
// Create Redis client
redisClient := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
// Create logger
logger, _ := zap.NewProduction()
// Create distributed mutex adapter
mutex := distributedmutex.NewAdapter(logger, redisClient)
// Acquire a lock
ctx := context.Background()
mutex.Acquire(ctx, "my-resource-key")
// ... perform operations on the protected resource ...
// Release the lock
mutex.Release(ctx, "my-resource-key")
// Acquire lock with formatted key
tenantID := "tenant-123"
resourceID := "resource-456"
mutex.Acquiref(ctx, "tenant:%s:resource:%s", tenantID, resourceID)
// ... perform operations ...
mutex.Releasef(ctx, "tenant:%s:resource:%s", tenantID, resourceID)
Integration with Dependency Injection (Wire)
// In your Wire provider set
wire.NewSet(
distributedmutex.ProvideDistributedMutexAdapter,
// ... other providers
)
// Use in your application
func NewMyService(mutex distributedmutex.Adapter) *MyService {
return &MyService{
mutex: mutex,
}
}
Configuration
Lock Duration
The default lock duration is 1 minute. Locks are automatically released after this time to prevent deadlocks.
Retry Strategy
- Retry Interval: 250ms
- Max Retries: 20 attempts
- Total Max Wait Time: ~5 seconds (20 × 250ms)
If a lock cannot be obtained after all retries, an error is logged and the Acquire method returns without blocking indefinitely.
Best Practices
-
Always Release Locks: Ensure locks are released even in error cases using defer
mutex.Acquire(ctx, "my-key")
defer mutex.Release(ctx, "my-key")
-
Use Descriptive Keys: Use clear, hierarchical key names
// Good
mutex.Acquire(ctx, "tenant:123:user:456:update")
// Not ideal
mutex.Acquire(ctx, "lock1")
-
Keep Critical Sections Short: Minimize the time locks are held to improve concurrency
-
Handle Timeouts: Use context with timeout for critical operations
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
mutex.Acquire(ctx, "my-key")
-
Avoid Nested Locks: Be careful with acquiring multiple locks to avoid deadlocks
Logging
The adapter logs the following events:
- Debug: Lock acquisition and release operations
- Error: Failed lock acquisitions, timeout errors, and release failures
- Warn: Attempts to release non-existent locks
Thread Safety
The adapter is safe for concurrent use within a single application instance. It uses an internal mutex to protect the lock instances map from concurrent access by multiple goroutines.
Error Handling
The current implementation logs errors but does not return them. Consider this when using the adapter:
- Lock acquisition failures are logged but don't panic
- The application continues running even if locks fail
- Check logs for lock-related issues in production
Limitations
- Lock Duration: Locks automatically expire after 1 minute
- No Lock Extension: Currently doesn't support extending lock duration
- No Deadlock Detection: Manual deadlock prevention is required
- Redis Dependency: Requires a running Redis instance
Example Use Cases
Preventing Duplicate Processing
func ProcessJob(ctx context.Context, jobID string, mutex distributedmutex.Adapter) {
lockKey := fmt.Sprintf("job:processing:%s", jobID)
mutex.Acquire(ctx, lockKey)
defer mutex.Release(ctx, lockKey)
// Process job - guaranteed only one instance processes this job
// ...
}
Coordinating Resource Updates
func UpdateTenantSettings(ctx context.Context, tenantID string, mutex distributedmutex.Adapter) error {
mutex.Acquiref(ctx, "tenant:%s:settings:update", tenantID)
defer mutex.Releasef(ctx, "tenant:%s:settings:update", tenantID)
// Safe to update tenant settings
// ...
return nil
}
Rate Limiting Operations
func RateLimitedOperation(ctx context.Context, userID string, mutex distributedmutex.Adapter) {
lockKey := fmt.Sprintf("ratelimit:user:%s", userID)
mutex.Acquire(ctx, lockKey)
defer mutex.Release(ctx, lockKey)
// Perform rate-limited operation
// ...
}
Troubleshooting
Lock Not Acquired
Problem: Locks are not being acquired (error in logs)
Solutions:
- Verify Redis is running and accessible
- Check network connectivity to Redis
- Ensure Redis has sufficient memory
- Check for Redis errors in logs
Lock Contention
Problem: Frequent lock acquisition failures due to contention
Solutions:
- Reduce critical section duration
- Use more specific lock keys to reduce contention
- Consider increasing retry limits if appropriate
- Review application architecture for excessive locking
Memory Leaks
Problem: Lock instances accumulating in memory
Solutions:
- Ensure all
Acquire calls have corresponding Release calls
- Use
defer to guarantee lock release
- Monitor lock instance map size in production