View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.elasticsearch.index;
23  
24  import static org.opencastproject.util.data.functions.Misc.chuck;
25  
26  import org.opencastproject.elasticsearch.api.SearchIndexException;
27  import org.opencastproject.elasticsearch.api.SearchMetadata;
28  import org.opencastproject.elasticsearch.api.SearchResult;
29  import org.opencastproject.elasticsearch.impl.AbstractElasticsearchIndex;
30  import org.opencastproject.elasticsearch.impl.ElasticsearchDocument;
31  import org.opencastproject.elasticsearch.impl.SearchMetadataCollection;
32  import org.opencastproject.elasticsearch.index.objects.event.Event;
33  import org.opencastproject.elasticsearch.index.objects.event.EventIndexUtils;
34  import org.opencastproject.elasticsearch.index.objects.event.EventQueryBuilder;
35  import org.opencastproject.elasticsearch.index.objects.event.EventSearchQuery;
36  import org.opencastproject.elasticsearch.index.objects.series.Series;
37  import org.opencastproject.elasticsearch.index.objects.series.SeriesIndexUtils;
38  import org.opencastproject.elasticsearch.index.objects.series.SeriesQueryBuilder;
39  import org.opencastproject.elasticsearch.index.objects.series.SeriesSearchQuery;
40  import org.opencastproject.elasticsearch.index.objects.theme.IndexTheme;
41  import org.opencastproject.elasticsearch.index.objects.theme.ThemeQueryBuilder;
42  import org.opencastproject.elasticsearch.index.objects.theme.ThemeSearchQuery;
43  import org.opencastproject.list.api.ListProvidersService;
44  import org.opencastproject.security.api.User;
45  
46  import com.google.common.util.concurrent.Striped;
47  
48  import org.apache.commons.lang3.BooleanUtils;
49  import org.apache.commons.lang3.math.NumberUtils;
50  import org.elasticsearch.action.DocWriteResponse;
51  import org.elasticsearch.action.delete.DeleteResponse;
52  import org.elasticsearch.action.search.SearchRequest;
53  import org.osgi.framework.BundleContext;
54  import org.osgi.service.component.ComponentException;
55  import org.osgi.service.component.annotations.Activate;
56  import org.osgi.service.component.annotations.Component;
57  import org.osgi.service.component.annotations.Deactivate;
58  import org.osgi.service.component.annotations.Modified;
59  import org.osgi.service.component.annotations.Reference;
60  import org.osgi.service.component.annotations.ReferenceCardinality;
61  import org.osgi.service.component.annotations.ReferencePolicy;
62  import org.osgi.service.component.annotations.ReferencePolicyOption;
63  import org.slf4j.Logger;
64  import org.slf4j.LoggerFactory;
65  
66  import java.io.IOException;
67  import java.util.ArrayList;
68  import java.util.List;
69  import java.util.Map;
70  import java.util.Objects;
71  import java.util.Optional;
72  import java.util.concurrent.locks.Lock;
73  import java.util.function.Function;
74  
75  import javax.xml.bind.Unmarshaller;
76  
77  /**
78   * An index implementation based on ElasticSearch that serves the Admin UI API and the External API with data
79   * aggregated from multiple services.
80   */
81  @Component(
82          property = {
83                  "service.description=Elasticsearch Index"
84          },
85          service = { ElasticsearchIndex.class }
86  )
87  public class ElasticsearchIndex extends AbstractElasticsearchIndex {
88  
89    /** Retry configuration */
90    private int maxRetryAttemptsGet;
91    private static final String MAX_RETRY_ATTEMPTS_GET_PROPERTY = "max.retry.attempts.get";
92    private static final int DEFAULT_MAX_RETRY_ATTEMPTS_GET = 0;
93  
94    private int maxRetryAttemptsUpdate;
95    private static final String MAX_RETRY_ATTEMPTS_UPDATE_PROPERTY = "max.retry.attempts.update";
96    private static final int DEFAULT_MAX_RETRY_ATTEMPTS_UPDATE = 0;
97  
98    private int retryWaitingPeriodGet;
99    private static final String RETRY_WAITING_PERIOD_GET_PROPERTY = "retry.waiting.period.get";
100   private static final int DEFAULT_RETRY_WAITING_PERIOD_GET = 1000;
101 
102   private int retryWaitingPeriodUpdate;
103   private static final String RETRY_WAITING_PERIOD_UPDATE_PROPERTY = "retry.waiting.period.update";
104   private static final int DEFAULT_RETRY_WAITING_PERIOD_UPDATE = 1000;
105 
106   /** The required index version */
107   private static final int INDEX_VERSION = 101;
108 
109   /** The document types */
110   private static final String VERSION_DOCUMENT_TYPE = "version";
111 
112   private static final String[] DOCUMENT_TYPES = new String[] {
113       Event.DOCUMENT_TYPE,
114       Series.DOCUMENT_TYPE,
115       IndexTheme.DOCUMENT_TYPE,
116       VERSION_DOCUMENT_TYPE
117   };
118 
119   private static final Logger logger = LoggerFactory.getLogger(ElasticsearchIndex.class);
120 
121   private final Striped<Lock> locks = Striped.lazyWeakLock(1024);
122 
123   private ListProvidersService listProvidersService;
124 
125   private static final String CONFIG_EPISODE_ID_ROLE = "org.opencastproject.episode.id.role.access";
126 
127   private boolean episodeIdRole = false;
128 
129   @Reference(
130       cardinality = ReferenceCardinality.OPTIONAL,
131       policy = ReferencePolicy.DYNAMIC,
132       policyOption = ReferencePolicyOption.GREEDY
133   )
134   public void setListProvidersService(ListProvidersService listProvidersService) {
135     this.listProvidersService = listProvidersService;
136   }
137 
138   public void unsetListProvidersService(ListProvidersService listProvidersService) {
139     if (this.listProvidersService == listProvidersService) {
140       this.listProvidersService = null;
141     }
142   }
143 
144   /**
145    * OSGi callback to activate this component instance.
146    *
147    * @param bundleContext
148    *          The bundle context
149    * @param properties
150    *          The configuration
151    * @throws ComponentException
152    *          If the search index cannot be initialized
153    */
154   @Activate
155   public void activate(BundleContext bundleContext, Map<String, Object> properties) throws ComponentException {
156     super.activate(properties, bundleContext);
157     modified(properties);
158 
159     try {
160       init(INDEX_VERSION);
161     } catch (Throwable t) {
162       throw new ComponentException("Error initializing elastic search index", t);
163     }
164 
165     episodeIdRole = BooleanUtils.toBoolean(Objects.toString(
166         bundleContext.getProperty(CONFIG_EPISODE_ID_ROLE), "false"));
167     logger.debug("Usage of episode ID roles is set to {}", episodeIdRole);
168   }
169 
170   /**
171    * OSGi callback to deactivate this component.
172    *
173    * @throws IOException
174    *          If closing the index fails
175    */
176   @Deactivate
177   public void deactivate() throws IOException {
178     close();
179   }
180 
181   /**
182    * OSGi callback for configuration changes.
183    *
184    * @param properties
185    *          The configuration
186    */
187   @Modified
188   public void modified(Map<String, Object> properties) {
189     super.modified(properties);
190 
191     maxRetryAttemptsGet = NumberUtils.toInt((String) properties.get(MAX_RETRY_ATTEMPTS_GET_PROPERTY),
192             DEFAULT_MAX_RETRY_ATTEMPTS_GET);
193     retryWaitingPeriodGet = NumberUtils.toInt((String) properties.get(RETRY_WAITING_PERIOD_GET_PROPERTY),
194             DEFAULT_RETRY_WAITING_PERIOD_GET);
195     logger.info("Max retry attempts for get requests set to {}, timeout set to {} ms.", maxRetryAttemptsGet,
196             retryWaitingPeriodGet);
197 
198     maxRetryAttemptsUpdate = NumberUtils.toInt((String) properties.get(MAX_RETRY_ATTEMPTS_UPDATE_PROPERTY),
199             DEFAULT_MAX_RETRY_ATTEMPTS_UPDATE);
200     retryWaitingPeriodUpdate = NumberUtils.toInt((String) properties.get(RETRY_WAITING_PERIOD_UPDATE_PROPERTY),
201             DEFAULT_RETRY_WAITING_PERIOD_UPDATE);
202     logger.info("Max retry attempts for update requests set to {}, timeout set to {} ms.", maxRetryAttemptsUpdate,
203             retryWaitingPeriodUpdate);
204 
205     if (maxRetryAttemptsGet < 0 || maxRetryAttemptsUpdate < 0 || retryWaitingPeriodGet < 0
206             || retryWaitingPeriodUpdate < 0) {
207       logger.warn("You have configured negative values for max attempts or retry periods. Is this intended? This is "
208               + "equivalent to setting those values to 0.");
209     }
210   }
211 
212   /**
213    * @see AbstractElasticsearchIndex#getDocumentTypes()
214    */
215   @Override
216   public String[] getDocumentTypes() {
217     return DOCUMENT_TYPES;
218   }
219 
220   /*
221    * Get index objects
222    */
223 
224   /**
225    * Loads the event from the search index if it exists.
226    *
227    * @param mediaPackageId
228    *          The media package identifier
229    * @param organization
230    *          The organization
231    * @param user
232    *          The user
233    * @return the event (optional)
234    *
235    * @throws SearchIndexException
236    *          If querying the search index fails
237    * @throws IllegalStateException
238    *          If multiple events with the same identifier are found
239    */
240   public Optional<Event> getEvent(String mediaPackageId, String organization, User user) throws SearchIndexException {
241     return getEvent(mediaPackageId, organization, user, maxRetryAttemptsGet, retryWaitingPeriodGet);
242   }
243 
244   /**
245    * Loads the event from the search index if it exists.
246    *
247    * @param mediaPackageId
248    *          The media package identifier
249    * @param organization
250    *          The organization
251    * @param user
252    *          The user
253    * @param maxRetryAttempts
254    *          How often to retry query in case of ElasticsearchStatusException
255    * @param retryWaitingPeriod
256    *          How long to wait (in ms) between retries
257    * @return the event (optional)
258    *
259    * @throws SearchIndexException
260    *           If querying the search index fails
261    * @throws IllegalStateException
262    *           If multiple events with the same identifier are found
263    */
264   private Optional<Event> getEvent(String mediaPackageId, String organization, User user, int maxRetryAttempts,
265           int retryWaitingPeriod) throws SearchIndexException {
266     EventSearchQuery query = new EventSearchQuery(organization, user).withoutActions().withIdentifier(mediaPackageId);
267     SearchResult<Event> searchResult = getByQuery(query, maxRetryAttempts, retryWaitingPeriod);
268     if (searchResult.getDocumentCount() == 0) {
269       return Optional.empty();
270     } else if (searchResult.getDocumentCount() == 1) {
271       return Optional.of(searchResult.getItems()[0].getSource());
272     } else {
273       throw new IllegalStateException(
274               "Multiple events with identifier " + mediaPackageId + " found in search index");
275     }
276   }
277 
278   /**
279    * Loads the series from the search index if it exists.
280    *
281    * @param seriesId
282    *          The series identifier
283    * @param organization
284    *          The organization
285    * @param user
286    *          The user
287    * @return the series (optional)
288    *
289    * @throws SearchIndexException
290    *          If querying the search index fails
291    * @throws IllegalStateException
292    *          If multiple series with the same identifier are found
293    */
294   public Optional<Series> getSeries(String seriesId, String organization, User user)
295           throws SearchIndexException {
296     return getSeries(seriesId, organization, user, maxRetryAttemptsGet, retryWaitingPeriodGet);
297   }
298 
299   /**
300    * Loads the series from the search index if it exists.
301    *
302    * @param seriesId
303    *          The series identifier
304    * @param organization
305    *          The organization
306    * @param user
307    *          The user
308    * @param maxRetryAttempts
309    *          How often to retry query in case of ElasticsearchStatusException
310    * @param retryWaitingPeriod
311    *          How long to wait (in ms) between retries
312    * @return the series (optional)
313    *
314    * @throws SearchIndexException
315    *           If querying the search index fails
316    * @throws IllegalStateException
317    *           If multiple series with the same identifier are found
318    */
319   private Optional<Series> getSeries(String seriesId, String organization, User user, int maxRetryAttempts,
320           int retryWaitingPeriod) throws SearchIndexException {
321     SeriesSearchQuery query = new SeriesSearchQuery(organization, user).withoutActions().withIdentifier(seriesId);
322     SearchResult<Series> searchResult = getByQuery(query, maxRetryAttempts, retryWaitingPeriod);
323     if (searchResult.getDocumentCount() == 0) {
324       return Optional.empty();
325     } else if (searchResult.getDocumentCount() == 1) {
326       return Optional.of(searchResult.getItems()[0].getSource());
327     } else {
328       throw new IllegalStateException("Multiple series with identifier " + seriesId + " found in search index");
329     }
330   }
331 
332   /**
333    * Loads the theme from the search index if it exists.
334    *
335    * @param themeId
336    *          The theme identifier
337    * @param organization
338    *          The organization
339    * @param user
340    *          The user
341    * @return the theme wrapped in an optional
342    *
343    * @throws SearchIndexException
344    *          If querying the search index fails
345    * @throws IllegalStateException
346    *          If multiple themes with the same identifier are found
347    */
348   public Optional<IndexTheme> getTheme(long themeId, String organization, User user)
349           throws SearchIndexException {
350     return getTheme(themeId, organization, user, maxRetryAttemptsGet, retryWaitingPeriodGet);
351   }
352 
353   /**
354    * Loads the theme from the search index if it exists.
355    *
356    * @param themeId
357    *          The theme identifier
358    * @param organization
359    *          The organization
360    * @param user
361    *          The user
362    * @param maxRetryAttempts
363    *          How often to retry query in case of ElasticsearchStatusException
364    * @param retryWaitingPeriod
365    *          How long to wait (in ms) between retries
366    * @return the theme wrapped in an optional
367    *
368    * @throws SearchIndexException
369    *          If querying the search index fails
370    * @throws IllegalStateException
371    *          If multiple themes with the same identifier are found
372    */
373   private Optional<IndexTheme> getTheme(long themeId, String organization, User user, int maxRetryAttempts,
374           int retryWaitingPeriod)
375           throws SearchIndexException {
376     ThemeSearchQuery query = new ThemeSearchQuery(organization, user).withIdentifier(themeId);
377     SearchResult<IndexTheme> searchResult = getByQuery(query, maxRetryAttempts, retryWaitingPeriod);
378     if (searchResult.getDocumentCount() == 0) {
379       return Optional.empty();
380     } else if (searchResult.getDocumentCount() == 1) {
381       return Optional.of(searchResult.getItems()[0].getSource());
382     } else {
383       throw new IllegalStateException("Multiple themes with identifier " + themeId + " found in search index");
384     }
385   }
386 
387   /*
388    * Add or update index objects
389    */
390 
391   /**
392    * Adds or updates the event in the search index. Uses a locking mechanism to avoid issues like Lost Update.
393    *
394    * @param id
395    *          The id of the event to update
396    * @param updateFunction
397    *          The function that does the actual updating
398    * @param orgId
399    *          The organization the event belongs to
400    * @param user
401    *          The user
402    *
403    * @throws SearchIndexException
404    *          Thrown if unable to update the event.
405    */
406   public Optional<Event> addOrUpdateEvent(String id, Function<Optional<Event>, Optional<Event>> updateFunction,
407           String orgId, User user) throws SearchIndexException {
408     final Lock lock = this.locks.get(id);
409     lock.lock();
410     logger.debug("Locked event '{}'", id);
411 
412     try {
413       Optional<Event> eventOpt = getEvent(id, orgId, user, maxRetryAttemptsUpdate, retryWaitingPeriodUpdate);
414       Optional<Event> updatedEventOpt = updateFunction.apply(eventOpt);
415 
416       if (updatedEventOpt.isPresent()) {
417         update(updatedEventOpt.get());
418       }
419       return updatedEventOpt;
420     } finally {
421       lock.unlock();
422       logger.debug("Released locked event '{}'", id);
423     }
424   }
425 
426   /**
427    * Adds the recording event to the search index or updates it accordingly if it is there.
428    *
429    * @param event
430    *          The event to update
431    *
432    * @throws SearchIndexException
433    *          If the event cannot be added or updated
434    */
435   private void update(Event event) throws SearchIndexException {
436     logger.debug("Adding event {} to search index", event.getIdentifier());
437 
438     // Add the resource to the index
439     SearchMetadataCollection inputDocument = EventIndexUtils.toSearchMetadata(event, listProvidersService,
440         episodeIdRole);
441     List<SearchMetadata<?>> resourceMetadata = inputDocument.getMetadata();
442     ElasticsearchDocument doc = new ElasticsearchDocument(inputDocument.getIdentifier(),
443             inputDocument.getDocumentType(), resourceMetadata);
444 
445     try {
446       update(maxRetryAttemptsUpdate, retryWaitingPeriodUpdate, doc);
447     } catch (Throwable t) {
448       throw new SearchIndexException("Cannot write event " + event + " to index", t);
449     }
450   }
451 
452   /**
453    * Adds the recording events to the search index or updates it accordingly if it is there.
454    *
455    * @param eventList
456    *          The events to update
457    *
458    * @throws SearchIndexException
459    *          If the events cannot be added or updated
460    */
461   public void bulkEventUpdate(List<Event> eventList) throws SearchIndexException {
462     List<ElasticsearchDocument> docs = new ArrayList<>();
463     for (Event event: eventList) {
464       logger.debug("Adding event {} to search index", event.getIdentifier());
465       // Add the resource to the index
466       SearchMetadataCollection inputDocument = EventIndexUtils.toSearchMetadata(event, listProvidersService,
467           episodeIdRole);
468       List<SearchMetadata<?>> resourceMetadata = inputDocument.getMetadata();
469       docs.add(new ElasticsearchDocument(inputDocument.getIdentifier(),
470               inputDocument.getDocumentType(), resourceMetadata));
471     }
472     try {
473       bulkUpdate(maxRetryAttemptsUpdate, retryWaitingPeriodUpdate, docs);
474     } catch (Throwable t) {
475       throw new SearchIndexException("Cannot write events " + eventList + " to index", t);
476     }
477   }
478 
479   /**
480    * Adds or updates the series in the search index. Uses a locking mechanism to avoid issues like Lost Update.
481    *
482    * @param id
483    *          The id of the series to add
484    * @param updateFunction
485    *          The function that does the actual updating
486    * @param orgId
487    *          The organization the series belongs to
488    * @param user
489    *          The user
490    *
491    * @throws SearchIndexException
492    *          Thrown if unable to add or update the series.
493    */
494   public Optional<Series> addOrUpdateSeries(String id, Function<Optional<Series>, Optional<Series>> updateFunction,
495           String orgId, User user) throws SearchIndexException {
496     final Lock lock = this.locks.get(id);
497     lock.lock();
498     logger.debug("Locked series '{}'", id);
499 
500     try {
501       Optional<Series> seriesOpt = getSeries(id, orgId, user, maxRetryAttemptsUpdate, retryWaitingPeriodUpdate);
502       Optional<Series> updatedSeriesOpt = updateFunction.apply(seriesOpt);
503       if (updatedSeriesOpt.isPresent()) {
504         update(updatedSeriesOpt.get());
505       }
506       return updatedSeriesOpt;
507     } finally {
508       lock.unlock();
509       logger.debug("Released locked series '{}'", id);
510     }
511   }
512 
513   /**
514    * Add or update a series in the search index.
515    *
516    * @param series
517    *          The series to update
518    *
519    * @throws SearchIndexException
520    *          If the series cannot be added or updated
521    */
522   private void update(Series series) throws SearchIndexException {
523     logger.debug("Adding series {} to search index", series.getIdentifier());
524 
525     // Add the resource to the index
526     SearchMetadataCollection inputDocument = SeriesIndexUtils.toSearchMetadata(series);
527     List<SearchMetadata<?>> resourceMetadata = inputDocument.getMetadata();
528     ElasticsearchDocument doc = new ElasticsearchDocument(inputDocument.getIdentifier(),
529             inputDocument.getDocumentType(), resourceMetadata);
530 
531     try {
532       update(maxRetryAttemptsUpdate, retryWaitingPeriodUpdate, doc);
533     } catch (Throwable t) {
534       throw new SearchIndexException("Cannot write series " + series + " to index", t);
535     }
536   }
537 
538   /**
539    * Add or update a range of series in the search index.
540    *
541    * @param seriesList
542    *          The series to update
543    *
544    * @throws SearchIndexException
545    *          If the series cannot be added or updated
546    */
547   public void bulkSeriesUpdate(List<Series> seriesList) throws SearchIndexException {
548     List<ElasticsearchDocument> docs = new ArrayList<>();
549     for (Series series: seriesList) {
550       logger.debug("Adding series {} to search index", series.getIdentifier());
551       // Add the resource to the index
552       SearchMetadataCollection inputDocument = SeriesIndexUtils.toSearchMetadata(series);
553       List<SearchMetadata<?>> resourceMetadata = inputDocument.getMetadata();
554       docs.add(new ElasticsearchDocument(inputDocument.getIdentifier(),
555               inputDocument.getDocumentType(), resourceMetadata));
556     }
557     try {
558       bulkUpdate(maxRetryAttemptsUpdate, retryWaitingPeriodUpdate, docs);
559     } catch (Throwable t) {
560       throw new SearchIndexException("Cannot write series " + seriesList + " to index", t);
561     }
562   }
563 
564   /**
565    * Adds or updates the theme in the search index. Uses a locking mechanism to avoid issues like Lost Update.
566    *
567    * @param id
568    *          The id of the theme to update
569    * @param updateFunction
570    *          The function that does the actual updating
571    * @param orgId
572    *          The organization the theme belongs to
573    * @param user
574    *          The user
575    *
576    * @throws SearchIndexException
577    *          Thrown if unable to update the theme.
578    */
579   public Optional<IndexTheme> addOrUpdateTheme(long id, Function<Optional<IndexTheme>,
580           Optional<IndexTheme>> updateFunction, String orgId, User user) throws SearchIndexException {
581     final Lock lock = this.locks.get(id);
582     lock.lock();
583     logger.debug("Locked theme '{}'", id);
584 
585     try {
586       Optional<IndexTheme> themeOpt = getTheme(id, orgId, user, maxRetryAttemptsUpdate, retryWaitingPeriodUpdate);
587       Optional<IndexTheme> updatedThemeOpt = updateFunction.apply(themeOpt);
588       if (updatedThemeOpt.isPresent()) {
589         update(updatedThemeOpt.get());
590       }
591       return updatedThemeOpt;
592     } finally {
593       lock.unlock();
594       logger.debug("Released locked theme '{}'", id);
595     }
596   }
597 
598   /**
599    * Adds or updates the theme in the search index.
600    *
601    * @param theme
602    *          The theme to update
603    *
604    * @throws SearchIndexException
605    *          Thrown if unable to add or update the theme.
606    */
607   private void update(IndexTheme theme) throws SearchIndexException {
608     logger.debug("Adding theme {} to search index", theme.getIdentifier());
609 
610     // Add the resource to the index
611     SearchMetadataCollection inputDocument = theme.toSearchMetadata();
612     List<SearchMetadata<?>> resourceMetadata = inputDocument.getMetadata();
613     ElasticsearchDocument doc = new ElasticsearchDocument(inputDocument.getIdentifier(),
614             inputDocument.getDocumentType(), resourceMetadata);
615 
616     try {
617       update(maxRetryAttemptsUpdate, retryWaitingPeriodUpdate, doc);
618     } catch (Throwable t) {
619       throw new SearchIndexException("Cannot write theme " + theme + " to index", t);
620     }
621   }
622 
623   /**
624    * Adds or updates the themes in the search index.
625    *
626    * @param themeList
627    *          The themes to update
628    *
629    * @throws SearchIndexException
630    *          Thrown if unable to add or update the themes.
631    */
632   public void bulkThemeUpdate(List<IndexTheme> themeList) throws SearchIndexException {
633     List<ElasticsearchDocument> docs = new ArrayList<>();
634     for (IndexTheme theme: themeList) {
635       logger.debug("Adding theme {} to search index", theme.getIdentifier());
636 
637       // Add the resource to the index
638       SearchMetadataCollection inputDocument = theme.toSearchMetadata();
639       List<SearchMetadata<?>> resourceMetadata = inputDocument.getMetadata();
640       docs.add(new ElasticsearchDocument(inputDocument.getIdentifier(),
641               inputDocument.getDocumentType(), resourceMetadata));
642     }
643     try {
644       bulkUpdate(maxRetryAttemptsUpdate, retryWaitingPeriodUpdate, docs);
645     } catch (Throwable t) {
646       throw new SearchIndexException("Cannot write themes " + themeList + " to index", t);
647     }
648   }
649 
650   /*
651    * Delete index objects
652    */
653 
654   /**
655    * Delete event from index.
656    *
657    * @param eventId
658    *         The event identifier
659    * @param orgId
660    *         The organization id
661    * @return
662    *         true if it was deleted, false if it couldn't be found
663    * @throws SearchIndexException
664    *         If there was an error during deletion
665    */
666   public boolean deleteEvent(String eventId, String orgId) throws SearchIndexException {
667     return delete(Event.DOCUMENT_TYPE, eventId, orgId);
668   }
669 
670   /**
671    * Delete series from index.
672    *
673    * @param seriesId
674    *         The series identifier
675    * @param orgId
676    *         The organization id
677    * @return
678    *         true if it was deleted, false if it couldn't be found
679    * @throws SearchIndexException
680    *         If there was an error during deletion
681    */
682   public boolean deleteSeries(String seriesId, String orgId) throws SearchIndexException {
683     return delete(Series.DOCUMENT_TYPE, seriesId, orgId);
684   }
685 
686   /**
687    * Delete theme from index.
688    *
689    * @param themeId
690    *         The theme identifier
691    * @param orgId
692    *         The organization id
693    * @return
694    *         true if it was deleted, false if it couldn't be found
695    * @throws SearchIndexException
696    *         If there was an error during deletion
697    */
698   public boolean deleteTheme(String themeId, String orgId) throws SearchIndexException {
699     return delete(IndexTheme.DOCUMENT_TYPE, themeId, orgId);
700   }
701 
702   /**
703    * Delete object from this index.
704    *
705    * @param type
706    *         The type of object we want to delete
707    * @param id
708    *         The identifier of this object
709    * @param orgId
710    *         The organization id
711    * @return
712    *         True if it was deleted, false if it couldn't be found
713    *
714    * @throws SearchIndexException
715    *         If deleting from the index fails
716    */
717   private boolean delete(String type, String id, String orgId) throws SearchIndexException {
718     final Lock lock = this.locks.get(id);
719     lock.lock();
720     logger.debug("Locked {} '{}'.", type, id);
721     try {
722       String idWithOrgId = id.concat(orgId);
723       logger.debug("Removing element with id '{}' from search index '{}'", idWithOrgId, getSubIndexIdentifier(type));
724 
725       DeleteResponse deleteResponse = delete(type, idWithOrgId, maxRetryAttemptsUpdate, retryWaitingPeriodUpdate);
726       if (deleteResponse.getResult().equals(DocWriteResponse.Result.NOT_FOUND)) {
727         logger.trace("Document {} to delete was not found on index '{}'", idWithOrgId, getSubIndexIdentifier(type));
728         return false;
729       }
730     } catch (Throwable e) {
731       throw new SearchIndexException("Cannot remove " + type + " " + id + " from index", e);
732     } finally {
733       lock.unlock();
734       logger.debug("Released locked {} '{}'.", type, id);
735     }
736     return true;
737   }
738 
739   /*
740    * Get index objects by query
741    */
742 
743   /**
744    * @param query
745    *          The query to use to retrieve the events that match the query
746    * @return {@link SearchResult} collection of {@link Event} from a query.
747    *
748    * @throws SearchIndexException
749    *          Thrown if there is an error getting the results.
750    */
751   public SearchResult<Event> getByQuery(EventSearchQuery query) throws SearchIndexException {
752     return getByQuery(query, maxRetryAttemptsGet, retryWaitingPeriodGet);
753   }
754 
755   /**
756    * @param query
757    *          The query to use to retrieve the events that match the query
758    * @param maxRetryAttempts
759    *          How often to retry query in case of ElasticsearchStatusException
760    * @param retryWaitingPeriod
761    *          How long to wait (in ms) between retries
762    * @return {@link SearchResult} collection of {@link Event} from a query.
763    *
764    * @throws SearchIndexException
765    *          Thrown if there is an error getting the results.
766    */
767   private SearchResult<Event> getByQuery(EventSearchQuery query, int maxRetryAttempts, int retryWaitingPeriod)
768           throws SearchIndexException {
769     logger.debug("Searching index using event query '{}'", query);
770     // Create the request
771     final SearchRequest searchRequest = getSearchRequest(query, new EventQueryBuilder(query));
772 
773     try {
774       final Unmarshaller unmarshaller = Event.createUnmarshaller();
775       return executeQuery(query, searchRequest, metadata -> {
776         try {
777           return EventIndexUtils.toRecordingEvent(metadata, unmarshaller);
778         } catch (IOException e) {
779           return chuck(e);
780         }
781       }, maxRetryAttempts, retryWaitingPeriod);
782     } catch (Throwable t) {
783       throw new SearchIndexException("Error querying event index", t);
784     }
785   }
786 
787 
788   /**
789    * @param query
790    *          The query to use to retrieve the series that match the query
791    * @return {@link SearchResult} collection of {@link Series} from a query.
792    *
793    * @throws SearchIndexException
794    *          Thrown if there is an error getting the results.
795    */
796   public SearchResult<Series> getByQuery(SeriesSearchQuery query) throws SearchIndexException {
797     return getByQuery(query, maxRetryAttemptsGet, retryWaitingPeriodGet);
798   }
799 
800   /**
801    * @param query
802    *          The query to use to retrieve the series that match the query
803    * @param maxRetryAttempts
804    *          How often to retry query in case of ElasticsearchStatusException
805    * @param retryWaitingPeriod
806    *          How long to wait (in ms) between retries
807    *
808    * @return {@link SearchResult} collection of {@link Series} from a query.
809    *
810    * @throws SearchIndexException
811    *          Thrown if there is an error getting the results.
812    */
813   private SearchResult<Series> getByQuery(SeriesSearchQuery query, int maxRetryAttempts, int retryWaitingPeriod)
814           throws SearchIndexException {
815     logger.debug("Searching index using series query '{}'", query);
816     // Create the request
817     final SearchRequest searchRequest = getSearchRequest(query, new SeriesQueryBuilder(query));
818     try {
819       final Unmarshaller unmarshaller = Series.createUnmarshaller();
820       return executeQuery(query, searchRequest, metadata -> {
821         try {
822           return SeriesIndexUtils.toSeries(metadata, unmarshaller);
823         } catch (IOException e) {
824           return chuck(e);
825         }
826       }, maxRetryAttempts, retryWaitingPeriod);
827     } catch (Throwable t) {
828       throw new SearchIndexException("Error querying series index", t);
829     }
830   }
831 
832   /**
833    * @param query
834    *          The query to use to retrieve the themes that match the query
835    * @return {@link SearchResult} collection of {@link IndexTheme} from a query.
836    *
837    * @throws SearchIndexException
838    *          Thrown if there is an error getting the results.
839    */
840   public SearchResult<IndexTheme> getByQuery(ThemeSearchQuery query) throws SearchIndexException {
841     return getByQuery(query, maxRetryAttemptsGet, retryWaitingPeriodGet);
842   }
843 
844   /**
845    * @param query
846    *          The query to use to retrieve the themes that match the query
847    * @param maxRetryAttempts
848    *          How often to retry query in case of ElasticsearchStatusException
849    * @param retryWaitingPeriod
850    *          How long to wait (in ms) between retries
851    *
852    * @return {@link SearchResult} collection of {@link IndexTheme} from a query.
853    *
854    * @throws SearchIndexException
855    *          Thrown if there is an error getting the results.
856    */
857   private SearchResult<IndexTheme> getByQuery(ThemeSearchQuery query, int maxRetryAttempts, int retryWaitingPeriod)
858           throws SearchIndexException {
859     logger.debug("Searching index using theme query '{}'", query);
860     // Create the request
861     final SearchRequest searchRequest = getSearchRequest(query, new ThemeQueryBuilder(query));
862 
863     try {
864       return executeQuery(query, searchRequest, metadata -> {
865         try {
866           return IndexTheme.fromSearchMetadata(metadata);
867         } catch (IOException e) {
868           return chuck(e);
869         }
870       }, maxRetryAttempts, retryWaitingPeriod);
871     } catch (Throwable t) {
872       throw new SearchIndexException("Error querying theme index", t);
873     }
874   }
875 }