Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions core/services/workflows/artifacts/v2/file_module_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,37 @@ func TestStore_Overwrite(t *testing.T) {
assert.Equal(t, []byte("new"), got)
}

func TestStore_emptyWorkflowID(t *testing.T) {
t.Parallel()
s, err := NewFileModuleStore(t.TempDir(), false)
require.NoError(t, err)

binary := []byte("data")
require.NoError(t, s.StoreModule("", binary, "v1"))

p, ver, ok, err := s.GetModule("")
require.NoError(t, err)
require.True(t, ok)
assert.Equal(t, "v1", ver)
got, err := os.ReadFile(p)
require.NoError(t, err)
assert.Equal(t, binary, got)
}

func TestStore_nilBinary(t *testing.T) {
t.Parallel()
s, err := NewFileModuleStore(t.TempDir(), false)
require.NoError(t, err)

require.NoError(t, s.StoreModule("wf-nil", nil, "v1"))
p, _, ok, err := s.GetModule("wf-nil")
require.NoError(t, err)
require.True(t, ok)
got, err := os.ReadFile(p)
require.NoError(t, err)
assert.Empty(t, got)
}

func TestStore_MissingModule(t *testing.T) {
s, err := NewFileModuleStore(t.TempDir(), false)
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package v2

import (
"fmt"
"testing"
"time"

"github.com/jonboulle/clockwork"

"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"
)

func benchLoadedModule(wfID string) *EvictableModule {
em := NewEvictableModule(&fakeModule{}, &host.ModuleConfig{}, nil, wfID, "", nil, nil, 1024)
em.started.Store(true)
return em
}

func BenchmarkModuleLRU_Register(b *testing.B) {
clock := clockwork.NewFakeClock()
lru := NewModuleLRU(clock, WithIdleTimeout(time.Hour))
em := benchLoadedModule("wf-bench")

b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
wfID := fmt.Sprintf("wf-%d", i)
lru.Register(wfID, em)
}
}

func BenchmarkModuleLRU_Contains(b *testing.B) {
clock := clockwork.NewFakeClock()
lru := NewModuleLRU(clock, WithIdleTimeout(time.Hour))
const n = 256
for i := 0; i < n; i++ {
wfID := fmt.Sprintf("wf-%d", i)
lru.Register(wfID, benchLoadedModule(wfID))
}

b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
lru.Contains(fmt.Sprintf("wf-%d", i%n))
}
}

// BenchmarkModuleLRU_reap_cap measures cap enforcement cost (sort.Slice over loaded entries).
// Complexity is O(n log n) which is acceptable for our current scale (hundreds to low thousands of modules) and 30s reap interval.
func BenchmarkModuleLRU_reap_cap(b *testing.B) {
sizes := []int{8, 64, 256}
for _, n := range sizes {
b.Run(fmt.Sprintf("loaded=%d", n), func(b *testing.B) {
clock := clockwork.NewFakeClock()
reap := make(chan time.Time, 1)
done := make(chan struct{}, 1)
capLimit := n / 2
if capLimit < 1 {
capLimit = 1
}
lru := NewModuleLRU(clock,
WithMaxLoadedModules(capLimit),
WithIdleTimeout(time.Hour),
WithReapTicker(reap),
WithOnReaped(done),
)
lru.Start()
defer lru.Close()

for j := 0; j < n; j++ {
wfID := fmt.Sprintf("wf-%d", j)
em := benchLoadedModule(wfID)
em.lastUsed.Store(clock.Now().Add(-time.Duration(j) * time.Second).UnixNano())
lru.Register(wfID, em)
}

b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
reap <- clock.Now()
<-done
}
})
}
}
234 changes: 234 additions & 0 deletions core/services/workflows/syncer/v2/evictable_module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,85 @@ func newLRUModule(t *testing.T, store artifacts.SerialisedModuleStore, wfID stri
return em
}

func newTestLRU(t *testing.T, maxLoaded int) (*clockwork.FakeClock, *ModuleLRU, chan time.Time, chan struct{}) {
t.Helper()
clock := clockwork.NewFakeClock()
reap := make(chan time.Time, 4)
done := make(chan struct{}, 4)
lru := NewModuleLRU(clock,
WithMaxLoadedModules(maxLoaded),
WithIdleTimeout(time.Hour),
WithReapTicker(reap),
WithOnReaped(done),
)
lru.Start()
t.Cleanup(lru.Close)
return clock, lru, reap, done
}

func triggerLRUReap(t *testing.T, clock *clockwork.FakeClock, reap chan time.Time, done chan struct{}) {
t.Helper()
reap <- clock.Now()
<-done
}

func TestLRU_AtCapacity_noEvictionUntilOver(t *testing.T) {
t.Parallel()
clock, lru, reap, done := newTestLRU(t, 2)

store, err := artifacts.NewFileModuleStore(t.TempDir(), false)
require.NoError(t, err)

m1 := newLRUModule(t, store, "wf-1")
m1.lastUsed.Store(clock.Now().Add(-3 * time.Minute).UnixNano())
m2 := newLRUModule(t, store, "wf-2")
m2.lastUsed.Store(clock.Now().Add(-2 * time.Minute).UnixNano())

lru.Register("wf-1", m1)
lru.Register("wf-2", m2)
triggerLRUReap(t, clock, reap, done)

assert.True(t, m1.IsLoaded(), "at capacity: oldest should not be evicted yet")
assert.True(t, m2.IsLoaded(), "at capacity: newest should remain loaded")

m3 := newLRUModule(t, store, "wf-3")
m3.lastUsed.Store(clock.Now().Add(-1 * time.Minute).UnixNano())
lru.Register("wf-3", m3)
triggerLRUReap(t, clock, reap, done)

assert.False(t, m1.IsLoaded(), "over capacity: least recently used should be evicted")
assert.True(t, m2.IsLoaded())
assert.True(t, m3.IsLoaded())
}

func TestLRU_RecencyBump_changesEvictionVictim(t *testing.T) {
t.Parallel()
clock, lru, reap, done := newTestLRU(t, 2)

store, err := artifacts.NewFileModuleStore(t.TempDir(), false)
require.NoError(t, err)

mA := newLRUModule(t, store, "wf-A")
mA.lastUsed.Store(clock.Now().Add(-10 * time.Minute).UnixNano())
mB := newLRUModule(t, store, "wf-B")
mB.lastUsed.Store(clock.Now().Add(-1 * time.Minute).UnixNano())

lru.Register("wf-A", mA)
lru.Register("wf-B", mB)

_, err = mA.Execute(context.Background(), &sdkpb.ExecuteRequest{}, nil)
require.NoError(t, err)

mC := newLRUModule(t, store, "wf-C")
mC.lastUsed.Store(clock.Now().UnixNano())
lru.Register("wf-C", mC)
triggerLRUReap(t, clock, reap, done)

assert.True(t, mA.IsLoaded(), "wf-A was touched recently and should survive")
assert.False(t, mB.IsLoaded(), "wf-B should be evicted after wf-A recency bump")
assert.True(t, mC.IsLoaded())
}

func TestLRU_EvictsIdleModule(t *testing.T) {
clock := clockwork.NewFakeClock()
reapTicker := make(chan time.Time, 1)
Expand Down Expand Up @@ -902,6 +981,161 @@ func TestLRU_EvictionOrder(t *testing.T) {
}
}

func TestLRU_MaxLoaded_zero_disablesCapEnforcement(t *testing.T) {
t.Parallel()
clock, lru, reap, done := newTestLRU(t, 0)

store, err := artifacts.NewFileModuleStore(t.TempDir(), false)
require.NoError(t, err)

modules := make([]*EvictableModule, 3)
for i := 0; i < 3; i++ {
wfID := string(rune('A' + i))
modules[i] = newLRUModule(t, store, wfID)
modules[i].lastUsed.Store(clock.Now().Add(-time.Duration(i+1) * time.Minute).UnixNano())
lru.Register(wfID, modules[i])
}
triggerLRUReap(t, clock, reap, done)

for i, m := range modules {
assert.True(t, m.IsLoaded(), "module %d: maxLoaded=0 must not enforce a loaded cap", i)
}
}

func TestLRU_Register_duplicateWorkflowID_replaces(t *testing.T) {
t.Parallel()
_, lru, _, _ := newTestLRU(t, 10)

store, err := artifacts.NewFileModuleStore(t.TempDir(), false)
require.NoError(t, err)

em1 := newLRUModule(t, store, "wf-1")
em2 := newLRUModule(t, store, "wf-1")
lru.Register("wf-1", em1)
lru.Register("wf-1", em2)

lru.mu.Lock()
got := lru.modules["wf-1"]
lru.mu.Unlock()
require.Equal(t, em2, got)
assert.True(t, lru.Contains("wf-1"))
}

func TestLRU_ConcurrentReapAndRegister(t *testing.T) {
t.Parallel()
clock := clockwork.NewFakeClock()
reap := make(chan time.Time, 64)
done := make(chan struct{}, 64)
lru := NewModuleLRU(clock,
WithMaxLoadedModules(5),
WithIdleTimeout(time.Hour),
WithReapTicker(reap),
WithOnReaped(done),
)
lru.Start()
defer lru.Close()

store, err := artifacts.NewFileModuleStore(t.TempDir(), false)
require.NoError(t, err)

const workers = 20
type entry struct {
wfID string
em *EvictableModule
}
entries := make([]entry, workers)
for i := 0; i < workers; i++ {
wfID := string(rune('A' + i))
entries[i] = entry{wfID: wfID, em: newLRUModule(t, store, wfID)}
}

var wg sync.WaitGroup
for i := range entries {
wg.Add(1)
go func(idx int) {
defer wg.Done()
e := entries[idx]
lru.Register(e.wfID, e.em)
if idx%3 == 0 {
reap <- clock.Now()
<-done
}
lru.Contains(e.wfID)
}(i)
}
wg.Wait()

lru.mu.Lock()
n := len(lru.modules)
lru.mu.Unlock()
assert.Positive(t, n)
}

func TestEvictable_Execute_L1_hit(t *testing.T) {
t.Parallel()
inner := modulemocks.NewModuleV2(t)
inner.EXPECT().Start()
inner.EXPECT().Execute(mock.Anything, mock.Anything, mock.Anything).Return(&sdkpb.ExecutionResult{}, nil)
inner.EXPECT().Close()

store, err := artifacts.NewFileModuleStore(t.TempDir(), false)
require.NoError(t, err)
require.NoError(t, store.StoreModule("wf-test", []byte("fake-binary"), ""))
cs := &countingStore{SerialisedModuleStore: store}

em := NewEvictableModule(inner, &host.ModuleConfig{}, cs, "wf-test", "", nil, nil, int64(len("fake-binary")))
em.Start()
t.Cleanup(em.Close)

_, err = em.Execute(context.Background(), &sdkpb.ExecuteRequest{}, nil)
require.NoError(t, err)
assert.True(t, em.IsLoaded())
assert.Equal(t, int32(0), cs.getModuleCalls.Load(), "L1 hit must not read from disk")
}

func TestEvictable_Evict_then_reloadWithoutDisk(t *testing.T) {
t.Parallel()
inner := modulemocks.NewModuleV2(t)
inner.EXPECT().Execute(mock.Anything, mock.Anything, mock.Anything).Return(&sdkpb.ExecutionResult{}, nil).Once()
inner.EXPECT().Close()

factory := func(_ context.Context, _ *host.ModuleConfig, _ []byte, _ ...func(*host.ModuleConfig)) (host.ModuleV2, error) {
t.Fatal("factory must not run when weak L2 resurrects after Evict")
return nil, nil
}

store, err := artifacts.NewFileModuleStore(t.TempDir(), false)
require.NoError(t, err)
require.NoError(t, store.StoreModule("wf-test", []byte("fake-binary"), ""))
cs := &countingStore{SerialisedModuleStore: store}

em := NewEvictableModule(inner, &host.ModuleConfig{}, cs, "wf-test", "", factory, nil, int64(len("fake-binary")))
em.started.Store(true)
t.Cleanup(em.Close)

em.Evict()
assert.False(t, em.IsLoaded())

_, err = em.Execute(context.Background(), &sdkpb.ExecuteRequest{}, nil)
require.NoError(t, err)
assert.True(t, em.IsLoaded())
assert.Equal(t, int32(0), cs.getModuleCalls.Load(), "weak L2 reload must not touch disk")
}

func TestEvictable_emptyWorkflowID_diskMiss(t *testing.T) {
t.Parallel()
store, err := artifacts.NewFileModuleStore(t.TempDir(), false)
require.NoError(t, err)

em := NewEvictableModule(nil, &host.ModuleConfig{}, store, "", "", nil, nil, 0)
em.started.Store(true)

_, err = em.Execute(context.Background(), &sdkpb.ExecuteRequest{}, nil)
require.Error(t, err)
assert.Contains(t, err.Error(), "no cached binary")
assert.False(t, em.IsLoaded())
}

// --- Weak reference (L2 cache) tests ---

// TestEvictable_WeakRefHitAfterEvict verifies that Evict drops only the strong
Expand Down
Loading