diff --git a/core/services/workflows/artifacts/v2/file_module_store_test.go b/core/services/workflows/artifacts/v2/file_module_store_test.go index 1998864372c..c5c97b6cdf7 100644 --- a/core/services/workflows/artifacts/v2/file_module_store_test.go +++ b/core/services/workflows/artifacts/v2/file_module_store_test.go @@ -43,6 +43,37 @@ func TestStore_Overwrite(t *testing.T) { assert.Equal(t, []byte("new"), got) } +func TestStore_emptyWorkflowID(t *testing.T) { + t.Parallel() + s, err := NewFileModuleStore(t.TempDir(), false) + require.NoError(t, err) + + binary := []byte("data") + require.NoError(t, s.StoreModule("", binary, "v1")) + + p, ver, ok, err := s.GetModule("") + require.NoError(t, err) + require.True(t, ok) + assert.Equal(t, "v1", ver) + got, err := os.ReadFile(p) + require.NoError(t, err) + assert.Equal(t, binary, got) +} + +func TestStore_nilBinary(t *testing.T) { + t.Parallel() + s, err := NewFileModuleStore(t.TempDir(), false) + require.NoError(t, err) + + require.NoError(t, s.StoreModule("wf-nil", nil, "v1")) + p, _, ok, err := s.GetModule("wf-nil") + require.NoError(t, err) + require.True(t, ok) + got, err := os.ReadFile(p) + require.NoError(t, err) + assert.Empty(t, got) +} + func TestStore_MissingModule(t *testing.T) { s, err := NewFileModuleStore(t.TempDir(), false) require.NoError(t, err) diff --git a/core/services/workflows/syncer/v2/evictable_module_bench_test.go b/core/services/workflows/syncer/v2/evictable_module_bench_test.go new file mode 100644 index 00000000000..3f40e4a7d12 --- /dev/null +++ b/core/services/workflows/syncer/v2/evictable_module_bench_test.go @@ -0,0 +1,85 @@ +package v2 + +import ( + "fmt" + "testing" + "time" + + "github.com/jonboulle/clockwork" + + "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host" +) + +func benchLoadedModule(wfID string) *EvictableModule { + em := NewEvictableModule(&fakeModule{}, &host.ModuleConfig{}, nil, wfID, "", nil, nil, 1024) + em.started.Store(true) + return em +} + +func BenchmarkModuleLRU_Register(b *testing.B) { + clock := clockwork.NewFakeClock() + lru := NewModuleLRU(clock, WithIdleTimeout(time.Hour)) + em := benchLoadedModule("wf-bench") + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + wfID := fmt.Sprintf("wf-%d", i) + lru.Register(wfID, em) + } +} + +func BenchmarkModuleLRU_Contains(b *testing.B) { + clock := clockwork.NewFakeClock() + lru := NewModuleLRU(clock, WithIdleTimeout(time.Hour)) + const n = 256 + for i := 0; i < n; i++ { + wfID := fmt.Sprintf("wf-%d", i) + lru.Register(wfID, benchLoadedModule(wfID)) + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + lru.Contains(fmt.Sprintf("wf-%d", i%n)) + } +} + +// BenchmarkModuleLRU_reap_cap measures cap enforcement cost (sort.Slice over loaded entries). +// Complexity is O(n log n) which is acceptable for our current scale (hundreds to low thousands of modules) and 30s reap interval. +func BenchmarkModuleLRU_reap_cap(b *testing.B) { + sizes := []int{8, 64, 256} + for _, n := range sizes { + b.Run(fmt.Sprintf("loaded=%d", n), func(b *testing.B) { + clock := clockwork.NewFakeClock() + reap := make(chan time.Time, 1) + done := make(chan struct{}, 1) + capLimit := n / 2 + if capLimit < 1 { + capLimit = 1 + } + lru := NewModuleLRU(clock, + WithMaxLoadedModules(capLimit), + WithIdleTimeout(time.Hour), + WithReapTicker(reap), + WithOnReaped(done), + ) + lru.Start() + defer lru.Close() + + for j := 0; j < n; j++ { + wfID := fmt.Sprintf("wf-%d", j) + em := benchLoadedModule(wfID) + em.lastUsed.Store(clock.Now().Add(-time.Duration(j) * time.Second).UnixNano()) + lru.Register(wfID, em) + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + reap <- clock.Now() + <-done + } + }) + } +} diff --git a/core/services/workflows/syncer/v2/evictable_module_test.go b/core/services/workflows/syncer/v2/evictable_module_test.go index deafe36d29c..a150b84f131 100644 --- a/core/services/workflows/syncer/v2/evictable_module_test.go +++ b/core/services/workflows/syncer/v2/evictable_module_test.go @@ -684,6 +684,85 @@ func newLRUModule(t *testing.T, store artifacts.SerialisedModuleStore, wfID stri return em } +func newTestLRU(t *testing.T, maxLoaded int) (*clockwork.FakeClock, *ModuleLRU, chan time.Time, chan struct{}) { + t.Helper() + clock := clockwork.NewFakeClock() + reap := make(chan time.Time, 4) + done := make(chan struct{}, 4) + lru := NewModuleLRU(clock, + WithMaxLoadedModules(maxLoaded), + WithIdleTimeout(time.Hour), + WithReapTicker(reap), + WithOnReaped(done), + ) + lru.Start() + t.Cleanup(lru.Close) + return clock, lru, reap, done +} + +func triggerLRUReap(t *testing.T, clock *clockwork.FakeClock, reap chan time.Time, done chan struct{}) { + t.Helper() + reap <- clock.Now() + <-done +} + +func TestLRU_AtCapacity_noEvictionUntilOver(t *testing.T) { + t.Parallel() + clock, lru, reap, done := newTestLRU(t, 2) + + store, err := artifacts.NewFileModuleStore(t.TempDir(), false) + require.NoError(t, err) + + m1 := newLRUModule(t, store, "wf-1") + m1.lastUsed.Store(clock.Now().Add(-3 * time.Minute).UnixNano()) + m2 := newLRUModule(t, store, "wf-2") + m2.lastUsed.Store(clock.Now().Add(-2 * time.Minute).UnixNano()) + + lru.Register("wf-1", m1) + lru.Register("wf-2", m2) + triggerLRUReap(t, clock, reap, done) + + assert.True(t, m1.IsLoaded(), "at capacity: oldest should not be evicted yet") + assert.True(t, m2.IsLoaded(), "at capacity: newest should remain loaded") + + m3 := newLRUModule(t, store, "wf-3") + m3.lastUsed.Store(clock.Now().Add(-1 * time.Minute).UnixNano()) + lru.Register("wf-3", m3) + triggerLRUReap(t, clock, reap, done) + + assert.False(t, m1.IsLoaded(), "over capacity: least recently used should be evicted") + assert.True(t, m2.IsLoaded()) + assert.True(t, m3.IsLoaded()) +} + +func TestLRU_RecencyBump_changesEvictionVictim(t *testing.T) { + t.Parallel() + clock, lru, reap, done := newTestLRU(t, 2) + + store, err := artifacts.NewFileModuleStore(t.TempDir(), false) + require.NoError(t, err) + + mA := newLRUModule(t, store, "wf-A") + mA.lastUsed.Store(clock.Now().Add(-10 * time.Minute).UnixNano()) + mB := newLRUModule(t, store, "wf-B") + mB.lastUsed.Store(clock.Now().Add(-1 * time.Minute).UnixNano()) + + lru.Register("wf-A", mA) + lru.Register("wf-B", mB) + + _, err = mA.Execute(context.Background(), &sdkpb.ExecuteRequest{}, nil) + require.NoError(t, err) + + mC := newLRUModule(t, store, "wf-C") + mC.lastUsed.Store(clock.Now().UnixNano()) + lru.Register("wf-C", mC) + triggerLRUReap(t, clock, reap, done) + + assert.True(t, mA.IsLoaded(), "wf-A was touched recently and should survive") + assert.False(t, mB.IsLoaded(), "wf-B should be evicted after wf-A recency bump") + assert.True(t, mC.IsLoaded()) +} + func TestLRU_EvictsIdleModule(t *testing.T) { clock := clockwork.NewFakeClock() reapTicker := make(chan time.Time, 1) @@ -902,6 +981,161 @@ func TestLRU_EvictionOrder(t *testing.T) { } } +func TestLRU_MaxLoaded_zero_disablesCapEnforcement(t *testing.T) { + t.Parallel() + clock, lru, reap, done := newTestLRU(t, 0) + + store, err := artifacts.NewFileModuleStore(t.TempDir(), false) + require.NoError(t, err) + + modules := make([]*EvictableModule, 3) + for i := 0; i < 3; i++ { + wfID := string(rune('A' + i)) + modules[i] = newLRUModule(t, store, wfID) + modules[i].lastUsed.Store(clock.Now().Add(-time.Duration(i+1) * time.Minute).UnixNano()) + lru.Register(wfID, modules[i]) + } + triggerLRUReap(t, clock, reap, done) + + for i, m := range modules { + assert.True(t, m.IsLoaded(), "module %d: maxLoaded=0 must not enforce a loaded cap", i) + } +} + +func TestLRU_Register_duplicateWorkflowID_replaces(t *testing.T) { + t.Parallel() + _, lru, _, _ := newTestLRU(t, 10) + + store, err := artifacts.NewFileModuleStore(t.TempDir(), false) + require.NoError(t, err) + + em1 := newLRUModule(t, store, "wf-1") + em2 := newLRUModule(t, store, "wf-1") + lru.Register("wf-1", em1) + lru.Register("wf-1", em2) + + lru.mu.Lock() + got := lru.modules["wf-1"] + lru.mu.Unlock() + require.Equal(t, em2, got) + assert.True(t, lru.Contains("wf-1")) +} + +func TestLRU_ConcurrentReapAndRegister(t *testing.T) { + t.Parallel() + clock := clockwork.NewFakeClock() + reap := make(chan time.Time, 64) + done := make(chan struct{}, 64) + lru := NewModuleLRU(clock, + WithMaxLoadedModules(5), + WithIdleTimeout(time.Hour), + WithReapTicker(reap), + WithOnReaped(done), + ) + lru.Start() + defer lru.Close() + + store, err := artifacts.NewFileModuleStore(t.TempDir(), false) + require.NoError(t, err) + + const workers = 20 + type entry struct { + wfID string + em *EvictableModule + } + entries := make([]entry, workers) + for i := 0; i < workers; i++ { + wfID := string(rune('A' + i)) + entries[i] = entry{wfID: wfID, em: newLRUModule(t, store, wfID)} + } + + var wg sync.WaitGroup + for i := range entries { + wg.Add(1) + go func(idx int) { + defer wg.Done() + e := entries[idx] + lru.Register(e.wfID, e.em) + if idx%3 == 0 { + reap <- clock.Now() + <-done + } + lru.Contains(e.wfID) + }(i) + } + wg.Wait() + + lru.mu.Lock() + n := len(lru.modules) + lru.mu.Unlock() + assert.Positive(t, n) +} + +func TestEvictable_Execute_L1_hit(t *testing.T) { + t.Parallel() + inner := modulemocks.NewModuleV2(t) + inner.EXPECT().Start() + inner.EXPECT().Execute(mock.Anything, mock.Anything, mock.Anything).Return(&sdkpb.ExecutionResult{}, nil) + inner.EXPECT().Close() + + store, err := artifacts.NewFileModuleStore(t.TempDir(), false) + require.NoError(t, err) + require.NoError(t, store.StoreModule("wf-test", []byte("fake-binary"), "")) + cs := &countingStore{SerialisedModuleStore: store} + + em := NewEvictableModule(inner, &host.ModuleConfig{}, cs, "wf-test", "", nil, nil, int64(len("fake-binary"))) + em.Start() + t.Cleanup(em.Close) + + _, err = em.Execute(context.Background(), &sdkpb.ExecuteRequest{}, nil) + require.NoError(t, err) + assert.True(t, em.IsLoaded()) + assert.Equal(t, int32(0), cs.getModuleCalls.Load(), "L1 hit must not read from disk") +} + +func TestEvictable_Evict_then_reloadWithoutDisk(t *testing.T) { + t.Parallel() + inner := modulemocks.NewModuleV2(t) + inner.EXPECT().Execute(mock.Anything, mock.Anything, mock.Anything).Return(&sdkpb.ExecutionResult{}, nil).Once() + inner.EXPECT().Close() + + factory := func(_ context.Context, _ *host.ModuleConfig, _ []byte, _ ...func(*host.ModuleConfig)) (host.ModuleV2, error) { + t.Fatal("factory must not run when weak L2 resurrects after Evict") + return nil, nil + } + + store, err := artifacts.NewFileModuleStore(t.TempDir(), false) + require.NoError(t, err) + require.NoError(t, store.StoreModule("wf-test", []byte("fake-binary"), "")) + cs := &countingStore{SerialisedModuleStore: store} + + em := NewEvictableModule(inner, &host.ModuleConfig{}, cs, "wf-test", "", factory, nil, int64(len("fake-binary"))) + em.started.Store(true) + t.Cleanup(em.Close) + + em.Evict() + assert.False(t, em.IsLoaded()) + + _, err = em.Execute(context.Background(), &sdkpb.ExecuteRequest{}, nil) + require.NoError(t, err) + assert.True(t, em.IsLoaded()) + assert.Equal(t, int32(0), cs.getModuleCalls.Load(), "weak L2 reload must not touch disk") +} + +func TestEvictable_emptyWorkflowID_diskMiss(t *testing.T) { + t.Parallel() + store, err := artifacts.NewFileModuleStore(t.TempDir(), false) + require.NoError(t, err) + + em := NewEvictableModule(nil, &host.ModuleConfig{}, store, "", "", nil, nil, 0) + em.started.Store(true) + + _, err = em.Execute(context.Background(), &sdkpb.ExecuteRequest{}, nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "no cached binary") + assert.False(t, em.IsLoaded()) +} + // --- Weak reference (L2 cache) tests --- // TestEvictable_WeakRefHitAfterEvict verifies that Evict drops only the strong