View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.series.impl;
23  
24  import static org.opencastproject.util.EqualsUtil.bothNotNull;
25  import static org.opencastproject.util.EqualsUtil.eqListSorted;
26  import static org.opencastproject.util.EqualsUtil.eqListUnsorted;
27  import static org.opencastproject.util.RequireUtil.notNull;
28  import static org.opencastproject.util.data.Option.some;
29  
30  import org.opencastproject.authorization.xacml.manager.api.AclServiceFactory;
31  import org.opencastproject.authorization.xacml.manager.api.ManagedAcl;
32  import org.opencastproject.authorization.xacml.manager.util.AccessInformationUtil;
33  import org.opencastproject.elasticsearch.api.SearchIndexException;
34  import org.opencastproject.elasticsearch.index.ElasticsearchIndex;
35  import org.opencastproject.elasticsearch.index.objects.series.Series;
36  import org.opencastproject.elasticsearch.index.rebuild.AbstractIndexProducer;
37  import org.opencastproject.elasticsearch.index.rebuild.IndexProducer;
38  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildException;
39  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService;
40  import org.opencastproject.mediapackage.EName;
41  import org.opencastproject.message.broker.api.series.SeriesItem;
42  import org.opencastproject.message.broker.api.update.SeriesUpdateHandler;
43  import org.opencastproject.metadata.dublincore.DublinCore;
44  import org.opencastproject.metadata.dublincore.DublinCoreByteFormat;
45  import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
46  import org.opencastproject.metadata.dublincore.DublinCoreValue;
47  import org.opencastproject.metadata.dublincore.DublinCoreXmlFormat;
48  import org.opencastproject.metadata.dublincore.EncodingSchemeUtils;
49  import org.opencastproject.metadata.dublincore.Precision;
50  import org.opencastproject.security.api.AccessControlList;
51  import org.opencastproject.security.api.AccessControlParser;
52  import org.opencastproject.security.api.Organization;
53  import org.opencastproject.security.api.OrganizationDirectoryService;
54  import org.opencastproject.security.api.SecurityService;
55  import org.opencastproject.security.api.UnauthorizedException;
56  import org.opencastproject.security.api.User;
57  import org.opencastproject.security.util.SecurityUtil;
58  import org.opencastproject.series.api.SeriesException;
59  import org.opencastproject.series.api.SeriesService;
60  import org.opencastproject.series.impl.persistence.SeriesEntity;
61  import org.opencastproject.util.NotFoundException;
62  import org.opencastproject.util.data.Option;
63  
64  import com.entwinemedia.fn.data.Opt;
65  
66  import org.apache.commons.lang3.StringUtils;
67  import org.json.simple.parser.ParseException;
68  import org.osgi.service.component.ComponentContext;
69  import org.osgi.service.component.annotations.Activate;
70  import org.osgi.service.component.annotations.Component;
71  import org.osgi.service.component.annotations.Reference;
72  import org.osgi.service.component.annotations.ReferenceCardinality;
73  import org.osgi.service.component.annotations.ReferencePolicy;
74  import org.slf4j.Logger;
75  import org.slf4j.LoggerFactory;
76  import org.xml.sax.SAXException;
77  
78  import java.io.IOException;
79  import java.nio.charset.StandardCharsets;
80  import java.util.ArrayList;
81  import java.util.Arrays;
82  import java.util.Date;
83  import java.util.HashMap;
84  import java.util.List;
85  import java.util.Map;
86  import java.util.Optional;
87  import java.util.Set;
88  import java.util.UUID;
89  import java.util.function.Function;
90  
91  import javax.xml.parsers.ParserConfigurationException;
92  
93  /**
94   * Implements {@link SeriesService}. Uses {@link SeriesServiceDatabase} for permanent storage and
95   * {@link ElasticsearchIndex} for searching.
96   */
97  @Component(
98      property = {
99          "service.description=Series Service"
100     },
101     immediate = true,
102     service = { SeriesService.class, IndexProducer.class }
103 )
104 public class SeriesServiceImpl extends AbstractIndexProducer implements SeriesService {
105 
106   /** Logging utility */
107   private static final Logger logger = LoggerFactory.getLogger(SeriesServiceImpl.class);
108 
109   private static final String THEME_PROPERTY_NAME = "theme";
110 
111   /** Persistent storage */
112   protected SeriesServiceDatabase persistence;
113 
114   /** The security service */
115   protected SecurityService securityService;
116 
117   /** The organization directory */
118   protected OrganizationDirectoryService orgDirectory;
119 
120   /** The system user name */
121   private String systemUserName;
122 
123   /** The Elasticsearch index */
124   private ElasticsearchIndex index;
125 
126   private AclServiceFactory aclServiceFactory;
127 
128   private ArrayList<SeriesUpdateHandler> updateHandlers = new ArrayList<>();
129 
130   /** OSGi callback for setting persistance. */
131   @Reference
132   public void setPersistence(SeriesServiceDatabase persistence) {
133     this.persistence = persistence;
134   }
135 
136   /** OSGi callback for setting the security service. */
137   @Reference
138   public void setSecurityService(SecurityService securityService) {
139     this.securityService = securityService;
140   }
141 
142   /** OSGi callback for setting the organization directory service. */
143   @Reference
144   public void setOrgDirectory(OrganizationDirectoryService orgDirectory) {
145     this.orgDirectory = orgDirectory;
146   }
147 
148   /** OSGi callbacks for settings and removing handlers. */
149   @Reference(
150       policy = ReferencePolicy.DYNAMIC,
151       cardinality = ReferenceCardinality.MULTIPLE,
152       unbind = "removeMessageHandler"
153   )
154   public void addMessageHandler(SeriesUpdateHandler handler) {
155     this.updateHandlers.add(handler);
156   }
157 
158   public void removeMessageHandler(SeriesUpdateHandler handler) {
159     this.updateHandlers.remove(handler);
160   }
161 
162   /** OSGi callbacks for setting the Elasticsearch index. */
163   @Reference
164   public void setElasticsearchIndex(ElasticsearchIndex index) {
165     this.index = index;
166   }
167 
168   @Reference
169   public void setAclServiceFactory(AclServiceFactory aclServiceFactory) {
170     this.aclServiceFactory = aclServiceFactory;
171   }
172 
173   /**
174    * Activates Series Service. Checks whether we are using synchronous or asynchronous indexing. If
175    * asynchronous is used, Executor service is set. If index is empty, persistent storage is queried
176    * if it contains any series. If that is the case, series are retrieved and indexed.
177    */
178   @Activate
179   public void activate(ComponentContext cc) throws Exception {
180     logger.info("Activating Series Service");
181     systemUserName = cc.getBundleContext().getProperty(SecurityUtil.PROPERTY_KEY_SYS_USER);
182   }
183 
184   @Override
185   public DublinCoreCatalog updateSeries(DublinCoreCatalog dc) throws SeriesException, UnauthorizedException {
186     try {
187       for (DublinCoreCatalog dublinCore : isNew(notNull(dc, "dc"))) {
188         final String id = dublinCore.getFirst(DublinCore.PROPERTY_IDENTIFIER);
189 
190         if (!dublinCore.hasValue(DublinCore.PROPERTY_CREATED)) {
191           DublinCoreValue date = EncodingSchemeUtils.encodeDate(new Date(), Precision.Minute);
192           dublinCore.set(DublinCore.PROPERTY_CREATED, date);
193           logger.debug("Setting series creation date to '{}'", date.getValue());
194         }
195 
196         if (dublinCore.hasValue(DublinCore.PROPERTY_TITLE)) {
197           if (dublinCore.getFirst(DublinCore.PROPERTY_TITLE).length() > 255) {
198             dublinCore.set(DublinCore.PROPERTY_TITLE, dublinCore.getFirst(DublinCore.PROPERTY_TITLE).substring(0, 255));
199             logger.warn("Title was longer than 255 characters. Cutting excess off.");
200           }
201         }
202 
203         logger.debug("Updating series {}", id);
204         // update API index
205         updateSeriesMetadataInIndex(id, dublinCore);
206 
207         // Make sure store to persistence comes after index, return value can be null
208         DublinCoreCatalog updated = persistence.storeSeries(dublinCore);
209 
210         // still sent for other asynchronous updates
211         triggerEventHandlers(SeriesItem.updateCatalog(dublinCore));
212         return (updated == null) ? null : dublinCore;
213       }
214       return dc;
215     } catch (Exception e) {
216       throw new SeriesException(e);
217     }
218   }
219 
220   /** Check if <code>dc</code> is new and, if so, return an updated version ready to store. */
221   private Option<DublinCoreCatalog> isNew(DublinCoreCatalog dc) throws SeriesServiceDatabaseException {
222     final String id = dc.getFirst(DublinCore.PROPERTY_IDENTIFIER);
223     if (id != null) {
224       try {
225         return equals(persistence.getSeries(id), dc) ? Option.none() : some(dc);
226       } catch (NotFoundException e) {
227         return some(dc);
228       }
229     } else {
230       logger.info("Series Dublin Core does not contain identifier, generating one");
231       dc.set(DublinCore.PROPERTY_IDENTIFIER, UUID.randomUUID().toString());
232       return some(dc);
233     }
234   }
235 
236   @Override
237   public boolean updateAccessControl(final String seriesId, final AccessControlList accessControl)
238           throws NotFoundException, SeriesException {
239     return updateAccessControl(seriesId, accessControl, false);
240   }
241 
242   // todo method signature does not fit the three different possible return values
243   @Override
244   public boolean updateAccessControl(final String seriesId, final AccessControlList accessControl,
245           boolean overrideEpisodeAcl)
246           throws NotFoundException, SeriesException {
247     if (StringUtils.isEmpty(seriesId)) {
248       throw new IllegalArgumentException("Series ID parameter must not be null or empty.");
249     }
250     if (accessControl == null) {
251       throw new IllegalArgumentException("ACL parameter must not be null");
252     }
253     if (needsUpdate(seriesId, accessControl) || overrideEpisodeAcl) {
254       logger.debug("Updating ACL of series {}", seriesId);
255       boolean updated;
256 
257       try {
258         updated = persistence.storeSeriesAccessControl(seriesId, accessControl);
259         //update Elasticsearch index
260         updateSeriesAclInIndex(seriesId, accessControl);
261         // still sent for other asynchronous updates
262         triggerEventHandlers(SeriesItem.updateAcl(seriesId, accessControl, overrideEpisodeAcl));
263       } catch (SeriesServiceDatabaseException e) {
264         logger.error("Could not update series {} with access control rules", seriesId, e);
265         throw new SeriesException(e);
266       }
267       return updated;
268     } else {
269       // todo not the right return code
270       return true;
271     }
272   }
273 
274   /** Check if <code>acl</code> needs to be updated for the given series. */
275   private boolean needsUpdate(String seriesId, AccessControlList acl) throws SeriesException {
276     try {
277       return !equals(persistence.getAccessControlList(seriesId), acl);
278     } catch (NotFoundException e) {
279       return true;
280     } catch (SeriesServiceDatabaseException e) {
281       throw new SeriesException(e);
282     }
283   }
284 
285   /*
286    * (non-Javadoc)
287    *
288    * @see org.opencastproject.series.api.SeriesService#deleteSeries(java.lang.String)
289    */
290   @Override
291   public void deleteSeries(final String seriesID) throws SeriesException, NotFoundException {
292     try {
293       persistence.deleteSeries(seriesID);
294       // remove from Elasticsearch index
295       removeSeriesFromIndex(seriesID);
296       // still sent for other asynchronous updates
297       triggerEventHandlers(SeriesItem.delete(seriesID));
298     } catch (SeriesServiceDatabaseException e1) {
299       logger.error("Could not delete series with id {} from persistence storage", seriesID);
300       throw new SeriesException(e1);
301     }
302   }
303 
304   @Override
305   public DublinCoreCatalog getSeries(String seriesID) throws SeriesException, NotFoundException {
306     try {
307       return persistence.getSeries(seriesID);
308     } catch (SeriesServiceDatabaseException e) {
309       logger.error("Failed to execute search query: {}", e.getMessage());
310       throw new SeriesException(e);
311     }
312   }
313 
314   @Override
315   public List<org.opencastproject.series.api.Series> getAllForAdministrativeRead(
316       Date from,
317       Optional<Date> to,
318       int limit
319   ) throws SeriesException, UnauthorizedException {
320     try {
321       return persistence.getAllForAdministrativeRead(from, to, limit);
322     } catch (SeriesServiceDatabaseException e) {
323       String msg = String.format(
324           "Exception while reading all series in range %s to %s from persistence storage",
325           from,
326           to
327       );
328       throw new SeriesException(msg, e);
329     }
330   }
331 
332   public AccessControlList getSeriesAccessControl(String seriesID) throws NotFoundException, SeriesException {
333     try {
334       return persistence.getAccessControlList(seriesID);
335     } catch (SeriesServiceDatabaseException e) {
336       throw new SeriesException("Failed to execute search query", e);
337     }
338   }
339 
340   @Override
341   public int getSeriesCount() throws SeriesException {
342     try {
343       return persistence.countSeries();
344     } catch (SeriesServiceDatabaseException e) {
345       throw new SeriesException("Failed to execute search query", e);
346     }
347   }
348 
349   @Override
350   public Map<String, String> getSeriesProperties(String seriesID)
351           throws SeriesException, NotFoundException, UnauthorizedException {
352     try {
353       return persistence.getSeriesProperties(seriesID);
354     } catch (SeriesServiceDatabaseException e) {
355       throw new SeriesException(String.format("Failed to get series properties for series with id '%s'", seriesID), e);
356     }
357   }
358 
359   @Override
360   public String getSeriesProperty(String seriesID, String propertyName)
361           throws SeriesException, NotFoundException, UnauthorizedException {
362     try {
363       return persistence.getSeriesProperty(seriesID, propertyName);
364     } catch (SeriesServiceDatabaseException e) {
365       String msg = String.format(
366               "Failed to get series property for series with series id '%s' and property name '%s'",
367               seriesID,
368               propertyName
369       );
370       throw new SeriesException(msg, e);
371     }
372   }
373 
374   @Override
375   public void updateSeriesProperty(String seriesID, String propertyName, String propertyValue)
376           throws SeriesException, NotFoundException, UnauthorizedException {
377     try {
378       persistence.updateSeriesProperty(seriesID, propertyName, propertyValue);
379 
380       // update Elasticsearch index
381       if (propertyName.equals(THEME_PROPERTY_NAME)) {
382         updateThemePropertyInIndex(seriesID, Optional.ofNullable(propertyValue));
383       }
384     } catch (SeriesServiceDatabaseException e) {
385       String msg = String.format(
386               "Failed to get series property for series with series id '%s' and property name '%s' and value '%s'",
387               seriesID,
388               propertyName,
389               propertyValue
390       );
391       throw new SeriesException(msg, e);
392     }
393   }
394 
395   @Override
396   public void deleteSeriesProperty(String seriesID, String propertyName)
397           throws SeriesException, NotFoundException, UnauthorizedException {
398     try {
399       persistence.deleteSeriesProperty(seriesID, propertyName);
400 
401       // update Elasticsearch index
402       if (propertyName.equals(THEME_PROPERTY_NAME)) {
403         updateThemePropertyInIndex(seriesID, Optional.empty());
404       }
405     } catch (SeriesServiceDatabaseException e) {
406       String msg = String.format(
407               "Failed to delete series property for series with series id '%s' and property name '%s'",
408               seriesID,
409               propertyName
410       );
411       throw new SeriesException(msg, e);
412     }
413   }
414 
415   /**
416    * Define equality on DublinCoreCatalogs. Two DublinCores are considered equal if they have the same properties and if
417    * each property has the same values in the same order.
418    * <p>
419    * Note: As long as http://opencast.jira.com/browse/MH-8759 is not fixed, the encoding scheme of values is not
420    * considered.
421    * <p>
422    * Implementation Note: DublinCores should not be compared by their string serialization since the ordering of
423    * properties is not defined and cannot be guaranteed between serializations.
424    */
425   public static boolean equals(DublinCoreCatalog a, DublinCoreCatalog b) {
426     final Map<EName, List<DublinCoreValue>> av = a.getValues();
427     final Map<EName, List<DublinCoreValue>> bv = b.getValues();
428     if (av.size() == bv.size()) {
429       for (Map.Entry<EName, List<DublinCoreValue>> ave : av.entrySet()) {
430         if (!eqListSorted(ave.getValue(), bv.get(ave.getKey()))) {
431           return false;
432         }
433       }
434       return true;
435     } else {
436       return false;
437     }
438   }
439 
440   /**
441    * Define equality on AccessControlLists. Two AccessControlLists are considered equal if they contain the exact same
442    * entries no matter in which order.
443    */
444   public static boolean equals(AccessControlList a, AccessControlList b) {
445     return bothNotNull(a, b) && eqListUnsorted(a.getEntries(), b.getEntries());
446   }
447 
448   @Override
449   public Opt<Map<String, byte[]>> getSeriesElements(String seriesId) throws SeriesException {
450     try {
451       return persistence.getSeriesElements(seriesId);
452     } catch (SeriesServiceDatabaseException e) {
453       throw new SeriesException(e);
454     }
455   }
456 
457   @Override
458   public Opt<byte[]> getSeriesElementData(String seriesId, String type) throws SeriesException {
459     try {
460       return persistence.getSeriesElement(seriesId, type);
461     } catch (SeriesServiceDatabaseException e) {
462       throw new SeriesException(e);
463     }
464   }
465 
466   @Override
467   public boolean updateExtendedMetadata(String seriesId, String type, DublinCoreCatalog dc) throws SeriesException {
468     try {
469       final byte[] data = dc.toXmlString().getBytes("UTF-8");
470       boolean successful = updateSeriesElement(seriesId, type, data);
471       if (successful) {
472         updateSeriesExtendedMetadataInIndex(seriesId, dc, type);
473       }
474       return successful;
475     } catch (IOException e) {
476       throw new SeriesException(e);
477     }
478   }
479 
480   @Override
481   public boolean updateSeriesElement(String seriesId, String type, byte[] data) throws SeriesException {
482     try {
483       boolean elementExisted = persistence.existsSeriesElement(seriesId, type);
484       boolean elementChanged = persistence.storeSeriesElement(seriesId, type, data);
485       if (elementExisted && elementChanged) {
486         triggerEventHandlers(SeriesItem.updateElement(seriesId, type, new String(data, StandardCharsets.UTF_8)));
487       }
488       return elementChanged;
489     } catch (SeriesServiceDatabaseException e) {
490       throw new SeriesException(e);
491     }
492   }
493 
494   @Override
495   public boolean deleteSeriesElement(String seriesId, String type) throws SeriesException {
496     try {
497       if (persistence.existsSeriesElement(seriesId, type)) {
498         boolean successful = persistence.deleteSeriesElement(seriesId, type);
499         if (successful) {
500           removeSeriesExtendedMetadataFromIndex(seriesId, type);
501         }
502         return  successful;
503       } else {
504         return false;
505       }
506     } catch (SeriesServiceDatabaseException e) {
507       throw new SeriesException(e);
508     }
509   }
510 
511   @Override
512   public void repopulate(IndexRebuildService.DataType type) throws IndexRebuildException {
513     try {
514       List<SeriesEntity> databaseSeries = persistence.getAllSeries();
515       final int total = databaseSeries.size();
516       logIndexRebuildBegin(logger, total, "series");
517       int current = 0;
518       int n = 20;
519       var updatedSeriesRange = new ArrayList<Series>();
520 
521       for (SeriesEntity series: databaseSeries) {
522         String seriesId = series.getSeriesId();
523         logger.trace("Adding series {} for organization {} to the {} index.", seriesId,
524                 series.getOrganization(), index.getIndexName());
525         Organization organization = orgDirectory.getOrganization(series.getOrganization());
526         User systemUser = SecurityUtil.createSystemUser(systemUserName, organization);
527         current++;
528 
529         SecurityUtil.runAs(securityService, organization, systemUser,
530               () -> {
531                 var updatedSeriesData = Optional.of(new Series(seriesId, organization.getId()));
532                 try {
533                   DublinCoreCatalog catalog = DublinCoreXmlFormat.read(series.getDublinCoreXML());
534                   updatedSeriesData = getMetadataUpdateFunction(seriesId, catalog, organization.getId())
535                       .apply(updatedSeriesData);
536                 } catch (IOException | ParserConfigurationException | SAXException e) {
537                   logger.error("Could not read dublincore XML of series {}.", seriesId, e);
538                   return;
539                 }
540 
541                 // remove all extended metadata catalogs first so we get rid of old data
542                 updatedSeriesData = getResetExtendedMetadataFunction().apply(updatedSeriesData);
543                 for (Map.Entry<String, byte[]> entry: series.getElements().entrySet()) {
544                   try {
545                     DublinCoreCatalog dc = DublinCoreByteFormat.read(entry.getValue());
546 
547                     updatedSeriesData = getExtendedMetadataUpdateFunction(seriesId, dc, entry.getKey(),
548                             organization.getId()).apply(updatedSeriesData);
549 
550                   } catch (IOException | ParseException | ParserConfigurationException | SAXException e) {
551                     logger.error("Could not parse series element {} of series {} as a dublin core catalog, skipping.",
552                             entry.getKey(), seriesId, e);
553                   }
554                 }
555 
556                 String aclStr = series.getAccessControl();
557                 if (StringUtils.isNotBlank(aclStr)) {
558                   try {
559                     AccessControlList acl = AccessControlParser.parseAcl(aclStr);
560                     updatedSeriesData = getAclUpdateFunction(seriesId, acl, organization.getId())
561                         .apply(updatedSeriesData);
562                   } catch (Exception ex) {
563                     logger.error("Unable to parse ACL of series {}.", seriesId, ex);
564                   }
565                 }
566 
567                 try {
568                   Map<String, String> properties = persistence.getSeriesProperties(seriesId);
569                   updatedSeriesData = getThemePropertyUpdateFunction(seriesId,
570                           Optional.ofNullable(properties.get(THEME_PROPERTY_NAME)), organization.getId())
571                       .apply(updatedSeriesData);
572                 } catch (NotFoundException | SeriesServiceDatabaseException e) {
573                   logger.error("Error reading properties of series {}", seriesId, e);
574                 }
575                 updatedSeriesRange.add(updatedSeriesData.get());
576 
577               });
578 
579         if (updatedSeriesRange.size() >= n || current >= databaseSeries.size()) {
580           // do the actual index update
581           index.bulkSeriesUpdate(updatedSeriesRange);
582           logIndexRebuildProgress(logger, total, current, n);
583           updatedSeriesRange.clear();
584         }
585       }
586     } catch (Exception e) {
587       logIndexRebuildError(logger, e);
588       throw new IndexRebuildException(getService(), e);
589     }
590   }
591 
592   private void triggerEventHandlers(SeriesItem item) {
593     while (updateHandlers.size() != 1) {
594       logger.warn("Expecting 1 handler, but {} are registered.  Waiting 10s then retrying...", updateHandlers.size());
595       try {
596         Thread.sleep(10000L);
597       } catch (InterruptedException e) { /* swallow this, nothing to do */ }
598     }
599     for (SeriesUpdateHandler handler : updateHandlers) {
600       handler.execute(item);
601     }
602   }
603 
604   @Override
605   public IndexRebuildService.Service getService() {
606     return IndexRebuildService.Service.Series;
607   }
608 
609   /**
610    * Remove series from Elasticsearch index.
611    *
612    * @param seriesId
613    *          The series id
614    */
615   private void removeSeriesFromIndex(String seriesId) {
616     String orgId = securityService.getOrganization().getId();
617     logger.debug("Removing series {} from the {} index.", seriesId, index.getIndexName());
618 
619     try {
620       index.deleteSeries(seriesId, orgId);
621       logger.debug("Series {} removed from the {} index.", seriesId, index.getIndexName());
622     } catch (SearchIndexException e) {
623       logger.error("Series {} couldn't be removed from the {} index.", seriesId, index.getIndexName(), e);
624     }
625   }
626 
627   /**
628    * Remove series extended metadata from Elasticsearch index.
629    *
630    * @param seriesId
631    *          The series id
632    * @param type
633    *          The type of extended metadata to remove
634    */
635   private void removeSeriesExtendedMetadataFromIndex(String seriesId, String type) {
636     String orgId = securityService.getOrganization().getId();
637     logger.debug("Removing extended metadata of series {} from the {} index.", seriesId, index.getIndexName());
638 
639     // update series
640     Function<Optional<Series>, Optional<Series>> updateFunction = (Optional<Series> seriesOpt) -> {
641       if (seriesOpt.isPresent()) {
642         Series series = seriesOpt.get();
643         series.removeExtendedMetadata(type);
644         return Optional.of(series);
645       }
646       return Optional.empty();
647     };
648     updateSeriesInIndex(seriesId, orgId, updateFunction);
649   }
650 
651   /**
652    * Update series extended metadata in Elasticsearch index.
653    *
654    * @param seriesId
655    *          The series id
656    * @param dc
657    *          The dublin core catalog
658    * @param type
659    *          The type of dublin core catalog
660    */
661   private void updateSeriesExtendedMetadataInIndex(String seriesId, DublinCoreCatalog dc,
662           String type) {
663     String orgId = securityService.getOrganization().getId();
664     logger.debug("Updating extended metadata of series {} in the {} index.", seriesId, index.getIndexName());
665 
666     // update series
667     Function<Optional<Series>, Optional<Series>> updateFunction =
668             getExtendedMetadataUpdateFunction(seriesId, dc, type, orgId);
669     updateSeriesInIndex(seriesId, orgId, updateFunction);
670   }
671 
672   /**
673    * Get the function to reset the extended metadata for a series in an Elasticsearch index.
674    *
675    * @return the function to do the update
676    */
677   private Function<Optional<Series>, Optional<Series>> getResetExtendedMetadataFunction() {
678     return (Optional<Series> seriesOpt) -> {
679       if (seriesOpt.isPresent()) {
680         Series series = seriesOpt.get();
681         series.resetExtendedMetadata();
682         return Optional.of(series);
683       }
684       return Optional.empty();
685     };
686   }
687 
688   /**
689    * Get the function to update the extended metadata for a series in an Elasticsearch index.
690    *
691    * @param seriesId
692    *          The series id
693    * @param dc
694    *          The dublin core catalog
695    * @param type
696    *          The type of dublin core catalog
697    * @param orgId
698    *          The id of the current organization
699    * @return the function to do the update
700    */
701   private Function<Optional<Series>, Optional<Series>> getExtendedMetadataUpdateFunction(String seriesId,
702           DublinCoreCatalog dc, String type, String orgId) {
703     return (Optional<Series> seriesOpt) -> {
704       Series series = seriesOpt.orElse(new Series(seriesId, orgId));
705 
706       Map<String, List<String>> map = new HashMap();
707       Set<EName> eNames = dc.getProperties();
708       for (EName eName: eNames) {
709         String name = eName.getLocalName();
710         List<String> values = dc.get(eName, DublinCore.LANGUAGE_ANY);
711         map.put(name, values);
712       }
713       series.setExtendedMetadata(type, map);
714       return Optional.of(series);
715     };
716   }
717 
718   /**
719    * Update series metadata in Elasticsearch index.
720    *
721    * @param seriesId
722    *          The series id
723    * @param dc
724    *          The dublin core catalog
725    */
726   private void updateSeriesMetadataInIndex(String seriesId, DublinCoreCatalog dc) {
727     String orgId = securityService.getOrganization().getId();
728     logger.debug("Updating metadata of series {} in the {} index.", seriesId, index.getIndexName());
729 
730     // update series
731     Function<Optional<Series>, Optional<Series>> updateFunction = getMetadataUpdateFunction(seriesId, dc, orgId);
732     updateSeriesInIndex(seriesId, orgId, updateFunction);
733   }
734 
735   /**
736    * Get the function to update the metadata for a series in an Elasticsearch index.
737    *
738    * @param seriesId
739    *          The series id
740    * @param dc
741    *          The dublin core catalog
742    * @param orgId
743    *          The id of the current organization
744    * @return the function to do the update
745    */
746   private Function<Optional<Series>, Optional<Series>> getMetadataUpdateFunction(String seriesId, DublinCoreCatalog dc,
747           String orgId) {
748     return (Optional<Series> seriesOpt) -> {
749       Series series = seriesOpt.orElse(new Series(seriesId, orgId));
750 
751       // only for new series
752       if (!seriesOpt.isPresent()) {
753         series.setCreator(securityService.getUser().getName());
754       }
755 
756       series.setTitle(dc.getFirst(DublinCoreCatalog.PROPERTY_TITLE));
757       series.setDescription(dc.getFirst(DublinCore.PROPERTY_DESCRIPTION));
758       series.setSubject(dc.getFirst(DublinCore.PROPERTY_SUBJECT));
759       series.setLanguage(dc.getFirst(DublinCoreCatalog.PROPERTY_LANGUAGE));
760       series.setLicense(dc.getFirst(DublinCoreCatalog.PROPERTY_LICENSE));
761       series.setRightsHolder(dc.getFirst(DublinCore.PROPERTY_RIGHTS_HOLDER));
762       String createdDateStr = dc.getFirst(DublinCoreCatalog.PROPERTY_CREATED);
763       if (createdDateStr != null) {
764         series.setCreatedDateTime(EncodingSchemeUtils.decodeDate(createdDateStr));
765       }
766       series.setPublishers(dc.get(DublinCore.PROPERTY_PUBLISHER, DublinCore.LANGUAGE_ANY));
767       series.setContributors(dc.get(DublinCore.PROPERTY_CONTRIBUTOR, DublinCore.LANGUAGE_ANY));
768       series.setOrganizers(dc.get(DublinCoreCatalog.PROPERTY_CREATOR, DublinCore.LANGUAGE_ANY));
769       return Optional.of(series);
770     };
771   }
772 
773   /**
774    * Update series acl in Elasticsearch index.
775    *
776    * @param seriesId
777    *          The series id
778    * @param acl
779    *          The acl to update
780    */
781   private void updateSeriesAclInIndex(String seriesId, AccessControlList acl) {
782     String orgId = securityService.getOrganization().getId();
783     logger.debug("Updating ACL of series {} in the {} index.", seriesId, index.getIndexName());
784     Function<Optional<Series>, Optional<Series>> updateFunction = getAclUpdateFunction(seriesId, acl, orgId);
785     updateSeriesInIndex(seriesId, orgId, updateFunction);
786   }
787 
788   /**
789    * Get the function to update the acl for a series in an Elasticsearch index.
790    *
791    * @param seriesId
792    *          The series id
793    * @param acl
794    *          The acl to update
795    * @param orgId
796    *          The id of the current organization
797    * @return the function to do the update
798    */
799   private Function<Optional<Series>, Optional<Series>> getAclUpdateFunction(String seriesId, AccessControlList acl,
800           String orgId) {
801     return (Optional<Series> seriesOpt) -> {
802       Series series = seriesOpt.orElse(new Series(seriesId, orgId));
803 
804       List<ManagedAcl> acls = aclServiceFactory.serviceFor(securityService.getOrganization()).getAcls();
805       Option<ManagedAcl> managedAcl = AccessInformationUtil.matchAcls(acls, acl);
806       if (managedAcl.isSome()) {
807         series.setManagedAcl(managedAcl.get().getName());
808       }
809 
810       series.setAccessPolicy(AccessControlParser.toJsonSilent(acl));
811       return Optional.of(series);
812     };
813   }
814 
815   /**
816    * Update series theme property in an Elasticsearch index.
817    *
818    * @param seriesId
819    *          The series id
820    * @param propertyValueOpt
821    *          The value of the property (optional)
822    */
823   private void updateThemePropertyInIndex(String seriesId, Optional<String> propertyValueOpt) {
824     String orgId = securityService.getOrganization().getId();
825     logger.debug("Updating theme property of series {} in the {} index.", seriesId, index.getIndexName());
826     Function<Optional<Series>, Optional<Series>> updateFunction =
827             getThemePropertyUpdateFunction(seriesId, propertyValueOpt, orgId);
828     updateSeriesInIndex(seriesId, orgId, updateFunction);
829   }
830 
831   /**
832    * Get the function to update the theme property for a series in an Elasticsearch index.
833    *
834    * @param seriesId
835    *          The series id
836    * @param propertyValueOpt
837    *          The value of the property (optional)
838    * @param orgId
839    *          The id of the current organization
840    * @return the function to do the update
841    */
842   private Function<Optional<Series>, Optional<Series>> getThemePropertyUpdateFunction(String seriesId,
843           Optional<String> propertyValueOpt, String orgId) {
844     return (Optional<Series> seriesOpt) -> {
845       Series series = seriesOpt.orElse(new Series(seriesId, orgId));
846       if (propertyValueOpt.isPresent()) {
847         series.setTheme(Long.valueOf(propertyValueOpt.get()));
848       } else {
849         series.setTheme(null);
850       }
851       return Optional.of(series);
852     };
853   }
854 
855   /**
856    * Update a series in an Elasticsearch index.
857    *
858    * @param seriesId
859    *          The series id
860    * @param updateFunctions
861    *          The function(s) to do the actual updating
862    * @param orgId
863    *          The id of the current organization
864    * @return the updated series (optional)
865    */
866   @SafeVarargs
867   private  Optional<Series> updateSeriesInIndex(String seriesId, String orgId,
868           Function<Optional<Series>, Optional<Series>>... updateFunctions) {
869     User user = securityService.getUser();
870     Function<Optional<Series>, Optional<Series>> updateFunction = Arrays.stream(updateFunctions)
871             .reduce(Function.identity(), Function::andThen);
872 
873     try {
874       Optional<Series> seriesOpt = index.addOrUpdateSeries(seriesId, updateFunction, orgId, user);
875       logger.debug("Series {} updated in the {} index", seriesId, index.getIndexName());
876       return seriesOpt;
877     } catch (SearchIndexException e) {
878       logger.error("Series {} couldn't be updated in the {} index.", seriesId, index.getIndexName(), e);
879       return Optional.empty();
880     }
881   }
882 }