Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/cache/attestationbundle/attestationbundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

const (
ttl = 5 * 24 * time.Hour
maxBytes = 100 * 1024 * 1024 // 100 MB
bucket = "chainloop-attestation-bundles"
description = "Cache for attestation bundles"
)
Expand All @@ -40,6 +41,7 @@ type Cache struct {
func New(ctx context.Context, rc *natsconn.ReloadableConnection, logger log.Logger) (*Cache, error) {
opts := []cache.Option{
cache.WithTTL(ttl),
cache.WithMaxBytes(maxBytes),
cache.WithDescription(description),
}

Expand Down
16 changes: 7 additions & 9 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,9 @@ type Logger interface {
Errorw(keyvals ...any)
}

// defaultMaxSize is a sensible upper bound on in-memory cache entries
// to prevent unbounded growth. 0 means no LRU eviction (TTL-only).
const defaultMaxSize = 1000

type config struct {
ttl time.Duration
maxSize int
maxBytes int64
logger Logger
natsConn *nats.Conn
bucketName string
Expand Down Expand Up @@ -75,6 +71,12 @@ func WithNATS(conn *nats.Conn, bucketName string) Option {
}
}

// WithMaxBytes sets the maximum total size (in bytes) for the NATS KV bucket.
// When the limit is reached, NATS discards the oldest entries. Ignored for in-memory backend.
func WithMaxBytes(n int64) Option {
return func(c *config) { c.maxBytes = n }
}

// WithDescription sets the NATS KV bucket description. Ignored for in-memory backend.
func WithDescription(desc string) Option {
return func(c *config) { c.description = desc }
Expand Down Expand Up @@ -108,10 +110,6 @@ func New[T any](opts ...Option) (Cache[T], error) {
return newNATSKV[T](cfg)
}

if cfg.maxSize == 0 {
cfg.maxSize = defaultMaxSize
}

return newMemoryCache[T](cfg), nil
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/cache/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,19 @@ import (
"github.com/hashicorp/golang-lru/v2/expirable"
)

// defaultMaxSize is a sensible upper bound on in-memory cache entries
// to prevent unbounded growth. 0 means no LRU eviction (TTL-only).
const defaultMaxSize = 1000

type memoryCache[T any] struct {
lru *expirable.LRU[string, T]
logger Logger
}

func newMemoryCache[T any](cfg *config) *memoryCache[T] {
cfg.logger.Infow("msg", "cache: using in-memory LRU backend", "ttl", cfg.ttl, "maxSize", cfg.maxSize)
cfg.logger.Infow("msg", "cache: using in-memory LRU backend", "ttl", cfg.ttl, "maxSize", defaultMaxSize)
return &memoryCache[T]{
lru: expirable.NewLRU[string, T](cfg.maxSize, nil, cfg.ttl),
lru: expirable.NewLRU[string, T](defaultMaxSize, nil, cfg.ttl),
logger: cfg.logger,
}
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/cache/natskv.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"sync"

"github.com/nats-io/nats.go"
Expand Down Expand Up @@ -65,12 +66,29 @@ func (c *natsKVCache[T]) initBucket() error {
Bucket: c.bucket,
Description: c.cfg.description,
TTL: c.cfg.ttl,
MaxBytes: c.cfg.maxBytes,
Storage: jetstream.MemoryStorage,
})
if err != nil {
return err
}

// NATS KV hardcodes DiscardNew on the backing stream, which rejects writes
// when MaxBytes is reached. For cache use-cases we want DiscardOld so that
// the oldest entries are evicted automatically to make room for new ones.
if c.cfg.maxBytes > 0 {
streamName := fmt.Sprintf("KV_%s", c.bucket)
stream, err := js.Stream(context.Background(), streamName)
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai bot Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Cache init now hard-fails on backing stream info/update errors when MaxBytes is enabled, introducing a new startup compatibility/availability regression.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At pkg/cache/natskv.go, line 81:

<comment>Cache init now hard-fails on backing stream info/update errors when MaxBytes is enabled, introducing a new startup compatibility/availability regression.</comment>

<file context>
@@ -72,6 +73,22 @@ func (c *natsKVCache[T]) initBucket() error {
+	// the oldest entries are evicted automatically to make room for new ones.
+	if c.cfg.maxBytes > 0 {
+		streamName := fmt.Sprintf("KV_%s", c.bucket)
+		stream, err := js.Stream(context.Background(), streamName)
+		if err != nil {
+			return fmt.Errorf("cache: failed to get backing stream %s: %w", streamName, err)
</file context>
Fix with Cubic

if err != nil {
return fmt.Errorf("cache: failed to get backing stream %s: %w", streamName, err)
}
cfg := stream.CachedInfo().Config
cfg.Discard = jetstream.DiscardOld
if _, err := js.UpdateStream(context.Background(), cfg); err != nil {
return fmt.Errorf("cache: failed to set DiscardOld on stream %s: %w", streamName, err)
}
}

c.mu.Lock()
c.kv = kv
c.mu.Unlock()
Expand Down
34 changes: 34 additions & 0 deletions pkg/cache/natskv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,40 @@ func TestNATSKV_NilKVGracefulDegradation(t *testing.T) {
require.NoError(t, nkv.Purge(ctx))
}

func TestNATSKV_MaxBytesEvictsOldEntries(t *testing.T) {
nc := startEmbeddedNATS(t)

// With MaxBytes set, the backing stream is updated to DiscardOld so that
// the oldest entries are evicted when the bucket is full.
const maxBytes int64 = 10 * 1024
c, err := New[[]byte](
WithTTL(time.Minute),
WithNATS(nc, "test-maxbytes"),
WithMaxBytes(maxBytes),
)
require.NoError(t, err)

ctx := context.Background()
payload := make([]byte, 1024)

// Write 20 entries, well beyond the 10 KiB limit
for i := range 20 {
key := "key-" + strings.Repeat("x", i)
require.NoError(t, c.Set(ctx, key, payload), "Set should not fail even when bucket is full")
}

// The latest entry should still be retrievable
lastKey := "key-" + strings.Repeat("x", 19)
_, ok, err := c.Get(ctx, lastKey)
require.NoError(t, err)
assert.True(t, ok, "most recent entry should still be in the cache")

// The earliest entries should have been evicted
_, ok, err = c.Get(ctx, "key-")
require.NoError(t, err)
assert.False(t, ok, "oldest entry should have been evicted")
}

func TestNew_WithNATSReturnsNATSBackend(t *testing.T) {
nc := startEmbeddedNATS(t)
c, err := New[string](
Expand Down
2 changes: 2 additions & 0 deletions pkg/cache/policyevalbundle/policyevalbundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

const (
ttl = 24 * time.Hour
maxBytes = 100 * 1024 * 1024 // 100 MB
bucket = "chainloop-policy-eval-bundles"
description = "Cache for policy evaluation bundles from CAS"
)
Expand All @@ -40,6 +41,7 @@ type Cache struct {
func New(ctx context.Context, rc *natsconn.ReloadableConnection, logger log.Logger) (*Cache, error) {
opts := []cache.Option{
cache.WithTTL(ttl),
cache.WithMaxBytes(maxBytes),
cache.WithDescription(description),
}

Expand Down
Loading