Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public PersistentDiskCache(File directory, int cacheMaxSizeMB, DiskCacheIOMonito
() -> Long.valueOf(directory.listFiles().length),
() -> FileUtils.sizeOfDirectory(directory),
() -> evictionCount.get());
segmentCacheStats.setWriteDiscardCountSupplier(() -> discardCount.get());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public PersistentRedisCache(String redisHost, int redisPort, int redisExpireSeco
redisSocketTimeout, null, redisDBIndex, null);
this.segmentCacheStats = new SegmentCacheStats(NAME, this::getRedisMaxMemory, this::getCacheElementCount,
this::getCurrentWeight, this::getNumberOfEvictedKeys);
this.segmentCacheStats.setWriteDiscardCountSupplier(() -> discardCount.get());
}

private long getCacheElementCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.jackrabbit.oak.commons.pio.Closer;
import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
import org.apache.jackrabbit.oak.segment.spi.monitor.RoleStatisticsProvider;
import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.AbstractPersistentCache;
import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache;
import org.apache.jackrabbit.oak.spi.toggle.FeatureToggle;
import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
Expand Down Expand Up @@ -101,6 +102,12 @@ private PersistentCache createPersistentCache(Configuration configuration, Close
new FeatureToggle(PersistentDiskCache.FT_OAK_12212, PersistentDiskCache.FT_OAK_12212_SKIP_MISSING_FILE_CHECK),
Collections.emptyMap()));

// OAK-12282: expose the bounded write-queue kill switch. Changing this
// flag requires a process restart since the executor is created at startup.
registerCloseable(osgiWhiteboard.register(FeatureToggle.class,
new FeatureToggle(AbstractPersistentCache.FT_OAK_12282, AbstractPersistentCache.FT_OAK_12282_BOUNDED_WRITE_QUEUE_ENABLED),
Collections.emptyMap()));

CacheStatsMBean diskCacheStatsMBean = persistentDiskCache.getCacheStats();
registerCloseable(registerMBean(CacheStatsMBean.class, diskCacheStatsMBean, CacheStats.TYPE, diskCacheStatsMBean.getName()));

Expand All @@ -120,6 +127,11 @@ private PersistentCache createPersistentCache(Configuration configuration, Close
configuration.redisMinConnections(), configuration.redisMaxConnections(), configuration.redisMaxTotalConnections(), configuration.redisDBIndex(), redisCacheIOMonitor);
closer.register(redisCache);

// OAK-12282: expose the bounded write-queue kill switch. Requires a restart to take effect.
registerCloseable(osgiWhiteboard.register(FeatureToggle.class,
new FeatureToggle(AbstractPersistentCache.FT_OAK_12282, AbstractPersistentCache.FT_OAK_12282_BOUNDED_WRITE_QUEUE_ENABLED),
Collections.emptyMap()));

CacheStatsMBean redisCacheStatsMBean = redisCache.getCacheStats();
registerCloseable(registerMBean(CacheStatsMBean.class, redisCacheStatsMBean, CacheStats.TYPE, redisCacheStatsMBean.getName()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,68 @@
import java.io.Closeable;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public abstract class AbstractPersistentCache implements PersistentCache, Closeable {
private static final Logger logger = LoggerFactory.getLogger(AbstractPersistentCache.class);

public static final int THREADS = Integer.getInteger("oak.segment.cache.threads", 10);
public static final int WRITE_QUEUE_SIZE = Integer.getInteger("oak.segment.cache.writeQueueSize", THREADS * 100);

protected ExecutorService executor;
protected AtomicLong cacheSize = new AtomicLong(0);
protected PersistentCache nextCache;
protected final Set<String> writesPending;
protected AtomicLong discardCount = new AtomicLong();

protected SegmentCacheStats segmentCacheStats;

/**
* Name of the feature toggle for the OAK-12282 bounded write-queue fix.
* See {@link #FT_OAK_12282_BOUNDED_WRITE_QUEUE_ENABLED}.
*/
public static final String FT_OAK_12282 = "FT_OAK-12282";

/**
* Whether the bounded write queue introduced in OAK-12282 is active.
* <p>
* When {@code true} (default), the executor uses a queue bounded to
* {@link #WRITE_QUEUE_SIZE} and silently discards write tasks when full,
* preventing OOM under high write load. This is safe because the disk
* cache is an optimisation only — a dropped write means the segment is
* fetched from remote storage on the next read.
* <p>
* Set to {@code false} via the {@link org.apache.jackrabbit.oak.spi.toggle.FeatureToggle}
* registered with the Whiteboard to revert to the pre-fix unbounded queue.
* <strong>Note:</strong> changing this flag requires a process restart, as
* the executor is created at startup.
*/
public static final AtomicBoolean FT_OAK_12282_BOUNDED_WRITE_QUEUE_ENABLED = new AtomicBoolean(true);

public AbstractPersistentCache() {
executor = Executors.newFixedThreadPool(THREADS);
// Formerly Executors.newFixedThreadPool() was used here, which creates an unbounded
// LinkedBlockingQueue — allowing unlimited segment buffers to pile up in memory under
// high write load. The bounded queue (gated by FT_OAK_12282) prevents OOM by dropping
// write tasks when full; this is safe because the disk cache is an optimisation only.
BlockingQueue<Runnable> writeQueue = FT_OAK_12282_BOUNDED_WRITE_QUEUE_ENABLED.get()
? new LinkedBlockingQueue<>(WRITE_QUEUE_SIZE)
: new LinkedBlockingQueue<>();
executor = new ThreadPoolExecutor(
THREADS, THREADS,
0L, TimeUnit.MILLISECONDS,
writeQueue,
(r, e) -> {
discardCount.incrementAndGet();
logger.debug("Segment write task discarded: write queue full (capacity={})", WRITE_QUEUE_SIZE);
});
writesPending = ConcurrentHashMap.newKeySet();
}

Expand Down Expand Up @@ -148,4 +189,8 @@ public void close() {
public int getWritesPending() {
return writesPending.size();
}

public long getWriteDiscardCount() {
return discardCount.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ public class SegmentCacheStats extends AbstractCacheStats {
@NotNull
final AtomicLong missCount = new AtomicLong();

@NotNull
private Supplier<Long> writeDiscardCountSupplier = () -> 0L;

public SegmentCacheStats(@NotNull String name,
@NotNull Supplier<Long> maximumWeight,
@NotNull Supplier<Long> elementCount,
Expand All @@ -78,6 +81,14 @@ protected CacheStats getCurrentStats() {
);
}

public void setWriteDiscardCountSupplier(@NotNull Supplier<Long> supplier) {
this.writeDiscardCountSupplier = requireNonNull(supplier);
}

public long getWriteDiscardCount() {
return writeDiscardCountSupplier.get();
}

@Override
public long getElementCount() {
return elementCount.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/
@Internal(since = "1.0.0")
@Version("6.1.0")
@Version("6.2.0")
package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache;

import org.apache.jackrabbit.oak.commons.annotations.Internal;
Expand Down
Loading