diff --git a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java index 25687239380..d9666de93ca 100644 --- a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java +++ b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java @@ -112,6 +112,7 @@ public PersistentDiskCache(File directory, int cacheMaxSizeMB, DiskCacheIOMonito () -> Long.valueOf(directory.listFiles().length), () -> FileUtils.sizeOfDirectory(directory), () -> evictionCount.get()); + segmentCacheStats.setWriteDiscardCountSupplier(() -> discardCount.get()); } @Override diff --git a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java index 204484e23c1..7c547333714 100644 --- a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java +++ b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java @@ -74,6 +74,7 @@ public PersistentRedisCache(String redisHost, int redisPort, int redisExpireSeco redisSocketTimeout, null, redisDBIndex, null); this.segmentCacheStats = new SegmentCacheStats(NAME, this::getRedisMaxMemory, this::getCacheElementCount, this::getCurrentWeight, this::getNumberOfEvictedKeys); + this.segmentCacheStats.setWriteDiscardCountSupplier(() -> discardCount.get()); } private long getCacheElementCount() { diff --git a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RemotePersistentCacheService.java b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RemotePersistentCacheService.java index f89d414c944..85a5dd0261a 100644 --- a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RemotePersistentCacheService.java +++ b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RemotePersistentCacheService.java @@ -24,6 +24,7 @@ import org.apache.jackrabbit.oak.commons.pio.Closer; import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard; import org.apache.jackrabbit.oak.segment.spi.monitor.RoleStatisticsProvider; +import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.AbstractPersistentCache; import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache; import org.apache.jackrabbit.oak.spi.toggle.FeatureToggle; import org.apache.jackrabbit.oak.spi.whiteboard.Registration; @@ -101,6 +102,12 @@ private PersistentCache createPersistentCache(Configuration configuration, Close new FeatureToggle(PersistentDiskCache.FT_OAK_12212, PersistentDiskCache.FT_OAK_12212_SKIP_MISSING_FILE_CHECK), Collections.emptyMap())); + // OAK-12282: expose the bounded write-queue kill switch. Changing this + // flag requires a process restart since the executor is created at startup. + registerCloseable(osgiWhiteboard.register(FeatureToggle.class, + new FeatureToggle(AbstractPersistentCache.FT_OAK_12282, AbstractPersistentCache.FT_OAK_12282_BOUNDED_WRITE_QUEUE_ENABLED), + Collections.emptyMap())); + CacheStatsMBean diskCacheStatsMBean = persistentDiskCache.getCacheStats(); registerCloseable(registerMBean(CacheStatsMBean.class, diskCacheStatsMBean, CacheStats.TYPE, diskCacheStatsMBean.getName())); @@ -120,6 +127,11 @@ private PersistentCache createPersistentCache(Configuration configuration, Close configuration.redisMinConnections(), configuration.redisMaxConnections(), configuration.redisMaxTotalConnections(), configuration.redisDBIndex(), redisCacheIOMonitor); closer.register(redisCache); + // OAK-12282: expose the bounded write-queue kill switch. Requires a restart to take effect. + registerCloseable(osgiWhiteboard.register(FeatureToggle.class, + new FeatureToggle(AbstractPersistentCache.FT_OAK_12282, AbstractPersistentCache.FT_OAK_12282_BOUNDED_WRITE_QUEUE_ENABLED), + Collections.emptyMap())); + CacheStatsMBean redisCacheStatsMBean = redisCache.getCacheStats(); registerCloseable(registerMBean(CacheStatsMBean.class, redisCacheStatsMBean, CacheStats.TYPE, redisCacheStatsMBean.getName())); diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java index e60ae43867c..62031f18958 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java @@ -31,27 +31,68 @@ import java.io.Closeable; import java.util.Set; import java.util.UUID; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; public abstract class AbstractPersistentCache implements PersistentCache, Closeable { private static final Logger logger = LoggerFactory.getLogger(AbstractPersistentCache.class); public static final int THREADS = Integer.getInteger("oak.segment.cache.threads", 10); + public static final int WRITE_QUEUE_SIZE = Integer.getInteger("oak.segment.cache.writeQueueSize", THREADS * 100); protected ExecutorService executor; protected AtomicLong cacheSize = new AtomicLong(0); protected PersistentCache nextCache; protected final Set writesPending; + protected AtomicLong discardCount = new AtomicLong(); protected SegmentCacheStats segmentCacheStats; + /** + * Name of the feature toggle for the OAK-12282 bounded write-queue fix. + * See {@link #FT_OAK_12282_BOUNDED_WRITE_QUEUE_ENABLED}. + */ + public static final String FT_OAK_12282 = "FT_OAK-12282"; + + /** + * Whether the bounded write queue introduced in OAK-12282 is active. + *

+ * When {@code true} (default), the executor uses a queue bounded to + * {@link #WRITE_QUEUE_SIZE} and silently discards write tasks when full, + * preventing OOM under high write load. This is safe because the disk + * cache is an optimisation only — a dropped write means the segment is + * fetched from remote storage on the next read. + *

+ * Set to {@code false} via the {@link org.apache.jackrabbit.oak.spi.toggle.FeatureToggle} + * registered with the Whiteboard to revert to the pre-fix unbounded queue. + * Note: changing this flag requires a process restart, as + * the executor is created at startup. + */ + public static final AtomicBoolean FT_OAK_12282_BOUNDED_WRITE_QUEUE_ENABLED = new AtomicBoolean(true); + public AbstractPersistentCache() { - executor = Executors.newFixedThreadPool(THREADS); + // Formerly Executors.newFixedThreadPool() was used here, which creates an unbounded + // LinkedBlockingQueue — allowing unlimited segment buffers to pile up in memory under + // high write load. The bounded queue (gated by FT_OAK_12282) prevents OOM by dropping + // write tasks when full; this is safe because the disk cache is an optimisation only. + BlockingQueue writeQueue = FT_OAK_12282_BOUNDED_WRITE_QUEUE_ENABLED.get() + ? new LinkedBlockingQueue<>(WRITE_QUEUE_SIZE) + : new LinkedBlockingQueue<>(); + executor = new ThreadPoolExecutor( + THREADS, THREADS, + 0L, TimeUnit.MILLISECONDS, + writeQueue, + (r, e) -> { + discardCount.incrementAndGet(); + logger.debug("Segment write task discarded: write queue full (capacity={})", WRITE_QUEUE_SIZE); + }); writesPending = ConcurrentHashMap.newKeySet(); } @@ -148,4 +189,8 @@ public void close() { public int getWritesPending() { return writesPending.size(); } + + public long getWriteDiscardCount() { + return discardCount.get(); + } } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/SegmentCacheStats.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/SegmentCacheStats.java index 541e31aa82c..0d09bd1aef9 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/SegmentCacheStats.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/SegmentCacheStats.java @@ -54,6 +54,9 @@ public class SegmentCacheStats extends AbstractCacheStats { @NotNull final AtomicLong missCount = new AtomicLong(); + @NotNull + private Supplier writeDiscardCountSupplier = () -> 0L; + public SegmentCacheStats(@NotNull String name, @NotNull Supplier maximumWeight, @NotNull Supplier elementCount, @@ -78,6 +81,14 @@ protected CacheStats getCurrentStats() { ); } + public void setWriteDiscardCountSupplier(@NotNull Supplier supplier) { + this.writeDiscardCountSupplier = requireNonNull(supplier); + } + + public long getWriteDiscardCount() { + return writeDiscardCountSupplier.get(); + } + @Override public long getElementCount() { return elementCount.get(); diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/package-info.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/package-info.java index 0ad0171f1b9..c3cf43215fe 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/package-info.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/package-info.java @@ -15,7 +15,7 @@ * limitations under the License. */ @Internal(since = "1.0.0") -@Version("6.1.0") +@Version("6.2.0") package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache; import org.apache.jackrabbit.oak.commons.annotations.Internal;