git-lfs/tools/kv/keyvaluestore.go
2016-12-12 10:26:12 +00:00

231 lines
6.2 KiB
Go

package kv
import (
"encoding/gob"
"fmt"
"io"
"os"
"sync"
)
// Store provides an in-memory key/value store which is persisted to
// a file. The file handle itself is not kept locked for the duration; it is
// only locked during load and save, to make it concurrency friendly. When
// saving, the store uses optimistic locking to determine whether the db on disk
// has been modified by another process; in which case it loads the latest
// version and re-applies modifications made during this session. This means
// the Lost Update db concurrency issue is possible; so don't use this if you
// need more DB integrity than Read Committed isolation levels.
type Store struct {
// Locks the entire store
mu sync.RWMutex
filename string
log []change
// This is the persistent data
// version for optimistic locking, this field is incremented with every Save()
version int64
db map[string]interface{}
}
// Type of operation; set or remove
type operation int
const (
// Set a value for a key
setOperation = operation(iota)
// Removed a value for a key
removeOperation = operation(iota)
)
type change struct {
op operation
key string
value interface{}
}
// NewStore creates a new key/value store and initialises it with contents from
// the named file, if it exists
func NewStore(filepath string) (*Store, error) {
kv := &Store{filename: filepath, db: make(map[string]interface{})}
return kv, kv.loadAndMergeIfNeeded()
}
// Set updates the key/value store in memory
// Changes are not persisted until you call Save()
func (k *Store) Set(key string, value interface{}) {
k.mu.Lock()
defer k.mu.Unlock()
k.db[key] = value
k.logChange(setOperation, key, value)
}
// Remove removes the key and its value from the store in memory
// Changes are not persisted until you call Save()
func (k *Store) Remove(key string) {
k.mu.Lock()
defer k.mu.Unlock()
delete(k.db, key)
k.logChange(removeOperation, key, nil)
}
// RemoveAll removes all entries from the store
// These changes are not persisted until you call Save()
func (k *Store) RemoveAll() {
k.mu.Lock()
defer k.mu.Unlock()
// Log all changes
for key, _ := range k.db {
k.logChange(removeOperation, key, nil)
}
k.db = make(map[string]interface{})
}
// Visit walks through the entire store via a function; return false from
// your visitor function to halt the walk
func (k *Store) Visit(cb func(string, interface{}) bool) {
// Read-only lock
k.mu.RLock()
defer k.mu.RUnlock()
for k, v := range k.db {
if !cb(k, v) {
break
}
}
}
// Append a change to the log; mutex must already be locked
func (k *Store) logChange(op operation, key string, value interface{}) {
k.log = append(k.log, change{op, key, value})
}
// Get retrieves a value from the store, or nil if it is not present
func (k *Store) Get(key string) interface{} {
// Read-only lock
k.mu.RLock()
defer k.mu.RUnlock()
// zero value of interface{} is nil so this does what we want
return k.db[key]
}
// Save persists the changes made to disk
// If any changes have been written by other code they will be merged
func (k *Store) Save() error {
k.mu.Lock()
defer k.mu.Unlock()
// Short-circuit if we have no changes
if len(k.log) == 0 {
return nil
}
// firstly peek at version; open read/write to keep lock between check & write
f, err := os.OpenFile(k.filename, os.O_RDWR|os.O_CREATE, 0664)
if err != nil {
return err
}
defer f.Close()
// Only try to merge if > 0 bytes, ignore empty files (decoder will fail)
if stat, _ := f.Stat(); stat.Size() > 0 {
k.loadAndMergeReaderIfNeeded(f)
// Now we overwrite the file
f.Seek(0, os.SEEK_SET)
f.Truncate(0)
}
k.version++
enc := gob.NewEncoder(f)
if err := enc.Encode(k.version); err != nil {
return fmt.Errorf("Error while writing version data to %v: %v", k.filename, err)
}
if err := enc.Encode(k.db); err != nil {
return fmt.Errorf("Error while writing new key/value data to %v: %v", k.filename, err)
}
// Clear log now that it's saved
k.log = nil
return nil
}
// Reads as little as possible from the passed in file to determine if the
// contents are different from the version already held. If so, reads the
// contents and merges with any outstanding changes. If not, stops early without
// reading the rest of the file
func (k *Store) loadAndMergeIfNeeded() error {
stat, err := os.Stat(k.filename)
if err != nil {
if os.IsNotExist(err) {
return nil // missing is OK
}
return err
}
// Do nothing if empty file
if stat.Size() == 0 {
return nil
}
f, err := os.OpenFile(k.filename, os.O_RDONLY, 0664)
if err == nil {
defer f.Close()
return k.loadAndMergeReaderIfNeeded(f)
} else {
return err
}
}
// As loadAndMergeIfNeeded but lets caller decide how to manage file handles
func (k *Store) loadAndMergeReaderIfNeeded(f io.Reader) error {
var versionOnDisk int64
// Decode *only* the version field to check whether anyone else has
// modified the db; gob serializes structs in order so it will always be 1st
dec := gob.NewDecoder(f)
err := dec.Decode(&versionOnDisk)
if err != nil {
return fmt.Errorf("Problem checking version of key/value data from %v: %v", k.filename, err)
}
// Totally uninitialised Version == 0, saved versions are always >=1
if versionOnDisk != k.version {
// Reload data & merge
var dbOnDisk map[string]interface{}
err = dec.Decode(&dbOnDisk)
if err != nil {
return fmt.Errorf("Problem reading updated key/value data from %v: %v", k.filename, err)
}
k.reapplyChanges(dbOnDisk)
k.version = versionOnDisk
}
return nil
}
// reapplyChanges replays the changes made since the last load onto baseDb
// and stores the result as our own DB
func (k *Store) reapplyChanges(baseDb map[string]interface{}) {
for _, change := range k.log {
switch change.op {
case setOperation:
baseDb[change.key] = change.value
case removeOperation:
delete(baseDb, change.key)
}
}
// Note, log is not cleared here, that only happens on Save since it's a
// list of unsaved changes
k.db = baseDb
}
// RegisterTypeForStorage registers a custom type (e.g. a struct) for
// use in the key value store. This is necessary if you intend to pass custom
// structs to Store.Set() rather than primitive types.
func RegisterTypeForStorage(val interface{}) {
gob.Register(val)
}