Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticIndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticRetryPolicy;
import org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticIndexProvider;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexStatistics;
import org.apache.jackrabbit.oak.plugins.index.elastic.query.inference.InferenceConfig;
import org.apache.jackrabbit.oak.plugins.index.elastic.query.inference.InferenceConstants;
import org.apache.jackrabbit.oak.plugins.index.elastic.query.inference.InferenceMBeanImpl;
Expand Down Expand Up @@ -227,6 +228,9 @@ private void activate(BundleContext bundleContext, Config config) {
oakRegs.add(whiteboard.register(FeatureToggle.class,
new FeatureToggle(ElasticConnection.FT_OAK_12234, ElasticConnection.FT_OAK_12234_DISABLE),
emptyMap()));
oakRegs.add(whiteboard.register(FeatureToggle.class,
new FeatureToggle(ElasticIndexStatistics.FT_OAK_12248, ElasticIndexStatistics.FT_OAK_12248_ENABLE),
emptyMap()));
if (System.getProperty(QueryEngineSettings.OAK_INFERENCE_ENABLED) != null) {
this.isInferenceEnabled = Boolean.parseBoolean(System.getProperty(QueryEngineSettings.OAK_INFERENCE_ENABLED));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;

import org.apache.jackrabbit.oak.cache.api.CacheBuilder;
Expand All @@ -33,6 +35,8 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import co.elastic.clients.elasticsearch._types.Bytes;
import co.elastic.clients.elasticsearch.cat.indices.IndicesRecord;
Expand All @@ -53,6 +57,17 @@
*/
public class ElasticIndexStatistics implements IndexStatistics {

private static final Logger LOG = LoggerFactory.getLogger(ElasticIndexStatistics.class);

public static final String FT_OAK_12248 = "FT_OAK-12248";
/**
* When {@code true}, a 404 from Elasticsearch (alias not found) is treated as an expected
* empty-index condition: statistics return 0, the query returns an empty cursor, and INFO is
* logged instead of ERROR. Requires OAK-12249 lazy provisioning to be meaningful.
* Disabled by default.
*/
public static final AtomicBoolean FT_OAK_12248_ENABLE = new AtomicBoolean(false);

private static final String MAX_SIZE = "oak.elastic.statsMaxSize";
private static final Long MAX_SIZE_DEFAULT = 10000L;
private static final String EXPIRE_SECONDS = "oak.elastic.statsExpireSeconds";
Expand All @@ -64,7 +79,6 @@ public class ElasticIndexStatistics implements IndexStatistics {
private final ElasticIndexDefinition indexDefinition;
private final LoadingCache<StatsRequestDescriptor, Integer> countCache;
private final LoadingCache<StatsRequestDescriptor, StatsResponse> statsCache;

ElasticIndexStatistics(@NotNull ElasticConnection elasticConnection,
@NotNull ElasticIndexDefinition indexDefinition) {
this(elasticConnection, indexDefinition, null, null);
Expand All @@ -89,7 +103,7 @@ public class ElasticIndexStatistics implements IndexStatistics {
*/
@Override
public int numDocs() {
return countCache.get(new StatsRequestDescriptor(elasticConnection, indexDefinition.getIndexAlias()));
return getOrRefetchDocCount(null, null);
}

/**
Expand All @@ -98,69 +112,62 @@ public int numDocs() {
*/
@Override
public int getDocCountFor(String field) {
String elasticField = ElasticIndexUtils.fieldName(field);
return countCache.get(
new StatsRequestDescriptor(elasticConnection, indexDefinition.getIndexAlias(), elasticField, null)
);
return getOrRefetchDocCount(ElasticIndexUtils.fieldName(field), null);
}

/**
* Returns the approximate number of documents for the {@code query} in the remote index bound to the
* {@code ElasticIndexDefinition}.
*/
public int getDocCountFor(Query query) {
return countCache.get(
new StatsRequestDescriptor(elasticConnection, indexDefinition.getIndexAlias(), null, query)
);
return getOrRefetchDocCount(null, query);
}

private int getOrRefetchDocCount(@Nullable String field, @Nullable Query query) {
return countCache.get(new StatsRequestDescriptor(elasticConnection, indexDefinition.getIndexAlias(), field, query));
}

/**
* Returns the approximate size in bytes for the primary shards of the remote index bound to the
* {@code ElasticIndexDefinition}.
*/
public long primaryStoreSize() {
return statsCache.get(
new StatsRequestDescriptor(elasticConnection, indexDefinition.getIndexAlias())
).primaryStoreSize;
return getOrRefetchStats().primaryStoreSize;
}

/**
* Returns the approximate size in bytes for the remote index bound to the {@code ElasticIndexDefinition}, including
* primary shards and replica shards.
*/
public long storeSize() {
return statsCache.get(
new StatsRequestDescriptor(elasticConnection, indexDefinition.getIndexAlias())
).storeSize;
return getOrRefetchStats().storeSize;
}

/**
* Returns the creation date for the remote index bound to the {@code ElasticIndexDefinition}.
*/
public long creationDate() {
return statsCache.get(
new StatsRequestDescriptor(elasticConnection, indexDefinition.getIndexAlias())
).creationDate;
return getOrRefetchStats().creationDate;
}

/**
* Returns the number of low level lucene documents for the remote index bound to the
* {@code ElasticIndexDefinition}. This document count includes hidden nested documents.
*/
public int luceneNumDocs() {
return statsCache.get(
new StatsRequestDescriptor(elasticConnection, indexDefinition.getIndexAlias())
).luceneDocsCount;
return getOrRefetchStats().luceneDocsCount;
}

/**
* Returns the number of deleted low level lucene documents for the remote index bound to the
* {@code ElasticIndexDefinition}. This document count includes hidden nested documents.
*/
public int luceneNumDeletedDocs() {
return statsCache.get(
new StatsRequestDescriptor(elasticConnection, indexDefinition.getIndexAlias())
).luceneDocsDeleted;
return getOrRefetchStats().luceneDocsDeleted;
}

private StatsResponse getOrRefetchStats() {
return statsCache.get(new StatsRequestDescriptor(elasticConnection, indexDefinition.getIndexAlias()));
}

static LoadingCache<StatsRequestDescriptor, Integer> setupCountCache(long maxSize, long expireSeconds, long refreshSeconds, @Nullable Clock clock) {
Expand Down Expand Up @@ -195,7 +202,15 @@ static class CountCacheLoader implements CacheLoader<StatsRequestDescriptor, Int

@Override
public @NotNull Integer load(@NotNull StatsRequestDescriptor countRequestDescriptor) throws IOException {
return count(countRequestDescriptor);
try {
return count(countRequestDescriptor);
} catch (ElasticsearchException ee) {
if (ee.status() == 404 && FT_OAK_12248_ENABLE.get()) {
LOG.info("ES alias not found for index {} — treating as empty (OAK-12248)", countRequestDescriptor.index);
return 0;
}
throw ee;
}
}

private int count(StatsRequestDescriptor crd) throws IOException {
Expand All @@ -216,7 +231,15 @@ static class StatsCacheLoader implements CacheLoader<StatsRequestDescriptor, Sta

@Override
public @NotNull StatsResponse load(@NotNull StatsRequestDescriptor countRequestDescriptor) throws IOException {
return stats(countRequestDescriptor);
try {
return stats(countRequestDescriptor);
} catch (ElasticsearchException ee) {
if (ee.status() == 404 && FT_OAK_12248_ENABLE.get()) {
LOG.info("ES alias not found for index {} — returning empty stats (OAK-12248)", countRequestDescriptor.index);
return StatsResponse.NO_ALIAS_STATS;
}
throw ee;
}
}

private StatsResponse stats(StatsRequestDescriptor crd) throws IOException {
Expand Down Expand Up @@ -292,6 +315,8 @@ private String internalQuery() {

static class StatsResponse {

static final StatsResponse NO_ALIAS_STATS = new StatsResponse(0, 0, -1, 0, 0);

final long storeSize;
final long primaryStoreSize;
final long creationDate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexNode;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexStatistics;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexTracker;
import org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticResultRowAsyncIterator;
import org.apache.jackrabbit.oak.plugins.index.search.IndexNode;
Expand All @@ -33,6 +34,7 @@
import org.jetbrains.annotations.NotNull;

import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.BiFunction;
import java.util.function.Predicate;

Expand Down Expand Up @@ -120,6 +122,14 @@ public Cursor query(IndexPlan plan, NodeState rootState) {
ElasticQueryIterator itr;
ElasticIndexNode indexNode = acquireIndexNode(plan);
try {
if (ElasticIndexStatistics.FT_OAK_12248_ENABLE.get() && indexNode.getIndexStatistics().numDocs() == 0) {
final String explainQuery = requestHandler.baseQuery().toString();
return new ElasticQueryIterator() {
@Override public boolean hasNext() { return false; }
@Override public FulltextResultRow next() { throw new NoSuchElementException(); }
@Override public String explain() { return explainQuery; }
};
}
if (requestHandler.requiresSpellCheck()) {
itr = new ElasticSpellcheckIterator(indexNode, requestHandler, responseHandler);
} else if (requestHandler.requiresSuggestion()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.index.elastic;

import org.apache.jackrabbit.oak.api.Tree;
import org.apache.jackrabbit.oak.plugins.index.search.util.IndexDefinitionBuilder;
import org.junit.After;
import org.junit.Test;

import java.util.List;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/**
* Tests for OAK-12248: graceful handling of ES 404 (alias not found) during query.
*/
public class ElasticGraceful404QueryTest extends ElasticAbstractQueryTest {

private static final String QUERY = "SELECT * FROM [nt:base] WHERE [ghost] IS NOT NULL OPTION(TRAVERSAL FAIL)";

@After
public void resetToggle() {
ElasticIndexStatistics.FT_OAK_12248_ENABLE.set(false);
}

private Tree provisionIndex() throws Exception {
Tree test = root.getTree("/").addChild("test");
test.addChild("a").setProperty("ghost", "value");
test.addChild("b").setProperty("ghost", "value");
root.commit();

IndexDefinitionBuilder builder = createIndex("ghost");
builder.indexRule("nt:base").property("ghost").propertyIndex();
Tree index = setIndex("ghostIndex", builder);
root.commit();

assertEventually(() -> {
assertTrue(exists(index));
assertEquals(2, countDocuments(index));
});
return index;
}

private void deleteAlias(Tree index) throws Exception {
esConnection.getClient().indices().delete(i -> i
.index(getElasticIndexDefinition(index).getIndexAlias() + "*"));
}

@Test
public void queryOnMissingAlias_withToggleOff_failsWithTraversalError() throws Exception {
Tree index = provisionIndex();
deleteAlias(index);

List<String> results = executeQuery(QUERY, SQL2);

assertEquals(1, results.size());
assertTrue("Expected traversal-fail error when toggle is off and alias is missing",
results.get(0).contains("Traversal"));
}

@Test
public void queryOnMissingAlias_withToggleOn_returnsEmpty() throws Exception {
Tree index = provisionIndex();
deleteAlias(index);
ElasticIndexStatistics.FT_OAK_12248_ENABLE.set(true);

List<String> results = executeQuery(QUERY, SQL2);

assertEquals("Expected empty result set when alias is missing and toggle is on",
0, results.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.jackrabbit.oak.plugins.index.elastic;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.core.CountRequest;
import co.elastic.clients.elasticsearch.core.CountResponse;
Expand Down Expand Up @@ -66,6 +67,7 @@ public void setUp() {
@After
public void releaseMocks() throws Exception {
closeable.close();
ElasticIndexStatistics.FT_OAK_12248_ENABLE.set(false);
}

@Test
Expand Down Expand Up @@ -142,4 +144,45 @@ public void cachedStatistics() throws Exception {
assertEquals(5000, indexStatistics.getDocCountFor(Query.of(qf -> qf.matchAll(mf -> mf.boost(100F)))));
verify(elasticClientMock, times(5)).count(any(CountRequest.class));
}

@Test
public void numDocsReturnsZeroOn404WithToggleEnabled() throws Exception {
ElasticIndexStatistics.FT_OAK_12248_ENABLE.set(true);
LoadingCache<ElasticIndexStatistics.StatsRequestDescriptor, Integer> cache =
ElasticIndexStatistics.setupCountCache(100, 10 * 60, 60, null);
ElasticIndexStatistics stats =
new ElasticIndexStatistics(elasticConnectionMock, indexDefinitionMock, cache, null);

ElasticsearchException notFound = mock(ElasticsearchException.class);
when(notFound.status()).thenReturn(404);
when(elasticClientMock.count(any(CountRequest.class))).thenThrow(notFound);

assertEquals(0, stats.numDocs());
verify(elasticClientMock).count(any(CountRequest.class));
}

@Test
public void numDocsRecoverAfterCacheExpiry() throws Exception {
ElasticIndexStatistics.FT_OAK_12248_ENABLE.set(true);
Clock.Virtual clock = new Clock.Virtual();
LoadingCache<ElasticIndexStatistics.StatsRequestDescriptor, Integer> cache =
ElasticIndexStatistics.setupCountCache(100, 10 * 60, 60, clock);
ElasticIndexStatistics stats =
new ElasticIndexStatistics(elasticConnectionMock, indexDefinitionMock, cache, null);

ElasticsearchException notFound = mock(ElasticsearchException.class);
when(notFound.status()).thenReturn(404);
CountResponse countResponse = mock(CountResponse.class);
when(countResponse.count()).thenReturn(5L);
when(elasticClientMock.count(any(CountRequest.class)))
.thenThrow(notFound)
.thenReturn(countResponse);

assertEquals(0, stats.numDocs());

clock.waitFor(Duration.ofMinutes(11));

assertEquals(5, stats.numDocs());
verify(elasticClientMock, times(2)).count(any(CountRequest.class));
}
}
Loading