View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  
23  package org.opencastproject.elasticsearch.impl;
24  
25  import static org.opencastproject.util.data.functions.Misc.chuck;
26  
27  import org.opencastproject.elasticsearch.api.SearchIndex;
28  import org.opencastproject.elasticsearch.api.SearchIndexException;
29  import org.opencastproject.elasticsearch.api.SearchMetadata;
30  import org.opencastproject.elasticsearch.api.SearchQuery;
31  import org.opencastproject.elasticsearch.api.SearchResult;
32  import org.opencastproject.elasticsearch.api.SearchResultItem;
33  import org.opencastproject.util.requests.SortCriterion;
34  
35  import org.apache.commons.io.IOUtils;
36  import org.apache.commons.lang3.StringUtils;
37  import org.apache.commons.lang3.math.NumberUtils;
38  import org.apache.http.ConnectionClosedException;
39  import org.apache.http.HttpHost;
40  import org.apache.http.auth.AuthScope;
41  import org.apache.http.auth.UsernamePasswordCredentials;
42  import org.apache.http.client.CredentialsProvider;
43  import org.apache.http.impl.client.BasicCredentialsProvider;
44  import org.elasticsearch.ElasticsearchException;
45  import org.elasticsearch.ElasticsearchStatusException;
46  import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
47  import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
48  import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
49  import org.elasticsearch.action.bulk.BulkRequest;
50  import org.elasticsearch.action.bulk.BulkResponse;
51  import org.elasticsearch.action.delete.DeleteRequest;
52  import org.elasticsearch.action.delete.DeleteResponse;
53  import org.elasticsearch.action.get.GetRequest;
54  import org.elasticsearch.action.get.GetResponse;
55  import org.elasticsearch.action.index.IndexRequest;
56  import org.elasticsearch.action.index.IndexResponse;
57  import org.elasticsearch.action.search.SearchRequest;
58  import org.elasticsearch.action.search.SearchResponse;
59  import org.elasticsearch.action.search.SearchType;
60  import org.elasticsearch.action.support.WriteRequest;
61  import org.elasticsearch.action.support.master.AcknowledgedResponse;
62  import org.elasticsearch.client.RequestOptions;
63  import org.elasticsearch.client.RestClient;
64  import org.elasticsearch.client.RestClientBuilder;
65  import org.elasticsearch.client.RestHighLevelClient;
66  import org.elasticsearch.client.indices.CreateIndexRequest;
67  import org.elasticsearch.client.indices.CreateIndexResponse;
68  import org.elasticsearch.cluster.health.ClusterHealthStatus;
69  import org.elasticsearch.common.document.DocumentField;
70  import org.elasticsearch.common.xcontent.XContentType;
71  import org.elasticsearch.index.query.QueryBuilder;
72  import org.elasticsearch.rest.RestStatus;
73  import org.elasticsearch.script.Script;
74  import org.elasticsearch.search.SearchHit;
75  import org.elasticsearch.search.SearchHits;
76  import org.elasticsearch.search.aggregations.AggregationBuilder;
77  import org.elasticsearch.search.aggregations.AggregationBuilders;
78  import org.elasticsearch.search.aggregations.bucket.terms.Terms;
79  import org.elasticsearch.search.builder.SearchSourceBuilder;
80  import org.elasticsearch.search.sort.ScriptSortBuilder;
81  import org.elasticsearch.search.sort.SortBuilders;
82  import org.elasticsearch.search.sort.SortOrder;
83  import org.osgi.framework.BundleContext;
84  import org.osgi.service.component.ComponentException;
85  import org.slf4j.Logger;
86  import org.slf4j.LoggerFactory;
87  
88  import java.io.IOException;
89  import java.io.InputStream;
90  import java.net.ConnectException;
91  import java.nio.charset.StandardCharsets;
92  import java.util.ArrayList;
93  import java.util.Arrays;
94  import java.util.Collections;
95  import java.util.List;
96  import java.util.Map;
97  import java.util.Map.Entry;
98  import java.util.function.Function;
99  
100 /**
101  * A search index implementation based on ElasticSearch.
102  */
103 public abstract class AbstractElasticsearchIndex implements SearchIndex {
104 
105   /** Logging facility */
106   private static final Logger logger = LoggerFactory.getLogger(AbstractElasticsearchIndex.class);
107 
108   /** The Elasticsearch maximum results window size */
109   private static final int ELASTICSEARCH_INDEX_MAX_RESULT_WINDOW = Integer.MAX_VALUE;
110 
111   /** The Elasticsearch term aggregation size */
112   private static final int ELASTICSEARCH_TERM_AGGREGATION_SIZE = 10000;
113 
114   /** Configuration key defining the hostname of an external Elasticsearch server */
115   public static final String ELASTICSEARCH_SERVER_HOSTNAME_KEY = "org.opencastproject.elasticsearch.server.hostname";
116 
117   /** Configuration key defining the scheme (http/https) of an external Elasticsearch server */
118   public static final String ELASTICSEARCH_SERVER_SCHEME_KEY = "org.opencastproject.elasticsearch.server.scheme";
119 
120   /** Configuration key defining the port of an external Elasticsearch server */
121   public static final String ELASTICSEARCH_SERVER_PORT_KEY = "org.opencastproject.elasticsearch.server.port";
122 
123   /** Configuration key defining the username of an external Elasticsearch server */
124   public static final String ELASTICSEARCH_USERNAME_KEY = "org.opencastproject.elasticsearch.username";
125 
126   /** Configuration key defining the password of an external Elasticsearch server */
127   public static final String ELASTICSEARCH_PASSWORD_KEY = "org.opencastproject.elasticsearch.password";
128 
129   /** Default port of an external Elasticsearch server */
130   private static final int ELASTICSEARCH_SERVER_PORT_DEFAULT = 9200;
131 
132   /** Default hostname of an external Elasticsearch server */
133   private static final String ELASTICSEARCH_SERVER_HOSTNAME_DEFAULT = "localhost";
134 
135   /** Default scheme of an external Elasticsearch server */
136   private static final String ELASTICSEARCH_SERVER_SCHEME_DEFAULT = "http";
137 
138   /** Identifier of the root entry */
139   private static final String ROOT_ID = "root";
140 
141   /** The index identifier */
142   private String indexIdentifier = null;
143   private static final String INDEX_IDENTIFIER_PROPERTY = "index.identifier";
144   private static final String DEFAULT_INDEX_IDENTIFIER = "opencast";
145 
146   /** The index name */
147   private String indexName = null;
148   private static final String INDEX_NAME_PROPERTY = "index.name";
149   private static final String DEFAULT_INDEX_NAME = "Elasticsearch";
150 
151   /** The high level client */
152   private RestHighLevelClient client = null;
153 
154   /** The version number */
155   private int indexVersion = -1;
156 
157   /** The path to the index settings */
158   protected String indexSettingsPath;
159 
160   /** Hostname of an external Elasticsearch server to connect to. */
161   private String externalServerHostname = ELASTICSEARCH_SERVER_HOSTNAME_DEFAULT;
162 
163   /** Scheme of an external Elasticsearch server to connect to. */
164   private String externalServerScheme = ELASTICSEARCH_SERVER_SCHEME_DEFAULT;
165 
166   /** Port of an external Elasticsearch server to connect to */
167   private int externalServerPort = ELASTICSEARCH_SERVER_PORT_DEFAULT;
168 
169   /** Username of an external Elasticsearch server to connect to. */
170   private String username;
171 
172   /** Password of an external Elasticsearch server to connect to. */
173   private String password;
174 
175   /** Defines how long to wait between retries, when the connection to OpenSearch failed on startup */
176   private int retryDelayOnStartup;
177   private static final String RETRY_DELAY_ON_STARTUP = "retry.delay.on.startup";
178   private static final int DEFAULT_RETRY_DELAY_ON_STARTUP = 10000;
179 
180   /**
181    * Returns an array of document types for the index. For every one of these, the corresponding document type
182    * definition will be loaded.
183    *
184    * @return the document types
185    */
186   public abstract String[] getDocumentTypes();
187 
188   /**
189    * OSGi callback to activate this component instance.
190    *
191    * @param properties
192    *          The configuration
193    * @param bundleContext
194    *          the bundle context
195    * @throws ComponentException
196    *           if the search index cannot be initialized
197    */
198   public void activate(Map<String, Object> properties, BundleContext bundleContext) throws ComponentException {
199     indexIdentifier = StringUtils.defaultIfBlank((String) properties
200             .get(INDEX_IDENTIFIER_PROPERTY), DEFAULT_INDEX_IDENTIFIER);
201     logger.info("Index identifier set to {}.", indexIdentifier);
202 
203     indexSettingsPath = StringUtils.trimToNull(bundleContext.getProperty("karaf.etc"));
204     if (indexSettingsPath == null) {
205       throw new ComponentException("Could not determine Karaf configuration path");
206     }
207     externalServerHostname = StringUtils
208             .defaultIfBlank(bundleContext.getProperty(ELASTICSEARCH_SERVER_HOSTNAME_KEY),
209                     ELASTICSEARCH_SERVER_HOSTNAME_DEFAULT);
210     externalServerScheme = StringUtils
211             .defaultIfBlank(bundleContext.getProperty(ELASTICSEARCH_SERVER_SCHEME_KEY),
212                     ELASTICSEARCH_SERVER_SCHEME_DEFAULT);
213     externalServerPort = Integer.parseInt(StringUtils
214             .defaultIfBlank(bundleContext.getProperty(ELASTICSEARCH_SERVER_PORT_KEY),
215                     ELASTICSEARCH_SERVER_PORT_DEFAULT + ""));
216     username = StringUtils.trimToNull(bundleContext.getProperty(ELASTICSEARCH_USERNAME_KEY));
217     password = StringUtils.trimToNull(bundleContext.getProperty(ELASTICSEARCH_PASSWORD_KEY));
218   }
219 
220   /**
221    * OSGi callback for configuration changes.
222    *
223    * @param properties
224    *          The configuration
225    */
226   public void modified(Map<String, Object> properties) {
227     indexName = StringUtils.defaultIfBlank((String) properties.get(INDEX_NAME_PROPERTY),
228             DEFAULT_INDEX_NAME);
229     logger.info("Index name set to {}.", indexName);
230 
231     retryDelayOnStartup = NumberUtils.toInt((String) properties.get(RETRY_DELAY_ON_STARTUP),
232         DEFAULT_RETRY_DELAY_ON_STARTUP);
233     if (retryDelayOnStartup <= 0) {
234       throw new IllegalArgumentException(RETRY_DELAY_ON_STARTUP
235           + " was wrongly configured. Value has to be greater than 0.");
236     }
237     logger.info("Retry delay on startup set to {} ms.", retryDelayOnStartup);
238   }
239 
240   @Override
241   public int getIndexVersion() {
242     return indexVersion;
243   }
244 
245   @Override
246   public void clear() throws IOException {
247     try {
248       final DeleteIndexRequest request = new DeleteIndexRequest(
249               Arrays.stream(getDocumentTypes()).map(this::getSubIndexIdentifier).toArray(String[]::new));
250       final AcknowledgedResponse delete = client.indices().delete(request, RequestOptions.DEFAULT);
251       if (!delete.isAcknowledged()) {
252         logger.error("Index '{}' could not be deleted", getIndexName());
253       }
254       createIndex();
255     } catch (ElasticsearchException exception) {
256       if (exception.status() == RestStatus.NOT_FOUND) {
257         logger.error("Cannot clear non-existing index '{}'", exception.getIndex().getName());
258       }
259     } catch (SearchIndexException e) {
260       logger.error("Unable to re-create the index after a clear", e);
261     }
262   }
263 
264   /**
265    * Posts the input document to the search index.
266    *
267    * @param maxRetryAttempts
268    *          How often to retry update in case of ElasticsearchStatusException
269    * @param retryWaitingPeriod
270    *          How long to wait (in ms) between retries
271    * @param document
272    *          The Elasticsearch document
273    * @return the query response
274    *
275    * @throws IOException
276    *         If updating the index fails
277    * @throws InterruptedException
278    *         If waiting during retry is interrupted
279    */
280   protected IndexResponse update(int maxRetryAttempts, int retryWaitingPeriod, ElasticsearchDocument document)
281           throws IOException, InterruptedException {
282 
283     final IndexRequest indexRequest = new IndexRequest(getSubIndexIdentifier(document.getType())).id(document.getUID())
284             .source(document).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
285 
286     IndexResponse indexResponse = null;
287     int retryAttempts = 0;
288     do {
289       try {
290         indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
291       } catch (ElasticsearchStatusException e) {
292         retryAttempts++;
293 
294         if (retryAttempts <= maxRetryAttempts) {
295           logger.warn("Could not update documents in index {}, retrying in {} ms.", getIndexName(),
296                   retryWaitingPeriod, e);
297           if (retryWaitingPeriod > 0) {
298             Thread.sleep(retryWaitingPeriod);
299           }
300         } else {
301           logger.error("Could not update documents in index {}, not retrying.", getIndexName(),
302                   e);
303           throw e;
304         }
305       }
306     } while (indexResponse == null);
307 
308     return indexResponse;
309   }
310 
311   /**
312    * Posts the input documents to the search index.
313    *
314    * @param maxRetryAttempts
315    *          How often to retry update in case of ElasticsearchStatusException
316    * @param retryWaitingPeriod
317    *          How long to wait (in ms) between retries
318    * @param documents
319    *          The Elasticsearch documents
320    * @return the query response
321    *
322    * @throws IOException
323    *         If updating the index fails
324    * @throws InterruptedException
325    *         If waiting during retry is interrupted
326    */
327   protected BulkResponse bulkUpdate(int maxRetryAttempts, int retryWaitingPeriod,
328       List<ElasticsearchDocument> documents)
329           throws IOException, InterruptedException {
330     BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
331 
332     for (ElasticsearchDocument document: documents) {
333       bulkRequest.add(new IndexRequest(getSubIndexIdentifier(document.getType())).id(document.getUID())
334           .source(document));
335     }
336 
337     BulkResponse bulkResponse = null;
338     int retryAttempts = 0;
339     do {
340       try {
341         bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
342       } catch (ElasticsearchStatusException e) {
343         retryAttempts++;
344 
345         if (retryAttempts <= maxRetryAttempts) {
346           logger.warn("Could not update documents in index {} because of {}, retrying in {} ms.", getIndexName(),
347                   e.getMessage(), retryWaitingPeriod);
348           if (retryWaitingPeriod > 0) {
349             Thread.sleep(retryWaitingPeriod);
350           }
351         } else {
352           logger.error("Could not update documents in index {}, not retrying.", getIndexName(),
353                   e);
354           throw e;
355         }
356       }
357     } while (bulkResponse == null);
358 
359     return bulkResponse;
360   }
361 
362   /**
363    * Delete document from index.
364    *
365    * @param type
366    *         The type of document we want to delete
367    * @param id
368    *         The identifier of the document
369    * @return
370    *         The delete response
371    *
372    * @throws IOException
373    *         If deleting from the index fails
374    * @throws InterruptedException
375    *         If waiting during retry is interrupted
376    */
377   protected DeleteResponse delete(String type, String id, int maxRetryAttempts, int retryWaitingPeriod)
378           throws IOException, InterruptedException {
379     final DeleteRequest deleteRequest = new DeleteRequest(getSubIndexIdentifier(type), id).setRefreshPolicy(
380             WriteRequest.RefreshPolicy.IMMEDIATE);
381     DeleteResponse deleteResponse = null;
382     int retryAttempts = 0;
383     do {
384       try {
385         deleteResponse = getClient().delete(deleteRequest, RequestOptions.DEFAULT);
386       } catch (ElasticsearchStatusException e) {
387         retryAttempts++;
388 
389         if (retryAttempts <= maxRetryAttempts) {
390           logger.warn("Could not remove documents from index {} because of {}, retrying in {} ms.", getIndexName(),
391                   e.getMessage(), retryWaitingPeriod);
392           if (retryWaitingPeriod > 0) {
393             Thread.sleep(retryWaitingPeriod);
394           }
395         } else {
396           logger.error("Could not remove documents from index {}, not retrying.", getIndexName(),
397                   e);
398           throw e;
399         }
400       }
401     } while (deleteResponse == null);
402 
403     return deleteResponse;
404   }
405 
406   /**
407    * Initializes an Elasticsearch node for the given index.
408    *
409    * @param version
410    *          the index version
411    * @throws SearchIndexException
412    *           if the index configuration cannot be loaded
413    * @throws IOException
414    *           if loading of settings fails
415    * @throws IllegalArgumentException
416    *           if the index identifier is blank.
417    */
418   protected void init(int version)
419           throws IOException, IllegalArgumentException, SearchIndexException {
420     this.indexVersion = version;
421 
422     if (client == null) {
423       final RestClientBuilder builder = RestClient
424           .builder(new HttpHost(externalServerHostname, externalServerPort, externalServerScheme));
425 
426       if (username != null && password != null) {
427         final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
428         credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
429         builder.setHttpClientConfigCallback(
430             httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
431       }
432 
433       client = new RestHighLevelClient(builder);
434     }
435 
436     // Test if opensearch is reachable and continuously retry if it is not
437     waitUntilOpensearchIsAvailable();
438 
439     // Create the index
440     createIndex();
441   }
442 
443   /**
444    * Continuously tries to connect to OpenSearch until it is reachable.
445    *
446    * @throws RuntimeException if an unexpected exception occurs
447    */
448   private void waitUntilOpensearchIsAvailable() {
449     String openSearchUrl = getOpensearchURL();
450     logger.info("Testing connection to OpenSearch at {}", openSearchUrl);
451     while (!isOpensearchReachable()) {
452       logger.warn("Could not reach OpenSearch at {}. Trying again after {} ms...", openSearchUrl, retryDelayOnStartup);
453       try {
454         Thread.sleep(retryDelayOnStartup);
455       } catch (InterruptedException e) {
456         throw new RuntimeException("Could not reach OpenSearch at " + openSearchUrl, e);
457       }
458     }
459     logger.info("Connection to OpenSearch at {} tested successfully", openSearchUrl);
460   }
461 
462   /**
463    * Checks if OpenSearch is reachable.
464    *
465    * @return true if OpenSearch is reachable, false otherwise
466    *
467    * @throws ElasticsearchException if an ElasticsearchException occurs, e.g. if the server returns a 4xx or 5xx error
468    *
469    * @throws RuntimeException if an unexpected exception occurs
470    */
471   private boolean isOpensearchReachable() {
472     try {
473       // test connection
474       ClusterHealthRequest request = new ClusterHealthRequest();
475       request.waitForYellowStatus();
476       ClusterHealthResponse resp = client.cluster().health(request, RequestOptions.DEFAULT);
477       if (resp.getStatus().equals(ClusterHealthStatus.GREEN)) {
478         logger.debug("Connected to OpenSearch, cluster health is {}", resp.getStatus());
479         return true;
480       } else if (resp.getStatus().equals(ClusterHealthStatus.YELLOW)) {
481         logger.warn("Connected to OpenSearch, cluster health is {}", resp.getStatus());
482         return true;
483       }
484       logger.debug("Connected to OpenSearch, but cluster health is {}", resp.getStatus());
485       return false;
486     } catch (ConnectException | ConnectionClosedException connectException) {
487       // Get thrown when we are unable to connect. Normally this should only happen when
488       // opensearch is not running or is just starting up, therefore only log the error on debug level
489       logger.debug("Unable to connect to OpenSearch", connectException);
490       return false;
491     } catch (IOException ioException) {
492       // Could be thrown when a docker container with opensearch is just starting up,
493       // so we check further if the cause is a socket exception
494       if (ioException.getCause() instanceof java.net.SocketException) {
495         // it seems like a container is starting up, we continue the loop
496         logger.debug("Unable to connect to OpenSearch", ioException);
497         return false;
498       }
499       // something different triggered an ioexception, so we fail
500       throw new RuntimeException("Couldn't connect to opensearch due to IOExceptionError", ioException);
501     } catch (ElasticsearchException elasticsearchException) {
502       if (elasticsearchException.status().getStatus() >= 500) {
503         logger.debug("OpenSearch health request ended with HTTP status code {}. Is OpenSearch service running?",
504             elasticsearchException.status().getStatus());
505         return false;
506       }
507       // An ElasticsearchException is usually thrown in case where the server returns a 4xx or 5xx error code.
508       // So for example for an HTTP 401 Unauthorized: In this case we want the startup to fail, so
509       // we get an error and have the chance to change the configuration
510       logger.error("Error while testing OpenSearch connection", elasticsearchException);
511       throw elasticsearchException;
512     } catch (Exception e) {
513       // When another exception occurs, we throw a runtime exception, so the startup of Opencast will fail
514       throw new RuntimeException("Unable to connect to OpenSearch, unexpected exception", e);
515     }
516   }
517 
518   /**
519    * Closes the client.
520    *
521    * @throws IOException
522    *           if stopping the Elasticsearch node fails
523    */
524   protected void close() throws IOException {
525     if (client != null) {
526       client.close();
527     }
528   }
529 
530   /**
531    * Prepares index to store data for the types (or mappings) as returned by {@link #getDocumentTypes()}.
532    *
533    *
534    * @throws SearchIndexException
535    *           if index and type creation fails
536    * @throws IOException
537    *           if loading of the type definitions fails
538    */
539   private void createIndex() throws SearchIndexException, IOException {
540     if (StringUtils.isBlank(this.indexIdentifier)) {
541       throw new IllegalArgumentException("Search index identifier must be set");
542     }
543 
544     for (String type : getDocumentTypes()) {
545       createSubIndex(type, getSubIndexIdentifier(type));
546     }
547   }
548 
549   private void createSubIndex(String type, String idxName) throws SearchIndexException, IOException {
550     try {
551       logger.debug("Trying to create index for '{}'", idxName);
552       final CreateIndexRequest request = new CreateIndexRequest(idxName)
553               .settings(loadResources("indexSettings.json"), XContentType.JSON)
554               .mapping(loadResources(type + "-mapping.json"), XContentType.JSON);
555 
556       final CreateIndexResponse siteIdxResponse = client.indices().create(request, RequestOptions.DEFAULT);
557       if (!siteIdxResponse.isAcknowledged()) {
558         throw new SearchIndexException("Unable to create index for '" + idxName + "'");
559       }
560     } catch (ElasticsearchStatusException e) {
561       if (e.getDetailedMessage().contains("already_exists_exception")) {
562         logger.info("Detected existing index '{}'", idxName);
563       } else {
564         throw e;
565       }
566     }
567 
568     // See if the index version exists and check if it matches. The request will
569     // fail if there is no version index
570     boolean versionIndexExists = false;
571     final GetRequest getRequest = new GetRequest(idxName, ROOT_ID);
572     try {
573       final GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
574       if (getResponse.isExists() && getResponse.getField(IndexSchema.VERSION) != null) {
575         final int actualIndexVersion = Integer.parseInt(getResponse.getField(IndexSchema.VERSION).getValue()
576                 .toString());
577         if (indexVersion != actualIndexVersion) {
578           throw new SearchIndexException(
579                   "Search index is at version " + actualIndexVersion + ", but codebase expects " + indexVersion);
580         }
581         versionIndexExists = true;
582         logger.debug("Search index version is {}", indexVersion);
583       }
584     } catch (ElasticsearchException e) {
585       logger.debug("Version index has not been created");
586     }
587 
588     // The index does not exist, let's create it
589     if (!versionIndexExists) {
590       logger.debug("Creating version index for site '{}'", idxName);
591       final IndexRequest indexRequest = new IndexRequest(idxName).id(ROOT_ID)
592               .source(Collections.singletonMap(IndexSchema.VERSION, indexVersion + ""));
593       logger.debug("Index version of site '{}' is {}", idxName, indexVersion);
594       client.index(indexRequest, RequestOptions.DEFAULT);
595     }
596   }
597 
598   /**
599    * Load resources from active index class resources if they exist or fall back to these classes resources as default.
600    *
601    * @return the string containing the resource
602    * @throws IOException
603    *           if reading the resources fails
604    */
605   private String loadResources(final String filename) throws IOException {
606     final String resourcePath = "/elasticsearch/" + filename;
607     // Try loading from the index implementation first.
608     // This allows index implementations to override the defaults
609     for (Class cls : Arrays.asList(this.getClass(), AbstractElasticsearchIndex.class)) {
610       try (InputStream is = cls.getResourceAsStream(resourcePath)) {
611         if (is != null) {
612           final String settings = IOUtils.toString(is, StandardCharsets.UTF_8);
613           logger.debug("Reading elasticsearch configuration resources from {}:\n{}", cls, settings);
614           return settings;
615         }
616       }
617     }
618     return null;
619   }
620 
621   /**
622    * Creates a request for a search query based on the properties known by the search query.
623    * <p>
624    * Once this query builder has been created, support for ordering needs to be configured as needed.
625    *
626    * @param query
627    *          the search query
628    * @return the request builder
629    */
630   protected SearchRequest getSearchRequest(SearchQuery query, QueryBuilder queryBuilder) {
631 
632     final SearchSourceBuilder searchSource = new SearchSourceBuilder()
633         .query(queryBuilder)
634         .trackTotalHits(true);
635 
636     // Create the actual search query
637     logger.debug("Searching for {}", searchSource);
638 
639     // Make sure all fields are being returned
640     if (query.getFields().length > 0) {
641       searchSource.storedFields(Arrays.asList(query.getFields()));
642     } else {
643       searchSource.storedFields(Collections.singletonList("*"));
644     }
645 
646     // Pagination
647     if (query.getOffset() >= 0) {
648       searchSource.from(query.getOffset());
649     }
650 
651     int limit = ELASTICSEARCH_INDEX_MAX_RESULT_WINDOW;
652     if (query.getLimit() > 0) {
653       if (query.getOffset() > 0
654               && (long) query.getOffset() + (long) query.getLimit() > ELASTICSEARCH_INDEX_MAX_RESULT_WINDOW) {
655         limit = ELASTICSEARCH_INDEX_MAX_RESULT_WINDOW - query.getOffset();
656       } else {
657         limit = query.getLimit();
658       }
659     }
660     searchSource.size(limit);
661 
662     // Sort orders
663     final Map<String, SortCriterion.Order> sortCriteria = query.getSortOrders();
664     for (Entry<String, SortCriterion.Order> sortCriterion : sortCriteria.entrySet()) {
665       ScriptSortBuilder sortBuilder = null;
666       logger.debug("Event sort criteria: {}", sortCriterion.getKey());
667       if ("publication".equals(sortCriterion.getKey())) {
668         sortBuilder = SortBuilders.scriptSort(
669             new Script("params._source.publication.length"),
670             ScriptSortBuilder.ScriptSortType.NUMBER);
671       }
672       switch (sortCriterion.getValue()) {
673         case Ascending:
674           if (sortBuilder != null) {
675             sortBuilder.order(SortOrder.ASC);
676             searchSource.sort(sortBuilder);
677           } else {
678             searchSource.sort(sortCriterion.getKey(), SortOrder.ASC);
679           }
680           break;
681         case Descending:
682           if (sortBuilder != null) {
683             sortBuilder.order(SortOrder.DESC);
684             searchSource.sort(sortBuilder);
685           } else {
686             searchSource.sort(sortCriterion.getKey(), SortOrder.DESC);
687           }
688           break;
689         default:
690           break;
691       }
692     }
693     return new SearchRequest(Arrays.stream(query.getTypes()).map(this::getSubIndexIdentifier).toArray(String[]::new))
694             .searchType(SearchType.QUERY_THEN_FETCH).preference("_local").source(searchSource);
695   }
696 
697   /**
698    * Returns the name of this index.
699    *
700    * @return the index name
701    */
702   public String getIndexName() {
703     return indexName;
704   }
705 
706   /*
707    * This method is a workaround to avoid accessing org.apache.lucene.search.TotalHits outside this bundle.
708    * Doing so would cause OSGi dependency problems. It seems to be a bug anyway that ES exposes this
709    * class.
710    */
711   protected long getTotalHits(SearchHits hits) {
712     return hits.getTotalHits().value;
713   }
714 
715   /**
716    * Returns the name of the sub index for the given type.
717    *
718    * @param type
719    *          The type to get the sub index for.
720    * @return the index name
721    */
722   protected String getSubIndexIdentifier(String type) {
723     return this.indexIdentifier + "_" + type;
724   }
725 
726   public RestHighLevelClient getClient() {
727     return client;
728   }
729 
730   /**
731    * Execute a query on the index.
732    *
733    * @param query
734    *          The query to use to find the results
735    * @param request
736    *          The builder to use to create the query.
737    * @param toSearchResult
738    *          The function to convert the results to a {@link SearchResult}
739    * @param maxRetryAttempts
740    *          How often to retry query in case of ElasticsearchStatusException
741    * @param retryWaitingPeriod
742    *          How long to wait (in ms) between retries
743    * @return A {@link SearchResult} containing the relevant objects.
744    *
745    * @throws IOException
746    *         If querying the index fails
747    * @throws InterruptedException
748    *         If waiting during retry is interrupted
749    */
750   protected <T> SearchResult<T> executeQuery(SearchQuery query, SearchRequest request,
751           Function<SearchMetadataCollection, T> toSearchResult, int maxRetryAttempts, int retryWaitingPeriod)
752           throws IOException, InterruptedException {
753     // Execute the query and try to get hold of a query response
754     SearchResponse searchResponse = null;
755     int retryAttempts = 0;
756     do {
757       try {
758         searchResponse = getClient().search(request, RequestOptions.DEFAULT);
759       } catch (ElasticsearchStatusException e) {
760         retryAttempts++;
761 
762         if (retryAttempts <= maxRetryAttempts) {
763           logger.warn("Could not query documents from index {} because of {}, retrying in {} ms.", getIndexName(),
764                   e.getMessage(), retryWaitingPeriod);
765           if (retryWaitingPeriod > 0) {
766             Thread.sleep(retryWaitingPeriod);
767           }
768         } else {
769           logger.error("Could not query documents from index {}, not retrying.", getIndexName(),
770                   e);
771           throw e;
772         }
773       }
774     } while (searchResponse == null);
775 
776     // Create and configure the query result
777     long hits = getTotalHits(searchResponse.getHits());
778     long size = searchResponse.getHits().getHits().length;
779     SearchResultImpl<T> result = new SearchResultImpl<>(query, hits, size);
780     result.setSearchTime(searchResponse.getTook().millis());
781 
782     // Walk through response and create new items with title, creator, etc:
783     for (SearchHit doc : searchResponse.getHits()) {
784 
785       // Wrap the search resulting metadata
786       SearchMetadataCollection metadata = new SearchMetadataCollection(doc.getType());
787       metadata.setIdentifier(doc.getId());
788 
789       for (DocumentField field : doc.getFields().values()) {
790         String name = field.getName();
791         SearchMetadata<Object> m = new SearchMetadataImpl<>(name);
792         // TODO: Add values with more care (localized, correct type etc.)
793 
794         // Add the field values
795         if (field.getValues().size() > 1) {
796           for (Object v : field.getValues()) {
797             m.addValue(v);
798           }
799         } else {
800           m.addValue(field.getValue());
801         }
802 
803         // Add the metadata
804         metadata.add(m);
805       }
806 
807       // Get the score for this item
808       float score = doc.getScore();
809 
810       // Have the serializer in charge create a type-specific search result
811       // item
812       try {
813         T document = toSearchResult.apply(metadata);
814         SearchResultItem<T> item = new SearchResultItemImpl<>(score, document);
815         result.addResultItem(item);
816       } catch (Throwable t) {
817         logger.warn("Error during search result serialization: '{}'. Skipping this search result.", t.getMessage());
818         size--;
819       }
820     }
821 
822     // Set the number of resulting documents
823     result.setDocumentCount(size);
824 
825     return result;
826   }
827 
828   /**
829    * Construct the URL to the OpenSearch service.
830    *
831    * @return the OpenSearch URL
832    */
833   private String getOpensearchURL() {
834     return this.externalServerScheme + "://" + this.externalServerHostname + ":" + this.externalServerPort;
835   }
836 
837   /**
838    * Returns all the known terms for a field (aka facets).
839    *
840    * @param field
841    *          the field name
842    * @param type
843    *          the document type
844    * @return the list of terms
845    */
846   public List<String> getTermsForField(String field, String type) {
847     final String facetName = "terms";
848     // Add size to aggregation to return all values (the default is the top ten terms with the most documents).
849     // We set it to 10,000, which should be enough (the maximum is 'search.max_buckets', which defaults to 65,536).
850     final AggregationBuilder aggBuilder = AggregationBuilders.terms(facetName).field(field)
851             .size(ELASTICSEARCH_TERM_AGGREGATION_SIZE);
852     final SearchSourceBuilder searchSource = new SearchSourceBuilder().aggregation(aggBuilder);
853     final SearchRequest searchRequest = new SearchRequest(this.getSubIndexIdentifier(type)).source(searchSource);
854     try {
855       final SearchResponse response = getClient().search(searchRequest, RequestOptions.DEFAULT);
856 
857       final List<String> terms = new ArrayList<>();
858       final Terms aggs = response.getAggregations().get(facetName);
859 
860       for (Terms.Bucket bucket : aggs.getBuckets()) {
861         terms.add(bucket.getKey().toString());
862       }
863 
864       return terms;
865     } catch (IOException e) {
866       return chuck(e);
867     }
868   }
869 
870 }