1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 package org.opencastproject.elasticsearch.impl;
24
25 import static org.opencastproject.util.data.functions.Misc.chuck;
26
27 import org.opencastproject.elasticsearch.api.SearchIndex;
28 import org.opencastproject.elasticsearch.api.SearchIndexException;
29 import org.opencastproject.elasticsearch.api.SearchMetadata;
30 import org.opencastproject.elasticsearch.api.SearchQuery;
31 import org.opencastproject.elasticsearch.api.SearchResult;
32 import org.opencastproject.elasticsearch.api.SearchResultItem;
33 import org.opencastproject.util.requests.SortCriterion;
34
35 import org.apache.commons.io.IOUtils;
36 import org.apache.commons.lang3.StringUtils;
37 import org.apache.commons.lang3.math.NumberUtils;
38 import org.apache.http.ConnectionClosedException;
39 import org.apache.http.HttpHost;
40 import org.apache.http.auth.AuthScope;
41 import org.apache.http.auth.UsernamePasswordCredentials;
42 import org.apache.http.client.CredentialsProvider;
43 import org.apache.http.impl.client.BasicCredentialsProvider;
44 import org.elasticsearch.ElasticsearchException;
45 import org.elasticsearch.ElasticsearchStatusException;
46 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
47 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
48 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
49 import org.elasticsearch.action.bulk.BulkRequest;
50 import org.elasticsearch.action.bulk.BulkResponse;
51 import org.elasticsearch.action.delete.DeleteRequest;
52 import org.elasticsearch.action.delete.DeleteResponse;
53 import org.elasticsearch.action.get.GetRequest;
54 import org.elasticsearch.action.get.GetResponse;
55 import org.elasticsearch.action.index.IndexRequest;
56 import org.elasticsearch.action.index.IndexResponse;
57 import org.elasticsearch.action.search.SearchRequest;
58 import org.elasticsearch.action.search.SearchResponse;
59 import org.elasticsearch.action.search.SearchType;
60 import org.elasticsearch.action.support.WriteRequest;
61 import org.elasticsearch.action.support.master.AcknowledgedResponse;
62 import org.elasticsearch.client.RequestOptions;
63 import org.elasticsearch.client.RestClient;
64 import org.elasticsearch.client.RestClientBuilder;
65 import org.elasticsearch.client.RestHighLevelClient;
66 import org.elasticsearch.client.indices.CreateIndexRequest;
67 import org.elasticsearch.client.indices.CreateIndexResponse;
68 import org.elasticsearch.cluster.health.ClusterHealthStatus;
69 import org.elasticsearch.common.document.DocumentField;
70 import org.elasticsearch.common.xcontent.XContentType;
71 import org.elasticsearch.index.query.QueryBuilder;
72 import org.elasticsearch.rest.RestStatus;
73 import org.elasticsearch.script.Script;
74 import org.elasticsearch.search.SearchHit;
75 import org.elasticsearch.search.SearchHits;
76 import org.elasticsearch.search.aggregations.AggregationBuilder;
77 import org.elasticsearch.search.aggregations.AggregationBuilders;
78 import org.elasticsearch.search.aggregations.bucket.terms.Terms;
79 import org.elasticsearch.search.builder.SearchSourceBuilder;
80 import org.elasticsearch.search.sort.ScriptSortBuilder;
81 import org.elasticsearch.search.sort.SortBuilders;
82 import org.elasticsearch.search.sort.SortOrder;
83 import org.osgi.framework.BundleContext;
84 import org.osgi.service.component.ComponentException;
85 import org.slf4j.Logger;
86 import org.slf4j.LoggerFactory;
87
88 import java.io.IOException;
89 import java.io.InputStream;
90 import java.net.ConnectException;
91 import java.nio.charset.StandardCharsets;
92 import java.util.ArrayList;
93 import java.util.Arrays;
94 import java.util.Collections;
95 import java.util.List;
96 import java.util.Map;
97 import java.util.Map.Entry;
98 import java.util.function.Function;
99
100
101
102
103 public abstract class AbstractElasticsearchIndex implements SearchIndex {
104
105
106 private static final Logger logger = LoggerFactory.getLogger(AbstractElasticsearchIndex.class);
107
108
109 private static final int ELASTICSEARCH_INDEX_MAX_RESULT_WINDOW = Integer.MAX_VALUE;
110
111
112 private static final int ELASTICSEARCH_TERM_AGGREGATION_SIZE = 10000;
113
114
115 public static final String ELASTICSEARCH_SERVER_HOSTNAME_KEY = "org.opencastproject.elasticsearch.server.hostname";
116
117
118 public static final String ELASTICSEARCH_SERVER_SCHEME_KEY = "org.opencastproject.elasticsearch.server.scheme";
119
120
121 public static final String ELASTICSEARCH_SERVER_PORT_KEY = "org.opencastproject.elasticsearch.server.port";
122
123
124 public static final String ELASTICSEARCH_USERNAME_KEY = "org.opencastproject.elasticsearch.username";
125
126
127 public static final String ELASTICSEARCH_PASSWORD_KEY = "org.opencastproject.elasticsearch.password";
128
129
130 private static final int ELASTICSEARCH_SERVER_PORT_DEFAULT = 9200;
131
132
133 private static final String ELASTICSEARCH_SERVER_HOSTNAME_DEFAULT = "localhost";
134
135
136 private static final String ELASTICSEARCH_SERVER_SCHEME_DEFAULT = "http";
137
138
139 private static final String ROOT_ID = "root";
140
141
142 private String indexIdentifier = null;
143 private static final String INDEX_IDENTIFIER_PROPERTY = "index.identifier";
144 private static final String DEFAULT_INDEX_IDENTIFIER = "opencast";
145
146
147 private String indexName = null;
148 private static final String INDEX_NAME_PROPERTY = "index.name";
149 private static final String DEFAULT_INDEX_NAME = "Elasticsearch";
150
151
152 private RestHighLevelClient client = null;
153
154
155 private int indexVersion = -1;
156
157
158 protected String indexSettingsPath;
159
160
161 private String externalServerHostname = ELASTICSEARCH_SERVER_HOSTNAME_DEFAULT;
162
163
164 private String externalServerScheme = ELASTICSEARCH_SERVER_SCHEME_DEFAULT;
165
166
167 private int externalServerPort = ELASTICSEARCH_SERVER_PORT_DEFAULT;
168
169
170 private String username;
171
172
173 private String password;
174
175
176 private int retryDelayOnStartup;
177 private static final String RETRY_DELAY_ON_STARTUP = "retry.delay.on.startup";
178 private static final int DEFAULT_RETRY_DELAY_ON_STARTUP = 10000;
179
180
181
182
183
184
185
186 public abstract String[] getDocumentTypes();
187
188
189
190
191
192
193
194
195
196
197
198 public void activate(Map<String, Object> properties, BundleContext bundleContext) throws ComponentException {
199 indexIdentifier = StringUtils.defaultIfBlank((String) properties
200 .get(INDEX_IDENTIFIER_PROPERTY), DEFAULT_INDEX_IDENTIFIER);
201 logger.info("Index identifier set to {}.", indexIdentifier);
202
203 indexSettingsPath = StringUtils.trimToNull(bundleContext.getProperty("karaf.etc"));
204 if (indexSettingsPath == null) {
205 throw new ComponentException("Could not determine Karaf configuration path");
206 }
207 externalServerHostname = StringUtils
208 .defaultIfBlank(bundleContext.getProperty(ELASTICSEARCH_SERVER_HOSTNAME_KEY),
209 ELASTICSEARCH_SERVER_HOSTNAME_DEFAULT);
210 externalServerScheme = StringUtils
211 .defaultIfBlank(bundleContext.getProperty(ELASTICSEARCH_SERVER_SCHEME_KEY),
212 ELASTICSEARCH_SERVER_SCHEME_DEFAULT);
213 externalServerPort = Integer.parseInt(StringUtils
214 .defaultIfBlank(bundleContext.getProperty(ELASTICSEARCH_SERVER_PORT_KEY),
215 ELASTICSEARCH_SERVER_PORT_DEFAULT + ""));
216 username = StringUtils.trimToNull(bundleContext.getProperty(ELASTICSEARCH_USERNAME_KEY));
217 password = StringUtils.trimToNull(bundleContext.getProperty(ELASTICSEARCH_PASSWORD_KEY));
218 }
219
220
221
222
223
224
225
226 public void modified(Map<String, Object> properties) {
227 indexName = StringUtils.defaultIfBlank((String) properties.get(INDEX_NAME_PROPERTY),
228 DEFAULT_INDEX_NAME);
229 logger.info("Index name set to {}.", indexName);
230
231 retryDelayOnStartup = NumberUtils.toInt((String) properties.get(RETRY_DELAY_ON_STARTUP),
232 DEFAULT_RETRY_DELAY_ON_STARTUP);
233 if (retryDelayOnStartup <= 0) {
234 throw new IllegalArgumentException(RETRY_DELAY_ON_STARTUP
235 + " was wrongly configured. Value has to be greater than 0.");
236 }
237 logger.info("Retry delay on startup set to {} ms.", retryDelayOnStartup);
238 }
239
240 @Override
241 public int getIndexVersion() {
242 return indexVersion;
243 }
244
245 @Override
246 public void clear() throws IOException {
247 try {
248 final DeleteIndexRequest request = new DeleteIndexRequest(
249 Arrays.stream(getDocumentTypes()).map(this::getSubIndexIdentifier).toArray(String[]::new));
250 final AcknowledgedResponse delete = client.indices().delete(request, RequestOptions.DEFAULT);
251 if (!delete.isAcknowledged()) {
252 logger.error("Index '{}' could not be deleted", getIndexName());
253 }
254 createIndex();
255 } catch (ElasticsearchException exception) {
256 if (exception.status() == RestStatus.NOT_FOUND) {
257 logger.error("Cannot clear non-existing index '{}'", exception.getIndex().getName());
258 }
259 } catch (SearchIndexException e) {
260 logger.error("Unable to re-create the index after a clear", e);
261 }
262 }
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280 protected IndexResponse update(int maxRetryAttempts, int retryWaitingPeriod, ElasticsearchDocument document)
281 throws IOException, InterruptedException {
282
283 final IndexRequest indexRequest = new IndexRequest(getSubIndexIdentifier(document.getType())).id(document.getUID())
284 .source(document).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
285
286 IndexResponse indexResponse = null;
287 int retryAttempts = 0;
288 do {
289 try {
290 indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
291 } catch (ElasticsearchStatusException e) {
292 retryAttempts++;
293
294 if (retryAttempts <= maxRetryAttempts) {
295 logger.warn("Could not update documents in index {}, retrying in {} ms.", getIndexName(),
296 retryWaitingPeriod, e);
297 if (retryWaitingPeriod > 0) {
298 Thread.sleep(retryWaitingPeriod);
299 }
300 } else {
301 logger.error("Could not update documents in index {}, not retrying.", getIndexName(),
302 e);
303 throw e;
304 }
305 }
306 } while (indexResponse == null);
307
308 return indexResponse;
309 }
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327 protected BulkResponse bulkUpdate(int maxRetryAttempts, int retryWaitingPeriod,
328 List<ElasticsearchDocument> documents)
329 throws IOException, InterruptedException {
330 BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
331
332 for (ElasticsearchDocument document: documents) {
333 bulkRequest.add(new IndexRequest(getSubIndexIdentifier(document.getType())).id(document.getUID())
334 .source(document));
335 }
336
337 BulkResponse bulkResponse = null;
338 int retryAttempts = 0;
339 do {
340 try {
341 bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
342 } catch (ElasticsearchStatusException e) {
343 retryAttempts++;
344
345 if (retryAttempts <= maxRetryAttempts) {
346 logger.warn("Could not update documents in index {} because of {}, retrying in {} ms.", getIndexName(),
347 e.getMessage(), retryWaitingPeriod);
348 if (retryWaitingPeriod > 0) {
349 Thread.sleep(retryWaitingPeriod);
350 }
351 } else {
352 logger.error("Could not update documents in index {}, not retrying.", getIndexName(),
353 e);
354 throw e;
355 }
356 }
357 } while (bulkResponse == null);
358
359 return bulkResponse;
360 }
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377 protected DeleteResponse delete(String type, String id, int maxRetryAttempts, int retryWaitingPeriod)
378 throws IOException, InterruptedException {
379 final DeleteRequest deleteRequest = new DeleteRequest(getSubIndexIdentifier(type), id).setRefreshPolicy(
380 WriteRequest.RefreshPolicy.IMMEDIATE);
381 DeleteResponse deleteResponse = null;
382 int retryAttempts = 0;
383 do {
384 try {
385 deleteResponse = getClient().delete(deleteRequest, RequestOptions.DEFAULT);
386 } catch (ElasticsearchStatusException e) {
387 retryAttempts++;
388
389 if (retryAttempts <= maxRetryAttempts) {
390 logger.warn("Could not remove documents from index {} because of {}, retrying in {} ms.", getIndexName(),
391 e.getMessage(), retryWaitingPeriod);
392 if (retryWaitingPeriod > 0) {
393 Thread.sleep(retryWaitingPeriod);
394 }
395 } else {
396 logger.error("Could not remove documents from index {}, not retrying.", getIndexName(),
397 e);
398 throw e;
399 }
400 }
401 } while (deleteResponse == null);
402
403 return deleteResponse;
404 }
405
406
407
408
409
410
411
412
413
414
415
416
417
418 protected void init(int version)
419 throws IOException, IllegalArgumentException, SearchIndexException {
420 this.indexVersion = version;
421
422 if (client == null) {
423 final RestClientBuilder builder = RestClient
424 .builder(new HttpHost(externalServerHostname, externalServerPort, externalServerScheme));
425
426 if (username != null && password != null) {
427 final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
428 credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
429 builder.setHttpClientConfigCallback(
430 httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
431 }
432
433 client = new RestHighLevelClient(builder);
434 }
435
436
437 waitUntilOpensearchIsAvailable();
438
439
440 createIndex();
441 }
442
443
444
445
446
447
448 private void waitUntilOpensearchIsAvailable() {
449 String openSearchUrl = getOpensearchURL();
450 logger.info("Testing connection to OpenSearch at {}", openSearchUrl);
451 while (!isOpensearchReachable()) {
452 logger.warn("Could not reach OpenSearch at {}. Trying again after {} ms...", openSearchUrl, retryDelayOnStartup);
453 try {
454 Thread.sleep(retryDelayOnStartup);
455 } catch (InterruptedException e) {
456 throw new RuntimeException("Could not reach OpenSearch at " + openSearchUrl, e);
457 }
458 }
459 logger.info("Connection to OpenSearch at {} tested successfully", openSearchUrl);
460 }
461
462
463
464
465
466
467
468
469
470
471 private boolean isOpensearchReachable() {
472 try {
473
474 ClusterHealthRequest request = new ClusterHealthRequest();
475 request.waitForYellowStatus();
476 ClusterHealthResponse resp = client.cluster().health(request, RequestOptions.DEFAULT);
477 if (resp.getStatus().equals(ClusterHealthStatus.GREEN)) {
478 logger.debug("Connected to OpenSearch, cluster health is {}", resp.getStatus());
479 return true;
480 } else if (resp.getStatus().equals(ClusterHealthStatus.YELLOW)) {
481 logger.warn("Connected to OpenSearch, cluster health is {}", resp.getStatus());
482 return true;
483 }
484 logger.debug("Connected to OpenSearch, but cluster health is {}", resp.getStatus());
485 return false;
486 } catch (ConnectException | ConnectionClosedException connectException) {
487
488
489 logger.debug("Unable to connect to OpenSearch", connectException);
490 return false;
491 } catch (IOException ioException) {
492
493
494 if (ioException.getCause() instanceof java.net.SocketException) {
495
496 logger.debug("Unable to connect to OpenSearch", ioException);
497 return false;
498 }
499
500 throw new RuntimeException("Couldn't connect to opensearch due to IOExceptionError", ioException);
501 } catch (ElasticsearchException elasticsearchException) {
502 if (elasticsearchException.status().getStatus() >= 500) {
503 logger.debug("OpenSearch health request ended with HTTP status code {}. Is OpenSearch service running?",
504 elasticsearchException.status().getStatus());
505 return false;
506 }
507
508
509
510 logger.error("Error while testing OpenSearch connection", elasticsearchException);
511 throw elasticsearchException;
512 } catch (Exception e) {
513
514 throw new RuntimeException("Unable to connect to OpenSearch, unexpected exception", e);
515 }
516 }
517
518
519
520
521
522
523
524 protected void close() throws IOException {
525 if (client != null) {
526 client.close();
527 }
528 }
529
530
531
532
533
534
535
536
537
538
539 private void createIndex() throws SearchIndexException, IOException {
540 if (StringUtils.isBlank(this.indexIdentifier)) {
541 throw new IllegalArgumentException("Search index identifier must be set");
542 }
543
544 for (String type : getDocumentTypes()) {
545 createSubIndex(type, getSubIndexIdentifier(type));
546 }
547 }
548
549 private void createSubIndex(String type, String idxName) throws SearchIndexException, IOException {
550 try {
551 logger.debug("Trying to create index for '{}'", idxName);
552 final CreateIndexRequest request = new CreateIndexRequest(idxName)
553 .settings(loadResources("indexSettings.json"), XContentType.JSON)
554 .mapping(loadResources(type + "-mapping.json"), XContentType.JSON);
555
556 final CreateIndexResponse siteIdxResponse = client.indices().create(request, RequestOptions.DEFAULT);
557 if (!siteIdxResponse.isAcknowledged()) {
558 throw new SearchIndexException("Unable to create index for '" + idxName + "'");
559 }
560 } catch (ElasticsearchStatusException e) {
561 if (e.getDetailedMessage().contains("already_exists_exception")) {
562 logger.info("Detected existing index '{}'", idxName);
563 } else {
564 throw e;
565 }
566 }
567
568
569
570 boolean versionIndexExists = false;
571 final GetRequest getRequest = new GetRequest(idxName, ROOT_ID);
572 try {
573 final GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
574 if (getResponse.isExists() && getResponse.getField(IndexSchema.VERSION) != null) {
575 final int actualIndexVersion = Integer.parseInt(getResponse.getField(IndexSchema.VERSION).getValue()
576 .toString());
577 if (indexVersion != actualIndexVersion) {
578 throw new SearchIndexException(
579 "Search index is at version " + actualIndexVersion + ", but codebase expects " + indexVersion);
580 }
581 versionIndexExists = true;
582 logger.debug("Search index version is {}", indexVersion);
583 }
584 } catch (ElasticsearchException e) {
585 logger.debug("Version index has not been created");
586 }
587
588
589 if (!versionIndexExists) {
590 logger.debug("Creating version index for site '{}'", idxName);
591 final IndexRequest indexRequest = new IndexRequest(idxName).id(ROOT_ID)
592 .source(Collections.singletonMap(IndexSchema.VERSION, indexVersion + ""));
593 logger.debug("Index version of site '{}' is {}", idxName, indexVersion);
594 client.index(indexRequest, RequestOptions.DEFAULT);
595 }
596 }
597
598
599
600
601
602
603
604
605 private String loadResources(final String filename) throws IOException {
606 final String resourcePath = "/elasticsearch/" + filename;
607
608
609 for (Class cls : Arrays.asList(this.getClass(), AbstractElasticsearchIndex.class)) {
610 try (InputStream is = cls.getResourceAsStream(resourcePath)) {
611 if (is != null) {
612 final String settings = IOUtils.toString(is, StandardCharsets.UTF_8);
613 logger.debug("Reading elasticsearch configuration resources from {}:\n{}", cls, settings);
614 return settings;
615 }
616 }
617 }
618 return null;
619 }
620
621
622
623
624
625
626
627
628
629
630 protected SearchRequest getSearchRequest(SearchQuery query, QueryBuilder queryBuilder) {
631
632 final SearchSourceBuilder searchSource = new SearchSourceBuilder()
633 .query(queryBuilder)
634 .trackTotalHits(true);
635
636
637 logger.debug("Searching for {}", searchSource);
638
639
640 if (query.getFields().length > 0) {
641 searchSource.storedFields(Arrays.asList(query.getFields()));
642 } else {
643 searchSource.storedFields(Collections.singletonList("*"));
644 }
645
646
647 if (query.getOffset() >= 0) {
648 searchSource.from(query.getOffset());
649 }
650
651 int limit = ELASTICSEARCH_INDEX_MAX_RESULT_WINDOW;
652 if (query.getLimit() > 0) {
653 if (query.getOffset() > 0
654 && (long) query.getOffset() + (long) query.getLimit() > ELASTICSEARCH_INDEX_MAX_RESULT_WINDOW) {
655 limit = ELASTICSEARCH_INDEX_MAX_RESULT_WINDOW - query.getOffset();
656 } else {
657 limit = query.getLimit();
658 }
659 }
660 searchSource.size(limit);
661
662
663 final Map<String, SortCriterion.Order> sortCriteria = query.getSortOrders();
664 for (Entry<String, SortCriterion.Order> sortCriterion : sortCriteria.entrySet()) {
665 ScriptSortBuilder sortBuilder = null;
666 logger.debug("Event sort criteria: {}", sortCriterion.getKey());
667 if ("publication".equals(sortCriterion.getKey())) {
668 sortBuilder = SortBuilders.scriptSort(
669 new Script("params._source.publication.length"),
670 ScriptSortBuilder.ScriptSortType.NUMBER);
671 }
672 switch (sortCriterion.getValue()) {
673 case Ascending:
674 if (sortBuilder != null) {
675 sortBuilder.order(SortOrder.ASC);
676 searchSource.sort(sortBuilder);
677 } else {
678 searchSource.sort(sortCriterion.getKey(), SortOrder.ASC);
679 }
680 break;
681 case Descending:
682 if (sortBuilder != null) {
683 sortBuilder.order(SortOrder.DESC);
684 searchSource.sort(sortBuilder);
685 } else {
686 searchSource.sort(sortCriterion.getKey(), SortOrder.DESC);
687 }
688 break;
689 default:
690 break;
691 }
692 }
693 return new SearchRequest(Arrays.stream(query.getTypes()).map(this::getSubIndexIdentifier).toArray(String[]::new))
694 .searchType(SearchType.QUERY_THEN_FETCH).preference("_local").source(searchSource);
695 }
696
697
698
699
700
701
702 public String getIndexName() {
703 return indexName;
704 }
705
706
707
708
709
710
711 protected long getTotalHits(SearchHits hits) {
712 return hits.getTotalHits().value;
713 }
714
715
716
717
718
719
720
721
722 protected String getSubIndexIdentifier(String type) {
723 return this.indexIdentifier + "_" + type;
724 }
725
726 public RestHighLevelClient getClient() {
727 return client;
728 }
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750 protected <T> SearchResult<T> executeQuery(SearchQuery query, SearchRequest request,
751 Function<SearchMetadataCollection, T> toSearchResult, int maxRetryAttempts, int retryWaitingPeriod)
752 throws IOException, InterruptedException {
753
754 SearchResponse searchResponse = null;
755 int retryAttempts = 0;
756 do {
757 try {
758 searchResponse = getClient().search(request, RequestOptions.DEFAULT);
759 } catch (ElasticsearchStatusException e) {
760 retryAttempts++;
761
762 if (retryAttempts <= maxRetryAttempts) {
763 logger.warn("Could not query documents from index {} because of {}, retrying in {} ms.", getIndexName(),
764 e.getMessage(), retryWaitingPeriod);
765 if (retryWaitingPeriod > 0) {
766 Thread.sleep(retryWaitingPeriod);
767 }
768 } else {
769 logger.error("Could not query documents from index {}, not retrying.", getIndexName(),
770 e);
771 throw e;
772 }
773 }
774 } while (searchResponse == null);
775
776
777 long hits = getTotalHits(searchResponse.getHits());
778 long size = searchResponse.getHits().getHits().length;
779 SearchResultImpl<T> result = new SearchResultImpl<>(query, hits, size);
780 result.setSearchTime(searchResponse.getTook().millis());
781
782
783 for (SearchHit doc : searchResponse.getHits()) {
784
785
786 SearchMetadataCollection metadata = new SearchMetadataCollection(doc.getType());
787 metadata.setIdentifier(doc.getId());
788
789 for (DocumentField field : doc.getFields().values()) {
790 String name = field.getName();
791 SearchMetadata<Object> m = new SearchMetadataImpl<>(name);
792
793
794
795 if (field.getValues().size() > 1) {
796 for (Object v : field.getValues()) {
797 m.addValue(v);
798 }
799 } else {
800 m.addValue(field.getValue());
801 }
802
803
804 metadata.add(m);
805 }
806
807
808 float score = doc.getScore();
809
810
811
812 try {
813 T document = toSearchResult.apply(metadata);
814 SearchResultItem<T> item = new SearchResultItemImpl<>(score, document);
815 result.addResultItem(item);
816 } catch (Throwable t) {
817 logger.warn("Error during search result serialization: '{}'. Skipping this search result.", t.getMessage());
818 size--;
819 }
820 }
821
822
823 result.setDocumentCount(size);
824
825 return result;
826 }
827
828
829
830
831
832
833 private String getOpensearchURL() {
834 return this.externalServerScheme + "://" + this.externalServerHostname + ":" + this.externalServerPort;
835 }
836
837
838
839
840
841
842
843
844
845
846 public List<String> getTermsForField(String field, String type) {
847 final String facetName = "terms";
848
849
850 final AggregationBuilder aggBuilder = AggregationBuilders.terms(facetName).field(field)
851 .size(ELASTICSEARCH_TERM_AGGREGATION_SIZE);
852 final SearchSourceBuilder searchSource = new SearchSourceBuilder().aggregation(aggBuilder);
853 final SearchRequest searchRequest = new SearchRequest(this.getSubIndexIdentifier(type)).source(searchSource);
854 try {
855 final SearchResponse response = getClient().search(searchRequest, RequestOptions.DEFAULT);
856
857 final List<String> terms = new ArrayList<>();
858 final Terms aggs = response.getAggregations().get(facetName);
859
860 for (Terms.Bucket bucket : aggs.getBuckets()) {
861 terms.add(bucket.getKey().toString());
862 }
863
864 return terms;
865 } catch (IOException e) {
866 return chuck(e);
867 }
868 }
869
870 }