Add last-in-one-out queue
Add an abstract queue that outputs only the last-queued item. Items that were queued before the last one will be dropped. This will be used for handling the last-rendered images. Any images that are rendered while an image is already uploading should be ignored, except the last one.
This commit is contained in:
parent
34f1cc076c
commit
3d20a89bf5
79
pkg/last_in_one_out_queue/last_in_one_out_queue.go
Normal file
79
pkg/last_in_one_out_queue/last_in_one_out_queue.go
Normal file
@ -0,0 +1,79 @@
|
||||
package last_in_one_out_queue
|
||||
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
const pollPeriod = 250 * time.Millisecond
|
||||
|
||||
type LastInOneOutQueue[T any] struct {
|
||||
queuedItem *T
|
||||
ch chan T
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
func New[T any]() *LastInOneOutQueue[T] {
|
||||
return &LastInOneOutQueue[T]{
|
||||
queuedItem: nil,
|
||||
ch: make(chan T),
|
||||
mutex: sync.Mutex{},
|
||||
}
|
||||
}
|
||||
|
||||
func (q *LastInOneOutQueue[T]) Run(ctx context.Context) {
|
||||
log.Trace().Msg("last-in-one-out queue starting")
|
||||
defer log.Trace().Msg("last-in-one-out queue stopping")
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(pollPeriod):
|
||||
// Periodically try pushing the queued item into the channel.
|
||||
q.tryPushingItem()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Enqueue puts the item in the queue. It replaces any previously-queued item.
|
||||
func (q *LastInOneOutQueue[T]) Enqueue(item T) {
|
||||
q.mutex.Lock()
|
||||
defer q.mutex.Unlock()
|
||||
q.queuedItem = &item
|
||||
q.tryPushingItem_unsafe()
|
||||
}
|
||||
|
||||
// Item returns the channel on which queued items can be dequeued.
|
||||
func (q *LastInOneOutQueue[T]) Item() <-chan T {
|
||||
return q.ch
|
||||
}
|
||||
|
||||
func (q *LastInOneOutQueue[T]) tryPushingItem() {
|
||||
q.mutex.Lock()
|
||||
defer q.mutex.Unlock()
|
||||
|
||||
if q.queuedItem == nil {
|
||||
return
|
||||
}
|
||||
|
||||
q.tryPushingItem_unsafe()
|
||||
}
|
||||
|
||||
// tryPushingItem_unsafe tries to push the queued item.
|
||||
// It assumes that q.queuedItem is not nil, and doesn't obtain the mutex.
|
||||
func (q *LastInOneOutQueue[T]) tryPushingItem_unsafe() {
|
||||
// It's fine if pushing to the channel fails; this means that the receiving
|
||||
// end of the queue isn't ready to process another item yet.
|
||||
select {
|
||||
case q.ch <- *q.queuedItem:
|
||||
q.queuedItem = nil
|
||||
log.Trace().Msg("last-in-one-out queue: queued item is being handled")
|
||||
default:
|
||||
}
|
||||
}
|
78
pkg/last_in_one_out_queue/last_in_one_out_queue_test.go
Normal file
78
pkg/last_in_one_out_queue/last_in_one_out_queue_test.go
Normal file
@ -0,0 +1,78 @@
|
||||
package last_in_one_out_queue
|
||||
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestNew(t *testing.T) {
|
||||
q := New[string]()
|
||||
select {
|
||||
case <-q.Item():
|
||||
t.Fatal("a new queue shouldn't hold an item")
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func TestQueueAndGet(t *testing.T) {
|
||||
q := New[string]()
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
q.Enqueue("hey")
|
||||
}()
|
||||
|
||||
select {
|
||||
case item := <-q.Item():
|
||||
assert.Equal(t, "hey", item)
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
t.Error("enqueueing while waiting for an item should push it immediately")
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestQueueMultiple(t *testing.T) {
|
||||
q := New[string]()
|
||||
|
||||
q.Enqueue("hey")
|
||||
q.Enqueue("these are multiple items")
|
||||
q.Enqueue("the last one should be the")
|
||||
q.Enqueue("winner")
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
q.Run(ctx)
|
||||
}()
|
||||
|
||||
select {
|
||||
case item := <-q.Item():
|
||||
func() {
|
||||
q.mutex.Lock()
|
||||
defer q.mutex.Unlock()
|
||||
assert.Nil(t, q.queuedItem,
|
||||
"after popping an item of the queue, the queue should be empty")
|
||||
}()
|
||||
assert.Equal(t, "winner", item)
|
||||
|
||||
case <-time.After(10 * pollPeriod):
|
||||
t.Error("timeout waiting for item")
|
||||
}
|
||||
|
||||
cancel()
|
||||
wg.Wait()
|
||||
}
|
Loading…
Reference in New Issue
Block a user