AbstractElasticsearchIndex.java
/*
* Licensed to The Apereo Foundation under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional
* information regarding copyright ownership.
*
*
* The Apereo Foundation licenses this file to you under the Educational
* Community License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License
* at:
*
* http://opensource.org/licenses/ecl2.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.opencastproject.elasticsearch.impl;
import static org.opencastproject.util.data.functions.Misc.chuck;
import org.opencastproject.elasticsearch.api.SearchIndex;
import org.opencastproject.elasticsearch.api.SearchIndexException;
import org.opencastproject.elasticsearch.api.SearchMetadata;
import org.opencastproject.elasticsearch.api.SearchQuery;
import org.opencastproject.elasticsearch.api.SearchResult;
import org.opencastproject.elasticsearch.api.SearchResultItem;
import org.opencastproject.util.requests.SortCriterion;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.http.ConnectionClosedException;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.ScriptSortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.osgi.framework.BundleContext;
import org.osgi.service.component.ComponentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.net.ConnectException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Function;
/**
* A search index implementation based on ElasticSearch.
*/
public abstract class AbstractElasticsearchIndex implements SearchIndex {
/** Logging facility */
private static final Logger logger = LoggerFactory.getLogger(AbstractElasticsearchIndex.class);
/** The Elasticsearch maximum results window size */
private static final int ELASTICSEARCH_INDEX_MAX_RESULT_WINDOW = Integer.MAX_VALUE;
/** The Elasticsearch term aggregation size */
private static final int ELASTICSEARCH_TERM_AGGREGATION_SIZE = 10000;
/** Configuration key defining the hostname of an external Elasticsearch server */
public static final String ELASTICSEARCH_SERVER_HOSTNAME_KEY = "org.opencastproject.elasticsearch.server.hostname";
/** Configuration key defining the scheme (http/https) of an external Elasticsearch server */
public static final String ELASTICSEARCH_SERVER_SCHEME_KEY = "org.opencastproject.elasticsearch.server.scheme";
/** Configuration key defining the port of an external Elasticsearch server */
public static final String ELASTICSEARCH_SERVER_PORT_KEY = "org.opencastproject.elasticsearch.server.port";
/** Configuration key defining the username of an external Elasticsearch server */
public static final String ELASTICSEARCH_USERNAME_KEY = "org.opencastproject.elasticsearch.username";
/** Configuration key defining the password of an external Elasticsearch server */
public static final String ELASTICSEARCH_PASSWORD_KEY = "org.opencastproject.elasticsearch.password";
/** Default port of an external Elasticsearch server */
private static final int ELASTICSEARCH_SERVER_PORT_DEFAULT = 9200;
/** Default hostname of an external Elasticsearch server */
private static final String ELASTICSEARCH_SERVER_HOSTNAME_DEFAULT = "localhost";
/** Default scheme of an external Elasticsearch server */
private static final String ELASTICSEARCH_SERVER_SCHEME_DEFAULT = "http";
/** Identifier of the root entry */
private static final String ROOT_ID = "root";
/** The index identifier */
private String indexIdentifier = null;
private static final String INDEX_IDENTIFIER_PROPERTY = "index.identifier";
private static final String DEFAULT_INDEX_IDENTIFIER = "opencast";
/** The index name */
private String indexName = null;
private static final String INDEX_NAME_PROPERTY = "index.name";
private static final String DEFAULT_INDEX_NAME = "Elasticsearch";
/** The high level client */
private RestHighLevelClient client = null;
/** The version number */
private int indexVersion = -1;
/** The path to the index settings */
protected String indexSettingsPath;
/** Hostname of an external Elasticsearch server to connect to. */
private String externalServerHostname = ELASTICSEARCH_SERVER_HOSTNAME_DEFAULT;
/** Scheme of an external Elasticsearch server to connect to. */
private String externalServerScheme = ELASTICSEARCH_SERVER_SCHEME_DEFAULT;
/** Port of an external Elasticsearch server to connect to */
private int externalServerPort = ELASTICSEARCH_SERVER_PORT_DEFAULT;
/** Username of an external Elasticsearch server to connect to. */
private String username;
/** Password of an external Elasticsearch server to connect to. */
private String password;
/** Defines how long to wait between retries, when the connection to OpenSearch failed on startup */
private int retryDelayOnStartup;
private static final String RETRY_DELAY_ON_STARTUP = "retry.delay.on.startup";
private static final int DEFAULT_RETRY_DELAY_ON_STARTUP = 10000;
/**
* Returns an array of document types for the index. For every one of these, the corresponding document type
* definition will be loaded.
*
* @return the document types
*/
public abstract String[] getDocumentTypes();
/**
* OSGi callback to activate this component instance.
*
* @param properties
* The configuration
* @param bundleContext
* the bundle context
* @throws ComponentException
* if the search index cannot be initialized
*/
public void activate(Map<String, Object> properties, BundleContext bundleContext) throws ComponentException {
indexIdentifier = StringUtils.defaultIfBlank((String) properties
.get(INDEX_IDENTIFIER_PROPERTY), DEFAULT_INDEX_IDENTIFIER);
logger.info("Index identifier set to {}.", indexIdentifier);
indexSettingsPath = StringUtils.trimToNull(bundleContext.getProperty("karaf.etc"));
if (indexSettingsPath == null) {
throw new ComponentException("Could not determine Karaf configuration path");
}
externalServerHostname = StringUtils
.defaultIfBlank(bundleContext.getProperty(ELASTICSEARCH_SERVER_HOSTNAME_KEY),
ELASTICSEARCH_SERVER_HOSTNAME_DEFAULT);
externalServerScheme = StringUtils
.defaultIfBlank(bundleContext.getProperty(ELASTICSEARCH_SERVER_SCHEME_KEY),
ELASTICSEARCH_SERVER_SCHEME_DEFAULT);
externalServerPort = Integer.parseInt(StringUtils
.defaultIfBlank(bundleContext.getProperty(ELASTICSEARCH_SERVER_PORT_KEY),
ELASTICSEARCH_SERVER_PORT_DEFAULT + ""));
username = StringUtils.trimToNull(bundleContext.getProperty(ELASTICSEARCH_USERNAME_KEY));
password = StringUtils.trimToNull(bundleContext.getProperty(ELASTICSEARCH_PASSWORD_KEY));
}
/**
* OSGi callback for configuration changes.
*
* @param properties
* The configuration
*/
public void modified(Map<String, Object> properties) {
indexName = StringUtils.defaultIfBlank((String) properties.get(INDEX_NAME_PROPERTY),
DEFAULT_INDEX_NAME);
logger.info("Index name set to {}.", indexName);
retryDelayOnStartup = NumberUtils.toInt((String) properties.get(RETRY_DELAY_ON_STARTUP),
DEFAULT_RETRY_DELAY_ON_STARTUP);
if (retryDelayOnStartup <= 0) {
throw new IllegalArgumentException(RETRY_DELAY_ON_STARTUP
+ " was wrongly configured. Value has to be greater than 0.");
}
logger.info("Retry delay on startup set to {} ms.", retryDelayOnStartup);
}
@Override
public int getIndexVersion() {
return indexVersion;
}
@Override
public void clear() throws IOException {
try {
final DeleteIndexRequest request = new DeleteIndexRequest(
Arrays.stream(getDocumentTypes()).map(this::getSubIndexIdentifier).toArray(String[]::new));
final AcknowledgedResponse delete = client.indices().delete(request, RequestOptions.DEFAULT);
if (!delete.isAcknowledged()) {
logger.error("Index '{}' could not be deleted", getIndexName());
}
createIndex();
} catch (ElasticsearchException exception) {
if (exception.status() == RestStatus.NOT_FOUND) {
logger.error("Cannot clear non-existing index '{}'", exception.getIndex().getName());
}
} catch (SearchIndexException e) {
logger.error("Unable to re-create the index after a clear", e);
}
}
/**
* Posts the input document to the search index.
*
* @param maxRetryAttempts
* How often to retry update in case of ElasticsearchStatusException
* @param retryWaitingPeriod
* How long to wait (in ms) between retries
* @param document
* The Elasticsearch document
* @return the query response
*
* @throws IOException
* If updating the index fails
* @throws InterruptedException
* If waiting during retry is interrupted
*/
protected IndexResponse update(int maxRetryAttempts, int retryWaitingPeriod, ElasticsearchDocument document)
throws IOException, InterruptedException {
final IndexRequest indexRequest = new IndexRequest(getSubIndexIdentifier(document.getType())).id(document.getUID())
.source(document).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
IndexResponse indexResponse = null;
int retryAttempts = 0;
do {
try {
indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
} catch (ElasticsearchStatusException e) {
retryAttempts++;
if (retryAttempts <= maxRetryAttempts) {
logger.warn("Could not update documents in index {}, retrying in {} ms.", getIndexName(),
retryWaitingPeriod, e);
if (retryWaitingPeriod > 0) {
Thread.sleep(retryWaitingPeriod);
}
} else {
logger.error("Could not update documents in index {}, not retrying.", getIndexName(),
e);
throw e;
}
}
} while (indexResponse == null);
return indexResponse;
}
/**
* Posts the input documents to the search index.
*
* @param maxRetryAttempts
* How often to retry update in case of ElasticsearchStatusException
* @param retryWaitingPeriod
* How long to wait (in ms) between retries
* @param documents
* The Elasticsearch documents
* @return the query response
*
* @throws IOException
* If updating the index fails
* @throws InterruptedException
* If waiting during retry is interrupted
*/
protected BulkResponse bulkUpdate(int maxRetryAttempts, int retryWaitingPeriod,
List<ElasticsearchDocument> documents)
throws IOException, InterruptedException {
BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (ElasticsearchDocument document: documents) {
bulkRequest.add(new IndexRequest(getSubIndexIdentifier(document.getType())).id(document.getUID())
.source(document));
}
BulkResponse bulkResponse = null;
int retryAttempts = 0;
do {
try {
bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
} catch (ElasticsearchStatusException e) {
retryAttempts++;
if (retryAttempts <= maxRetryAttempts) {
logger.warn("Could not update documents in index {} because of {}, retrying in {} ms.", getIndexName(),
e.getMessage(), retryWaitingPeriod);
if (retryWaitingPeriod > 0) {
Thread.sleep(retryWaitingPeriod);
}
} else {
logger.error("Could not update documents in index {}, not retrying.", getIndexName(),
e);
throw e;
}
}
} while (bulkResponse == null);
return bulkResponse;
}
/**
* Delete document from index.
*
* @param type
* The type of document we want to delete
* @param id
* The identifier of the document
* @return
* The delete response
*
* @throws IOException
* If deleting from the index fails
* @throws InterruptedException
* If waiting during retry is interrupted
*/
protected DeleteResponse delete(String type, String id, int maxRetryAttempts, int retryWaitingPeriod)
throws IOException, InterruptedException {
final DeleteRequest deleteRequest = new DeleteRequest(getSubIndexIdentifier(type), id).setRefreshPolicy(
WriteRequest.RefreshPolicy.IMMEDIATE);
DeleteResponse deleteResponse = null;
int retryAttempts = 0;
do {
try {
deleteResponse = getClient().delete(deleteRequest, RequestOptions.DEFAULT);
} catch (ElasticsearchStatusException e) {
retryAttempts++;
if (retryAttempts <= maxRetryAttempts) {
logger.warn("Could not remove documents from index {} because of {}, retrying in {} ms.", getIndexName(),
e.getMessage(), retryWaitingPeriod);
if (retryWaitingPeriod > 0) {
Thread.sleep(retryWaitingPeriod);
}
} else {
logger.error("Could not remove documents from index {}, not retrying.", getIndexName(),
e);
throw e;
}
}
} while (deleteResponse == null);
return deleteResponse;
}
/**
* Initializes an Elasticsearch node for the given index.
*
* @param version
* the index version
* @throws SearchIndexException
* if the index configuration cannot be loaded
* @throws IOException
* if loading of settings fails
* @throws IllegalArgumentException
* if the index identifier is blank.
*/
protected void init(int version)
throws IOException, IllegalArgumentException, SearchIndexException {
this.indexVersion = version;
if (client == null) {
final RestClientBuilder builder = RestClient
.builder(new HttpHost(externalServerHostname, externalServerPort, externalServerScheme));
if (username != null && password != null) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
builder.setHttpClientConfigCallback(
httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
}
client = new RestHighLevelClient(builder);
}
// Test if opensearch is reachable and continuously retry if it is not
waitUntilOpensearchIsAvailable();
// Create the index
createIndex();
}
/**
* Continuously tries to connect to OpenSearch until it is reachable.
*
* @throws RuntimeException if an unexpected exception occurs
*/
private void waitUntilOpensearchIsAvailable() {
String openSearchUrl = getOpensearchURL();
logger.info("Testing connection to OpenSearch at {}", openSearchUrl);
while (!isOpensearchReachable()) {
logger.warn("Could not reach OpenSearch at {}. Trying again after {} ms...", openSearchUrl, retryDelayOnStartup);
try {
Thread.sleep(retryDelayOnStartup);
} catch (InterruptedException e) {
throw new RuntimeException("Could not reach OpenSearch at " + openSearchUrl, e);
}
}
logger.info("Connection to OpenSearch at {} tested successfully", openSearchUrl);
}
/**
* Checks if OpenSearch is reachable.
*
* @return true if OpenSearch is reachable, false otherwise
*
* @throws ElasticsearchException if an ElasticsearchException occurs, e.g. if the server returns a 4xx or 5xx error
*
* @throws RuntimeException if an unexpected exception occurs
*/
private boolean isOpensearchReachable() {
try {
// test connection
ClusterHealthRequest request = new ClusterHealthRequest();
request.waitForYellowStatus();
ClusterHealthResponse resp = client.cluster().health(request, RequestOptions.DEFAULT);
if (resp.getStatus().equals(ClusterHealthStatus.GREEN)) {
logger.debug("Connected to OpenSearch, cluster health is {}", resp.getStatus());
return true;
} else if (resp.getStatus().equals(ClusterHealthStatus.YELLOW)) {
logger.warn("Connected to OpenSearch, cluster health is {}", resp.getStatus());
return true;
}
logger.debug("Connected to OpenSearch, but cluster health is {}", resp.getStatus());
return false;
} catch (ConnectException | ConnectionClosedException connectException) {
// Get thrown when we are unable to connect. Normally this should only happen when
// opensearch is not running or is just starting up, therefore only log the error on debug level
logger.debug("Unable to connect to OpenSearch", connectException);
return false;
} catch (IOException ioException) {
// Could be thrown when a docker container with opensearch is just starting up,
// so we check further if the cause is a socket exception
if (ioException.getCause() instanceof java.net.SocketException) {
// it seems like a container is starting up, we continue the loop
logger.debug("Unable to connect to OpenSearch", ioException);
return false;
}
// something different triggered an ioexception, so we fail
throw new RuntimeException("Couldn't connect to opensearch due to IOExceptionError", ioException);
} catch (ElasticsearchException elasticsearchException) {
if (elasticsearchException.status().getStatus() >= 500) {
logger.debug("OpenSearch health request ended with HTTP status code {}. Is OpenSearch service running?",
elasticsearchException.status().getStatus());
return false;
}
// An ElasticsearchException is usually thrown in case where the server returns a 4xx or 5xx error code.
// So for example for an HTTP 401 Unauthorized: In this case we want the startup to fail, so
// we get an error and have the chance to change the configuration
logger.error("Error while testing OpenSearch connection", elasticsearchException);
throw elasticsearchException;
} catch (Exception e) {
// When another exception occurs, we throw a runtime exception, so the startup of Opencast will fail
throw new RuntimeException("Unable to connect to OpenSearch, unexpected exception", e);
}
}
/**
* Closes the client.
*
* @throws IOException
* if stopping the Elasticsearch node fails
*/
protected void close() throws IOException {
if (client != null) {
client.close();
}
}
/**
* Prepares index to store data for the types (or mappings) as returned by {@link #getDocumentTypes()}.
*
*
* @throws SearchIndexException
* if index and type creation fails
* @throws IOException
* if loading of the type definitions fails
*/
private void createIndex() throws SearchIndexException, IOException {
if (StringUtils.isBlank(this.indexIdentifier)) {
throw new IllegalArgumentException("Search index identifier must be set");
}
for (String type : getDocumentTypes()) {
createSubIndex(type, getSubIndexIdentifier(type));
}
}
private void createSubIndex(String type, String idxName) throws SearchIndexException, IOException {
try {
logger.debug("Trying to create index for '{}'", idxName);
final CreateIndexRequest request = new CreateIndexRequest(idxName)
.settings(loadResources("indexSettings.json"), XContentType.JSON)
.mapping(loadResources(type + "-mapping.json"), XContentType.JSON);
final CreateIndexResponse siteIdxResponse = client.indices().create(request, RequestOptions.DEFAULT);
if (!siteIdxResponse.isAcknowledged()) {
throw new SearchIndexException("Unable to create index for '" + idxName + "'");
}
} catch (ElasticsearchStatusException e) {
if (e.getDetailedMessage().contains("already_exists_exception")) {
logger.info("Detected existing index '{}'", idxName);
} else {
throw e;
}
}
// See if the index version exists and check if it matches. The request will
// fail if there is no version index
boolean versionIndexExists = false;
final GetRequest getRequest = new GetRequest(idxName, ROOT_ID);
try {
final GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
if (getResponse.isExists() && getResponse.getField(IndexSchema.VERSION) != null) {
final int actualIndexVersion = Integer.parseInt(getResponse.getField(IndexSchema.VERSION).getValue()
.toString());
if (indexVersion != actualIndexVersion) {
throw new SearchIndexException(
"Search index is at version " + actualIndexVersion + ", but codebase expects " + indexVersion);
}
versionIndexExists = true;
logger.debug("Search index version is {}", indexVersion);
}
} catch (ElasticsearchException e) {
logger.debug("Version index has not been created");
}
// The index does not exist, let's create it
if (!versionIndexExists) {
logger.debug("Creating version index for site '{}'", idxName);
final IndexRequest indexRequest = new IndexRequest(idxName).id(ROOT_ID)
.source(Collections.singletonMap(IndexSchema.VERSION, indexVersion + ""));
logger.debug("Index version of site '{}' is {}", idxName, indexVersion);
client.index(indexRequest, RequestOptions.DEFAULT);
}
}
/**
* Load resources from active index class resources if they exist or fall back to these classes resources as default.
*
* @return the string containing the resource
* @throws IOException
* if reading the resources fails
*/
private String loadResources(final String filename) throws IOException {
final String resourcePath = "/elasticsearch/" + filename;
// Try loading from the index implementation first.
// This allows index implementations to override the defaults
for (Class cls : Arrays.asList(this.getClass(), AbstractElasticsearchIndex.class)) {
try (InputStream is = cls.getResourceAsStream(resourcePath)) {
if (is != null) {
final String settings = IOUtils.toString(is, StandardCharsets.UTF_8);
logger.debug("Reading elasticsearch configuration resources from {}:\n{}", cls, settings);
return settings;
}
}
}
return null;
}
/**
* Creates a request for a search query based on the properties known by the search query.
* <p>
* Once this query builder has been created, support for ordering needs to be configured as needed.
*
* @param query
* the search query
* @return the request builder
*/
protected SearchRequest getSearchRequest(SearchQuery query, QueryBuilder queryBuilder) {
final SearchSourceBuilder searchSource = new SearchSourceBuilder()
.query(queryBuilder)
.trackTotalHits(true);
// Create the actual search query
logger.debug("Searching for {}", searchSource);
// Make sure all fields are being returned
if (query.getFields().length > 0) {
searchSource.storedFields(Arrays.asList(query.getFields()));
} else {
searchSource.storedFields(Collections.singletonList("*"));
}
// Pagination
if (query.getOffset() >= 0) {
searchSource.from(query.getOffset());
}
int limit = ELASTICSEARCH_INDEX_MAX_RESULT_WINDOW;
if (query.getLimit() > 0) {
if (query.getOffset() > 0
&& (long) query.getOffset() + (long) query.getLimit() > ELASTICSEARCH_INDEX_MAX_RESULT_WINDOW) {
limit = ELASTICSEARCH_INDEX_MAX_RESULT_WINDOW - query.getOffset();
} else {
limit = query.getLimit();
}
}
searchSource.size(limit);
// Sort orders
final Map<String, SortCriterion.Order> sortCriteria = query.getSortOrders();
for (Entry<String, SortCriterion.Order> sortCriterion : sortCriteria.entrySet()) {
ScriptSortBuilder sortBuilder = null;
logger.debug("Event sort criteria: {}", sortCriterion.getKey());
if ("publication".equals(sortCriterion.getKey())) {
sortBuilder = SortBuilders.scriptSort(
new Script("params._source.publication.length"),
ScriptSortBuilder.ScriptSortType.NUMBER);
}
switch (sortCriterion.getValue()) {
case Ascending:
if (sortBuilder != null) {
sortBuilder.order(SortOrder.ASC);
searchSource.sort(sortBuilder);
} else {
searchSource.sort(sortCriterion.getKey(), SortOrder.ASC);
}
break;
case Descending:
if (sortBuilder != null) {
sortBuilder.order(SortOrder.DESC);
searchSource.sort(sortBuilder);
} else {
searchSource.sort(sortCriterion.getKey(), SortOrder.DESC);
}
break;
default:
break;
}
}
return new SearchRequest(Arrays.stream(query.getTypes()).map(this::getSubIndexIdentifier).toArray(String[]::new))
.searchType(SearchType.QUERY_THEN_FETCH).preference("_local").source(searchSource);
}
/**
* Returns the name of this index.
*
* @return the index name
*/
public String getIndexName() {
return indexName;
}
/*
* This method is a workaround to avoid accessing org.apache.lucene.search.TotalHits outside this bundle.
* Doing so would cause OSGi dependency problems. It seems to be a bug anyway that ES exposes this
* class.
*/
protected long getTotalHits(SearchHits hits) {
return hits.getTotalHits().value;
}
/**
* Returns the name of the sub index for the given type.
*
* @param type
* The type to get the sub index for.
* @return the index name
*/
protected String getSubIndexIdentifier(String type) {
return this.indexIdentifier + "_" + type;
}
public RestHighLevelClient getClient() {
return client;
}
/**
* Execute a query on the index.
*
* @param query
* The query to use to find the results
* @param request
* The builder to use to create the query.
* @param toSearchResult
* The function to convert the results to a {@link SearchResult}
* @param maxRetryAttempts
* How often to retry query in case of ElasticsearchStatusException
* @param retryWaitingPeriod
* How long to wait (in ms) between retries
* @return A {@link SearchResult} containing the relevant objects.
*
* @throws IOException
* If querying the index fails
* @throws InterruptedException
* If waiting during retry is interrupted
*/
protected <T> SearchResult<T> executeQuery(SearchQuery query, SearchRequest request,
Function<SearchMetadataCollection, T> toSearchResult, int maxRetryAttempts, int retryWaitingPeriod)
throws IOException, InterruptedException {
// Execute the query and try to get hold of a query response
SearchResponse searchResponse = null;
int retryAttempts = 0;
do {
try {
searchResponse = getClient().search(request, RequestOptions.DEFAULT);
} catch (ElasticsearchStatusException e) {
retryAttempts++;
if (retryAttempts <= maxRetryAttempts) {
logger.warn("Could not query documents from index {} because of {}, retrying in {} ms.", getIndexName(),
e.getMessage(), retryWaitingPeriod);
if (retryWaitingPeriod > 0) {
Thread.sleep(retryWaitingPeriod);
}
} else {
logger.error("Could not query documents from index {}, not retrying.", getIndexName(),
e);
throw e;
}
}
} while (searchResponse == null);
// Create and configure the query result
long hits = getTotalHits(searchResponse.getHits());
long size = searchResponse.getHits().getHits().length;
SearchResultImpl<T> result = new SearchResultImpl<>(query, hits, size);
result.setSearchTime(searchResponse.getTook().millis());
// Walk through response and create new items with title, creator, etc:
for (SearchHit doc : searchResponse.getHits()) {
// Wrap the search resulting metadata
SearchMetadataCollection metadata = new SearchMetadataCollection(doc.getType());
metadata.setIdentifier(doc.getId());
for (DocumentField field : doc.getFields().values()) {
String name = field.getName();
SearchMetadata<Object> m = new SearchMetadataImpl<>(name);
// TODO: Add values with more care (localized, correct type etc.)
// Add the field values
if (field.getValues().size() > 1) {
for (Object v : field.getValues()) {
m.addValue(v);
}
} else {
m.addValue(field.getValue());
}
// Add the metadata
metadata.add(m);
}
// Get the score for this item
float score = doc.getScore();
// Have the serializer in charge create a type-specific search result
// item
try {
T document = toSearchResult.apply(metadata);
SearchResultItem<T> item = new SearchResultItemImpl<>(score, document);
result.addResultItem(item);
} catch (Throwable t) {
logger.warn("Error during search result serialization: '{}'. Skipping this search result.", t.getMessage());
size--;
}
}
// Set the number of resulting documents
result.setDocumentCount(size);
return result;
}
/**
* Construct the URL to the OpenSearch service.
*
* @return the OpenSearch URL
*/
private String getOpensearchURL() {
return this.externalServerScheme + "://" + this.externalServerHostname + ":" + this.externalServerPort;
}
/**
* Returns all the known terms for a field (aka facets).
*
* @param field
* the field name
* @param type
* the document type
* @return the list of terms
*/
public List<String> getTermsForField(String field, String type) {
final String facetName = "terms";
// Add size to aggregation to return all values (the default is the top ten terms with the most documents).
// We set it to 10,000, which should be enough (the maximum is 'search.max_buckets', which defaults to 65,536).
final AggregationBuilder aggBuilder = AggregationBuilders.terms(facetName).field(field)
.size(ELASTICSEARCH_TERM_AGGREGATION_SIZE);
final SearchSourceBuilder searchSource = new SearchSourceBuilder().aggregation(aggBuilder);
final SearchRequest searchRequest = new SearchRequest(this.getSubIndexIdentifier(type)).source(searchSource);
try {
final SearchResponse response = getClient().search(searchRequest, RequestOptions.DEFAULT);
final List<String> terms = new ArrayList<>();
final Terms aggs = response.getAggregations().get(facetName);
for (Terms.Bucket bucket : aggs.getBuckets()) {
terms.add(bucket.getKey().toString());
}
return terms;
} catch (IOException e) {
return chuck(e);
}
}
}