bc0977f1c9
Follow #31908. The main refactor is that it has removed the returned context of `Lock`. The returned context of `Lock` in old code is to provide a way to let callers know that they have lost the lock. But in most cases, callers shouldn't cancel what they are doing even it has lost the lock. And the design would confuse developers and make them use it incorrectly. See the discussion history: https://github.com/go-gitea/gitea/pull/31813#discussion_r1732041513 and https://github.com/go-gitea/gitea/pull/31813#discussion_r1734078998 It's a breaking change, but since the new module hasn't been used yet, I think it's OK to not add the `pr/breaking` label. ## Design principles It's almost copied from #31908, but with some changes. ### Use spinlock even in memory implementation (unchanged) In actual use cases, users may cancel requests. `sync.Mutex` will block the goroutine until the lock is acquired even if the request is canceled. And the spinlock is more suitable for this scenario since it's possible to give up the lock acquisition. Although the spinlock consumes more CPU resources, I think it's acceptable in most cases. ### Do not expose the mutex to callers (unchanged) If we expose the mutex to callers, it's possible for callers to reuse the mutex, which causes more complexity. For example: ```go lock := GetLocker(key) lock.Lock() // ... // even if the lock is unlocked, we cannot GC the lock, // since the caller may still use it again. lock.Unlock() lock.Lock() // ... lock.Unlock() // callers have to GC the lock manually. RemoveLocker(key) ``` That's why https://github.com/go-gitea/gitea/pull/31813#discussion_r1721200549 In this PR, we only expose `ReleaseFunc` to callers. So callers just need to call `ReleaseFunc` to release the lock, and do not need to care about the lock's lifecycle. ```go release, err := locker.Lock(ctx, key) if err != nil { return err } // ... release() // if callers want to lock again, they have to re-acquire the lock. release, err := locker.Lock(ctx, key) // ... ``` In this way, it's also much easier for redis implementation to extend the mutex automatically, so that callers do not need to care about the lock's lifecycle. See also https://github.com/go-gitea/gitea/pull/31813#discussion_r1722659743 ### Use "release" instead of "unlock" (unchanged) For "unlock", it has the meaning of "unlock an acquired lock". So it's not acceptable to call "unlock" when failed to acquire the lock, or call "unlock" multiple times. It causes more complexity for callers to decide whether to call "unlock" or not. So we use "release" instead of "unlock" to make it clear. Whether the lock is acquired or not, callers can always call "release", and it's also safe to call "release" multiple times. But the code DO NOT expect callers to not call "release" after acquiring the lock. If callers forget to call "release", it will cause resource leak. That's why it's always safe to call "release" without extra checks: to avoid callers to forget to call it. ### Acquired locks could be lost, but the callers shouldn't stop Unlike `sync.Mutex` which will be locked forever once acquired until calling `Unlock`, for distributed lock, the acquired lock could be lost. For example, the caller has acquired the lock, and it holds the lock for a long time since auto-extending is working for redis. However, it lost the connection to the redis server, and it's impossible to extend the lock anymore. In #31908, it will cancel the context to make the operation stop, but it's not safe. Many operations are not revert-able. If they have been interrupted, then the instance goes corrupted. So `Lock` won't return `ctx` anymore in this PR. ### Multiple ways to use the lock 1. Regular way ```go release, err := Lock(ctx, key) if err != nil { return err } defer release() // ... ``` 2. Early release ```go release, err := Lock(ctx, key) if err != nil { return err } defer release() // ... // release the lock earlier release() // continue to do something else // ... ``` 3. Functional way ```go if err := LockAndDo(ctx, key, func(ctx context.Context) error { // ... return nil }); err != nil { return err } ```
138 lines
3.2 KiB
Go
138 lines
3.2 KiB
Go
// Copyright 2024 The Gitea Authors. All rights reserved.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package globallock
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"code.gitea.io/gitea/modules/nosql"
|
|
|
|
"github.com/go-redsync/redsync/v4"
|
|
"github.com/go-redsync/redsync/v4/redis/goredis/v9"
|
|
)
|
|
|
|
const redisLockKeyPrefix = "gitea:globallock:"
|
|
|
|
// redisLockExpiry is the default expiry time for a lock.
|
|
// Define it as a variable to make it possible to change it in tests.
|
|
var redisLockExpiry = 30 * time.Second
|
|
|
|
type redisLocker struct {
|
|
rs *redsync.Redsync
|
|
|
|
mutexM sync.Map
|
|
closed atomic.Bool
|
|
extendWg sync.WaitGroup
|
|
}
|
|
|
|
var _ Locker = &redisLocker{}
|
|
|
|
func NewRedisLocker(connection string) Locker {
|
|
l := &redisLocker{
|
|
rs: redsync.New(
|
|
goredis.NewPool(
|
|
nosql.GetManager().GetRedisClient(connection),
|
|
),
|
|
),
|
|
}
|
|
|
|
l.extendWg.Add(1)
|
|
l.startExtend()
|
|
|
|
return l
|
|
}
|
|
|
|
func (l *redisLocker) Lock(ctx context.Context, key string) (ReleaseFunc, error) {
|
|
return l.lock(ctx, key, 0)
|
|
}
|
|
|
|
func (l *redisLocker) TryLock(ctx context.Context, key string) (bool, ReleaseFunc, error) {
|
|
f, err := l.lock(ctx, key, 1)
|
|
|
|
var (
|
|
errTaken *redsync.ErrTaken
|
|
errNodeTaken *redsync.ErrNodeTaken
|
|
)
|
|
if errors.As(err, &errTaken) || errors.As(err, &errNodeTaken) {
|
|
return false, f, nil
|
|
}
|
|
return err == nil, f, err
|
|
}
|
|
|
|
// Close closes the locker.
|
|
// It will stop extending the locks and refuse to acquire new locks.
|
|
// In actual use, it is not necessary to call this function.
|
|
// But it's useful in tests to release resources.
|
|
// It could take some time since it waits for the extending goroutine to finish.
|
|
func (l *redisLocker) Close() error {
|
|
l.closed.Store(true)
|
|
l.extendWg.Wait()
|
|
return nil
|
|
}
|
|
|
|
func (l *redisLocker) lock(ctx context.Context, key string, tries int) (ReleaseFunc, error) {
|
|
if l.closed.Load() {
|
|
return func() {}, fmt.Errorf("locker is closed")
|
|
}
|
|
|
|
options := []redsync.Option{
|
|
redsync.WithExpiry(redisLockExpiry),
|
|
}
|
|
if tries > 0 {
|
|
options = append(options, redsync.WithTries(tries))
|
|
}
|
|
mutex := l.rs.NewMutex(redisLockKeyPrefix+key, options...)
|
|
if err := mutex.LockContext(ctx); err != nil {
|
|
return func() {}, err
|
|
}
|
|
|
|
l.mutexM.Store(key, mutex)
|
|
|
|
releaseOnce := sync.Once{}
|
|
return func() {
|
|
releaseOnce.Do(func() {
|
|
l.mutexM.Delete(key)
|
|
|
|
// It's safe to ignore the error here,
|
|
// if it failed to unlock, it will be released automatically after the lock expires.
|
|
// Do not call mutex.UnlockContext(ctx) here, or it will fail to release when ctx has timed out.
|
|
_, _ = mutex.Unlock()
|
|
})
|
|
}, nil
|
|
}
|
|
|
|
func (l *redisLocker) startExtend() {
|
|
if l.closed.Load() {
|
|
l.extendWg.Done()
|
|
return
|
|
}
|
|
|
|
toExtend := make([]*redsync.Mutex, 0)
|
|
l.mutexM.Range(func(_, value any) bool {
|
|
m := value.(*redsync.Mutex)
|
|
|
|
// Extend the lock if it is not expired.
|
|
// Although the mutex will be removed from the map before it is released,
|
|
// it still can be expired because of a failed extension.
|
|
// If it happens, it does not need to be extended anymore.
|
|
if time.Now().After(m.Until()) {
|
|
return true
|
|
}
|
|
|
|
toExtend = append(toExtend, m)
|
|
return true
|
|
})
|
|
for _, v := range toExtend {
|
|
// If it failed to extend, it will be released automatically after the lock expires.
|
|
_, _ = v.Extend()
|
|
}
|
|
|
|
time.AfterFunc(redisLockExpiry/2, l.startExtend)
|
|
}
|