1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.elasticsearch.index;
23
24 import static org.opencastproject.util.data.functions.Misc.chuck;
25
26 import org.opencastproject.elasticsearch.api.SearchIndexException;
27 import org.opencastproject.elasticsearch.api.SearchMetadata;
28 import org.opencastproject.elasticsearch.api.SearchResult;
29 import org.opencastproject.elasticsearch.impl.AbstractElasticsearchIndex;
30 import org.opencastproject.elasticsearch.impl.ElasticsearchDocument;
31 import org.opencastproject.elasticsearch.impl.SearchMetadataCollection;
32 import org.opencastproject.elasticsearch.index.objects.event.Event;
33 import org.opencastproject.elasticsearch.index.objects.event.EventIndexUtils;
34 import org.opencastproject.elasticsearch.index.objects.event.EventQueryBuilder;
35 import org.opencastproject.elasticsearch.index.objects.event.EventSearchQuery;
36 import org.opencastproject.elasticsearch.index.objects.series.Series;
37 import org.opencastproject.elasticsearch.index.objects.series.SeriesIndexUtils;
38 import org.opencastproject.elasticsearch.index.objects.series.SeriesQueryBuilder;
39 import org.opencastproject.elasticsearch.index.objects.series.SeriesSearchQuery;
40 import org.opencastproject.elasticsearch.index.objects.theme.IndexTheme;
41 import org.opencastproject.elasticsearch.index.objects.theme.ThemeQueryBuilder;
42 import org.opencastproject.elasticsearch.index.objects.theme.ThemeSearchQuery;
43 import org.opencastproject.list.api.ListProvidersService;
44 import org.opencastproject.security.api.User;
45
46 import com.google.common.util.concurrent.Striped;
47
48 import org.apache.commons.lang3.BooleanUtils;
49 import org.apache.commons.lang3.math.NumberUtils;
50 import org.elasticsearch.action.DocWriteResponse;
51 import org.elasticsearch.action.delete.DeleteResponse;
52 import org.elasticsearch.action.search.SearchRequest;
53 import org.osgi.framework.BundleContext;
54 import org.osgi.service.component.ComponentException;
55 import org.osgi.service.component.annotations.Activate;
56 import org.osgi.service.component.annotations.Component;
57 import org.osgi.service.component.annotations.Deactivate;
58 import org.osgi.service.component.annotations.Modified;
59 import org.osgi.service.component.annotations.Reference;
60 import org.osgi.service.component.annotations.ReferenceCardinality;
61 import org.osgi.service.component.annotations.ReferencePolicy;
62 import org.osgi.service.component.annotations.ReferencePolicyOption;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65
66 import java.io.IOException;
67 import java.util.ArrayList;
68 import java.util.List;
69 import java.util.Map;
70 import java.util.Objects;
71 import java.util.Optional;
72 import java.util.concurrent.locks.Lock;
73 import java.util.function.Function;
74
75 import javax.xml.bind.Unmarshaller;
76
77
78
79
80
81 @Component(
82 property = {
83 "service.description=Elasticsearch Index"
84 },
85 service = { ElasticsearchIndex.class }
86 )
87 public class ElasticsearchIndex extends AbstractElasticsearchIndex {
88
89
90 private int maxRetryAttemptsGet;
91 private static final String MAX_RETRY_ATTEMPTS_GET_PROPERTY = "max.retry.attempts.get";
92 private static final int DEFAULT_MAX_RETRY_ATTEMPTS_GET = 0;
93
94 private int maxRetryAttemptsUpdate;
95 private static final String MAX_RETRY_ATTEMPTS_UPDATE_PROPERTY = "max.retry.attempts.update";
96 private static final int DEFAULT_MAX_RETRY_ATTEMPTS_UPDATE = 0;
97
98 private int retryWaitingPeriodGet;
99 private static final String RETRY_WAITING_PERIOD_GET_PROPERTY = "retry.waiting.period.get";
100 private static final int DEFAULT_RETRY_WAITING_PERIOD_GET = 1000;
101
102 private int retryWaitingPeriodUpdate;
103 private static final String RETRY_WAITING_PERIOD_UPDATE_PROPERTY = "retry.waiting.period.update";
104 private static final int DEFAULT_RETRY_WAITING_PERIOD_UPDATE = 1000;
105
106
107 private static final int INDEX_VERSION = 101;
108
109
110 private static final String VERSION_DOCUMENT_TYPE = "version";
111
112 private static final String[] DOCUMENT_TYPES = new String[] {
113 Event.DOCUMENT_TYPE,
114 Series.DOCUMENT_TYPE,
115 IndexTheme.DOCUMENT_TYPE,
116 VERSION_DOCUMENT_TYPE
117 };
118
119 private static final Logger logger = LoggerFactory.getLogger(ElasticsearchIndex.class);
120
121 private final Striped<Lock> locks = Striped.lazyWeakLock(1024);
122
123 private ListProvidersService listProvidersService;
124
125 private static final String CONFIG_EPISODE_ID_ROLE = "org.opencastproject.episode.id.role.access";
126
127 private boolean episodeIdRole = false;
128
129 @Reference(
130 cardinality = ReferenceCardinality.OPTIONAL,
131 policy = ReferencePolicy.DYNAMIC,
132 policyOption = ReferencePolicyOption.GREEDY
133 )
134 public void setListProvidersService(ListProvidersService listProvidersService) {
135 this.listProvidersService = listProvidersService;
136 }
137
138 public void unsetListProvidersService(ListProvidersService listProvidersService) {
139 if (this.listProvidersService == listProvidersService) {
140 this.listProvidersService = null;
141 }
142 }
143
144
145
146
147
148
149
150
151
152
153
154 @Activate
155 public void activate(BundleContext bundleContext, Map<String, Object> properties) throws ComponentException {
156 super.activate(properties, bundleContext);
157 modified(properties);
158
159 try {
160 init(INDEX_VERSION);
161 } catch (Throwable t) {
162 throw new ComponentException("Error initializing elastic search index", t);
163 }
164
165 episodeIdRole = BooleanUtils.toBoolean(Objects.toString(
166 bundleContext.getProperty(CONFIG_EPISODE_ID_ROLE), "false"));
167 logger.debug("Usage of episode ID roles is set to {}", episodeIdRole);
168 }
169
170
171
172
173
174
175
176 @Deactivate
177 public void deactivate() throws IOException {
178 close();
179 }
180
181
182
183
184
185
186
187 @Modified
188 public void modified(Map<String, Object> properties) {
189 super.modified(properties);
190
191 maxRetryAttemptsGet = NumberUtils.toInt((String) properties.get(MAX_RETRY_ATTEMPTS_GET_PROPERTY),
192 DEFAULT_MAX_RETRY_ATTEMPTS_GET);
193 retryWaitingPeriodGet = NumberUtils.toInt((String) properties.get(RETRY_WAITING_PERIOD_GET_PROPERTY),
194 DEFAULT_RETRY_WAITING_PERIOD_GET);
195 logger.info("Max retry attempts for get requests set to {}, timeout set to {} ms.", maxRetryAttemptsGet,
196 retryWaitingPeriodGet);
197
198 maxRetryAttemptsUpdate = NumberUtils.toInt((String) properties.get(MAX_RETRY_ATTEMPTS_UPDATE_PROPERTY),
199 DEFAULT_MAX_RETRY_ATTEMPTS_UPDATE);
200 retryWaitingPeriodUpdate = NumberUtils.toInt((String) properties.get(RETRY_WAITING_PERIOD_UPDATE_PROPERTY),
201 DEFAULT_RETRY_WAITING_PERIOD_UPDATE);
202 logger.info("Max retry attempts for update requests set to {}, timeout set to {} ms.", maxRetryAttemptsUpdate,
203 retryWaitingPeriodUpdate);
204
205 if (maxRetryAttemptsGet < 0 || maxRetryAttemptsUpdate < 0 || retryWaitingPeriodGet < 0
206 || retryWaitingPeriodUpdate < 0) {
207 logger.warn("You have configured negative values for max attempts or retry periods. Is this intended? This is "
208 + "equivalent to setting those values to 0.");
209 }
210 }
211
212
213
214
215 @Override
216 public String[] getDocumentTypes() {
217 return DOCUMENT_TYPES;
218 }
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240 public Optional<Event> getEvent(String mediaPackageId, String organization, User user) throws SearchIndexException {
241 return getEvent(mediaPackageId, organization, user, maxRetryAttemptsGet, retryWaitingPeriodGet);
242 }
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264 private Optional<Event> getEvent(String mediaPackageId, String organization, User user, int maxRetryAttempts,
265 int retryWaitingPeriod) throws SearchIndexException {
266 EventSearchQuery query = new EventSearchQuery(organization, user).withoutActions().withIdentifier(mediaPackageId);
267 SearchResult<Event> searchResult = getByQuery(query, maxRetryAttempts, retryWaitingPeriod);
268 if (searchResult.getDocumentCount() == 0) {
269 return Optional.empty();
270 } else if (searchResult.getDocumentCount() == 1) {
271 return Optional.of(searchResult.getItems()[0].getSource());
272 } else {
273 throw new IllegalStateException(
274 "Multiple events with identifier " + mediaPackageId + " found in search index");
275 }
276 }
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294 public Optional<Series> getSeries(String seriesId, String organization, User user)
295 throws SearchIndexException {
296 return getSeries(seriesId, organization, user, maxRetryAttemptsGet, retryWaitingPeriodGet);
297 }
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319 private Optional<Series> getSeries(String seriesId, String organization, User user, int maxRetryAttempts,
320 int retryWaitingPeriod) throws SearchIndexException {
321 SeriesSearchQuery query = new SeriesSearchQuery(organization, user).withoutActions().withIdentifier(seriesId);
322 SearchResult<Series> searchResult = getByQuery(query, maxRetryAttempts, retryWaitingPeriod);
323 if (searchResult.getDocumentCount() == 0) {
324 return Optional.empty();
325 } else if (searchResult.getDocumentCount() == 1) {
326 return Optional.of(searchResult.getItems()[0].getSource());
327 } else {
328 throw new IllegalStateException("Multiple series with identifier " + seriesId + " found in search index");
329 }
330 }
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348 public Optional<IndexTheme> getTheme(long themeId, String organization, User user)
349 throws SearchIndexException {
350 return getTheme(themeId, organization, user, maxRetryAttemptsGet, retryWaitingPeriodGet);
351 }
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373 private Optional<IndexTheme> getTheme(long themeId, String organization, User user, int maxRetryAttempts,
374 int retryWaitingPeriod)
375 throws SearchIndexException {
376 ThemeSearchQuery query = new ThemeSearchQuery(organization, user).withIdentifier(themeId);
377 SearchResult<IndexTheme> searchResult = getByQuery(query, maxRetryAttempts, retryWaitingPeriod);
378 if (searchResult.getDocumentCount() == 0) {
379 return Optional.empty();
380 } else if (searchResult.getDocumentCount() == 1) {
381 return Optional.of(searchResult.getItems()[0].getSource());
382 } else {
383 throw new IllegalStateException("Multiple themes with identifier " + themeId + " found in search index");
384 }
385 }
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406 public Optional<Event> addOrUpdateEvent(String id, Function<Optional<Event>, Optional<Event>> updateFunction,
407 String orgId, User user) throws SearchIndexException {
408 final Lock lock = this.locks.get(id);
409 lock.lock();
410 logger.debug("Locked event '{}'", id);
411
412 try {
413 Optional<Event> eventOpt = getEvent(id, orgId, user, maxRetryAttemptsUpdate, retryWaitingPeriodUpdate);
414 Optional<Event> updatedEventOpt = updateFunction.apply(eventOpt);
415
416 if (updatedEventOpt.isPresent()) {
417 update(updatedEventOpt.get());
418 }
419 return updatedEventOpt;
420 } finally {
421 lock.unlock();
422 logger.debug("Released locked event '{}'", id);
423 }
424 }
425
426
427
428
429
430
431
432
433
434
435 private void update(Event event) throws SearchIndexException {
436 logger.debug("Adding event {} to search index", event.getIdentifier());
437
438
439 SearchMetadataCollection inputDocument = EventIndexUtils.toSearchMetadata(event, listProvidersService,
440 episodeIdRole);
441 List<SearchMetadata<?>> resourceMetadata = inputDocument.getMetadata();
442 ElasticsearchDocument doc = new ElasticsearchDocument(inputDocument.getIdentifier(),
443 inputDocument.getDocumentType(), resourceMetadata);
444
445 try {
446 update(maxRetryAttemptsUpdate, retryWaitingPeriodUpdate, doc);
447 } catch (Throwable t) {
448 throw new SearchIndexException("Cannot write event " + event + " to index", t);
449 }
450 }
451
452
453
454
455
456
457
458
459
460
461 public void bulkEventUpdate(List<Event> eventList) throws SearchIndexException {
462 List<ElasticsearchDocument> docs = new ArrayList<>();
463 for (Event event: eventList) {
464 logger.debug("Adding event {} to search index", event.getIdentifier());
465
466 SearchMetadataCollection inputDocument = EventIndexUtils.toSearchMetadata(event, listProvidersService,
467 episodeIdRole);
468 List<SearchMetadata<?>> resourceMetadata = inputDocument.getMetadata();
469 docs.add(new ElasticsearchDocument(inputDocument.getIdentifier(),
470 inputDocument.getDocumentType(), resourceMetadata));
471 }
472 try {
473 bulkUpdate(maxRetryAttemptsUpdate, retryWaitingPeriodUpdate, docs);
474 } catch (Throwable t) {
475 throw new SearchIndexException("Cannot write events " + eventList + " to index", t);
476 }
477 }
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494 public Optional<Series> addOrUpdateSeries(String id, Function<Optional<Series>, Optional<Series>> updateFunction,
495 String orgId, User user) throws SearchIndexException {
496 final Lock lock = this.locks.get(id);
497 lock.lock();
498 logger.debug("Locked series '{}'", id);
499
500 try {
501 Optional<Series> seriesOpt = getSeries(id, orgId, user, maxRetryAttemptsUpdate, retryWaitingPeriodUpdate);
502 Optional<Series> updatedSeriesOpt = updateFunction.apply(seriesOpt);
503 if (updatedSeriesOpt.isPresent()) {
504 update(updatedSeriesOpt.get());
505 }
506 return updatedSeriesOpt;
507 } finally {
508 lock.unlock();
509 logger.debug("Released locked series '{}'", id);
510 }
511 }
512
513
514
515
516
517
518
519
520
521
522 private void update(Series series) throws SearchIndexException {
523 logger.debug("Adding series {} to search index", series.getIdentifier());
524
525
526 SearchMetadataCollection inputDocument = SeriesIndexUtils.toSearchMetadata(series);
527 List<SearchMetadata<?>> resourceMetadata = inputDocument.getMetadata();
528 ElasticsearchDocument doc = new ElasticsearchDocument(inputDocument.getIdentifier(),
529 inputDocument.getDocumentType(), resourceMetadata);
530
531 try {
532 update(maxRetryAttemptsUpdate, retryWaitingPeriodUpdate, doc);
533 } catch (Throwable t) {
534 throw new SearchIndexException("Cannot write series " + series + " to index", t);
535 }
536 }
537
538
539
540
541
542
543
544
545
546
547 public void bulkSeriesUpdate(List<Series> seriesList) throws SearchIndexException {
548 List<ElasticsearchDocument> docs = new ArrayList<>();
549 for (Series series: seriesList) {
550 logger.debug("Adding series {} to search index", series.getIdentifier());
551
552 SearchMetadataCollection inputDocument = SeriesIndexUtils.toSearchMetadata(series);
553 List<SearchMetadata<?>> resourceMetadata = inputDocument.getMetadata();
554 docs.add(new ElasticsearchDocument(inputDocument.getIdentifier(),
555 inputDocument.getDocumentType(), resourceMetadata));
556 }
557 try {
558 bulkUpdate(maxRetryAttemptsUpdate, retryWaitingPeriodUpdate, docs);
559 } catch (Throwable t) {
560 throw new SearchIndexException("Cannot write series " + seriesList + " to index", t);
561 }
562 }
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579 public Optional<IndexTheme> addOrUpdateTheme(long id, Function<Optional<IndexTheme>,
580 Optional<IndexTheme>> updateFunction, String orgId, User user) throws SearchIndexException {
581 final Lock lock = this.locks.get(id);
582 lock.lock();
583 logger.debug("Locked theme '{}'", id);
584
585 try {
586 Optional<IndexTheme> themeOpt = getTheme(id, orgId, user, maxRetryAttemptsUpdate, retryWaitingPeriodUpdate);
587 Optional<IndexTheme> updatedThemeOpt = updateFunction.apply(themeOpt);
588 if (updatedThemeOpt.isPresent()) {
589 update(updatedThemeOpt.get());
590 }
591 return updatedThemeOpt;
592 } finally {
593 lock.unlock();
594 logger.debug("Released locked theme '{}'", id);
595 }
596 }
597
598
599
600
601
602
603
604
605
606
607 private void update(IndexTheme theme) throws SearchIndexException {
608 logger.debug("Adding theme {} to search index", theme.getIdentifier());
609
610
611 SearchMetadataCollection inputDocument = theme.toSearchMetadata();
612 List<SearchMetadata<?>> resourceMetadata = inputDocument.getMetadata();
613 ElasticsearchDocument doc = new ElasticsearchDocument(inputDocument.getIdentifier(),
614 inputDocument.getDocumentType(), resourceMetadata);
615
616 try {
617 update(maxRetryAttemptsUpdate, retryWaitingPeriodUpdate, doc);
618 } catch (Throwable t) {
619 throw new SearchIndexException("Cannot write theme " + theme + " to index", t);
620 }
621 }
622
623
624
625
626
627
628
629
630
631
632 public void bulkThemeUpdate(List<IndexTheme> themeList) throws SearchIndexException {
633 List<ElasticsearchDocument> docs = new ArrayList<>();
634 for (IndexTheme theme: themeList) {
635 logger.debug("Adding theme {} to search index", theme.getIdentifier());
636
637
638 SearchMetadataCollection inputDocument = theme.toSearchMetadata();
639 List<SearchMetadata<?>> resourceMetadata = inputDocument.getMetadata();
640 docs.add(new ElasticsearchDocument(inputDocument.getIdentifier(),
641 inputDocument.getDocumentType(), resourceMetadata));
642 }
643 try {
644 bulkUpdate(maxRetryAttemptsUpdate, retryWaitingPeriodUpdate, docs);
645 } catch (Throwable t) {
646 throw new SearchIndexException("Cannot write themes " + themeList + " to index", t);
647 }
648 }
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666 public boolean deleteEvent(String eventId, String orgId) throws SearchIndexException {
667 return delete(Event.DOCUMENT_TYPE, eventId, orgId);
668 }
669
670
671
672
673
674
675
676
677
678
679
680
681
682 public boolean deleteSeries(String seriesId, String orgId) throws SearchIndexException {
683 return delete(Series.DOCUMENT_TYPE, seriesId, orgId);
684 }
685
686
687
688
689
690
691
692
693
694
695
696
697
698 public boolean deleteTheme(String themeId, String orgId) throws SearchIndexException {
699 return delete(IndexTheme.DOCUMENT_TYPE, themeId, orgId);
700 }
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717 private boolean delete(String type, String id, String orgId) throws SearchIndexException {
718 final Lock lock = this.locks.get(id);
719 lock.lock();
720 logger.debug("Locked {} '{}'.", type, id);
721 try {
722 String idWithOrgId = id.concat(orgId);
723 logger.debug("Removing element with id '{}' from search index '{}'", idWithOrgId, getSubIndexIdentifier(type));
724
725 DeleteResponse deleteResponse = delete(type, idWithOrgId, maxRetryAttemptsUpdate, retryWaitingPeriodUpdate);
726 if (deleteResponse.getResult().equals(DocWriteResponse.Result.NOT_FOUND)) {
727 logger.trace("Document {} to delete was not found on index '{}'", idWithOrgId, getSubIndexIdentifier(type));
728 return false;
729 }
730 } catch (Throwable e) {
731 throw new SearchIndexException("Cannot remove " + type + " " + id + " from index", e);
732 } finally {
733 lock.unlock();
734 logger.debug("Released locked {} '{}'.", type, id);
735 }
736 return true;
737 }
738
739
740
741
742
743
744
745
746
747
748
749
750
751 public SearchResult<Event> getByQuery(EventSearchQuery query) throws SearchIndexException {
752 return getByQuery(query, maxRetryAttemptsGet, retryWaitingPeriodGet);
753 }
754
755
756
757
758
759
760
761
762
763
764
765
766
767 private SearchResult<Event> getByQuery(EventSearchQuery query, int maxRetryAttempts, int retryWaitingPeriod)
768 throws SearchIndexException {
769 logger.debug("Searching index using event query '{}'", query);
770
771 final SearchRequest searchRequest = getSearchRequest(query, new EventQueryBuilder(query));
772
773 try {
774 final Unmarshaller unmarshaller = Event.createUnmarshaller();
775 return executeQuery(query, searchRequest, metadata -> {
776 try {
777 return EventIndexUtils.toRecordingEvent(metadata, unmarshaller);
778 } catch (IOException e) {
779 return chuck(e);
780 }
781 }, maxRetryAttempts, retryWaitingPeriod);
782 } catch (Throwable t) {
783 throw new SearchIndexException("Error querying event index", t);
784 }
785 }
786
787
788
789
790
791
792
793
794
795
796 public SearchResult<Series> getByQuery(SeriesSearchQuery query) throws SearchIndexException {
797 return getByQuery(query, maxRetryAttemptsGet, retryWaitingPeriodGet);
798 }
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813 private SearchResult<Series> getByQuery(SeriesSearchQuery query, int maxRetryAttempts, int retryWaitingPeriod)
814 throws SearchIndexException {
815 logger.debug("Searching index using series query '{}'", query);
816
817 final SearchRequest searchRequest = getSearchRequest(query, new SeriesQueryBuilder(query));
818 try {
819 final Unmarshaller unmarshaller = Series.createUnmarshaller();
820 return executeQuery(query, searchRequest, metadata -> {
821 try {
822 return SeriesIndexUtils.toSeries(metadata, unmarshaller);
823 } catch (IOException e) {
824 return chuck(e);
825 }
826 }, maxRetryAttempts, retryWaitingPeriod);
827 } catch (Throwable t) {
828 throw new SearchIndexException("Error querying series index", t);
829 }
830 }
831
832
833
834
835
836
837
838
839
840 public SearchResult<IndexTheme> getByQuery(ThemeSearchQuery query) throws SearchIndexException {
841 return getByQuery(query, maxRetryAttemptsGet, retryWaitingPeriodGet);
842 }
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857 private SearchResult<IndexTheme> getByQuery(ThemeSearchQuery query, int maxRetryAttempts, int retryWaitingPeriod)
858 throws SearchIndexException {
859 logger.debug("Searching index using theme query '{}'", query);
860
861 final SearchRequest searchRequest = getSearchRequest(query, new ThemeQueryBuilder(query));
862
863 try {
864 return executeQuery(query, searchRequest, metadata -> {
865 try {
866 return IndexTheme.fromSearchMetadata(metadata);
867 } catch (IOException e) {
868 return chuck(e);
869 }
870 }, maxRetryAttempts, retryWaitingPeriod);
871 } catch (Throwable t) {
872 throw new SearchIndexException("Error querying theme index", t);
873 }
874 }
875 }