diff --git a/pkg/last_in_one_out_queue/last_in_one_out_queue.go b/pkg/last_in_one_out_queue/last_in_one_out_queue.go new file mode 100644 index 00000000..acb79626 --- /dev/null +++ b/pkg/last_in_one_out_queue/last_in_one_out_queue.go @@ -0,0 +1,79 @@ +package last_in_one_out_queue + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "context" + "sync" + "time" + + "github.com/rs/zerolog/log" +) + +const pollPeriod = 250 * time.Millisecond + +type LastInOneOutQueue[T any] struct { + queuedItem *T + ch chan T + mutex sync.Mutex +} + +func New[T any]() *LastInOneOutQueue[T] { + return &LastInOneOutQueue[T]{ + queuedItem: nil, + ch: make(chan T), + mutex: sync.Mutex{}, + } +} + +func (q *LastInOneOutQueue[T]) Run(ctx context.Context) { + log.Trace().Msg("last-in-one-out queue starting") + defer log.Trace().Msg("last-in-one-out queue stopping") + + for { + select { + case <-ctx.Done(): + return + case <-time.After(pollPeriod): + // Periodically try pushing the queued item into the channel. + q.tryPushingItem() + } + } +} + +// Enqueue puts the item in the queue. It replaces any previously-queued item. +func (q *LastInOneOutQueue[T]) Enqueue(item T) { + q.mutex.Lock() + defer q.mutex.Unlock() + q.queuedItem = &item + q.tryPushingItem_unsafe() +} + +// Item returns the channel on which queued items can be dequeued. +func (q *LastInOneOutQueue[T]) Item() <-chan T { + return q.ch +} + +func (q *LastInOneOutQueue[T]) tryPushingItem() { + q.mutex.Lock() + defer q.mutex.Unlock() + + if q.queuedItem == nil { + return + } + + q.tryPushingItem_unsafe() +} + +// tryPushingItem_unsafe tries to push the queued item. +// It assumes that q.queuedItem is not nil, and doesn't obtain the mutex. +func (q *LastInOneOutQueue[T]) tryPushingItem_unsafe() { + // It's fine if pushing to the channel fails; this means that the receiving + // end of the queue isn't ready to process another item yet. + select { + case q.ch <- *q.queuedItem: + q.queuedItem = nil + log.Trace().Msg("last-in-one-out queue: queued item is being handled") + default: + } +} diff --git a/pkg/last_in_one_out_queue/last_in_one_out_queue_test.go b/pkg/last_in_one_out_queue/last_in_one_out_queue_test.go new file mode 100644 index 00000000..32c3ead2 --- /dev/null +++ b/pkg/last_in_one_out_queue/last_in_one_out_queue_test.go @@ -0,0 +1,78 @@ +package last_in_one_out_queue + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestNew(t *testing.T) { + q := New[string]() + select { + case <-q.Item(): + t.Fatal("a new queue shouldn't hold an item") + default: + } +} + +func TestQueueAndGet(t *testing.T) { + q := New[string]() + wg := sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(10 * time.Millisecond) + q.Enqueue("hey") + }() + + select { + case item := <-q.Item(): + assert.Equal(t, "hey", item) + case <-time.After(200 * time.Millisecond): + t.Error("enqueueing while waiting for an item should push it immediately") + } + + wg.Wait() +} + +func TestQueueMultiple(t *testing.T) { + q := New[string]() + + q.Enqueue("hey") + q.Enqueue("these are multiple items") + q.Enqueue("the last one should be the") + q.Enqueue("winner") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + q.Run(ctx) + }() + + select { + case item := <-q.Item(): + func() { + q.mutex.Lock() + defer q.mutex.Unlock() + assert.Nil(t, q.queuedItem, + "after popping an item of the queue, the queue should be empty") + }() + assert.Equal(t, "winner", item) + + case <-time.After(10 * pollPeriod): + t.Error("timeout waiting for item") + } + + cancel() + wg.Wait() +}