231 lines
6.2 KiB
Go
231 lines
6.2 KiB
Go
package kv
|
|
|
|
import (
|
|
"encoding/gob"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"sync"
|
|
)
|
|
|
|
// Store provides an in-memory key/value store which is persisted to
|
|
// a file. The file handle itself is not kept locked for the duration; it is
|
|
// only locked during load and save, to make it concurrency friendly. When
|
|
// saving, the store uses optimistic locking to determine whether the db on disk
|
|
// has been modified by another process; in which case it loads the latest
|
|
// version and re-applies modifications made during this session. This means
|
|
// the Lost Update db concurrency issue is possible; so don't use this if you
|
|
// need more DB integrity than Read Committed isolation levels.
|
|
type Store struct {
|
|
// Locks the entire store
|
|
mu sync.RWMutex
|
|
filename string
|
|
log []change
|
|
|
|
// This is the persistent data
|
|
// version for optimistic locking, this field is incremented with every Save()
|
|
version int64
|
|
db map[string]interface{}
|
|
}
|
|
|
|
// Type of operation; set or remove
|
|
type operation int
|
|
|
|
const (
|
|
// Set a value for a key
|
|
setOperation = operation(iota)
|
|
// Removed a value for a key
|
|
removeOperation = operation(iota)
|
|
)
|
|
|
|
type change struct {
|
|
op operation
|
|
key string
|
|
value interface{}
|
|
}
|
|
|
|
// NewStore creates a new key/value store and initialises it with contents from
|
|
// the named file, if it exists
|
|
func NewStore(filepath string) (*Store, error) {
|
|
kv := &Store{filename: filepath, db: make(map[string]interface{})}
|
|
return kv, kv.loadAndMergeIfNeeded()
|
|
}
|
|
|
|
// Set updates the key/value store in memory
|
|
// Changes are not persisted until you call Save()
|
|
func (k *Store) Set(key string, value interface{}) {
|
|
k.mu.Lock()
|
|
defer k.mu.Unlock()
|
|
|
|
k.db[key] = value
|
|
k.logChange(setOperation, key, value)
|
|
}
|
|
|
|
// Remove removes the key and its value from the store in memory
|
|
// Changes are not persisted until you call Save()
|
|
func (k *Store) Remove(key string) {
|
|
k.mu.Lock()
|
|
defer k.mu.Unlock()
|
|
|
|
delete(k.db, key)
|
|
k.logChange(removeOperation, key, nil)
|
|
}
|
|
|
|
// RemoveAll removes all entries from the store
|
|
// These changes are not persisted until you call Save()
|
|
func (k *Store) RemoveAll() {
|
|
k.mu.Lock()
|
|
defer k.mu.Unlock()
|
|
|
|
// Log all changes
|
|
for key, _ := range k.db {
|
|
k.logChange(removeOperation, key, nil)
|
|
}
|
|
k.db = make(map[string]interface{})
|
|
}
|
|
|
|
// Visit walks through the entire store via a function; return false from
|
|
// your visitor function to halt the walk
|
|
func (k *Store) Visit(cb func(string, interface{}) bool) {
|
|
// Read-only lock
|
|
k.mu.RLock()
|
|
defer k.mu.RUnlock()
|
|
|
|
for k, v := range k.db {
|
|
if !cb(k, v) {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Append a change to the log; mutex must already be locked
|
|
func (k *Store) logChange(op operation, key string, value interface{}) {
|
|
k.log = append(k.log, change{op, key, value})
|
|
}
|
|
|
|
// Get retrieves a value from the store, or nil if it is not present
|
|
func (k *Store) Get(key string) interface{} {
|
|
// Read-only lock
|
|
k.mu.RLock()
|
|
defer k.mu.RUnlock()
|
|
|
|
// zero value of interface{} is nil so this does what we want
|
|
return k.db[key]
|
|
}
|
|
|
|
// Save persists the changes made to disk
|
|
// If any changes have been written by other code they will be merged
|
|
func (k *Store) Save() error {
|
|
k.mu.Lock()
|
|
defer k.mu.Unlock()
|
|
|
|
// Short-circuit if we have no changes
|
|
if len(k.log) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// firstly peek at version; open read/write to keep lock between check & write
|
|
f, err := os.OpenFile(k.filename, os.O_RDWR|os.O_CREATE, 0664)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer f.Close()
|
|
|
|
// Only try to merge if > 0 bytes, ignore empty files (decoder will fail)
|
|
if stat, _ := f.Stat(); stat.Size() > 0 {
|
|
k.loadAndMergeReaderIfNeeded(f)
|
|
// Now we overwrite the file
|
|
f.Seek(0, os.SEEK_SET)
|
|
f.Truncate(0)
|
|
}
|
|
|
|
k.version++
|
|
|
|
enc := gob.NewEncoder(f)
|
|
if err := enc.Encode(k.version); err != nil {
|
|
return fmt.Errorf("Error while writing version data to %v: %v", k.filename, err)
|
|
}
|
|
if err := enc.Encode(k.db); err != nil {
|
|
return fmt.Errorf("Error while writing new key/value data to %v: %v", k.filename, err)
|
|
}
|
|
// Clear log now that it's saved
|
|
k.log = nil
|
|
|
|
return nil
|
|
}
|
|
|
|
// Reads as little as possible from the passed in file to determine if the
|
|
// contents are different from the version already held. If so, reads the
|
|
// contents and merges with any outstanding changes. If not, stops early without
|
|
// reading the rest of the file
|
|
func (k *Store) loadAndMergeIfNeeded() error {
|
|
stat, err := os.Stat(k.filename)
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
return nil // missing is OK
|
|
}
|
|
return err
|
|
}
|
|
// Do nothing if empty file
|
|
if stat.Size() == 0 {
|
|
return nil
|
|
}
|
|
|
|
f, err := os.OpenFile(k.filename, os.O_RDONLY, 0664)
|
|
if err == nil {
|
|
defer f.Close()
|
|
return k.loadAndMergeReaderIfNeeded(f)
|
|
} else {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// As loadAndMergeIfNeeded but lets caller decide how to manage file handles
|
|
func (k *Store) loadAndMergeReaderIfNeeded(f io.Reader) error {
|
|
var versionOnDisk int64
|
|
// Decode *only* the version field to check whether anyone else has
|
|
// modified the db; gob serializes structs in order so it will always be 1st
|
|
dec := gob.NewDecoder(f)
|
|
err := dec.Decode(&versionOnDisk)
|
|
if err != nil {
|
|
return fmt.Errorf("Problem checking version of key/value data from %v: %v", k.filename, err)
|
|
}
|
|
// Totally uninitialised Version == 0, saved versions are always >=1
|
|
if versionOnDisk != k.version {
|
|
// Reload data & merge
|
|
var dbOnDisk map[string]interface{}
|
|
err = dec.Decode(&dbOnDisk)
|
|
if err != nil {
|
|
return fmt.Errorf("Problem reading updated key/value data from %v: %v", k.filename, err)
|
|
}
|
|
k.reapplyChanges(dbOnDisk)
|
|
k.version = versionOnDisk
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// reapplyChanges replays the changes made since the last load onto baseDb
|
|
// and stores the result as our own DB
|
|
func (k *Store) reapplyChanges(baseDb map[string]interface{}) {
|
|
for _, change := range k.log {
|
|
switch change.op {
|
|
case setOperation:
|
|
baseDb[change.key] = change.value
|
|
case removeOperation:
|
|
delete(baseDb, change.key)
|
|
}
|
|
}
|
|
// Note, log is not cleared here, that only happens on Save since it's a
|
|
// list of unsaved changes
|
|
k.db = baseDb
|
|
|
|
}
|
|
|
|
// RegisterTypeForStorage registers a custom type (e.g. a struct) for
|
|
// use in the key value store. This is necessary if you intend to pass custom
|
|
// structs to Store.Set() rather than primitive types.
|
|
func RegisterTypeForStorage(val interface{}) {
|
|
gob.Register(val)
|
|
}
|