diff --git a/oak-benchmarks/pom.xml b/oak-benchmarks/pom.xml index c6088f047a3..0c2b51ce9b6 100644 --- a/oak-benchmarks/pom.xml +++ b/oak-benchmarks/pom.xml @@ -107,6 +107,16 @@ oak-core-spi ${project.version} + + org.apache.jackrabbit + oak-shaded-guava + ${project.version} + + + org.apache.jackrabbit + oak-segment-tar + ${project.version} + org.apache.jackrabbit oak-commons diff --git a/oak-benchmarks/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java b/oak-benchmarks/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java index f7de6c3b6ad..4c45634c44b 100644 --- a/oak-benchmarks/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java +++ b/oak-benchmarks/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java @@ -469,6 +469,9 @@ public static void main(String[] args) throws Exception { benchmarkOptions.getDynamicMembership().value(options), benchmarkOptions.getAutoMembership().values(options)), new BundlingNodeTest(), new PersistentCacheTest(statsProvider), + new SegmentCachePolicyBenchmark(), + new SegmentCacheMemoizationBenchmark(), + new SegmentCacheTarBenchmark(), new StringWriteTest(), new BasicWriteTest(), new CanReadNonExisting(), diff --git a/oak-benchmarks/src/main/java/org/apache/jackrabbit/oak/benchmark/GuavaSegmentCache.java b/oak-benchmarks/src/main/java/org/apache/jackrabbit/oak/benchmark/GuavaSegmentCache.java new file mode 100644 index 00000000000..32c80caf3e1 --- /dev/null +++ b/oak-benchmarks/src/main/java/org/apache/jackrabbit/oak/benchmark/GuavaSegmentCache.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.benchmark; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; + +import org.apache.jackrabbit.guava.common.cache.CacheBuilder; +import org.apache.jackrabbit.guava.common.cache.CacheStats; +import org.apache.jackrabbit.guava.common.cache.RemovalNotification; +import org.apache.jackrabbit.guava.common.util.concurrent.UncheckedExecutionException; +import org.apache.jackrabbit.oak.cache.AbstractCacheStats; +import org.apache.jackrabbit.oak.segment.CacheWeights; +import org.apache.jackrabbit.oak.segment.Segment; +import org.apache.jackrabbit.oak.segment.SegmentCache; +import org.apache.jackrabbit.oak.segment.SegmentId; +import org.jetbrains.annotations.NotNull; + +/** + * A {@link SegmentCache} backed by a Guava LRU cache, used in benchmark classes + * to compare eviction policies against the default Caffeine W-TinyLFU implementation. + * All Guava-specific code lives here; the production {@link SegmentCache} class stays clean. + */ +class GuavaSegmentCache extends SegmentCache { + + private static final String NAME = "Segment Cache (Guava)"; + + private final org.apache.jackrabbit.guava.common.cache.Cache cache; + private final Stats stats; + + GuavaSegmentCache(long cacheSizeMb) { + long maximumWeight = cacheSizeMb * 1024L * 1024L; + // Build cache first so cache::size can be passed to Stats; the removal listener + // references this.stats which is assigned below — safe because evictions only + // fire after construction is complete (same pattern as production NonEmptyCache). + this.cache = CacheBuilder.newBuilder() + .maximumWeight(maximumWeight) + .weigher((SegmentId id, Segment seg) -> CacheWeights.segmentWeight(seg)) + .removalListener(this::onRemove) + .build(); + this.stats = new Stats(NAME, maximumWeight, cache::size); + } + + private void onRemove(RemovalNotification n) { + stats.evictionCount.incrementAndGet(); + if (n.getValue() != null) { + stats.currentWeight.addAndGet(-CacheWeights.segmentWeight(n.getValue())); + } + n.getKey().unloaded(); + } + + @Override + @NotNull + public Segment getSegment(@NotNull SegmentId id, @NotNull Callable loader) + throws ExecutionException { + if (id.isDataSegmentId()) { + try { + return cache.get(id, () -> { + long t0 = System.nanoTime(); + try { + Segment segment = loader.call(); + stats.loadSuccessCount.incrementAndGet(); + stats.loadTime.addAndGet(System.nanoTime() - t0); + stats.missCount.incrementAndGet(); + stats.currentWeight.addAndGet(CacheWeights.segmentWeight(segment)); + id.loaded(segment); + return segment; + } catch (Exception e) { + stats.loadExceptionCount.incrementAndGet(); + if (e instanceof RuntimeException re) throw re; + throw new LoaderException(e); + } + }); + } catch (UncheckedExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof LoaderException le) { + throw new ExecutionException(le.getCause()); + } + if (cause instanceof RuntimeException re) throw re; + throw e; + } catch (ExecutionException e) { + throw new ExecutionException(e.getCause()); + } + } else { + try { + return loader.call(); + } catch (Exception e) { + throw new ExecutionException(e); + } + } + } + + @Override + public void putSegment(@NotNull Segment segment) { + SegmentId id = segment.getSegmentId(); + if (id.isDataSegmentId()) { + // Update before put() for correct ordering with eviction callback + id.loaded(segment); + stats.currentWeight.addAndGet(CacheWeights.segmentWeight(segment)); + cache.put(id, segment); + } + } + + @Override + public void clear() { + cache.invalidateAll(); + stats.currentWeight.set(0); + } + + @Override + @NotNull + public AbstractCacheStats getCacheStats() { + return stats; + } + + @Override + public void recordHit(@NotNull SegmentId id) { + if (id.isDataSegmentId()) { + if (FT_OAK_12214_PROPAGATE_L1_HITS_TO_L2_ENABLED.get()) { + cache.getIfPresent(id); + } + stats.hitCount.incrementAndGet(); + } + } + + private static final class LoaderException extends RuntimeException { + LoaderException(Exception cause) { + super(cause); + } + } + + private static final class Stats extends AbstractCacheStats { + private final long maximumWeight; + private final Supplier elementCount; + + final AtomicLong currentWeight = new AtomicLong(); + final AtomicLong loadSuccessCount = new AtomicLong(); + final AtomicInteger loadExceptionCount = new AtomicInteger(); + final AtomicLong loadTime = new AtomicLong(); + final AtomicLong evictionCount = new AtomicLong(); + final AtomicLong hitCount = new AtomicLong(); + final AtomicLong missCount = new AtomicLong(); + + Stats(@NotNull String name, long maximumWeight, @NotNull Supplier elementCount) { + super(name); + this.maximumWeight = maximumWeight; + this.elementCount = elementCount; + } + + @Override + protected CacheStats getCurrentStats() { + return new CacheStats( + hitCount.get(), + missCount.get(), + loadSuccessCount.get(), + loadExceptionCount.get(), + loadTime.get(), + evictionCount.get()); + } + + @Override + public long getElementCount() { + return elementCount.get(); + } + + @Override + public long getMaxTotalWeight() { + return maximumWeight; + } + + @Override + public long estimateCurrentWeight() { + return currentWeight.get(); + } + } +} diff --git a/oak-benchmarks/src/main/java/org/apache/jackrabbit/oak/benchmark/SegmentCacheMemoizationBenchmark.java b/oak-benchmarks/src/main/java/org/apache/jackrabbit/oak/benchmark/SegmentCacheMemoizationBenchmark.java new file mode 100644 index 00000000000..d1b0aa171e8 --- /dev/null +++ b/oak-benchmarks/src/main/java/org/apache/jackrabbit/oak/benchmark/SegmentCacheMemoizationBenchmark.java @@ -0,0 +1,618 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.benchmark; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jcr.Repository; + +import org.apache.jackrabbit.oak.commons.Buffer; +import org.apache.jackrabbit.oak.fixture.RepositoryFixture; +import org.apache.jackrabbit.oak.segment.RecordNumbers; +import org.apache.jackrabbit.oak.segment.Segment; +import org.apache.jackrabbit.oak.segment.SegmentCache; +import org.apache.jackrabbit.oak.segment.SegmentId; +import org.apache.jackrabbit.oak.segment.SegmentReferences; +import org.apache.jackrabbit.oak.segment.SegmentStore; +import org.apache.jackrabbit.oak.segment.data.SegmentData; +import org.jetbrains.annotations.NotNull; + +/** + * Measures TAR-read% (loader calls / total accesses) across cache policies going through + * the full production access path: {@link SegmentId#getSegment()} → L2 → loader. + * + *

{@link SegmentCachePolicyBenchmark} hits L2 on every call, so Caffeine's sketch and + * Guava's LRU position are updated even for accesses that in production would be served from + * the {@link SegmentId} memoization field (L1) without touching L2. That flatters both + * policies. In production, L2 frequency and recency only advance when a segment is actually + * loaded — i.e. on L1 misses. Counts go stale for hot segments served exclusively from L1. + * When such a segment is eventually evicted and Caffeine's admission gate compares its stale + * count against an incumbent, the reload can be rejected, the eviction listener clears L1, + * and the next access triggers another TAR read — a loop that {@link SegmentCachePolicyBenchmark} + * cannot see. This benchmark exercises that path.

+ * + *

Segments are still Mockito mocks, so each loader call is free. The number of loader + * calls is what matters here, not how long they take. For the same scenarios with real disk + * I/O cost see {@link SegmentCacheTarBenchmark}.

+ * + *

Run with {@code -Xmx4g}; size-sensitivity sweeps allocate up to 20K mocks per pool.

+ * + *

Scenario 1 (live): Zipfian steady-state, reported per iteration via {@code statsValues()}. + * Scenario 2: post-compaction cold-start — old-gen warm, then all traffic on new-gen IDs. + * Scenario 3: drifting active set — sliding Zipfian window showing how long the L1-staleness + * loop sustains itself as the hot set shifts.

+ */ +public class SegmentCacheMemoizationBenchmark extends AbstractTest { + + // ----- cache sizing: avg ~130 KB/segment; 130 MB ≈ 1 000 entries ----- + private static final int CACHE_SIZE_MB = 130; + private static final int MIN_SEG_KB = 4; + private static final int MAX_SEG_KB = 256; + private static final long RANDOM_SEED = 42L; + private static final double ZIPF_EXPONENT = 1.0; + + // ----- Scenario 1 (live run): Zipfian steady-state ----- + private static final int POOL_1 = 10_000; + private static final int BATCH_SIZE = Integer.getInteger("segment.batch.size", 1_000); + + // ----- Scenario 2: post-compaction cold-start ----- + // 200K warmup saturates old-gen sketch to freq=15 (4-bit cap). + // NEW_GEN_2 = 15K + flat Zipf(0.5) means each new-gen entry gets ~8 hits/epoch + // so most remain stuck at freq ≤ 5 (auto-reject threshold) for 3–5 epochs. + // EPOCH_OPS_2 = 2K exposes the initial spike before Zipfian leaders escape the gate. + private static final int OLD_GEN_2 = 5_000; + private static final int NEW_GEN_2 = 15_000; + private static final int WARMUP_2 = 200_000; + private static final double ZIPF_2_NEW_EXP = 0.5; // flatter than warmup — slows freq build-up + private static final int MEASURE_2 = 600_000; + private static final int EPOCH_OPS_2 = 2_000; + + // ----- Scenario 3: drifting active set ----- + private static final int POOL_3 = 20_000; + private static final int WIDTH_3 = 1_500; + private static final int DRIFT_3 = 5; + private static final double ZIPF_3_EXP = 0.5; + private static final int WARMUP_3 = 50_000; + private static final int MEASURE_3 = 600_000; + private static final int EPOCH_OPS_3 = 10_000; + + private static final long DATA_SEG_LSB_MASK = 0xa000000000000000L; + + @FunctionalInterface + private interface CacheFactory { + SegmentCache create(int cacheSizeMb); + } + + private static final CacheFactory[] POLICIES = { + SegmentCache::newSegmentCache, + GuavaSegmentCache::new + }; + private static final String[] POLICY_NAMES = {"CAFFEINE", "GUAVA"}; + private static final int NUM_POLICIES = POLICIES.length; + // ----- live Scenario 1 state (used by runTest / statsValues) ----- + private double[] liveCdf; + private Random liveRng; + private CacheSetup[] liveSetups; + + @Override + public String toString() { + return "SegmentCacheMemoizationBenchmark"; + } + + @Override + protected Repository[] createRepository(RepositoryFixture fixture) throws Exception { + return fixture.setUpCluster(1); + } + + /** + * Initialises the live Scenario 1 caches used by {@link #runTest()}. + */ + @Override + protected void beforeSuite() { + liveCdf = buildZipfCdf(POOL_1, ZIPF_EXPONENT); + liveRng = new Random(RANDOM_SEED); + liveSetups = new CacheSetup[NUM_POLICIES]; + for (int p = 0; p < NUM_POLICIES; p++) { + liveSetups[p] = freshSetup(p, POOL_1, CACHE_SIZE_MB); + } + } + + /** + * Runs one Zipfian batch against all policies; paces the AbstractTest timing loop + * and feeds the live TAR-read% columns reported by {@link #statsValues()}. + */ + @Override + protected void runTest() { + for (int i = 0; i < BATCH_SIZE; i++) { + int segIdx = zipfSample(liveCdf, liveRng.nextDouble()); + for (int p = 0; p < NUM_POLICIES; p++) { + liveSetups[p].access(segIdx); + } + } + } + + /** Column headers for the AbstractTest output row. */ + @Override + protected String[] statsNames() { + return new String[]{" Caff_tar%", " Guav_tar%"}; + } + + /** Format strings for the per-policy TAR-read% columns. */ + @Override + protected String[] statsFormats() { + return new String[]{" %10.1f", " %10.1f"}; + } + + /** Current running TAR-read% for each policy from the live Scenario 1 run. */ + @Override + protected Object[] statsValues() { + Object[] vals = new Object[NUM_POLICIES]; + for (int p = 0; p < NUM_POLICIES; p++) { + long tar = liveSetups[p].store.tarReads.get(); + long total = liveSetups[p].store.totalAccesses.get(); + vals[p] = total == 0 ? 0.0 : 100.0 * tar / total; + } + return vals; + } + + /** + * Runs Scenarios 2 and 3 after the timed loop and prints a detailed report. + */ + @Override + protected void afterSuite() { + int avgWeight = 32 + (MIN_SEG_KB + MAX_SEG_KB) / 2 * 1024; + int cacheCapacity = (int) ((long) CACHE_SIZE_MB * 1024 * 1024 / avgWeight); + System.out.printf( + "%nSegmentCacheMemoizationBenchmark cacheCapacity~=%d%n" + + " TAR reads = loader invocations (actual disk equivalents);" + + " L1 hits bypass L2 entirely.%n%n", + cacheCapacity); + + System.out.println("--- Scenario 1: Zipfian steady-state (live run — see timed output above) ---"); + for (int p = 0; p < NUM_POLICIES; p++) { + long[] snap = liveSetups[p].snapshotAndReset(); + printResult(POLICY_NAMES[p], snap[0], snap[1], snap[2], snap[3]); + } + liveSetups = null; // release 5×POOL_1 mock segments — no longer needed + + System.gc(); + runScenario2(CACHE_SIZE_MB); // cold-start behaviour is cache-size-independent — run once + + for (int cacheSizeMb : new int[]{CACHE_SIZE_MB / 2, CACHE_SIZE_MB, CACHE_SIZE_MB * 2}) { + System.gc(); // release previous pool before allocating the next batch of mocks + runScenario3(cacheSizeMb); // drift+zipf behaviour changes with capacity — sweep all sizes + } + } + + // ----------------------------------------------------------------------- + // Scenario runners + // ----------------------------------------------------------------------- + + private void runScenario2(int cacheSizeMb) { + System.out.printf( + "%n--- Scenario 2: post-compaction cold-start" + + " (old-gen=%,d new-gen=%,d warmup=%,d measure=%,d epoch=%,d ops" + + " zipf-new=%.1f cache=%dMB) ---%n", + OLD_GEN_2, NEW_GEN_2, WARMUP_2, MEASURE_2, EPOCH_OPS_2, ZIPF_2_NEW_EXP, cacheSizeMb); + System.out.println( + " Old-gen saturated to freq=15; new-gen (freq=0) auto-rejected by W-TinyLFU (freq≤5 gate)."); + System.out.println( + " Caffeine: ~40%+ TAR-read% initially, self-corrects after ~30K ops; Guava: ~27% steady."); + System.out.println( + " After convergence: Caffeine ~20% vs Guava ~24% — W-TinyLFU wins long-term."); + Segment[] pool2 = createSegmentPool(OLD_GEN_2 + NEW_GEN_2); + long[][][] epochs = new long[NUM_POLICIES][][]; + long[][] totals = new long[NUM_POLICIES][]; + for (int p = 0; p < NUM_POLICIES; p++) { + List epochList = new ArrayList<>(); + CacheSetup setup = freshSetupWithPool(p, pool2, cacheSizeMb); + totals[p] = runCompactionColdStart(setup, epochList); + epochs[p] = epochList.toArray(new long[0][]); + } + printEpochTable(epochs, epochs[0].length, EPOCH_OPS_2); + for (int p = 0; p < NUM_POLICIES; p++) { + printResult(POLICY_NAMES[p], totals[p][0], totals[p][1], totals[p][2], totals[p][3]); + } + } + + private void runScenario3(int cacheSizeMb) { + System.out.printf( + "%n--- Scenario 3: drifting active set" + + " (pool=%,d width=%,d drift=%d warmup=%,d" + + " measure=%,d epoch=%,d zipf=%.1f cache=%dMB) ---%n", + POOL_3, WIDTH_3, DRIFT_3, WARMUP_3, MEASURE_3, EPOCH_OPS_3, ZIPF_3_EXP, cacheSizeMb); + System.out.println( + " stale sketch/LRU from L1 hits → eviction → rejection loop under working-set churn."); + Segment[] pool3 = createSegmentPool(POOL_3); + long[][][] epochs = new long[NUM_POLICIES][][]; + long[][] totals = new long[NUM_POLICIES][]; + for (int p = 0; p < NUM_POLICIES; p++) { + List epochList = new ArrayList<>(); + CacheSetup setup = freshSetupWithPool(p, pool3, cacheSizeMb); + totals[p] = runDriftingWindow(setup, epochList); + epochs[p] = epochList.toArray(new long[0][]); + } + printEpochTable(epochs, epochs[0].length, EPOCH_OPS_3); + for (int p = 0; p < NUM_POLICIES; p++) { + printResult(POLICY_NAMES[p], totals[p][0], totals[p][1], totals[p][2], totals[p][3]); + } + } + + // ----------------------------------------------------------------------- + // CacheSetup — production-faithful L1 → store → L2 → loader path + // ----------------------------------------------------------------------- + + /** + * Holds a cache and SegmentIds wired through {@link InstrumentedStore} so that + * every {@link #access} call follows the production path: + * {@code id.getSegment()} → L1 check → on miss: {@code store.readSegment()} → + * {@link SegmentCache#getSegment} → on L2 miss: loader (TAR read). + */ + private static final class CacheSetup { + final SegmentCache cache; + final SegmentId[] ids; + final InstrumentedStore store; + private long evictionBaseline = 0; + + CacheSetup(SegmentCache cache, SegmentId[] ids, InstrumentedStore store) { + this.cache = cache; + this.ids = ids; + this.store = store; + } + + /** One production-faithful access: L1 check → store → L2 → loader. */ + void access(int idx) { + store.totalAccesses.incrementAndGet(); + ids[idx].getSegment(); + } + + /** + * Returns [total, l1Hits, tarReads, evictionsDelta] for the window since the + * last call, then resets the counters. Evictions are computed as a delta so + * repeated calls give per-epoch (not cumulative) values. + */ + long[] snapshotAndReset() { + long total = store.totalAccesses.getAndSet(0); + long l1Hits = store.l1Hits.getAndSet(0); + long tarReads = store.tarReads.getAndSet(0); + long currentEvictions = cache.getCacheStats().getEvictionCount(); + long evictionsDelta = currentEvictions - evictionBaseline; + evictionBaseline = currentEvictions; + return new long[]{total, l1Hits, tarReads, evictionsDelta}; + } + } + + /** + * Minimal {@link SegmentStore} that bridges L1 misses to the {@link SegmentCache}, + * counting TAR reads (loader invocations), L1 hits (via {@code onAccess}), and + * total accesses. + */ + private static final class InstrumentedStore implements SegmentStore { + + private final SegmentCache cache; + private final Map segMap; + + final AtomicLong totalAccesses = new AtomicLong(); + final AtomicLong l1Hits = new AtomicLong(); + final AtomicLong tarReads = new AtomicLong(); + + InstrumentedStore(SegmentCache cache, Map segMap) { + this.cache = cache; + this.segMap = segMap; + } + + /** + * Called by {@link SegmentId#getSegment()} on L1 miss; bridges to the L2 + * cache. The loader increments {@link #tarReads} only on an L2 miss. + */ + @Override + @NotNull + public Segment readSegment(@NotNull SegmentId id) { + Segment mock = segMap.get(id); + if (mock == null) { + throw new IllegalStateException("Unknown segment: " + id); + } + try { + return cache.getSegment(id, () -> { + tarReads.incrementAndGet(); + return mock; + }); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean containsSegment(@NotNull SegmentId id) { + return segMap.containsKey(id); + } + + @Override + public void writeSegment(@NotNull SegmentId id, byte[] bytes, int offset, int length) + throws IOException { + throw new UnsupportedOperationException("benchmark store is read-only"); + } + } + + /** + * Creates {@code n} reusable segments with a pre-set {@link Segment#estimateMemoryUsage()} + * value. Uses a lightweight {@link MinimalSegment} subclass instead of Mockito proxies — + * each instance costs a few dozen bytes vs. several kilobytes for a ByteBuddy mock. + * + * @param n number of distinct segments to create + * @return array of segments with randomised sizes + */ + private static Segment[] createSegmentPool(int n) { + Segment[] segs = new Segment[n]; + Random r = new Random(RANDOM_SEED); + for (int i = 0; i < n; i++) { + int memUsage = MIN_SEG_KB * 1024 + r.nextInt((MAX_SEG_KB - MIN_SEG_KB) * 1024); + segs[i] = new MinimalSegment(memUsage); + } + return segs; + } + + /** + * Wires existing mock segments into a fresh {@link CacheSetup} for the given policy. + * Reuses the segment objects (only {@code getSegmentId()} stubs are updated); creates + * new {@link SegmentId} instances, a new {@link SegmentCache}, and a new + * {@link InstrumentedStore}. Call {@link #createSegmentPool} once and pass the result + * to this method for each policy to avoid accumulating mock objects. + * + * @param policyIndex index into {@link #POLICIES} + * @param segs pre-created mock segments (from {@link #createSegmentPool}) + * @param cacheSizeMb cache capacity in megabytes + */ + private static CacheSetup freshSetupWithPool(int policyIndex, Segment[] segs, int cacheSizeMb) { + int n = segs.length; + SegmentCache cache = POLICIES[policyIndex].create(cacheSizeMb); + SegmentId[] ids = new SegmentId[n]; + Map segMap = new IdentityHashMap<>(n * 2); + InstrumentedStore store = new InstrumentedStore(cache, segMap); + + for (int i = 0; i < n; i++) { + UUID uuid = UUID.randomUUID(); + long msb = uuid.getMostSignificantBits(); + long lsb = (uuid.getLeastSignificantBits() & 0x0fffffffffffffffL) | DATA_SEG_LSB_MASK; + ids[i] = new SegmentId(store, msb, lsb, id -> { + store.l1Hits.incrementAndGet(); + cache.recordHit(id); + }); + segMap.put(ids[i], segs[i]); + } + + return new CacheSetup(cache, ids, store); + } + + /** + * Builds a fresh {@link CacheSetup} with {@code n} new mock segments. + * + * @param policyIndex index into {@link #POLICIES} + * @param n number of distinct segments in the pool + * @param cacheSizeMb cache capacity in megabytes + */ + private static CacheSetup freshSetup(int policyIndex, int n, int cacheSizeMb) { + return freshSetupWithPool(policyIndex, createSegmentPool(n), cacheSizeMb); + } + + // ----------------------------------------------------------------------- + // Scenario implementations + // ----------------------------------------------------------------------- + + /** + * Scenario 2: post-compaction cold-start. Warms the cache with old-gen segments + * then measures access to new-gen segments only, per epoch. + * + * @param epochStats receives per-epoch [total, l1Hits, tarReads, evictions] + * @return aggregate [total, l1Hits, tarReads, evictions] across all epochs + */ + private static long[] runCompactionColdStart(CacheSetup setup, List epochStats) { + double[] oldCdf = buildZipfCdf(OLD_GEN_2, ZIPF_EXPONENT); + double[] newCdf = buildZipfCdf(NEW_GEN_2, ZIPF_2_NEW_EXP); + Random r = new Random(RANDOM_SEED); + + for (int i = 0; i < WARMUP_2; i++) { + setup.access(zipfSample(oldCdf, r.nextDouble())); + } + setup.snapshotAndReset(); // discard warmup counts + reset eviction baseline + + long totTotal = 0, totL1 = 0, totTar = 0, totEvict = 0; + for (int epoch = 0; epoch < MEASURE_2 / EPOCH_OPS_2; epoch++) { + for (int i = 0; i < EPOCH_OPS_2; i++) { + setup.access(OLD_GEN_2 + zipfSample(newCdf, r.nextDouble())); + } + long[] snap = setup.snapshotAndReset(); + epochStats.add(snap); + totTotal += snap[0]; totL1 += snap[1]; totTar += snap[2]; totEvict += snap[3]; + } + return new long[]{totTotal, totL1, totTar, totEvict}; + } + + /** + * Scenario 3: drifting active set. Slides a Zipfian window through the pool; + * per-epoch TAR-read% reveals whether L1-staleness compounds under churn. + * + * @param epochStats receives per-epoch [total, l1Hits, tarReads, evictions] + * @return aggregate [total, l1Hits, tarReads, evictions] across all epochs + */ + private static long[] runDriftingWindow(CacheSetup setup, List epochStats) { + double[] cdf = buildZipfCdf(WIDTH_3, ZIPF_3_EXP); + Random r = new Random(RANDOM_SEED); + int cursor = 0; + int opCount = 0; + + for (int i = 0; i < WARMUP_3; i++) { + if (opCount % DRIFT_3 == 0) cursor = (cursor + 1) % POOL_3; + setup.access((cursor + zipfSample(cdf, r.nextDouble())) % POOL_3); + opCount++; + } + setup.snapshotAndReset(); // discard warmup counts + reset eviction baseline + + long totTotal = 0, totL1 = 0, totTar = 0, totEvict = 0; + for (int epoch = 0; epoch < MEASURE_3 / EPOCH_OPS_3; epoch++) { + for (int i = 0; i < EPOCH_OPS_3; i++) { + if (opCount % DRIFT_3 == 0) cursor = (cursor + 1) % POOL_3; + setup.access((cursor + zipfSample(cdf, r.nextDouble())) % POOL_3); + opCount++; + } + long[] snap = setup.snapshotAndReset(); + epochStats.add(snap); + totTotal += snap[0]; totL1 += snap[1]; totTar += snap[2]; totEvict += snap[3]; + } + return new long[]{totTotal, totL1, totTar, totEvict}; + } + + // ----------------------------------------------------------------------- + // Reporting helpers + // ----------------------------------------------------------------------- + + private static void printEpochTable(long[][][] epochs, int numEpochs, int epochOps) { + System.out.printf(" %8s", "epoch"); + for (int p = 0; p < NUM_POLICIES; p++) { + System.out.printf(" %22s", POLICY_NAMES[p] + "_tar%"); + } + System.out.println(); + for (int e = 0; e < numEpochs; e++) { + System.out.printf(" %8d", (long) (e + 1) * epochOps); + for (int p = 0; p < NUM_POLICIES; p++) { + long[] ep = epochs[p][e]; + long total = ep[0]; + System.out.printf(" %22.1f", total == 0 ? 0.0 : 100.0 * ep[2] / total); + } + System.out.println(); + } + } + + /** + * Prints one result row: policy name, L1-hit%, TAR-read%, totals, and evictions. + * + * @param label policy name + * @param total total accesses in the measurement window + * @param l1Hits accesses served from L1 — no L2 call + * @param tarReads loader invocations — disk-read equivalents + * @param evictions eviction count delta for the measurement window + */ + private static void printResult(String label, long total, long l1Hits, + long tarReads, long evictions) { + double l1Pct = total == 0 ? 0.0 : 100.0 * l1Hits / total; + double tarPct = total == 0 ? 0.0 : 100.0 * tarReads / total; + double evPct = total == 0 ? 0.0 : 100.0 * evictions / total; + System.out.printf( + " %-22s l1%%=%5.1f tar%%=%5.1f" + + " total=%,10d l1Hits=%,9d tarReads=%,9d" + + " evictions=%,9d evict%%=%6.1f%n", + label, l1Pct, tarPct, total, l1Hits, tarReads, evictions, evPct); + } + + // ----------------------------------------------------------------------- + // Zipfian distribution helpers + // ----------------------------------------------------------------------- + + private static double[] buildZipfCdf(int n, double exponent) { + double[] cdf = new double[n]; + double sum = 0; + for (int i = 0; i < n; i++) { + sum += 1.0 / Math.pow(i + 1, exponent); + cdf[i] = sum; + } + for (int i = 0; i < n; i++) { + cdf[i] /= sum; + } + return cdf; + } + + private static int zipfSample(double[] cdf, double u) { + int lo = 0, hi = cdf.length - 1; + while (lo < hi) { + int mid = (lo + hi) >>> 1; + if (cdf[mid] < u) lo = mid + 1; + else hi = mid; + } + return lo; + } + + // ----------------------------------------------------------------------- + // MinimalSegment — lightweight Segment substitute, avoids Mockito overhead + // ----------------------------------------------------------------------- + + /** + * Minimal {@link Segment} subclass that stores only a pre-set memory-usage value. + * Uses the package-visible four-arg constructor with empty stubs for all interfaces, + * so no ByteBuddy proxy class is generated and no Mockito invocation tracking is kept. + * Memory cost is ~50 bytes vs. several KB per Mockito mock. + */ + private static final class MinimalSegment extends Segment { + + private static final SegmentData EMPTY_DATA = new SegmentData() { + @Override public byte getVersion() { return (byte) 13; } + @Override public String getSignature() { return ""; } + @Override public int getFullGeneration() { return 0; } + @Override public boolean isCompacted() { return false; } + @Override public int getGeneration() { return 0; } + @Override public int getSegmentReferencesCount() { return 0; } + @Override public int getRecordReferencesCount() { return 0; } + @Override public int getRecordReferenceNumber(int i) { return 0; } + @Override public byte getRecordReferenceType(int i) { return 0; } + @Override public int getRecordReferenceOffset(int i) { return 0; } + @Override public long getSegmentReferenceMsb(int i) { return 0; } + @Override public long getSegmentReferenceLsb(int i) { return 0; } + @Override public byte readByte(int offset) { return 0; } + @Override public int readInt(int offset) { return 0; } + @Override public short readShort(int offset) { return 0; } + @Override public long readLong(int offset) { return 0; } + @Override public Buffer readBytes(int offset, int size) { return null; } + @Override public int size() { return 0; } + @Override public void hexDump(OutputStream stream) {} + @Override public void binDump(OutputStream stream) {} + @Override public int estimateMemoryUsage() { return 0; } + }; + + private static final SegmentReferences EMPTY_REFS = new SegmentReferences() { + @Override + public SegmentId getSegmentId(int reference) { + throw new UnsupportedOperationException(); + } + @Override + public Iterator iterator() { + return Collections.emptyIterator(); + } + }; + + private final int memUsage; + + MinimalSegment(int memUsage) { + super(SegmentId.NULL, EMPTY_DATA, RecordNumbers.EMPTY_RECORD_NUMBERS, EMPTY_REFS); + this.memUsage = memUsage; + } + + @Override + public int estimateMemoryUsage() { + return memUsage; + } + } +} diff --git a/oak-benchmarks/src/main/java/org/apache/jackrabbit/oak/benchmark/SegmentCachePolicyBenchmark.java b/oak-benchmarks/src/main/java/org/apache/jackrabbit/oak/benchmark/SegmentCachePolicyBenchmark.java new file mode 100644 index 00000000000..854cfe2d8d4 --- /dev/null +++ b/oak-benchmarks/src/main/java/org/apache/jackrabbit/oak/benchmark/SegmentCachePolicyBenchmark.java @@ -0,0 +1,1174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.benchmark; + +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.LongAdder; + +import javax.jcr.Repository; + +import org.apache.jackrabbit.oak.commons.Buffer; +import org.apache.jackrabbit.oak.fixture.RepositoryFixture; +import org.apache.jackrabbit.oak.segment.RecordNumbers; +import org.apache.jackrabbit.oak.segment.Segment; +import org.apache.jackrabbit.oak.segment.SegmentCache; +import org.apache.jackrabbit.oak.segment.SegmentId; +import org.apache.jackrabbit.oak.segment.SegmentReferences; +import org.apache.jackrabbit.oak.segment.SegmentStore; +import org.apache.jackrabbit.oak.segment.data.SegmentData; + +/** + * Compares CAFFEINE and GUAVA eviction policies by calling {@link SegmentCache#getSegment} + * directly on every access — no L1 memoization involved. The metric is L2 miss rate. + * Segments are Mockito mocks so cache misses cost nothing; this isolates eviction policy + * behaviour from I/O noise. + * + *

Because every access hits L2, both policies see the full access stream and their + * frequency/recency counters stay accurate. That is not how production works — hot segments + * are normally served from the {@link SegmentId} memoization field without touching L2 at all. + * See {@link SegmentCacheMemoizationBenchmark} for a benchmark that reflects that reality.

+ * + *

Run with {@code -Xmx4g}; the 11-scenario afterSuite accumulates ~50K live mocks and + * a GC death spiral below that.

+ * + *

Scenario A — Zipfian steady-state (timed run)

+ * A small number of segments dominate traffic (templates, nav components); access probability + * drops off with rank. Cache holds ~10% of the pool. Favours frequency-aware policies. + * + *

Scenario B — scan pollution

+ * A large sequential scan (GC traversal, index rebuild) precedes a Zipfian workload. + * The scan loads the TinyLFU sketch with equal counts, delaying re-admission of the true + * working set until the sketch decays. + * + *

Scenario C — sustained scan contamination

+ * A fraction of every operation re-accesses random scan entries (bot / crawler traffic on + * old content). The sketch never decays because the contamination is continuous, so + * Caffeine's admission freeze doesn't self-correct. Guava is unaffected. + * + *

Scenario D — uniform random thrash

+ * Pool is 25× cache capacity with uniform access. No policy has an advantage here; + * establishes the ~95% miss-rate floor. + * + *

Scenario E — burst new content

+ * A warm cache is hit by a burst of new segments, each accessed several times then + * abandoned. Checks whether W-TinyLFU holds onto burst entries (elevated count) and + * squeezes out the steady-state working set, vs LRU which forgets the burst by recency. + * + *

Scenario F — periodic GC/diff scans

+ * Short sequential scans interleaved with Zipfian traffic over many cycles. Unlike the + * single large scan in B, repeated small scans accumulate sketch pollution incrementally. + * + *

Scenario G — write-heavy import then read-back

+ * Each segment is written exactly once (import), then the most recent ones are re-read at + * random. Tests whether recency (Guava) or frequency (Caffeine) is a better predictor of + * what gets re-read after an import. + * + *

Scenario H — sliding window

+ * A hot window of {@code WINDOW_SIZE_H} segments advances through a large pool; each entry + * is hit {@code WINDOW_HITS_H} times before the window moves on. Window is ~1.2× cache + * capacity so every slide forces evictions. Pure LRU is theoretically optimal here. + * + *

Scenario I — drifting active set

+ * A Zipfian window moves through a pool; the cursor advances every {@code DRIFT_I} ops so + * older entries leave the hot set continuously. Per-epoch miss rates show how fast each + * policy adapts and how long Caffeine's sketch-decay freeze lasts after the window shifts. + * + *

Scenario J — drift-rate sweep

+ * Re-runs the drifting window at several cursor speeds (1, 2, 5, 10, 20, static) to produce + * a miss-rate table indexed by churn rate. Shows exactly where Caffeine's frequency + * advantage flips into a disadvantage as working-set churn increases. + * + *

Scenario K — post-compaction cold-start

+ * Phase 1 warms the cache with old-generation segments (Zipfian, building sketch counts). + * Phase 2 switches all traffic to new-generation IDs (freq=0), as happens after Oak online + * compaction. Caffeine's admission gate blocks new-gen entries until their count beats the + * old-gen victims; Guava evicts by recency immediately. Per-epoch miss rates track how long + * the freeze lasts. + * + *

Configurable: {@code -Dsegment.batch.size} (accesses per iteration, default 1000), + * {@code -Dsegment.zipf.exponent} (default 1.0), {@code -Dsegment.random.seed} (default 42). + */ +public class SegmentCachePolicyBenchmark extends AbstractTest { + + // ----- cache sizing: segments vary 4–256 KB; avg ~130 KB; 130 MB gives ~1000 entries ----- + private static final int CACHE_SIZE_MB = 130; + private static final int MIN_SEG_KB = 4; + private static final int MAX_SEG_KB = 256; + + // ----- Scenario A pool ----- + private static final int TOTAL_SEGMENTS = 10_000; + private static final double ZIPF_EXPONENT = + Double.parseDouble(System.getProperty("segment.zipf.exponent", "1.0")); + private static final long RANDOM_SEED = Long.getLong("segment.random.seed", 42L); + private static final int BATCH_SIZE = Integer.getInteger("segment.batch.size", 1_000); + + // ----- Scenario B (scan then Zipfian) ----- + private static final int SCAN_LENGTH = 50_000; + private static final int POST_SCAN_WARMUP = 20_000; + private static final int POST_SCAN_MEASURE = 600_000; + + // ----- Scenario C (cold-start regression) — TMG-realistic variant ----- + // SCAN_PASSES_C passes raise incumbent freq to ~10, making new entries hard to admit. + // BG_SCAN_INTERVAL_C simulates background bot/crawler traffic that continuously + // re-accesses old content during measurement, preventing sketch decay and sustaining + // the freeze. Larger WORKING_SET_C reduces per-entry revisit rate (more unique URLs). + // Pool = SCAN_C + WORKING_SET_C. + private static final int SCAN_C = 9_000; + private static final int WORKING_SET_C = 5_000; + private static final int SCAN_PASSES_C = 10; + private static final int BG_SCAN_INTERVAL_C = 10; + private static final int MEASURE_C = 900_000; + private static final int EPOCH_OPS_C = 10_000; + + // ----- Scenario D: uniform random / cache thrash ----- + // Pool is 25x cache capacity; uniform access means no hot data and ~95% miss rate. + private static final int UNIFORM_POOL_D = 25_000; + private static final int MEASURE_D = 600_000; + + // ----- Scenario E: burst new content ----- + // Warm Zipfian cache + burst of BURST_SIZE_E new segments × BURST_ACCESSES_E hits each, + // then measure Zipfian over original working set. Pool = TOTAL_SEGMENTS + BURST_SIZE_E. + private static final int BURST_SIZE_E = 500; + private static final int BURST_ACCESSES_E = 20; + private static final int WARMUP_E = 50_000; + private static final int MEASURE_E = 300_000; + + // ----- Scenario F: periodic background (GC / diff) alternation ----- + private static final int CYCLES_F = 10; + private static final int CYCLE_ZIPF_OPS_F = 10_000; + private static final int CYCLE_SCAN_OPS_F = 2_000; + private static final int MEASURE_F = 300_000; + + // ----- Scenario G: write-heavy import then recent read-back ----- + private static final int IMPORT_SIZE_G = 50_000; + private static final int RECENT_WINDOW_G = 2_000; + private static final int MEASURE_G = 300_000; + + // ----- Scenario H: sliding window / temporal locality ----- + // Window slightly > cache capacity to force eviction decisions on every slide. + private static final int WINDOW_SIZE_H = 1_200; + private static final int SLIDE_STEP_H = 200; + private static final int TOTAL_POOL_H = 20_000; + private static final int WINDOW_HITS_H = 2; + private static final int MEASURE_H = 450_000; + + // ----- Scenario I: drifting active set with per-epoch reporting ----- + // Cursor advances 1 position every DRIFT_I ops; within the window, access follows + // a mild Zipfian distribution (exponent 0.5, so less skewed than ZIPF_EXPONENT). + private static final int POOL_I = 20_000; + private static final int WIDTH_I = 1_500; + private static final int DRIFT_I = 5; + private static final int WARMUP_I = 50_000; + private static final int MEASURE_I = 1_200_000; + private static final double ZIPF_I_EXP = 0.5; + private static final int EPOCH_OPS_I = 10_000; + + // ----- Scenario J: drift-rate sweep ----- + // Same drifting-window generator as I, swept across multiple drift speeds. + // Large pool ensures the window does not wrap-alias across drift variants. + private static final int POOL_J = 260_000; + private static final int WIDTH_J = 1_500; + private static final double ZIPF_J_EXP = 0.5; + private static final int WARMUP_J = 50_000; + private static final int MEASURE_J = 600_000; + private static final int[] DRIFT_VARIANTS_J = {1, 2, 5, 10, 20, Integer.MAX_VALUE}; + + // ----- Scenario K: post-compaction cold-start ----- + // 200K warmup saturates old-gen sketch to freq=15 (4-bit cap). + // NEW_GEN_K = 15K + flat Zipf(0.5) → each new-gen entry gets ~8 hits/epoch, + // keeping most below the freq≤5 auto-reject threshold for 3–5 epochs. + // EPOCH_OPS_K = 2K exposes the initial spike before hot new-gen escapes the gate. + private static final int OLD_GEN_K = 5_000; + private static final int NEW_GEN_K = 15_000; + private static final int WARMUP_K = 200_000; + private static final double ZIPF_K_NEW_EXP = 0.5; // flatter than warmup — slows freq build-up + private static final int MEASURE_K = 900_000; + private static final int EPOCH_OPS_K = 2_000; + + private static final long DATA_SEG_LSB_MASK = 0xa000000000000000L; + + @FunctionalInterface + private interface CacheFactory { + SegmentCache create(int cacheSizeMb); + } + + private static final CacheFactory[] POLICIES = { + SegmentCache::newSegmentCache, + GuavaSegmentCache::new + }; + private static final String[] POLICY_NAMES = {"CAFFEINE", "GUAVA"}; + private static final int NUM_POLICIES = POLICIES.length; + // ----- live Scenario A state ----- + private double[] zipfCdf; + private Random rng; + private SegmentCache[] liveCaches; + private SegmentId[][] liveIds; + private Segment[][] liveSegs; + private LongAdder[] totalAccesses; + + @Override + public String toString() { + return "SegmentCachePolicyBenchmark"; + } + + /** + * This benchmark exercises only in-memory caches; no JCR repository is used. + */ + @Override + protected Repository[] createRepository(RepositoryFixture fixture) throws Exception { + return fixture.setUpCluster(1); + } + + /** + * Initialises one {@link SegmentCache} per policy with pre-built + * {@link SegmentId} and {@link MinimalSegment} pools for Scenario A. + */ + @Override + protected void beforeSuite() { + zipfCdf = buildZipfCdf(TOTAL_SEGMENTS, ZIPF_EXPONENT); + rng = new Random(RANDOM_SEED); + totalAccesses = new LongAdder[NUM_POLICIES]; + liveCaches = new SegmentCache[NUM_POLICIES]; + liveIds = new SegmentId[NUM_POLICIES][TOTAL_SEGMENTS]; + liveSegs = new Segment[NUM_POLICIES][TOTAL_SEGMENTS]; + for (int p = 0; p < NUM_POLICIES; p++) { + totalAccesses[p] = new LongAdder(); + liveCaches[p] = POLICIES[p].create(CACHE_SIZE_MB); + for (int i = 0; i < TOTAL_SEGMENTS; i++) { + UUID uuid = UUID.randomUUID(); + long msb = uuid.getMostSignificantBits(); + long lsb = (uuid.getLeastSignificantBits() & 0x0fffffffffffffffL) | DATA_SEG_LSB_MASK; + liveIds[p][i] = new SegmentId( + SegmentStore.EMPTY_STORE, msb, lsb, + liveCaches[p]::recordHit); + int memUsage = MIN_SEG_KB * 1024 + rng.nextInt((MAX_SEG_KB - MIN_SEG_KB) * 1024); + liveSegs[p][i] = new MinimalSegment(memUsage); + } + } + } + + /** + * Performs {@code segment.batch.size} Zipfian accesses against all three + * caches simultaneously. The same segment rank is presented to every + * policy per iteration so comparisons are fair. + */ + @Override + protected void runTest() throws Exception { + for (int i = 0; i < BATCH_SIZE; i++) { + int segIdx = zipfSample(zipfCdf, rng.nextDouble()); + accessAll(segIdx); + } + } + + private void accessAll(int segIdx) throws ExecutionException { + for (int p = 0; p < NUM_POLICIES; p++) { + Segment seg = liveSegs[p][segIdx]; + liveCaches[p].getSegment(liveIds[p][segIdx], () -> seg); + totalAccesses[p].increment(); + } + } + + /** + * Prints a three-scenario comparison table. Scenario A uses the live + * counters from the AbstractTest loop; Scenarios B and C run fresh caches. + */ + @Override + protected void afterSuite() { + int avgWeight = 32 + (MIN_SEG_KB + MAX_SEG_KB) / 2 * 1024; + int cacheCapacity = (int) ((long) CACHE_SIZE_MB * 1024 * 1024 / avgWeight); + System.out.printf( + "%nSegmentCachePolicyBenchmark cacheCapacity~=%d pool=%d zipf=%.1f%n%n", + cacheCapacity, TOTAL_SEGMENTS, ZIPF_EXPONENT); + + System.out.println("--- Scenario A: Zipfian steady-state (AbstractTest timed run) ---"); + for (int p = 0; p < NUM_POLICIES; p++) { + long misses = liveCaches[p].getCacheStats().getMissCount(); + long total = totalAccesses[p].sum(); + long evictions = liveCaches[p].getCacheStats().getEvictionCount(); + printResult(POLICY_NAMES[p], total - misses, misses, evictions); + } + liveCaches = null; // release live-run state — no longer needed + liveIds = null; + liveSegs = null; + System.gc(); // hint GC before allocating scenario pools + + System.out.printf( + "%n--- Scenario B: scan (%,d segs) then Zipfian" + + " (warmup=%,d measure=%,d ops) ---%n", + SCAN_LENGTH, POST_SCAN_WARMUP, POST_SCAN_MEASURE); + for (int p = 0; p < NUM_POLICIES; p++) { + PolicySetup setup = freshSetup(p, TOTAL_SEGMENTS, CACHE_SIZE_MB); + long[] r = runScanThenZipf(setup); + printResult(POLICY_NAMES[p], r[0], r[1], r[2]); + } + + System.out.printf( + "%n--- Scenario C: cold-start regression / TMG crawler simulation" + + " (scan=%,d × %d passes working-set=%,d bg-scan=1/%d" + + " measure=%,d epoch=%,d ops) ---%n", + SCAN_C, SCAN_PASSES_C, WORKING_SET_C, BG_SCAN_INTERVAL_C, MEASURE_C, EPOCH_OPS_C); + System.out.printf( + " incumbents at freq=%d; %.0f%% of ops re-access old content" + + " (bot/crawler) — prevents sketch decay%n", + SCAN_PASSES_C, 100.0 / BG_SCAN_INTERVAL_C); + long[][][] epochsC = new long[NUM_POLICIES][][]; + long[][] totalsC = new long[NUM_POLICIES][]; + for (int p = 0; p < NUM_POLICIES; p++) { + List epochs = new ArrayList<>(); + PolicySetup setup = freshSetup(p, SCAN_C + WORKING_SET_C, CACHE_SIZE_MB); + totalsC[p] = runColdStart(setup, epochs); + epochsC[p] = epochs.toArray(new long[0][]); + } + System.out.printf(" %8s", "epoch"); + for (int p = 0; p < NUM_POLICIES; p++) { + System.out.printf(" %14s", POLICY_NAMES[p] + "_miss%"); + } + System.out.println(); + for (int e = 0; e < epochsC[0].length; e++) { + System.out.printf(" %8d", (long) (e + 1) * EPOCH_OPS_C); + for (int p = 0; p < NUM_POLICIES; p++) { + long[] ep = epochsC[p][e]; + long epTotal = ep[0] + ep[1]; + System.out.printf(" %14.1f", epTotal == 0 ? 0.0 : 100.0 * ep[1] / epTotal); + } + System.out.println(); + } + for (int p = 0; p < NUM_POLICIES; p++) { + printResult(POLICY_NAMES[p], totalsC[p][0], totalsC[p][1], totalsC[p][2]); + } + + System.out.printf( + "%n--- Scenario D: uniform random / cache thrash" + + " (pool=%,d = ~%dx cache measure=%,d ops) ---%n", + UNIFORM_POOL_D, UNIFORM_POOL_D / cacheCapacity, MEASURE_D); + System.out.println( + " no hot data — uniform access over pool 25x cache; expected miss ~95%%"); + for (int p = 0; p < NUM_POLICIES; p++) { + PolicySetup setup = freshSetup(p, UNIFORM_POOL_D, CACHE_SIZE_MB); + long[] r = runUniformRandom(setup); + printResult(POLICY_NAMES[p], r[0], r[1], r[2]); + } + + System.out.printf( + "%n--- Scenario E: burst new content" + + " (burst=%,d segs × %d hits warmup=%,d measure=%,d ops) ---%n", + BURST_SIZE_E, BURST_ACCESSES_E, WARMUP_E, MEASURE_E); + System.out.println( + " warm Zipfian cache hit by burst of new segments;" + + " measures working-set miss rate after burst subsides"); + for (int p = 0; p < NUM_POLICIES; p++) { + PolicySetup setup = freshSetup(p, TOTAL_SEGMENTS + BURST_SIZE_E, CACHE_SIZE_MB); + long[] r = runBurstNewContent(setup); + printResult(POLICY_NAMES[p], r[0], r[1], r[2]); + } + + System.out.printf( + "%n--- Scenario F: periodic GC/diff alternation" + + " (cycles=%d zipf/cycle=%,d scan/cycle=%,d measure=%,d ops) ---%n", + CYCLES_F, CYCLE_ZIPF_OPS_F, CYCLE_SCAN_OPS_F, MEASURE_F); + System.out.println( + " repeated small scans interleaved with Zipfian;" + + " cumulative sketch pollution vs LRU recency aging"); + for (int p = 0; p < NUM_POLICIES; p++) { + PolicySetup setup = freshSetup(p, TOTAL_SEGMENTS, CACHE_SIZE_MB); + long[] r = runPeriodicGC(setup); + printResult(POLICY_NAMES[p], r[0], r[1], r[2]); + } + + System.out.printf( + "%n--- Scenario G: write-heavy import then read-back" + + " (import=%,d recent-window=%,d measure=%,d ops) ---%n", + IMPORT_SIZE_G, RECENT_WINDOW_G, MEASURE_G); + System.out.println( + " large sequential import followed by random reads of recently-imported segments;" + + " recency (LRU) vs frequency (Caffeine) post-import"); + for (int p = 0; p < NUM_POLICIES; p++) { + PolicySetup setup = freshSetup(p, IMPORT_SIZE_G, CACHE_SIZE_MB); + long[] r = runImportThenRead(setup); + printResult(POLICY_NAMES[p], r[0], r[1], r[2]); + } + + System.out.printf( + "%n--- Scenario H: sliding window / temporal locality" + + " (window=%,d ~%.0f%% of cache slide=%,d pool=%,d" + + " hits/item=%d measure=%,d ops) ---%n", + WINDOW_SIZE_H, 100.0 * WINDOW_SIZE_H / cacheCapacity, + SLIDE_STEP_H, TOTAL_POOL_H, WINDOW_HITS_H, MEASURE_H); + System.out.println( + " hot window slides forward; pure recency (LRU) is optimal;" + + " window > cache forces evictions on every slide"); + for (int p = 0; p < NUM_POLICIES; p++) { + PolicySetup setup = freshSetup(p, TOTAL_POOL_H, CACHE_SIZE_MB); + long[] r = runSlidingWindow(setup); + printResult(POLICY_NAMES[p], r[0], r[1], r[2]); + } + + System.out.printf( + "%n--- Scenario I: drifting active set" + + " (pool=%,d width=%,d drift=%d warmup=%,d" + + " measure=%,d epoch=%,d zipf=%.1f) ---%n", + POOL_I, WIDTH_I, DRIFT_I, WARMUP_I, MEASURE_I, EPOCH_OPS_I, ZIPF_I_EXP); + System.out.println( + " window slides continuously; per-epoch miss% reveals" + + " W-TinyLFU sketch-decay freeze on new-entry admission"); + long[][][] epochsI = new long[NUM_POLICIES][][]; + long[][] totalsI = new long[NUM_POLICIES][]; + for (int p = 0; p < NUM_POLICIES; p++) { + List epochs = new ArrayList<>(); + PolicySetup setup = freshSetup(p, POOL_I, CACHE_SIZE_MB); + totalsI[p] = runDriftingWindow(setup, epochs); + epochsI[p] = epochs.toArray(new long[0][]); + } + System.out.printf(" %8s", "epoch"); + for (int p = 0; p < NUM_POLICIES; p++) { + System.out.printf(" %14s", POLICY_NAMES[p] + "_miss%"); + } + System.out.println(); + for (int e = 0; e < epochsI[0].length; e++) { + System.out.printf(" %8d", (long) (e + 1) * EPOCH_OPS_I); + for (int p = 0; p < NUM_POLICIES; p++) { + long[] ep = epochsI[p][e]; + long epTotal = ep[0] + ep[1]; + System.out.printf(" %14.1f", epTotal == 0 ? 0.0 : 100.0 * ep[1] / epTotal); + } + System.out.println(); + } + for (int p = 0; p < NUM_POLICIES; p++) { + printResult(POLICY_NAMES[p], totalsI[p][0], totalsI[p][1], totalsI[p][2]); + } + + System.out.printf( + "%n--- Scenario J: drift-rate sweep" + + " (pool=%,d width=%,d warmup=%,d measure=%,d zipf=%.1f) ---%n", + POOL_J, WIDTH_J, WARMUP_J, MEASURE_J, ZIPF_J_EXP); + System.out.println( + " drift=1 → cursor every op; Integer.MAX_VALUE → stationary working set"); + System.out.printf(" %-12s", "drift"); + for (int p = 0; p < NUM_POLICIES; p++) { + System.out.printf(" %14s", POLICY_NAMES[p] + "_miss%"); + } + System.out.println(); + for (int drift : DRIFT_VARIANTS_J) { + String label = drift == Integer.MAX_VALUE ? "static" : String.valueOf(drift); + System.out.printf(" %-12s", label); + for (int p = 0; p < NUM_POLICIES; p++) { + PolicySetup setup = freshSetup(p, POOL_J, CACHE_SIZE_MB); + long[] r = runDriftVariant(setup, drift); + long total = r[0] + r[1]; + System.out.printf(" %14.1f", total == 0 ? 0.0 : 100.0 * r[1] / total); + } + System.out.println(); + } + + System.out.printf( + "%n--- Scenario K: post-compaction cold-start" + + " (old-gen=%,d new-gen=%,d warmup=%,d measure=%,d epoch=%,d ops" + + " zipf-new=%.1f) ---%n", + OLD_GEN_K, NEW_GEN_K, WARMUP_K, MEASURE_K, EPOCH_OPS_K, ZIPF_K_NEW_EXP); + System.out.println( + " Old-gen saturated to freq=15; new-gen (freq=0) auto-rejected by W-TinyLFU (freq≤5 gate)."); + System.out.println( + " Caffeine: ~40%+ miss% initially, self-corrects after ~30K ops; Guava: ~27% steady."); + System.out.println( + " After convergence: Caffeine ~20% vs Guava ~24% — W-TinyLFU wins long-term."); + long[][][] epochsK = new long[NUM_POLICIES][][]; + long[][] totalsK = new long[NUM_POLICIES][]; + for (int p = 0; p < NUM_POLICIES; p++) { + List epochs = new ArrayList<>(); + PolicySetup setup = freshSetup(p, OLD_GEN_K + NEW_GEN_K, CACHE_SIZE_MB); + totalsK[p] = runCompactionColdStart(setup, epochs); + epochsK[p] = epochs.toArray(new long[0][]); + } + System.out.printf(" %8s", "epoch"); + for (int p = 0; p < NUM_POLICIES; p++) { + System.out.printf(" %14s", POLICY_NAMES[p] + "_miss%"); + } + System.out.println(); + for (int e = 0; e < epochsK[0].length; e++) { + System.out.printf(" %8d", (long) (e + 1) * EPOCH_OPS_K); + for (int p = 0; p < NUM_POLICIES; p++) { + long[] ep = epochsK[p][e]; + long epTotal = ep[0] + ep[1]; + System.out.printf(" %14.1f", epTotal == 0 ? 0.0 : 100.0 * ep[1] / epTotal); + } + System.out.println(); + } + for (int p = 0; p < NUM_POLICIES; p++) { + printResult(POLICY_NAMES[p], totalsK[p][0], totalsK[p][1], totalsK[p][2]); + } + + runSizeSensitivity(); + } + + /** + * Runs Scenario I (drifting active set) and Scenario K (post-compaction cold-start) + * at half, normal, and double cache sizes to show how each policy scales with capacity. + */ + private void runSizeSensitivity() { + int[] sizes = {CACHE_SIZE_MB / 2, CACHE_SIZE_MB, CACHE_SIZE_MB * 2}; + + System.out.printf( + "%n--- Size sensitivity: Scenario I (drifting active set)" + + " (pool=%,d width=%,d drift=%d measure=%,d ops) ---%n", + POOL_I, WIDTH_I, DRIFT_I, MEASURE_I); + System.out.printf(" %8s", "cacheMB"); + for (int p = 0; p < NUM_POLICIES; p++) { + System.out.printf(" %14s", POLICY_NAMES[p] + "_miss%"); + } + System.out.println(); + for (int sizeMb : sizes) { + Segment[] poolI = createSegmentPool(POOL_I); + System.out.printf(" %8d", sizeMb); + for (int p = 0; p < NUM_POLICIES; p++) { + PolicySetup setup = freshSetupWithPool(p, poolI, sizeMb); + long[] r = runDriftingWindow(setup, new ArrayList<>()); + long total = r[0] + r[1]; + System.out.printf(" %14.1f", total == 0 ? 0.0 : 100.0 * r[1] / total); + } + System.out.println(); + } + + System.out.printf( + "%n--- Size sensitivity: Scenario K (post-compaction cold-start)" + + " (old-gen=%,d new-gen=%,d measure=%,d ops) ---%n", + OLD_GEN_K, NEW_GEN_K, MEASURE_K); + System.out.printf(" %8s", "cacheMB"); + for (int p = 0; p < NUM_POLICIES; p++) { + System.out.printf(" %14s", POLICY_NAMES[p] + "_miss%"); + } + System.out.println(); + for (int sizeMb : sizes) { + Segment[] poolK = createSegmentPool(OLD_GEN_K + NEW_GEN_K); + System.out.printf(" %8d", sizeMb); + for (int p = 0; p < NUM_POLICIES; p++) { + PolicySetup setup = freshSetupWithPool(p, poolK, sizeMb); + long[] totals = runCompactionColdStart(setup, new ArrayList<>()); + long total = totals[0] + totals[1]; + System.out.printf(" %14.1f", total == 0 ? 0.0 : 100.0 * totals[1] / total); + } + System.out.println(); + } + } + + /** Miss-rate column headers for the AbstractTest output row. */ + @Override + protected String[] statsNames() { + return new String[]{" Caff_miss%", " Guav_miss%"}; + } + + /** Format strings for the five miss-rate columns. */ + @Override + protected String[] statsFormats() { + return new String[]{" %10.1f", " %10.1f"}; + } + + /** Current running miss-rate (%) for each policy from the live Scenario A run. */ + @Override + protected Object[] statsValues() { + Object[] vals = new Object[NUM_POLICIES]; + for (int p = 0; p < NUM_POLICIES; p++) { + long misses = liveCaches[p].getCacheStats().getMissCount(); + long total = totalAccesses[p].sum(); + vals[p] = total == 0 ? 0.0 : 100.0 * misses / total; + } + return vals; + } + + // ----------------------------------------------------------------------- + // PolicySetup helper + // ----------------------------------------------------------------------- + + /** + * Groups a {@link SegmentCache} with its associated {@link SegmentId} and + * mock {@link Segment} arrays for use in scenario runners. + */ + private static final class PolicySetup { + final SegmentCache cache; + final SegmentId[] ids; + final Segment[] segs; + + PolicySetup(SegmentCache cache, SegmentId[] ids, Segment[] segs) { + this.cache = cache; + this.ids = ids; + this.segs = segs; + } + + void access(int idx) { + Segment s = segs[idx]; + try { + cache.getSegment(ids[idx], () -> s); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + } + + /** + * Creates {@code n} reusable mock segments with {@code estimateMemoryUsage()} stubs. + * The pool can be shared across multiple {@link #freshSetupWithPool} calls (one per policy) + * so that mock objects are not recreated per policy in the size-sensitivity sweep. + * + * @param n number of distinct segments to create + * @return array of mock segments with size stubs applied + */ + private static Segment[] createSegmentPool(int n) { + Segment[] segs = new Segment[n]; + Random r = new Random(RANDOM_SEED); + for (int i = 0; i < n; i++) { + int memUsage = MIN_SEG_KB * 1024 + r.nextInt((MAX_SEG_KB - MIN_SEG_KB) * 1024); + segs[i] = new MinimalSegment(memUsage); + } + return segs; + } + + /** + * Wires existing mock segments into a fresh {@link PolicySetup} for the given policy. + * Reuses the segment objects (only {@code getSegmentId()} stubs are updated); creates + * new {@link SegmentId} instances and a new {@link SegmentCache}. Call + * {@link #createSegmentPool} once and pass the result to this method for each policy + * to avoid accumulating mock objects across the size sweep. + * + * @param policyIndex index into {@link #POLICIES} + * @param segs pre-created mock segments (from {@link #createSegmentPool}) + * @param cacheSizeMb cache capacity in megabytes + */ + private static PolicySetup freshSetupWithPool(int policyIndex, Segment[] segs, int cacheSizeMb) { + int n = segs.length; + SegmentCache cache = POLICIES[policyIndex].create(cacheSizeMb); + SegmentId[] ids = new SegmentId[n]; + for (int i = 0; i < n; i++) { + UUID uuid = UUID.randomUUID(); + long msb = uuid.getMostSignificantBits(); + long lsb = (uuid.getLeastSignificantBits() & 0x0fffffffffffffffL) | DATA_SEG_LSB_MASK; + ids[i] = new SegmentId(SegmentStore.EMPTY_STORE, msb, lsb, cache::recordHit); + } + return new PolicySetup(cache, ids, segs); + } + + /** + * Builds a fresh {@link PolicySetup} with {@code n} segments. + * + * @param policyIndex index into {@link #POLICIES} + * @param n number of distinct segments to create + * @param cacheSizeMb cache capacity in megabytes + */ + private static PolicySetup freshSetup(int policyIndex, int n, int cacheSizeMb) { + return freshSetupWithPool(policyIndex, createSegmentPool(n), cacheSizeMb); + } + + // ----------------------------------------------------------------------- + // Scenario runners + // ----------------------------------------------------------------------- + + /** + * Scenario B: sequential scan then Zipfian workload. + * + * @return [hits, misses, evictions] measured only during the post-scan phase + */ + private static long[] runScanThenZipf(PolicySetup setup) { + double[] cdf = buildZipfCdf(TOTAL_SEGMENTS, ZIPF_EXPONENT); + Random r = new Random(RANDOM_SEED); + + for (int i = 0; i < SCAN_LENGTH; i++) { + setup.access(i % TOTAL_SEGMENTS); + } + for (int i = 0; i < POST_SCAN_WARMUP; i++) { + setup.access(zipfSample(cdf, r.nextDouble())); + } + + long missesBase = setup.cache.getCacheStats().getMissCount(); + long evictBase = setup.cache.getCacheStats().getEvictionCount(); + + for (int i = 0; i < POST_SCAN_MEASURE; i++) { + setup.access(zipfSample(cdf, r.nextDouble())); + } + + long misses = setup.cache.getCacheStats().getMissCount() - missesBase; + long evictions = setup.cache.getCacheStats().getEvictionCount() - evictBase; + return new long[]{POST_SCAN_MEASURE - misses, misses, evictions}; + } + + /** + * Scenario C: multi-pass scan raises incumbent sketch frequency to + * {@code SCAN_PASSES_C}. During measurement, every {@code BG_SCAN_INTERVAL_C}-th + * operation re-accesses a random scan entry (simulating search-crawler or bot + * traffic on historical content). This continuous re-contamination prevents the + * Count-Min sketch from decaying, sustaining W-TinyLFU's admission freeze for the + * entire measurement window. Measurement is split into epochs for per-epoch + * tracking of the divergence. + * + * @param epochStats collector populated with per-epoch [hits, misses, evictions] + * @return [totalHits, totalMisses, totalEvictions] over all measurement epochs + */ + private static long[] runColdStart(PolicySetup setup, List epochStats) { + Random r = new Random(RANDOM_SEED); + + for (int pass = 0; pass < SCAN_PASSES_C; pass++) { + for (int i = 0; i < SCAN_C; i++) { + setup.access(i); + } + } + + long totalHits = 0; + long totalMisses = 0; + long totalEvictions = 0; + int numEpochs = MEASURE_C / EPOCH_OPS_C; + + for (int epoch = 0; epoch < numEpochs; epoch++) { + long missBase = setup.cache.getCacheStats().getMissCount(); + long evictBase = setup.cache.getCacheStats().getEvictionCount(); + + for (int i = 0; i < EPOCH_OPS_C; i++) { + if (i % BG_SCAN_INTERVAL_C == 0) { + // bot/crawler re-accesses old content — keeps sketch counts elevated + setup.access(r.nextInt(SCAN_C)); + } else { + setup.access(SCAN_C + r.nextInt(WORKING_SET_C)); + } + } + + long epochMisses = setup.cache.getCacheStats().getMissCount() - missBase; + long epochEvictions = setup.cache.getCacheStats().getEvictionCount() - evictBase; + long epochHits = EPOCH_OPS_C - epochMisses; + + epochStats.add(new long[]{epochHits, epochMisses, epochEvictions}); + totalHits += epochHits; + totalMisses += epochMisses; + totalEvictions += epochEvictions; + } + + return new long[]{totalHits, totalMisses, totalEvictions}; + } + + /** + * Scenario D: uniform random access over a pool far larger than the cache. + * Warms the cache with one random pass (so each policy starts from a full + * cache), then measures steady-state miss rate. + * + * @return [hits, misses, evictions] + */ + private static long[] runUniformRandom(PolicySetup setup) { + Random r = new Random(RANDOM_SEED); + int n = setup.ids.length; + + // fill the cache before measuring + for (int i = 0; i < n; i++) { + setup.access(r.nextInt(n)); + } + + long missesBase = setup.cache.getCacheStats().getMissCount(); + long evictBase = setup.cache.getCacheStats().getEvictionCount(); + + for (int i = 0; i < MEASURE_D; i++) { + setup.access(r.nextInt(n)); + } + + long misses = setup.cache.getCacheStats().getMissCount() - missesBase; + long evictions = setup.cache.getCacheStats().getEvictionCount() - evictBase; + return new long[]{MEASURE_D - misses, misses, evictions}; + } + + /** + * Scenario E: warms caches with Zipfian over the original working set, injects a + * concentrated burst of new segments, then measures working-set miss rate after the + * burst subsides. Elevated frequency counts retained by W-TinyLFU for burst items + * may delay re-admission of hot working-set entries. + * + * @return [hits, misses, evictions] measured only during the post-burst Zipfian phase + */ + private static long[] runBurstNewContent(PolicySetup setup) { + double[] cdf = buildZipfCdf(TOTAL_SEGMENTS, ZIPF_EXPONENT); + Random r = new Random(RANDOM_SEED); + + for (int i = 0; i < WARMUP_E; i++) { + setup.access(zipfSample(cdf, r.nextDouble())); + } + // burst: access new segments (indices TOTAL_SEGMENTS .. +BURST_SIZE_E) repeatedly + for (int b = 0; b < BURST_ACCESSES_E; b++) { + for (int i = 0; i < BURST_SIZE_E; i++) { + setup.access(TOTAL_SEGMENTS + i); + } + } + + long missesBase = setup.cache.getCacheStats().getMissCount(); + long evictBase = setup.cache.getCacheStats().getEvictionCount(); + + for (int i = 0; i < MEASURE_E; i++) { + setup.access(zipfSample(cdf, r.nextDouble())); + } + + long misses = setup.cache.getCacheStats().getMissCount() - missesBase; + long evictions = setup.cache.getCacheStats().getEvictionCount() - evictBase; + return new long[]{MEASURE_E - misses, misses, evictions}; + } + + /** + * Scenario F: alternates short sequential scans with Zipfian traffic for + * {@code CYCLES_F} cycles, then measures steady-state Zipfian miss rate. + * Each scan is below one TinyLFU decay period, so sketch pollution accumulates + * across cycles rather than being cleared by a single halving event. + * + * @return [hits, misses, evictions] measured during the final Zipfian phase + */ + private static long[] runPeriodicGC(PolicySetup setup) { + double[] cdf = buildZipfCdf(TOTAL_SEGMENTS, ZIPF_EXPONENT); + Random r = new Random(RANDOM_SEED); + + for (int c = 0; c < CYCLES_F; c++) { + for (int i = 0; i < CYCLE_ZIPF_OPS_F; i++) { + setup.access(zipfSample(cdf, r.nextDouble())); + } + int scanOffset = (c * CYCLE_SCAN_OPS_F) % (TOTAL_SEGMENTS - CYCLE_SCAN_OPS_F); + for (int i = 0; i < CYCLE_SCAN_OPS_F; i++) { + setup.access(scanOffset + i); + } + } + + long missesBase = setup.cache.getCacheStats().getMissCount(); + long evictBase = setup.cache.getCacheStats().getEvictionCount(); + + for (int i = 0; i < MEASURE_F; i++) { + setup.access(zipfSample(cdf, r.nextDouble())); + } + + long misses = setup.cache.getCacheStats().getMissCount() - missesBase; + long evictions = setup.cache.getCacheStats().getEvictionCount() - evictBase; + return new long[]{MEASURE_F - misses, misses, evictions}; + } + + /** + * Scenario G: simulates a large sequential import (each segment accessed exactly + * once), then measures random read-back of only the most recently imported segments. + * LRU retains the tail of the import by recency; Caffeine must rely on frequency + * counts of 1 to keep them against higher-frequency incumbents. + * + * @return [hits, misses, evictions] measured during the read-back phase + */ + private static long[] runImportThenRead(PolicySetup setup) { + Random r = new Random(RANDOM_SEED); + + for (int i = 0; i < IMPORT_SIZE_G; i++) { + setup.access(i); + } + + long missesBase = setup.cache.getCacheStats().getMissCount(); + long evictBase = setup.cache.getCacheStats().getEvictionCount(); + + int base = IMPORT_SIZE_G - RECENT_WINDOW_G; + for (int i = 0; i < MEASURE_G; i++) { + setup.access(base + r.nextInt(RECENT_WINDOW_G)); + } + + long misses = setup.cache.getCacheStats().getMissCount() - missesBase; + long evictions = setup.cache.getCacheStats().getEvictionCount() - evictBase; + return new long[]{MEASURE_G - misses, misses, evictions}; + } + + /** + * Scenario H: advances a hot window across a large pool. Each item is accessed + * {@code WINDOW_HITS_H} times per window pass before the window moves on. With + * window slightly larger than cache capacity, every slide must evict some in-window + * items; pure recency (LRU) is the theoretically optimal policy here. + * + * @return [hits, misses, evictions] measured after one warmup pass across half the pool + */ + private static long[] runSlidingWindow(PolicySetup setup) { + // warmup: advance window across the first half of the pool + int windowStart = 0; + while (windowStart + WINDOW_SIZE_H <= TOTAL_POOL_H / 2) { + for (int hit = 0; hit < WINDOW_HITS_H; hit++) { + for (int i = windowStart; i < windowStart + WINDOW_SIZE_H; i++) { + setup.access(i); + } + } + windowStart += SLIDE_STEP_H; + } + + long missesBase = setup.cache.getCacheStats().getMissCount(); + long evictBase = setup.cache.getCacheStats().getEvictionCount(); + + int measured = 0; + while (measured < MEASURE_H) { + for (int hit = 0; hit < WINDOW_HITS_H; hit++) { + for (int i = windowStart; i < windowStart + WINDOW_SIZE_H; i++) { + setup.access(i % TOTAL_POOL_H); + measured++; + if (measured >= MEASURE_H) break; + } + if (measured >= MEASURE_H) break; + } + windowStart = (windowStart + SLIDE_STEP_H) % TOTAL_POOL_H; + } + + long misses = setup.cache.getCacheStats().getMissCount() - missesBase; + long evictions = setup.cache.getCacheStats().getEvictionCount() - evictBase; + return new long[]{MEASURE_H - misses, misses, evictions}; + } + + /** + * Scenario I: slides a Zipfian-distributed (exponent {@code ZIPF_I_EXP}) window + * through the pool. The cursor advances by 1 every {@code DRIFT_I} operations so + * older entries continuously leave the hot set. Measurement is split into epochs + * of {@code EPOCH_OPS_I} ops each; per-epoch [hits, misses, evictions] are appended + * to {@code epochStats}. + * + * @param epochStats collector populated with per-epoch [hits, misses, evictions] arrays + * @return [totalHits, totalMisses, totalEvictions] over all measurement epochs + */ + private static long[] runDriftingWindow(PolicySetup setup, List epochStats) { + double[] cdf = buildZipfCdf(WIDTH_I, ZIPF_I_EXP); + Random r = new Random(RANDOM_SEED); + int cursor = 0; + int opCount = 0; + + for (int i = 0; i < WARMUP_I; i++) { + if (opCount % DRIFT_I == 0) { + cursor = (cursor + 1) % POOL_I; + } + setup.access((cursor + zipfSample(cdf, r.nextDouble())) % POOL_I); + opCount++; + } + + long totalHits = 0; + long totalMisses = 0; + long totalEvictions = 0; + int numEpochs = MEASURE_I / EPOCH_OPS_I; + + for (int epoch = 0; epoch < numEpochs; epoch++) { + long missBase = setup.cache.getCacheStats().getMissCount(); + long evictBase = setup.cache.getCacheStats().getEvictionCount(); + + for (int i = 0; i < EPOCH_OPS_I; i++) { + if (opCount % DRIFT_I == 0) { + cursor = (cursor + 1) % POOL_I; + } + setup.access((cursor + zipfSample(cdf, r.nextDouble())) % POOL_I); + opCount++; + } + + long epochMisses = setup.cache.getCacheStats().getMissCount() - missBase; + long epochEvictions = setup.cache.getCacheStats().getEvictionCount() - evictBase; + long epochHits = EPOCH_OPS_I - epochMisses; + + epochStats.add(new long[]{epochHits, epochMisses, epochEvictions}); + totalHits += epochHits; + totalMisses += epochMisses; + totalEvictions += epochEvictions; + } + + return new long[]{totalHits, totalMisses, totalEvictions}; + } + + /** + * Scenario J: runs the drifting-window generator with a configurable cursor-advance + * speed. Warmup is discarded; only the measurement phase is reported. + * + * @param drift ops between each cursor advance; {@code Integer.MAX_VALUE} for stationary + * @return [hits, misses, evictions] over the measurement phase + */ + private static long[] runDriftVariant(PolicySetup setup, int drift) { + double[] cdf = buildZipfCdf(WIDTH_J, ZIPF_J_EXP); + Random r = new Random(RANDOM_SEED); + int cursor = 0; + int opCount = 0; + + for (int i = 0; i < WARMUP_J; i++) { + if (drift != Integer.MAX_VALUE && opCount % drift == 0) { + cursor = (cursor + 1) % POOL_J; + } + setup.access((cursor + zipfSample(cdf, r.nextDouble())) % POOL_J); + opCount++; + } + + long missesBase = setup.cache.getCacheStats().getMissCount(); + long evictBase = setup.cache.getCacheStats().getEvictionCount(); + + for (int i = 0; i < MEASURE_J; i++) { + if (drift != Integer.MAX_VALUE && opCount % drift == 0) { + cursor = (cursor + 1) % POOL_J; + } + setup.access((cursor + zipfSample(cdf, r.nextDouble())) % POOL_J); + opCount++; + } + + long misses = setup.cache.getCacheStats().getMissCount() - missesBase; + long evictions = setup.cache.getCacheStats().getEvictionCount() - evictBase; + return new long[]{MEASURE_J - misses, misses, evictions}; + } + + /** + * Scenario K: warms the cache with old-generation segments, then switches all + * traffic to new-generation segment IDs (simulating Oak online compaction). + * + *

The warmup phase builds frequency counts in Caffeine's Count-Min sketch for + * {@code OLD_GEN_K} segments. After warmup the cache is full of old-gen entries. + * The measurement phase accesses only the {@code NEW_GEN_K} new-gen segments + * (indices {@code OLD_GEN_K .. OLD_GEN_K + NEW_GEN_K - 1} in the setup arrays). + * New-gen entries start at freq=0 in the sketch; Caffeine's TinyLFU admission gate + * rejects them until their frequency exceeds the old-gen victims in the probationary + * queue. Guava LRU admits new entries immediately by recency.

+ * + * @param epochStats collector populated with per-epoch [hits, misses, evictions] + * @return [totalHits, totalMisses, totalEvictions] over all measurement epochs + */ + private static long[] runCompactionColdStart(PolicySetup setup, List epochStats) { + double[] oldCdf = buildZipfCdf(OLD_GEN_K, ZIPF_EXPONENT); + double[] newCdf = buildZipfCdf(NEW_GEN_K, ZIPF_K_NEW_EXP); + Random r = new Random(RANDOM_SEED); + + // Phase 1: warm cache with old-gen segments; builds sketch frequency counts + for (int i = 0; i < WARMUP_K; i++) { + setup.access(zipfSample(oldCdf, r.nextDouble())); + } + // Phase 2: compaction — all traffic switches to new-gen (freq=0 in sketch) + long totalHits = 0; + long totalMisses = 0; + long totalEvictions = 0; + int numEpochs = MEASURE_K / EPOCH_OPS_K; + + for (int epoch = 0; epoch < numEpochs; epoch++) { + long missBase = setup.cache.getCacheStats().getMissCount(); + long evictBase = setup.cache.getCacheStats().getEvictionCount(); + + for (int i = 0; i < EPOCH_OPS_K; i++) { + setup.access(OLD_GEN_K + zipfSample(newCdf, r.nextDouble())); + } + + long epochMisses = setup.cache.getCacheStats().getMissCount() - missBase; + long epochEvictions = setup.cache.getCacheStats().getEvictionCount() - evictBase; + long epochHits = EPOCH_OPS_K - epochMisses; + + epochStats.add(new long[]{epochHits, epochMisses, epochEvictions}); + totalHits += epochHits; + totalMisses += epochMisses; + totalEvictions += epochEvictions; + } + + return new long[]{totalHits, totalMisses, totalEvictions}; + } + + // ----------------------------------------------------------------------- + // Zipfian distribution + // ----------------------------------------------------------------------- + + /** + * Pre-computes a cumulative Zipfian CDF over {@code n} items. + * Item at rank 0 has weight 1/1^exponent, rank 1 has 1/2^exponent, etc. + */ + private static double[] buildZipfCdf(int n, double exponent) { + double[] cdf = new double[n]; + double sum = 0; + for (int i = 0; i < n; i++) { + sum += 1.0 / Math.pow(i + 1, exponent); + cdf[i] = sum; + } + for (int i = 0; i < n; i++) { + cdf[i] /= sum; + } + return cdf; + } + + /** Samples a rank from the Zipfian CDF using binary search. */ + private static int zipfSample(double[] cdf, double u) { + int lo = 0, hi = cdf.length - 1; + while (lo < hi) { + int mid = (lo + hi) >>> 1; + if (cdf[mid] < u) { + lo = mid + 1; + } else { + hi = mid; + } + } + return lo; + } + + private static void printResult(String label, long hits, long misses, long evictions) { + long total = hits + misses; + double missRate = total == 0 ? 0.0 : 100.0 * misses / total; + double evictRate = total == 0 ? 0.0 : 100.0 * evictions / total; + System.out.printf( + " %-12s miss%%=%5.1f hits=%,8d misses=%,8d evictions=%,8d evict%%=%5.1f%n", + label, missRate, hits, misses, evictions, evictRate); + } + + // ----------------------------------------------------------------------- + // MinimalSegment — lightweight Segment substitute, avoids Mockito overhead + // ----------------------------------------------------------------------- + + /** + * Minimal {@link Segment} subclass that stores only a pre-set memory-usage value. + * Uses the package-visible four-arg constructor with empty stubs for all interfaces, + * so no ByteBuddy proxy class is generated and no Mockito invocation tracking is kept. + * Memory cost is ~50 bytes vs. several KB per Mockito mock. + */ + private static final class MinimalSegment extends Segment { + + private static final SegmentData EMPTY_DATA = new SegmentData() { + @Override public byte getVersion() { return (byte) 13; } + @Override public String getSignature() { return ""; } + @Override public int getFullGeneration() { return 0; } + @Override public boolean isCompacted() { return false; } + @Override public int getGeneration() { return 0; } + @Override public int getSegmentReferencesCount() { return 0; } + @Override public int getRecordReferencesCount() { return 0; } + @Override public int getRecordReferenceNumber(int i) { return 0; } + @Override public byte getRecordReferenceType(int i) { return 0; } + @Override public int getRecordReferenceOffset(int i) { return 0; } + @Override public long getSegmentReferenceMsb(int i) { return 0; } + @Override public long getSegmentReferenceLsb(int i) { return 0; } + @Override public byte readByte(int offset) { return 0; } + @Override public int readInt(int offset) { return 0; } + @Override public short readShort(int offset) { return 0; } + @Override public long readLong(int offset) { return 0; } + @Override public Buffer readBytes(int offset, int size) { return null; } + @Override public int size() { return 0; } + @Override public void hexDump(OutputStream stream) {} + @Override public void binDump(OutputStream stream) {} + @Override public int estimateMemoryUsage() { return 0; } + }; + + private static final SegmentReferences EMPTY_REFS = new SegmentReferences() { + @Override + public SegmentId getSegmentId(int reference) { + throw new UnsupportedOperationException(); + } + @Override + public Iterator iterator() { + return Collections.emptyIterator(); + } + }; + + private final int memUsage; + + MinimalSegment(int memUsage) { + super(SegmentId.NULL, EMPTY_DATA, RecordNumbers.EMPTY_RECORD_NUMBERS, EMPTY_REFS); + this.memUsage = memUsage; + } + + @Override + public int estimateMemoryUsage() { + return memUsage; + } + } +} diff --git a/oak-benchmarks/src/main/java/org/apache/jackrabbit/oak/benchmark/SegmentCacheTarBenchmark.java b/oak-benchmarks/src/main/java/org/apache/jackrabbit/oak/benchmark/SegmentCacheTarBenchmark.java new file mode 100644 index 00000000000..4d1a2479fb7 --- /dev/null +++ b/oak-benchmarks/src/main/java/org/apache/jackrabbit/oak/benchmark/SegmentCacheTarBenchmark.java @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.benchmark; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +import javax.jcr.Repository; + +import org.apache.commons.io.FileUtils; +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean; +import org.apache.jackrabbit.oak.fixture.RepositoryFixture; +import org.apache.jackrabbit.oak.segment.SegmentCache; +import org.apache.jackrabbit.oak.segment.SegmentId; +import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; +import org.apache.jackrabbit.oak.segment.file.FileStore; +import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder; +import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException; +import org.apache.jackrabbit.oak.segment.file.ReadOnlyFileStore; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; + +/** + * Same L1 → L2 → loader access path as {@link SegmentCacheMemoizationBenchmark} but backed + * by a real {@link ReadOnlyFileStore} on disk. Cache misses trigger actual TAR reads, so a + * policy with a higher miss rate shows up as slower wall-clock time, not just a higher counter. + * Stats report elapsed ms alongside L1-hit%, L2-hit%, and TAR-read% per policy. + * + *

The {@code RepositoryFixture} argument only matters to the {@code AbstractTest} + * infrastructure; this benchmark builds its own {@link FileStore} in {@code beforeSuite} + * regardless. Pass {@code Oak-MemoryNS} to avoid writing a second unused store to disk.

+ * + *

Run with {@code -Xmx4g}; the size-sensitivity sweep opens several + * {@link ReadOnlyFileStore} instances concurrently and causes GC pressure below that.

+ * + *

Scenario 1 (live): Zipfian steady-state with per-policy elapsed time. + * Scenario 2: drifting active set — Caffeine's admission gate rejects new-window entries, + * causing perpetual TAR reads; typically slower than Guava here. + * Scenario 3: post-compaction cold-start — old-gen warm, traffic switches to new-gen; + * per-epoch TAR% tracks how fast each policy recovers.

+ */ +public class SegmentCacheTarBenchmark extends AbstractTest { + + // ----- content generation ----- + private static final int N_NODES = 4_000; + private static final int N_BATCH = 1_000; + private static final int PROPERTY_KB = 10; + private static final int PROPERTY_BYTES = PROPERTY_KB * 1024; + + // ----- cache config: ~10 MB ≈ 40 data segments at 256 KB each ----- + private static final int CACHE_SIZE_MB = 10; + + // ----- Scenario 1: Zipfian steady-state ----- + private static final int BATCH_SIZE = Integer.getInteger("segment.batch.size", 500); + private static final int WARMUP_OPS = 5_000; + private static final int MEASURE_OPS = 150_000; + private static final double ZIPF_EXP = 1.0; + + // ----- Scenario 2: drifting active set ----- + private static final int WIDTH_2 = 100; // active window > cache capacity + private static final int DRIFT_2 = 5; // advance cursor every N ops + private static final double ZIPF_2_EXP = 0.5; // flatter → more entries compete for cache + private static final int WARMUP_2 = 20_000; + private static final int MEASURE_2 = 300_000; + private static final int EPOCH_OPS_2 = 10_000; + + // ----- Scenario 3: post-compaction cold-start ----- + // 200K warmup saturates old-gen sketch to freq=15 (4-bit cap). + // Flat Zipf(0.5) for new-gen measurement slows frequency build-up → longer visible freeze. + // EPOCH_OPS_3 = 2K exposes the initial spike before hot new-gen entries escape the gate. + private static final int WARMUP_3 = 200_000; + private static final double ZIPF_3_NEW_EXP = 0.5; // flatter than warmup — slows freq build-up + private static final int MEASURE_3 = 300_000; + private static final int EPOCH_OPS_3 = 2_000; + + @FunctionalInterface + private interface CacheFactory { + SegmentCache create(int cacheSizeMb); + } + + private static final CacheFactory[] POLICIES = { + SegmentCache::newSegmentCache, + GuavaSegmentCache::new + }; + private static final String[] POLICY_NAMES = {"CAFFEINE", "GUAVA"}; + private static final int NUM_POLICIES = POLICIES.length; + // ----- live-run state ----- + private File storeDir; + private int poolSize; + private double[] zipfCdf; + private ReadOnlyFileStore[] liveStores; + private SegmentId[][] liveIds; // liveIds[policy][segIdx] + private long[] liveTotalOps; // per-policy access counter for statsValues() + + @Override + public String toString() { + return "SegmentCacheTarBenchmark"; + } + + @Override + protected Repository[] createRepository(RepositoryFixture fixture) throws Exception { + return fixture.setUpCluster(1); + } + + /** + * Generates real TAR content, discovers the data-segment pool, and opens the + * per-policy live stores for the AbstractTest timing loop. + */ + @Override + protected void beforeSuite() throws Exception { + storeDir = Files.createTempDirectory("SegmentCacheTarBenchmark-").toFile(); + generateContent(); + poolSize = discoverPoolSize(); + zipfCdf = buildZipfCdf(poolSize, ZIPF_EXP); + liveTotalOps = new long[NUM_POLICIES]; + openLiveStores(); + System.out.printf( + "%nSegmentCacheTarBenchmark setup complete:" + + " pool=%d data-segments cache=%dMB dir=%s%n" + + " (fixture controls only the JCR repo; TAR reads always hit real disk)%n", + poolSize, CACHE_SIZE_MB, storeDir); + } + + /** + * Writes {@value N_NODES} nodes with unique {@value PROPERTY_KB}KB string properties to + * force creation of many on-heap data segments in the TAR store. + */ + private void generateContent() + throws IOException, InvalidFileStoreVersionException, CommitFailedException { + char[] pad = new char[PROPERTY_BYTES - 20]; + Arrays.fill(pad, 'x'); + String padStr = new String(pad); + try (FileStore fs = FileStoreBuilder.fileStoreBuilder(storeDir) + .withSegmentCacheSize(CACHE_SIZE_MB).withMemoryMapping(false).build()) { + var ns = SegmentNodeStoreBuilders.builder(fs).build(); + for (int start = 0; start < N_NODES; start += N_BATCH) { + int end = Math.min(start + N_BATCH, N_NODES); + NodeBuilder root = ns.getRoot().builder(); + for (int i = start; i < end; i++) { + root.child("n" + i).setProperty("v", padStr + String.format("%020d", i)); + } + ns.merge(root, EmptyHook.INSTANCE, CommitInfo.EMPTY); + fs.flush(); + } + } + } + + /** Counts data segments in the store to set the Zipfian pool size. */ + private int discoverPoolSize() throws IOException, InvalidFileStoreVersionException { + try (ReadOnlyFileStore store = FileStoreBuilder.fileStoreBuilder(storeDir) + .withSegmentCacheSize(1).withMemoryMapping(false).buildReadOnly()) { + int count = 0; + for (SegmentId id : store.getSegmentIds()) { + if (id.isDataSegmentId()) count++; + } + return count; + } + } + + /** Opens one {@link ReadOnlyFileStore} per policy for the live timing loop. */ + private void openLiveStores() throws IOException, InvalidFileStoreVersionException { + liveStores = new ReadOnlyFileStore[NUM_POLICIES]; + liveIds = new SegmentId[NUM_POLICIES][]; + for (int p = 0; p < NUM_POLICIES; p++) { + ReadOnlyFileStore store = openReadOnly(p, CACHE_SIZE_MB); + liveStores[p] = store; + liveIds[p] = collectDataIds(store); + } + } + + /** Opens a fresh on-heap {@link ReadOnlyFileStore} with the given policy and cache size. */ + private ReadOnlyFileStore openReadOnly(int policyIndex, int cacheSizeMb) + throws IOException, InvalidFileStoreVersionException { + return FileStoreBuilder.fileStoreBuilder(storeDir) + .withSegmentCache(POLICIES[policyIndex].create(cacheSizeMb)) + .withMemoryMapping(false) + .buildReadOnly(); + } + + /** Returns all data-segment IDs from {@code store} as an array. */ + private static SegmentId[] collectDataIds(ReadOnlyFileStore store) { + List ids = new ArrayList<>(); + for (SegmentId id : store.getSegmentIds()) { + if (id.isDataSegmentId()) ids.add(id); + } + return ids.toArray(new SegmentId[0]); + } + + // ----------------------------------------------------------------------- + // AbstractTest hook overrides + // ----------------------------------------------------------------------- + + /** Runs one Zipfian batch across all policies to drive the AbstractTest throughput counter. */ + @Override + protected void runTest() { + ThreadLocalRandom rng = ThreadLocalRandom.current(); + int n = liveIds[0].length; + for (int i = 0; i < BATCH_SIZE; i++) { + int idx = zipfSample(zipfCdf, rng.nextDouble()) % n; + for (int p = 0; p < NUM_POLICIES; p++) { + liveIds[p][idx].getSegment(); + liveTotalOps[p]++; + } + } + } + + @Override + protected String[] statsNames() { + return new String[]{" Caff_tar%", " Guav_tar%"}; + } + + @Override + protected String[] statsFormats() { + return new String[]{" %10.1f", " %10.1f"}; + } + + /** TAR-read% per policy (loader invocations / total accesses × 100). */ + @Override + protected Object[] statsValues() { + Object[] vals = new Object[NUM_POLICIES]; + for (int p = 0; p < NUM_POLICIES; p++) { + long tar = liveStores[p].getSegmentCacheStats().getMissCount(); + long total = liveTotalOps[p]; + vals[p] = total == 0 ? 0.0 : 100.0 * tar / total; + } + return vals; + } + + /** + * Prints live-run tier breakdown, then runs Scenarios 1–3 in isolation and prints + * per-epoch TAR% tables plus total timing for Scenario 1. + */ + @Override + protected void afterSuite() throws Exception { + System.out.printf( + "%n--- SegmentCacheTarBenchmark: live run summary" + + " (all policies share I/O bandwidth) ---%n" + + " pool=%d data-segs cache=%dMB%n", + poolSize, CACHE_SIZE_MB); + for (int p = 0; p < NUM_POLICIES; p++) { + CacheStatsMBean s = liveStores[p].getSegmentCacheStats(); + long total = liveTotalOps[p]; + long l1Hits = s.getHitCount(); + long tarReads = s.getMissCount(); + long l2Hits = Math.max(0, total - l1Hits - tarReads); + printResult(POLICY_NAMES[p], total, l1Hits, l2Hits, tarReads, -1); + } + for (ReadOnlyFileStore s : liveStores) { + s.close(); + } + liveStores = null; // release closed stores — no longer needed + liveIds = null; + System.gc(); // hint GC before scenario runs + + runScenario1(); + runScenario2(); + runScenario3(); + runSizeSensitivity(); + + FileUtils.deleteDirectory(storeDir); + } + + // ----------------------------------------------------------------------- + // Scenario runners + // ----------------------------------------------------------------------- + + /** + * Scenario 1: Zipfian steady-state — isolated per-policy elapsed time. + * Caffeine is expected to have the lowest TAR-read% (W-TinyLFU vs LRU). + */ + private void runScenario1() throws IOException, InvalidFileStoreVersionException { + System.out.printf( + "%n--- Scenario 1: Zipfian steady-state" + + " (warmup=%,d measure=%,d zipf=%.1f cache=%dMB) ---%n" + + " Caffeine W-TinyLFU should have fewest TAR reads; note later" + + " policies see warmer OS page cache.%n", + WARMUP_OPS, MEASURE_OPS, ZIPF_EXP, CACHE_SIZE_MB); + double[] cdf = buildZipfCdf(poolSize, ZIPF_EXP); + for (int p = 0; p < NUM_POLICIES; p++) { + try (ReadOnlyFileStore store = openReadOnly(p, CACHE_SIZE_MB)) { + SegmentId[] ids = collectDataIds(store); + int n = ids.length; + ThreadLocalRandom rng = ThreadLocalRandom.current(); + for (int i = 0; i < WARMUP_OPS; i++) { + ids[zipfSample(cdf, rng.nextDouble()) % n].getSegment(); + } + long h0 = store.getSegmentCacheStats().getHitCount(); + long m0 = store.getSegmentCacheStats().getMissCount(); + long t0 = System.currentTimeMillis(); + for (int i = 0; i < MEASURE_OPS; i++) { + ids[zipfSample(cdf, rng.nextDouble()) % n].getSegment(); + } + long elapsed = System.currentTimeMillis() - t0; + long l1Hits = store.getSegmentCacheStats().getHitCount() - h0; + long tarReads = store.getSegmentCacheStats().getMissCount() - m0; + long l2Hits = Math.max(0, MEASURE_OPS - l1Hits - tarReads); + printResult(POLICY_NAMES[p], MEASURE_OPS, l1Hits, l2Hits, tarReads, elapsed); + } + } + } + + /** + * Scenario 2: drifting active set. A sliding Zipfian window forces continuous cache + * churn. Caffeine's admission gate rejects new-window entries (freq=0) against + * incumbents, triggering L1-clear loops and more TAR reads than Guava. + */ + private void runScenario2() throws IOException, InvalidFileStoreVersionException { + int width = Math.min(WIDTH_2, poolSize - 1); + System.out.printf( + "%n--- Scenario 2: drifting active set" + + " (pool=%d width=%d drift=%d zipf=%.1f" + + " warmup=%,d measure=%,d epoch=%,d) ---%n" + + " Caffeine admission gate rejects new-window entries (freq=0)" + + " → L1-clear loop → more TAR reads than Guava.%n", + poolSize, width, DRIFT_2, ZIPF_2_EXP, WARMUP_2, MEASURE_2, EPOCH_OPS_2); + int numEpochs = MEASURE_2 / EPOCH_OPS_2; + long[][][] epochs = new long[NUM_POLICIES][numEpochs][]; + long[][] totals = new long[NUM_POLICIES][]; + for (int p = 0; p < NUM_POLICIES; p++) { + try (ReadOnlyFileStore store = openReadOnly(p, CACHE_SIZE_MB)) { + SegmentId[] ids = collectDataIds(store); + epochs[p] = new long[numEpochs][]; + totals[p] = runDriftingEpochs(store, ids, width, epochs[p]); + } + } + printEpochTable(epochs, EPOCH_OPS_2, "tar%"); + for (int p = 0; p < NUM_POLICIES; p++) { + printResult(POLICY_NAMES[p], totals[p][0], totals[p][1], totals[p][2], totals[p][3], -1); + } + } + + /** + * Scenario 3: post-compaction cold-start. Cache is warmed on old-gen segments; + * all traffic then switches to new-gen (freq=0 / LRU-cold). Per-epoch TAR% + * shows how quickly each policy warms up. + */ + private void runScenario3() throws IOException, InvalidFileStoreVersionException { + int oldGen = poolSize / 2; + int newGen = poolSize - oldGen; + System.out.printf( + "%n--- Scenario 3: post-compaction cold-start" + + " (old-gen=%d new-gen=%d warmup=%,d measure=%,d epoch=%,d zipf-new=%.1f) ---%n" + + " Old-gen saturated to freq=15; new-gen auto-rejected (freq≤5 gate):%n" + + " Caffeine ~40%%+ TAR-read%% initially, self-corrects after ~30K ops; Guava ~27%% steady.%n" + + " After convergence: Caffeine ~20%% vs Guava ~24%% — W-TinyLFU wins long-term.%n", + oldGen, newGen, WARMUP_3, MEASURE_3, EPOCH_OPS_3, ZIPF_3_NEW_EXP); + int numEpochs = MEASURE_3 / EPOCH_OPS_3; + long[][][] epochs = new long[NUM_POLICIES][numEpochs][]; + long[][] totals = new long[NUM_POLICIES][]; + for (int p = 0; p < NUM_POLICIES; p++) { + try (ReadOnlyFileStore store = openReadOnly(p, CACHE_SIZE_MB)) { + SegmentId[] ids = collectDataIds(store); + epochs[p] = new long[numEpochs][]; + totals[p] = runCompactionEpochs(store, ids, oldGen, epochs[p]); + } + } + printEpochTable(epochs, EPOCH_OPS_3, "tar%"); + for (int p = 0; p < NUM_POLICIES; p++) { + printResult(POLICY_NAMES[p], totals[p][0], totals[p][1], totals[p][2], totals[p][3], -1); + } + } + + /** + * Runs Scenario 2 (drifting) and Scenario 3 (post-compaction) at half, normal, and + * double cache sizes to show how each policy scales with capacity. + */ + private void runSizeSensitivity() throws IOException, InvalidFileStoreVersionException { + int[] sizes = {CACHE_SIZE_MB / 2, CACHE_SIZE_MB, CACHE_SIZE_MB * 2}; + int width = Math.min(WIDTH_2, poolSize - 1); + int oldGen = poolSize / 2; + + System.out.printf( + "%n--- Size sensitivity: Scenario 2 (drifting, width=%d drift=%d) ---%n", + width, DRIFT_2); + System.out.printf(" %8s", "cacheMB"); + for (int p = 0; p < NUM_POLICIES; p++) { + System.out.printf(" %12s", POLICY_NAMES[p] + "_tar%"); + } + System.out.println(); + for (int sizeMb : sizes) { + System.out.printf(" %8d", sizeMb); + for (int p = 0; p < NUM_POLICIES; p++) { + try (ReadOnlyFileStore store = openReadOnly(p, sizeMb)) { + SegmentId[] ids = collectDataIds(store); + long[][] ignored = new long[MEASURE_2 / EPOCH_OPS_2][]; + long[] r = runDriftingEpochs(store, ids, width, ignored); + long total = r[0]; + System.out.printf(" %12.1f", total == 0 ? 0.0 : 100.0 * r[3] / total); + } + } + System.out.println(); + } + + System.out.printf( + "%n--- Size sensitivity: Scenario 3 (post-compaction, old-gen=%d new-gen=%d) ---%n", + oldGen, poolSize - oldGen); + System.out.printf(" %8s", "cacheMB"); + for (int p = 0; p < NUM_POLICIES; p++) { + System.out.printf(" %12s", POLICY_NAMES[p] + "_tar%"); + } + System.out.println(); + for (int sizeMb : sizes) { + System.out.printf(" %8d", sizeMb); + for (int p = 0; p < NUM_POLICIES; p++) { + try (ReadOnlyFileStore store = openReadOnly(p, sizeMb)) { + SegmentId[] ids = collectDataIds(store); + long[][] ignored = new long[MEASURE_3 / EPOCH_OPS_3][]; + long[] r = runCompactionEpochs(store, ids, oldGen, ignored); + long total = r[0]; + System.out.printf(" %12.1f", total == 0 ? 0.0 : 100.0 * r[3] / total); + } + } + System.out.println(); + } + } + + // ----------------------------------------------------------------------- + // Epoch-based runners (one policy at a time) + // ----------------------------------------------------------------------- + + /** + * Runs the drifting-window scenario for one policy. + * Warms the cache, then measures {@value EPOCH_OPS_2} ops per epoch. + * + * @param store freshly opened store for this policy + * @param pool all data-segment IDs from the store + * @param width sliding window width + * @param epochStats receives per-epoch [total, l1Hits, l2Hits, tarReads] arrays + * @return aggregate [total, l1Hits, l2Hits, tarReads] across all epochs + */ + private static long[] runDriftingEpochs(ReadOnlyFileStore store, SegmentId[] pool, + int width, long[][] epochStats) { + double[] cdf = buildZipfCdf(width, ZIPF_2_EXP); + ThreadLocalRandom rng = ThreadLocalRandom.current(); + int n = pool.length; + int cursor = 0; + int opCount = 0; + + for (int i = 0; i < WARMUP_2; i++) { + if (opCount % DRIFT_2 == 0) cursor = (cursor + 1) % n; + pool[(cursor + zipfSample(cdf, rng.nextDouble())) % n].getSegment(); + opCount++; + } + + long h0 = store.getSegmentCacheStats().getHitCount(); + long m0 = store.getSegmentCacheStats().getMissCount(); + long totTotal = 0, totL1 = 0, totL2 = 0, totTar = 0; + for (int epoch = 0; epoch < epochStats.length; epoch++) { + for (int i = 0; i < EPOCH_OPS_2; i++) { + if (opCount % DRIFT_2 == 0) cursor = (cursor + 1) % n; + pool[(cursor + zipfSample(cdf, rng.nextDouble())) % n].getSegment(); + opCount++; + } + long l1 = store.getSegmentCacheStats().getHitCount() - h0; + long tar = store.getSegmentCacheStats().getMissCount() - m0; + long l2 = Math.max(0, EPOCH_OPS_2 - l1 - tar); + epochStats[epoch] = new long[]{EPOCH_OPS_2, l1, l2, tar}; + totTotal += EPOCH_OPS_2; totL1 += l1; totL2 += l2; totTar += tar; + h0 = store.getSegmentCacheStats().getHitCount(); + m0 = store.getSegmentCacheStats().getMissCount(); + } + return new long[]{totTotal, totL1, totL2, totTar}; + } + + /** + * Runs the post-compaction cold-start scenario for one policy. + * Warms on old-gen then measures access to new-gen only, epoch by epoch. + * + * @param store freshly opened store for this policy + * @param pool all data-segment IDs; first {@code oldGen} = old-gen + * @param oldGen split index: [0, oldGen) = old-gen, [oldGen, pool.length) = new-gen + * @param epochStats receives per-epoch [total, l1Hits, l2Hits, tarReads] arrays + * @return aggregate [total, l1Hits, l2Hits, tarReads] across all epochs + */ + private static long[] runCompactionEpochs(ReadOnlyFileStore store, SegmentId[] pool, + int oldGen, long[][] epochStats) { + int newGen = pool.length - oldGen; + double[] oldCdf = buildZipfCdf(oldGen, ZIPF_EXP); + double[] newCdf = buildZipfCdf(newGen, ZIPF_3_NEW_EXP); + ThreadLocalRandom rng = ThreadLocalRandom.current(); + + for (int i = 0; i < WARMUP_3; i++) { + pool[zipfSample(oldCdf, rng.nextDouble())].getSegment(); + } + long h0 = store.getSegmentCacheStats().getHitCount(); + long m0 = store.getSegmentCacheStats().getMissCount(); + long totTotal = 0, totL1 = 0, totL2 = 0, totTar = 0; + for (int epoch = 0; epoch < epochStats.length; epoch++) { + for (int i = 0; i < EPOCH_OPS_3; i++) { + pool[oldGen + zipfSample(newCdf, rng.nextDouble()) % newGen].getSegment(); + } + long l1 = store.getSegmentCacheStats().getHitCount() - h0; + long tar = store.getSegmentCacheStats().getMissCount() - m0; + long l2 = Math.max(0, EPOCH_OPS_3 - l1 - tar); + epochStats[epoch] = new long[]{EPOCH_OPS_3, l1, l2, tar}; + totTotal += EPOCH_OPS_3; totL1 += l1; totL2 += l2; totTar += tar; + h0 = store.getSegmentCacheStats().getHitCount(); + m0 = store.getSegmentCacheStats().getMissCount(); + } + return new long[]{totTotal, totL1, totL2, totTar}; + } + + // ----------------------------------------------------------------------- + // Reporting helpers + // ----------------------------------------------------------------------- + + /** + * Prints a per-epoch table with one column per policy. + * + * @param policyEpochs [policy][epoch] = [total, l1Hits, l2Hits, tarReads] + * @param epochOps ops per epoch (denominator for percentages) + * @param metric column header suffix, e.g. "tar%" + */ + private static void printEpochTable(long[][][] policyEpochs, int epochOps, String metric) { + System.out.printf(" %8s", "ops"); + for (String name : POLICY_NAMES) { + System.out.printf(" %22s", name + "_" + metric); + } + System.out.println(); + int numEpochs = policyEpochs[0].length; + for (int e = 0; e < numEpochs; e++) { + System.out.printf(" %8d", (long)(e + 1) * epochOps); + for (int p = 0; p < NUM_POLICIES; p++) { + long[] ep = policyEpochs[p][e]; + long tar = ep[3]; + System.out.printf(" %22.1f", pct(tar, ep[0])); + } + System.out.println(); + } + } + + /** + * Prints one result row: policy name, L1/L2/TAR tier breakdown, optional elapsed time. + * + * @param label policy display name + * @param total total accesses in the window + * @param l1Hits served from SegmentId memoization field — no L2 call made + * @param l2Hits found in L2 — no loader/disk read + * @param tarReads loader invocations — actual disk-read equivalents + * @param elapsedMs wall-clock ms, or -1 to omit timing columns + */ + private static void printResult(String label, long total, long l1Hits, + long l2Hits, long tarReads, long elapsedMs) { + double l1Pct = pct(l1Hits, total); + double l2Pct = pct(l2Hits, total); + double tarPct = pct(tarReads, total); + if (elapsedMs < 0) { + System.out.printf( + " %-22s l1%%=%5.1f l2%%=%5.1f tar%%=%5.1f" + + " (total=%,d l1=%,d l2=%,d tar=%,d)%n", + label, l1Pct, l2Pct, tarPct, total, l1Hits, l2Hits, tarReads); + } else { + double opsPerSec = elapsedMs == 0 ? Double.MAX_VALUE : 1000.0 * total / elapsedMs; + System.out.printf( + " %-22s elapsed=%,6d ms ops/sec=%,9.0f" + + " l1%%=%5.1f l2%%=%5.1f tar%%=%5.1f (tar=%,d)%n", + label, elapsedMs, opsPerSec, l1Pct, l2Pct, tarPct, tarReads); + } + } + + private static double pct(long num, long denom) { + return denom == 0 ? 0.0 : 100.0 * num / denom; + } + + // ----------------------------------------------------------------------- + // Zipfian helpers + // ----------------------------------------------------------------------- + + private static double[] buildZipfCdf(int n, double exponent) { + double[] cdf = new double[n]; + double sum = 0; + for (int i = 0; i < n; i++) { + sum += 1.0 / Math.pow(i + 1, exponent); + cdf[i] = sum; + } + for (int i = 0; i < n; i++) { + cdf[i] /= sum; + } + return cdf; + } + + private static int zipfSample(double[] cdf, double u) { + int lo = 0, hi = cdf.length - 1; + while (lo < hi) { + int mid = (lo + hi) >>> 1; + if (cdf[mid] < u) lo = mid + 1; + else hi = mid; + } + return lo; + } +} diff --git a/oak-core-spi/src/test/java/org/apache/jackrabbit/oak/cache/impl/caffeine/CaffeineCacheAdapterTest.java b/oak-core-spi/src/test/java/org/apache/jackrabbit/oak/cache/impl/caffeine/CaffeineCacheAdapterTest.java index cda9f1ff4da..0b661c411c2 100644 --- a/oak-core-spi/src/test/java/org/apache/jackrabbit/oak/cache/impl/caffeine/CaffeineCacheAdapterTest.java +++ b/oak-core-spi/src/test/java/org/apache/jackrabbit/oak/cache/impl/caffeine/CaffeineCacheAdapterTest.java @@ -17,9 +17,11 @@ package org.apache.jackrabbit.oak.cache.impl.caffeine; import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.RemovalCause; +import org.apache.jackrabbit.oak.cache.api.CacheBuilder; import org.apache.jackrabbit.oak.cache.api.CacheStatsSnapshot; import org.apache.jackrabbit.oak.cache.api.EvictionCause; import org.junit.Assert; @@ -50,6 +52,58 @@ public void statsSnapshotReflectsUnderlyingCacheStats() { Assert.assertEquals(1, stats.missCount()); } + /** + * Verifies that eviction listeners registered via {@link CacheBuilder#evictionListener} fire + * synchronously during {@link CaffeineCacheAdapter#invalidateAll()}. + * + *

Oak {@link CacheBuilder} configures {@code executor(Runnable::run)} for caches without + * {@code refreshAfterWrite}, so removal listeners usually run during {@code invalidateAll()} + * even without an explicit {@code cleanUp()} in this adapter. The adapter still calls + * {@code cleanUp()} to guarantee that contract for every backing Caffeine instance and to + * drain any buffered maintenance work before returning.

+ */ + @Test + public void evictionListenerFiresForAllEntriesDuringInvalidateAll() { + AtomicInteger listenerCallCount = new AtomicInteger(0); + org.apache.jackrabbit.oak.cache.api.Cache cache = + CacheBuilder.newBuilder() + .maximumSize(100) + .evictionListener((k, v, cause) -> listenerCallCount.incrementAndGet()) + .build(); + + cache.put("a", "1"); + cache.put("b", "2"); + cache.put("c", "3"); + + cache.invalidateAll(); + + Assert.assertEquals("eviction listener must fire for every entry during invalidateAll()", + 3, listenerCallCount.get()); + } + + /** + * Verifies the same guarantee for {@link CaffeineCacheAdapter#invalidateAll(Iterable)}. + */ + @Test + public void evictionListenerFiresForRequestedEntriesDuringInvalidateAllIterable() { + AtomicInteger listenerCallCount = new AtomicInteger(0); + org.apache.jackrabbit.oak.cache.api.Cache cache = + CacheBuilder.newBuilder() + .maximumSize(100) + .evictionListener((k, v, cause) -> listenerCallCount.incrementAndGet()) + .build(); + + cache.put("a", "1"); + cache.put("b", "2"); + cache.put("c", "3"); + + cache.invalidateAll(Arrays.asList("a", "c")); + + Assert.assertEquals("eviction listener must fire for each invalidated key", + 2, listenerCallCount.get()); + Assert.assertNotNull("non-invalidated entry must still be present", cache.getIfPresent("b")); + } + @Test public void invalidateAllIterableRemovesOnlyRequestedKeys() { CaffeineCacheAdapter adapter = diff --git a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java index 25687239380..3a42b748c19 100644 --- a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java +++ b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java @@ -105,6 +105,9 @@ public PersistentDiskCache(File directory, int cacheMaxSizeMB, DiskCacheIOMonito if (!directory.exists()) { directory.mkdirs(); } + // Seed the counter from actual disk state so restarts don't reset it to 0 + // while old segments are still on disk, which would prevent cleanup from running. + cacheSize.set(FileUtils.sizeOfDirectory(directory)); segmentCacheStats = new SegmentCacheStats( NAME, @@ -210,6 +213,11 @@ public void writeSegment(long msb, long lsb, Buffer buffer) { executor.execute(task); } + /** Returns the in-memory cacheSize counter. Package-private for testing. */ + long getCacheSizeForTesting() { + return cacheSize.get(); + } + private boolean isCacheFull() { return cacheSize.get() >= maxCacheSizeBytes; } @@ -246,10 +254,19 @@ private void cleanUpInternal() { } return; } - long cacheSizeAfter = cacheSize.addAndGet(-length); - diskCacheIOMonitor.updateCacheSize(cacheSizeAfter, -length); - segment.delete(); - evictionCount.incrementAndGet(); + // Delete before decrementing: if another thread races to re-write + // this file between a decrement and the delete, the write increments + // the counter while our decrement already fired, inflating cacheSize. + // Temp files are never counted in cacheSize (the counter is only + // incremented after the atomic rename to the final segment path), so + // deleting a stale temp file must not decrement the counter. + if (segment.delete()) { + if (!segmentCacheEntry.isTempFile()) { + long cacheSizeAfter = cacheSize.addAndGet(-length); + diskCacheIOMonitor.updateCacheSize(cacheSizeAfter, -length); + } + evictionCount.incrementAndGet(); + } } else { breaker.stop(); } diff --git a/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCacheTest.java b/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCacheTest.java index f233b46498c..f4caf9dc4ed 100644 --- a/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCacheTest.java +++ b/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCacheTest.java @@ -26,12 +26,16 @@ import org.junit.rules.TemporaryFolder; import org.mockito.Mockito; +import org.apache.commons.io.FileUtils; + import java.io.File; import java.io.IOException; +import java.nio.file.Files; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.UUID; import static org.junit.Assert.assertEquals; @@ -132,6 +136,108 @@ public void cleanupTest() throws Exception { assertEquals("Segment(s) not cleaned up in cache", 0, SEGMENTS - errors.get()); } + /** + * Reproduces the Fix-A bug: writeSegment() always called cacheSize.addAndGet(fileSize) even + * when the segment file already existed on disk. Every Caffeine L2-eviction followed by a + * re-request caused a re-write of the same file, adding to the counter without adding new + * bytes on disk. Over time this drove cacheSize to ~80 GB while the actual disk held only + * 19.6 GB, making isCacheFull() permanently true and collapsing the disk-cache hit rate. + */ + @Test + public void testCacheSizeNotInflatedOnReWrite() throws Exception { + persistentCache.close(); + File cacheDir = temporaryFolder.newFolder(); + persistentCache = new PersistentDiskCache(cacheDir, 10 * 1024, new DiskCacheIOMonitor(StatisticsProvider.NOOP)); + + TestSegment segment = TestSegment.createSegment(); + long[] id = segment.getSegmentId(); + + // Write the same segment 5 times to simulate repeated L2 eviction + re-read + for (int i = 0; i < 5; i++) { + persistentCache.writeSegment(id[0], id[1], segment.getSegmentBuffer()); + } + waitWhile(() -> persistentCache.getWritesPending() > 0); + Thread.sleep(100); + + // cacheSize counter must equal actual disk usage — not 5× the segment size + long cacheSizeCounter = ((PersistentDiskCache) persistentCache).getCacheSizeForTesting(); + File segmentFile = new File(cacheDir, new UUID(id[0], id[1]).toString()); + assertEquals("cacheSize inflated by repeated writes of the same segment", + segmentFile.length(), cacheSizeCounter); + } + + /** + * Reproduces the Fix-C bug: cacheSize was initialized to 0 on startup regardless of segments + * already present on disk from a previous session. The counter therefore under-reported disk + * usage, isCacheFull() stayed false longer than it should, and cleanup did not run to evict + * old files — allowing disk usage to silently grow past the configured maximum. + */ + @Test + public void testCacheSizeInitializedFromExistingFiles() throws Exception { + persistentCache.close(); + File cacheDir = temporaryFolder.newFolder(); + + // Pre-populate the directory to simulate a restarted instance with leftover segments + byte[] data = new byte[4096]; + new Random().nextBytes(data); + Files.write(new File(cacheDir, UUID.randomUUID().toString()).toPath(), data); + long expectedSize = data.length; + + persistentCache = new PersistentDiskCache(cacheDir, 10 * 1024, new DiskCacheIOMonitor(StatisticsProvider.NOOP)); + + assertEquals("cacheSize should reflect existing files so isCacheFull() is accurate after restart", + expectedSize, ((PersistentDiskCache) persistentCache).getCacheSizeForTesting()); + } + + /** + * Reproduces the Fix-B bug: cleanUpInternal() decremented cacheSize before deleting + * the file. In the window between the decrement and the actual delete a concurrent + * writeSegment task could replace the file and increment cacheSize back, then the cleanup + * delete removed the newly-written file. The net effect was one phantom increment per race + * occurrence — under high concurrent write load this drove cacheSize far above the real + * on-disk bytes. + * + *

The test runs {@value AbstractPersistentCacheTest#THREADS} writer threads against a + * 1 MB cache, forcing cleanup to fire continuously and maximise the probability of the race. + * After all work drains, the in-memory counter must equal the actual directory size.

+ */ + @Test + public void testCacheSizeConsistentUnderConcurrentWriteAndCleanup() throws Exception { + persistentCache.close(); + File cacheDir = temporaryFolder.newFolder(); + // 1 MB max with 0 ms temp-file grace so cleanup fires after every few writes + persistentCache = new PersistentDiskCache(cacheDir, 1, new DiskCacheIOMonitor(StatisticsProvider.NOOP), 0); + + runConcurrently((nThread, nSegment) -> { + TestSegment segment = TestSegment.createSegment(); + long[] id = segment.getSegmentId(); + try { + persistentCache.writeSegment(id[0], id[1], segment.getSegmentBuffer()); + } catch (Throwable t) { + errors.incrementAndGet(); + } finally { + done.incrementAndGet(); + } + }); + + waitWhile(() -> done.get() < SEGMENTS); + waitWhile(() -> persistentCache.getWritesPending() > 0); + waitWhile(() -> ((PersistentDiskCache) persistentCache).cleanupInProgress.get()); + + assertEquals("Errors during concurrent writes", 0, errors.get()); + assertNoTimeout(); + + // One final explicit cleanup pass to drain any in-flight work + persistentCache.cleanUp(); + waitWhile(() -> ((PersistentDiskCache) persistentCache).cleanupInProgress.get()); + + long counter = ((PersistentDiskCache) persistentCache).getCacheSizeForTesting(); + long onDisk = FileUtils.sizeOfDirectory(cacheDir); + assertEquals( + "cacheSize counter must equal actual on-disk bytes after concurrent write+cleanup", + onDisk, counter); + } + @Test public void testIOMonitor() throws IOException { DiskCacheIOMonitor ioMonitorAdapter = Mockito.mock(DiskCacheIOMonitor.class); diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CacheWeights.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CacheWeights.java index fbf2342bd70..c2dad311c7e 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CacheWeights.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CacheWeights.java @@ -65,7 +65,7 @@ public static Weigher noopWeigher() { return (Weigher) NOOP_WEIGHER; } - static int segmentWeight(Segment segment) { + public static int segmentWeight(Segment segment) { return SEGMENT_CACHE_OVERHEAD + segment.estimateMemoryUsage(); } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Segment.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Segment.java index 01fd329d1d6..7c554709e71 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Segment.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Segment.java @@ -456,7 +456,7 @@ public void forEachRecord(RecordConsumer consumer) { * Estimate of how much memory this instance would occupy in the segment * cache. */ - int estimateMemoryUsage() { + public int estimateMemoryUsage() { int size = OBJECT_HEADER_SIZE + 76; size += 56; // 7 refs x 8 bytes diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentId.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentId.java index 5b4fd2bed95..c2df558e164 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentId.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentId.java @@ -206,7 +206,7 @@ public void reclaimed(@NotNull String gcInfo) { * @see #getSegment() * @see #unloaded() */ - void loaded(@NotNull Segment segment) { + public void loaded(@NotNull Segment segment) { this.segment = segment; this.gcGeneration = segment.getGcGeneration(); } @@ -217,7 +217,7 @@ void loaded(@NotNull Segment segment) { * @see #getSegment() * @see #loaded(Segment) */ - void unloaded() { + public void unloaded() { this.segment = null; } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java index bad5068a77d..3ecb217e84a 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java @@ -150,7 +150,8 @@ public SegmentId newSegmentId(long msb, long lsb) { } }); this.blobStore = builder.getBlobStore(); - this.segmentCache = newSegmentCache(builder.getSegmentCacheSize()); + SegmentCache injectedCache = builder.getSegmentCache(); + this.segmentCache = injectedCache != null ? injectedCache : newSegmentCache(builder.getSegmentCacheSize()); this.segmentReader = new CachingSegmentReader( this::getWriter, blobStore, diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java index e37e764f3a1..0fe0207f665 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java @@ -40,6 +40,7 @@ import org.apache.jackrabbit.oak.segment.CacheWeights.TemplateCacheWeigher; import org.apache.jackrabbit.oak.segment.RecordCache; import org.apache.jackrabbit.oak.segment.Segment; +import org.apache.jackrabbit.oak.segment.SegmentCache; import org.apache.jackrabbit.oak.segment.SegmentNotFoundExceptionListener; import org.apache.jackrabbit.oak.segment.WriterCacheManager; import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions; @@ -83,6 +84,9 @@ public class FileStoreBuilder { private int segmentCacheSize = DEFAULT_SEGMENT_CACHE_MB; + @Nullable + private SegmentCache segmentCache; + private int stringCacheSize = DEFAULT_STRING_CACHE_MB; private int templateCacheSize = DEFAULT_TEMPLATE_CACHE_MB; @@ -204,6 +208,20 @@ public FileStoreBuilder withSegmentCacheSize(int segmentCacheSize) { return this; } + /** + * Injects a pre-built {@link SegmentCache} to use instead of the default Caffeine cache. + * Useful for benchmarking alternative eviction policies without polluting the production + * {@link SegmentCache} class. When set, {@link #withSegmentCacheSize(int)} is ignored. + * + * @param segmentCache the cache to use (must not be null) + * @return this instance + */ + @NotNull + public FileStoreBuilder withSegmentCache(@NotNull SegmentCache segmentCache) { + this.segmentCache = segmentCache; + return this; + } + /** * Size of the string cache in MB. * @@ -545,6 +563,11 @@ int getSegmentCacheSize() { return segmentCacheSize; } + @Nullable + SegmentCache getSegmentCache() { + return segmentCache; + } + int getStringCacheSize() { return stringCacheSize; } diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentCacheTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentCacheTest.java index 8e228bddc3f..346b9dcb328 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentCacheTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentCacheTest.java @@ -85,6 +85,17 @@ public void putTest() throws ExecutionException { assertEquals(segment1, cache.getSegment(id1, () -> failToLoad(id1))); } + @Test + public void putSegmentDoesNotInflateWeightOrElementCount() throws ExecutionException { + AbstractCacheStats stats = cache.getCacheStats(); + cache.putSegment(segment1); + assertEquals(33, stats.estimateCurrentWeight()); + assertEquals(1, stats.getElementCount()); + // Entry must still be accessible via L1 and L2 + assertEquals(segment1, id1.getSegment()); + assertEquals(segment1, cache.getSegment(id1, () -> failToLoad(id1))); + } + @Test public void getSegmentWrapsCheckedLoaderFailureInExecutionException() { Exception failure = new Exception("load failed"); @@ -139,6 +150,52 @@ public void invalidateTests() throws ExecutionException { assertEquals(segment1, cache.getSegment(id1, () -> failToLoad(id1))); } + /** + * Reproduces the Caffeine-specific clear() bug: Caffeine's evictionListener fires only for + * size/time evictions, not for explicit invalidateAll(). The old clear() implementation + * delegated entirely to cache.invalidateAll(), so entries not in Caffeine's pending-eviction + * queue kept their L1 (SegmentId.segment) references after the call. Subsequent reads then + * returned stale segment data from L1 instead of going through the loader, bypassing the + * post-compaction reload path that ensures correct segment data. + */ + @Test + public void clearUnloadsAllSegmentIdsFromL1() throws ExecutionException { + cache.getSegment(id1, () -> segment1); + cache.getSegment(id2, () -> segment2); + + // Verify both are memoised in L1 + assertEquals(segment1, id1.getSegment()); + assertEquals(segment2, id2.getSegment()); + + cache.clear(); + + // L1 must be null for ALL entries — not only those Caffeine's evictionListener + // happened to fire for during invalidateAll(). + expect(SegmentNotFoundException.class, id1::getSegment); + expect(SegmentNotFoundException.class, id2::getSegment); + } + + /** + * Reproduces the stats.currentWeight inflation caused by the same Caffeine clear() bug: + * because evictionListener was not called for explicitly-invalidated entries, the weight + * decrements in onRemove() never ran, leaving currentWeight stuck at the pre-clear value. + * Subsequent putSegment() calls added to an already-inflated counter, eventually causing + * spurious size-based evictions and incorrect occupancy metrics. + */ + @Test + public void clearResetsCurrentWeightToZeroForAllEntries() throws ExecutionException { + cache.getSegment(id1, () -> segment1); // contributes weight 33 (32 overhead + 1) + cache.getSegment(id2, () -> segment2); // contributes weight 34 (32 overhead + 2) + assertEquals(67, cache.getCacheStats().estimateCurrentWeight()); + + cache.clear(); + + // currentWeight must be 0: without the explicit stats.currentWeight.set(0) at the + // end of clear(), entries whose evictionListener was skipped kept their weight in + // the counter and inflated it across compaction cycles. + assertEquals(0, cache.getCacheStats().estimateCurrentWeight()); + } + @Test public void evictionDuringPut() throws ExecutionException { cache.putSegment(segment3); @@ -280,6 +337,23 @@ public void hotSegmentEvictedWithoutL2Notification() throws ExecutionException { } } + /** + * When {@link SegmentCache#FT_OAK_12214_PROPAGATE_L1_HITS_TO_L2_ENABLED} is disabled, L1 hits must still + * be counted in {@link AbstractCacheStats#getHitCount()} even though {@code getIfPresent} + * is skipped — the stats branch runs regardless of the L2-notify branch. + */ + @Test + public void recordHitSkipsL2NotifyWhenToggleDisabled() throws ExecutionException { + SegmentCache.FT_OAK_12214_PROPAGATE_L1_HITS_TO_L2_ENABLED.set(false); + try { + cache.getSegment(id1, () -> segment1); + assertEquals(segment1, id1.getSegment()); + assertEquals(1, cache.getCacheStats().getHitCount()); + } finally { + SegmentCache.FT_OAK_12214_PROPAGATE_L1_HITS_TO_L2_ENABLED.set(true); + } + } + @Test public void nonEmptyCacheStatsTest() throws Exception { AbstractCacheStats stats = cache.getCacheStats();