Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions store/kv/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package kv

import (
"container/list"
"sync"
)

// LRU Cache implementation
type LRUCache struct {
capacity int
cache map[string]*list.Element
list *list.List
mu sync.Mutex
}

type entry struct {
key string
value []byte
}

// NewLRUCache creates a new LRU cache
func NewLRUCache(capacity int) *LRUCache {
return &LRUCache{
capacity: capacity,
cache: make(map[string]*list.Element),
list: list.New(),
}
}

// Get retrieves an item from the cache
func (c *LRUCache) Get(key string) ([]byte, bool) {
c.mu.Lock()
defer c.mu.Unlock()

if elem, found := c.cache[key]; found {
c.list.MoveToFront(elem)
return elem.Value.(*entry).value, true
}
return nil, false
}

// Put adds an item to the cache
func (c *LRUCache) Put(key string, value []byte) {
c.mu.Lock()
defer c.mu.Unlock()

if elem, found := c.cache[key]; found {
c.list.MoveToFront(elem)
elem.Value.(*entry).value = value
return
}

if c.list.Len() >= c.capacity {
// Evict the least recently used item
elem := c.list.Back()
c.list.Remove(elem)
delete(c.cache, elem.Value.(*entry).key)
}

newEntry := &entry{key, value}
elem := c.list.PushFront(newEntry)
c.cache[key] = elem
}

88 changes: 88 additions & 0 deletions store/kv/kv_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,67 @@ import (
"github.com/tigrisdata/tigris/internal"
)

// Add a global or instance-level cache
var cache *LRUCache

// Initialize the cache
func init() {
cache = NewLRUCache(1000) // Set capacity to 1000 items
}

// Get retrieves a value from the store with cache support
func (kv *KVStore) Get(key string) ([]byte, error) {
// Check the cache first
if value, found := cache.Get(key); found {
return value, nil
}

// Fallback to the underlying store
value, err := kv.backend.Get(key)
if err != nil {
return nil, err
}

// Store the result in the cache
cache.Put(key, value)
return value, nil
}

// Set stores a value in the store and updates the cache
func (kv *KVStore) Set(key string, value []byte) error {
// Update the cache
cache.Put(key, value)

// Update the underlying store
return kv.backend.Set(key, value)
}

// Initialize TTL-enabled cache
var ttlCache *TTLCache

func init() {
ttlCache = NewTTLCache(1000, time.Minute) // Capacity 1000 items, cleanup every 1 minute
}

// Get with TTL support
func (kv *KVStore) Get(key string) ([]byte, error) {
if value, found := ttlCache.Get(key); found {
return value, nil
}
value, err := kv.backend.Get(key)
if err != nil {
return nil, err
}
ttlCache.Put(key, value, 5*time.Minute) // Default 5 minutes TTL
return value, nil
}

// Set with TTL support
func (kv *KVStore) SetWithTTL(key string, value []byte, ttl time.Duration) error {
ttlCache.Put(key, value, ttl)
return kv.backend.Set(key, value)
}

type KeyValueTxStore struct {
*fdbkv
}
Expand Down Expand Up @@ -144,3 +205,30 @@ func (i *KeyValueIterator) Err() error {

return i.baseIterator.Err()
}

// BatchGet retrieves multiple keys at once
func (kv *KVStore) BatchGet(keys []string) (map[string][]byte, error) {
results := make(map[string][]byte)
missedKeys := []string{}

for _, key := range keys {
if value, found := cache.Get(key); found {
results[key] = value
} else {
missedKeys = append(missedKeys, key)
}
}

if len(missedKeys) > 0 {
backendResults, err := kv.backend.BatchGet(missedKeys)
if err != nil {
return nil, err
}
for k, v := range backendResults {
cache.Put(k, v)
results[k] = v
}
}
return results, nil
}

67 changes: 67 additions & 0 deletions store/kv/ttl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package kv

import (
"sync"
"time"
)

// TTLCache extends LRUCache to support TTL
type TTLCache struct {
cache *LRUCache
expiries map[string]time.Time
mu sync.Mutex
ttlWorker *time.Ticker
}

// NewTTLCache creates a TTL-enabled cache
func NewTTLCache(capacity int, cleanupInterval time.Duration) *TTLCache {
c := &TTLCache{
cache: NewLRUCache(capacity),
expiries: make(map[string]time.Time),
ttlWorker: time.NewTicker(cleanupInterval),
}

go c.cleanup()
return c
}

// Put adds an item with TTL
func (c *TTLCache) Put(key string, value []byte, ttl time.Duration) {
c.mu.Lock()
defer c.mu.Unlock()

expiry := time.Now().Add(ttl)
c.cache.Put(key, value)
c.expiries[key] = expiry
}

// Get retrieves an item, checking TTL
func (c *TTLCache) Get(key string) ([]byte, bool) {
c.mu.Lock()
defer c.mu.Unlock()

if expiry, exists := c.expiries[key]; exists {
if time.Now().After(expiry) {
// Item expired, remove it
c.cache.Remove(key)
delete(c.expiries, key)
return nil, false
}
}
return c.cache.Get(key)
}

// Cleanup removes expired items
func (c *TTLCache) cleanup() {
for range c.ttlWorker.C {
c.mu.Lock()
for key, expiry := range c.expiries {
if time.Now().After(expiry) {
c.cache.Remove(key)
delete(c.expiries, key)
}
}
c.mu.Unlock()
}
}