View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.series.impl;
23  
24  import static org.opencastproject.util.EqualsUtil.bothNotNull;
25  import static org.opencastproject.util.EqualsUtil.eqListSorted;
26  import static org.opencastproject.util.EqualsUtil.eqListUnsorted;
27  import static org.opencastproject.util.RequireUtil.notNull;
28  import static org.opencastproject.util.data.Option.some;
29  
30  import org.opencastproject.authorization.xacml.manager.api.AclServiceFactory;
31  import org.opencastproject.authorization.xacml.manager.api.ManagedAcl;
32  import org.opencastproject.authorization.xacml.manager.util.AccessInformationUtil;
33  import org.opencastproject.elasticsearch.api.SearchIndexException;
34  import org.opencastproject.elasticsearch.index.ElasticsearchIndex;
35  import org.opencastproject.elasticsearch.index.objects.series.Series;
36  import org.opencastproject.elasticsearch.index.rebuild.AbstractIndexProducer;
37  import org.opencastproject.elasticsearch.index.rebuild.IndexProducer;
38  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildException;
39  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService;
40  import org.opencastproject.mediapackage.EName;
41  import org.opencastproject.message.broker.api.series.SeriesItem;
42  import org.opencastproject.message.broker.api.update.SeriesUpdateHandler;
43  import org.opencastproject.metadata.dublincore.DublinCore;
44  import org.opencastproject.metadata.dublincore.DublinCoreByteFormat;
45  import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
46  import org.opencastproject.metadata.dublincore.DublinCoreValue;
47  import org.opencastproject.metadata.dublincore.DublinCoreXmlFormat;
48  import org.opencastproject.metadata.dublincore.EncodingSchemeUtils;
49  import org.opencastproject.metadata.dublincore.Precision;
50  import org.opencastproject.security.api.AccessControlList;
51  import org.opencastproject.security.api.AccessControlParser;
52  import org.opencastproject.security.api.Organization;
53  import org.opencastproject.security.api.OrganizationDirectoryService;
54  import org.opencastproject.security.api.SecurityService;
55  import org.opencastproject.security.api.UnauthorizedException;
56  import org.opencastproject.security.api.User;
57  import org.opencastproject.security.util.SecurityUtil;
58  import org.opencastproject.series.api.SeriesException;
59  import org.opencastproject.series.api.SeriesService;
60  import org.opencastproject.series.impl.persistence.SeriesEntity;
61  import org.opencastproject.util.NotFoundException;
62  import org.opencastproject.util.data.Option;
63  
64  import org.apache.commons.lang3.StringUtils;
65  import org.json.simple.parser.ParseException;
66  import org.osgi.service.component.ComponentContext;
67  import org.osgi.service.component.annotations.Activate;
68  import org.osgi.service.component.annotations.Component;
69  import org.osgi.service.component.annotations.Reference;
70  import org.osgi.service.component.annotations.ReferenceCardinality;
71  import org.osgi.service.component.annotations.ReferencePolicy;
72  import org.slf4j.Logger;
73  import org.slf4j.LoggerFactory;
74  import org.xml.sax.SAXException;
75  
76  import java.io.IOException;
77  import java.nio.charset.StandardCharsets;
78  import java.util.ArrayList;
79  import java.util.Arrays;
80  import java.util.Date;
81  import java.util.HashMap;
82  import java.util.List;
83  import java.util.Map;
84  import java.util.Optional;
85  import java.util.Set;
86  import java.util.UUID;
87  import java.util.function.Function;
88  
89  import javax.xml.parsers.ParserConfigurationException;
90  
91  /**
92   * Implements {@link SeriesService}. Uses {@link SeriesServiceDatabase} for permanent storage and
93   * {@link ElasticsearchIndex} for searching.
94   */
95  @Component(
96      property = {
97          "service.description=Series Service"
98      },
99      immediate = true,
100     service = { SeriesService.class, IndexProducer.class }
101 )
102 public class SeriesServiceImpl extends AbstractIndexProducer implements SeriesService {
103 
104   /** Logging utility */
105   private static final Logger logger = LoggerFactory.getLogger(SeriesServiceImpl.class);
106 
107   private static final String THEME_PROPERTY_NAME = "theme";
108 
109   /** Persistent storage */
110   protected SeriesServiceDatabase persistence;
111 
112   /** The security service */
113   protected SecurityService securityService;
114 
115   /** The organization directory */
116   protected OrganizationDirectoryService orgDirectory;
117 
118   /** The system user name */
119   private String systemUserName;
120 
121   /** The Elasticsearch index */
122   private ElasticsearchIndex index;
123 
124   private AclServiceFactory aclServiceFactory;
125 
126   private ArrayList<SeriesUpdateHandler> updateHandlers = new ArrayList<>();
127 
128   /** OSGi callback for setting persistance. */
129   @Reference
130   public void setPersistence(SeriesServiceDatabase persistence) {
131     this.persistence = persistence;
132   }
133 
134   /** OSGi callback for setting the security service. */
135   @Reference
136   public void setSecurityService(SecurityService securityService) {
137     this.securityService = securityService;
138   }
139 
140   /** OSGi callback for setting the organization directory service. */
141   @Reference
142   public void setOrgDirectory(OrganizationDirectoryService orgDirectory) {
143     this.orgDirectory = orgDirectory;
144   }
145 
146   /** OSGi callbacks for settings and removing handlers. */
147   @Reference(
148       policy = ReferencePolicy.DYNAMIC,
149       cardinality = ReferenceCardinality.MULTIPLE,
150       unbind = "removeMessageHandler"
151   )
152   public void addMessageHandler(SeriesUpdateHandler handler) {
153     this.updateHandlers.add(handler);
154   }
155 
156   public void removeMessageHandler(SeriesUpdateHandler handler) {
157     this.updateHandlers.remove(handler);
158   }
159 
160   /** OSGi callbacks for setting the Elasticsearch index. */
161   @Reference
162   public void setElasticsearchIndex(ElasticsearchIndex index) {
163     this.index = index;
164   }
165 
166   @Reference
167   public void setAclServiceFactory(AclServiceFactory aclServiceFactory) {
168     this.aclServiceFactory = aclServiceFactory;
169   }
170 
171   /**
172    * Activates Series Service. Checks whether we are using synchronous or asynchronous indexing. If
173    * asynchronous is used, Executor service is set. If index is empty, persistent storage is queried
174    * if it contains any series. If that is the case, series are retrieved and indexed.
175    */
176   @Activate
177   public void activate(ComponentContext cc) throws Exception {
178     logger.info("Activating Series Service");
179     systemUserName = cc.getBundleContext().getProperty(SecurityUtil.PROPERTY_KEY_SYS_USER);
180   }
181 
182   @Override
183   public DublinCoreCatalog updateSeries(DublinCoreCatalog dc) throws SeriesException, UnauthorizedException {
184     try {
185       for (DublinCoreCatalog dublinCore : isNew(notNull(dc, "dc"))) {
186         final String id = dublinCore.getFirst(DublinCore.PROPERTY_IDENTIFIER);
187 
188         if (!dublinCore.hasValue(DublinCore.PROPERTY_CREATED)) {
189           DublinCoreValue date = EncodingSchemeUtils.encodeDate(new Date(), Precision.Minute);
190           dublinCore.set(DublinCore.PROPERTY_CREATED, date);
191           logger.debug("Setting series creation date to '{}'", date.getValue());
192         }
193 
194         if (dublinCore.hasValue(DublinCore.PROPERTY_TITLE)) {
195           if (dublinCore.getFirst(DublinCore.PROPERTY_TITLE).length() > 255) {
196             dublinCore.set(DublinCore.PROPERTY_TITLE, dublinCore.getFirst(DublinCore.PROPERTY_TITLE).substring(0, 255));
197             logger.warn("Title was longer than 255 characters. Cutting excess off.");
198           }
199         }
200 
201         logger.debug("Updating series {}", id);
202         // update API index
203         updateSeriesMetadataInIndex(id, dublinCore);
204 
205         // Make sure store to persistence comes after index, return value can be null
206         DublinCoreCatalog updated = persistence.storeSeries(dublinCore);
207 
208         // still sent for other asynchronous updates
209         triggerEventHandlers(SeriesItem.updateCatalog(dublinCore));
210         return (updated == null) ? null : dublinCore;
211       }
212       return dc;
213     } catch (Exception e) {
214       throw new SeriesException(e);
215     }
216   }
217 
218   /** Check if <code>dc</code> is new and, if so, return an updated version ready to store. */
219   private Option<DublinCoreCatalog> isNew(DublinCoreCatalog dc) throws SeriesServiceDatabaseException {
220     final String id = dc.getFirst(DublinCore.PROPERTY_IDENTIFIER);
221     if (id != null) {
222       try {
223         return equals(persistence.getSeries(id), dc) ? Option.none() : some(dc);
224       } catch (NotFoundException e) {
225         return some(dc);
226       }
227     } else {
228       logger.info("Series Dublin Core does not contain identifier, generating one");
229       dc.set(DublinCore.PROPERTY_IDENTIFIER, UUID.randomUUID().toString());
230       return some(dc);
231     }
232   }
233 
234   @Override
235   public boolean updateAccessControl(final String seriesId, final AccessControlList accessControl)
236           throws NotFoundException, SeriesException {
237     return updateAccessControl(seriesId, accessControl, false);
238   }
239 
240   // todo method signature does not fit the three different possible return values
241   @Override
242   public boolean updateAccessControl(final String seriesId, final AccessControlList accessControl,
243           boolean overrideEpisodeAcl)
244           throws NotFoundException, SeriesException {
245     if (StringUtils.isEmpty(seriesId)) {
246       throw new IllegalArgumentException("Series ID parameter must not be null or empty.");
247     }
248     if (accessControl == null) {
249       throw new IllegalArgumentException("ACL parameter must not be null");
250     }
251     if (needsUpdate(seriesId, accessControl) || overrideEpisodeAcl) {
252       logger.debug("Updating ACL of series {}", seriesId);
253       boolean updated;
254 
255       try {
256         updated = persistence.storeSeriesAccessControl(seriesId, accessControl);
257         //update Elasticsearch index
258         updateSeriesAclInIndex(seriesId, accessControl);
259         // still sent for other asynchronous updates
260         triggerEventHandlers(SeriesItem.updateAcl(seriesId, accessControl, overrideEpisodeAcl));
261       } catch (SeriesServiceDatabaseException e) {
262         logger.error("Could not update series {} with access control rules", seriesId, e);
263         throw new SeriesException(e);
264       }
265       return updated;
266     } else {
267       // todo not the right return code
268       return true;
269     }
270   }
271 
272   /** Check if <code>acl</code> needs to be updated for the given series. */
273   private boolean needsUpdate(String seriesId, AccessControlList acl) throws SeriesException {
274     try {
275       return !equals(persistence.getAccessControlList(seriesId), acl);
276     } catch (NotFoundException e) {
277       return true;
278     } catch (SeriesServiceDatabaseException e) {
279       throw new SeriesException(e);
280     }
281   }
282 
283   /*
284    * (non-Javadoc)
285    *
286    * @see org.opencastproject.series.api.SeriesService#deleteSeries(java.lang.String)
287    */
288   @Override
289   public void deleteSeries(final String seriesID) throws SeriesException, NotFoundException {
290     try {
291       persistence.deleteSeries(seriesID);
292       // remove from Elasticsearch index
293       removeSeriesFromIndex(seriesID);
294       // still sent for other asynchronous updates
295       triggerEventHandlers(SeriesItem.delete(seriesID));
296     } catch (SeriesServiceDatabaseException e1) {
297       logger.error("Could not delete series with id {} from persistence storage", seriesID);
298       throw new SeriesException(e1);
299     }
300   }
301 
302   @Override
303   public DublinCoreCatalog getSeries(String seriesID) throws SeriesException, NotFoundException {
304     try {
305       return persistence.getSeries(seriesID);
306     } catch (SeriesServiceDatabaseException e) {
307       logger.error("Failed to execute search query: {}", e.getMessage());
308       throw new SeriesException(e);
309     }
310   }
311 
312   @Override
313   public List<org.opencastproject.series.api.Series> getAllForAdministrativeRead(
314       Date from,
315       Optional<Date> to,
316       int limit
317   ) throws SeriesException, UnauthorizedException {
318     try {
319       return persistence.getAllForAdministrativeRead(from, to, limit);
320     } catch (SeriesServiceDatabaseException e) {
321       String msg = String.format(
322           "Exception while reading all series in range %s to %s from persistence storage",
323           from,
324           to
325       );
326       throw new SeriesException(msg, e);
327     }
328   }
329 
330   public AccessControlList getSeriesAccessControl(String seriesID) throws NotFoundException, SeriesException {
331     try {
332       return persistence.getAccessControlList(seriesID);
333     } catch (SeriesServiceDatabaseException e) {
334       throw new SeriesException("Failed to execute search query", e);
335     }
336   }
337 
338   @Override
339   public int getSeriesCount() throws SeriesException {
340     try {
341       return persistence.countSeries();
342     } catch (SeriesServiceDatabaseException e) {
343       throw new SeriesException("Failed to execute search query", e);
344     }
345   }
346 
347   @Override
348   public Map<String, String> getSeriesProperties(String seriesID)
349           throws SeriesException, NotFoundException, UnauthorizedException {
350     try {
351       return persistence.getSeriesProperties(seriesID);
352     } catch (SeriesServiceDatabaseException e) {
353       throw new SeriesException(String.format("Failed to get series properties for series with id '%s'", seriesID), e);
354     }
355   }
356 
357   @Override
358   public String getSeriesProperty(String seriesID, String propertyName)
359           throws SeriesException, NotFoundException, UnauthorizedException {
360     try {
361       return persistence.getSeriesProperty(seriesID, propertyName);
362     } catch (SeriesServiceDatabaseException e) {
363       String msg = String.format(
364               "Failed to get series property for series with series id '%s' and property name '%s'",
365               seriesID,
366               propertyName
367       );
368       throw new SeriesException(msg, e);
369     }
370   }
371 
372   @Override
373   public void updateSeriesProperty(String seriesID, String propertyName, String propertyValue)
374           throws SeriesException, NotFoundException, UnauthorizedException {
375     try {
376       persistence.updateSeriesProperty(seriesID, propertyName, propertyValue);
377 
378       // update Elasticsearch index
379       if (propertyName.equals(THEME_PROPERTY_NAME)) {
380         updateThemePropertyInIndex(seriesID, Optional.ofNullable(propertyValue));
381       }
382     } catch (SeriesServiceDatabaseException e) {
383       String msg = String.format(
384               "Failed to get series property for series with series id '%s' and property name '%s' and value '%s'",
385               seriesID,
386               propertyName,
387               propertyValue
388       );
389       throw new SeriesException(msg, e);
390     }
391   }
392 
393   @Override
394   public void deleteSeriesProperty(String seriesID, String propertyName)
395           throws SeriesException, NotFoundException, UnauthorizedException {
396     try {
397       persistence.deleteSeriesProperty(seriesID, propertyName);
398 
399       // update Elasticsearch index
400       if (propertyName.equals(THEME_PROPERTY_NAME)) {
401         updateThemePropertyInIndex(seriesID, Optional.empty());
402       }
403     } catch (SeriesServiceDatabaseException e) {
404       String msg = String.format(
405               "Failed to delete series property for series with series id '%s' and property name '%s'",
406               seriesID,
407               propertyName
408       );
409       throw new SeriesException(msg, e);
410     }
411   }
412 
413   /**
414    * Define equality on DublinCoreCatalogs. Two DublinCores are considered equal if they have the same properties and if
415    * each property has the same values in the same order.
416    * <p>
417    * Note: As long as http://opencast.jira.com/browse/MH-8759 is not fixed, the encoding scheme of values is not
418    * considered.
419    * <p>
420    * Implementation Note: DublinCores should not be compared by their string serialization since the ordering of
421    * properties is not defined and cannot be guaranteed between serializations.
422    */
423   public static boolean equals(DublinCoreCatalog a, DublinCoreCatalog b) {
424     final Map<EName, List<DublinCoreValue>> av = a.getValues();
425     final Map<EName, List<DublinCoreValue>> bv = b.getValues();
426     if (av.size() == bv.size()) {
427       for (Map.Entry<EName, List<DublinCoreValue>> ave : av.entrySet()) {
428         if (!eqListSorted(ave.getValue(), bv.get(ave.getKey()))) {
429           return false;
430         }
431       }
432       return true;
433     } else {
434       return false;
435     }
436   }
437 
438   /**
439    * Define equality on AccessControlLists. Two AccessControlLists are considered equal if they contain the exact same
440    * entries no matter in which order.
441    */
442   public static boolean equals(AccessControlList a, AccessControlList b) {
443     return bothNotNull(a, b) && eqListUnsorted(a.getEntries(), b.getEntries());
444   }
445 
446   @Override
447   public Optional<Map<String, byte[]>> getSeriesElements(String seriesId) throws SeriesException {
448     try {
449       return persistence.getSeriesElements(seriesId);
450     } catch (SeriesServiceDatabaseException e) {
451       throw new SeriesException(e);
452     }
453   }
454 
455   @Override
456   public Optional<byte[]> getSeriesElementData(String seriesId, String type) throws SeriesException {
457     try {
458       return persistence.getSeriesElement(seriesId, type);
459     } catch (SeriesServiceDatabaseException e) {
460       throw new SeriesException(e);
461     }
462   }
463 
464   @Override
465   public boolean updateExtendedMetadata(String seriesId, String type, DublinCoreCatalog dc) throws SeriesException {
466     try {
467       final byte[] data = dc.toXmlString().getBytes("UTF-8");
468       boolean successful = updateSeriesElement(seriesId, type, data);
469       if (successful) {
470         updateSeriesExtendedMetadataInIndex(seriesId, dc, type);
471       }
472       return successful;
473     } catch (IOException e) {
474       throw new SeriesException(e);
475     }
476   }
477 
478   @Override
479   public boolean updateSeriesElement(String seriesId, String type, byte[] data) throws SeriesException {
480     try {
481       boolean elementExisted = persistence.existsSeriesElement(seriesId, type);
482       boolean elementChanged = persistence.storeSeriesElement(seriesId, type, data);
483       if (elementExisted && elementChanged) {
484         triggerEventHandlers(SeriesItem.updateElement(seriesId, type, new String(data, StandardCharsets.UTF_8)));
485       }
486       return elementChanged;
487     } catch (SeriesServiceDatabaseException e) {
488       throw new SeriesException(e);
489     }
490   }
491 
492   @Override
493   public boolean deleteSeriesElement(String seriesId, String type) throws SeriesException {
494     try {
495       if (persistence.existsSeriesElement(seriesId, type)) {
496         boolean successful = persistence.deleteSeriesElement(seriesId, type);
497         if (successful) {
498           removeSeriesExtendedMetadataFromIndex(seriesId, type);
499         }
500         return  successful;
501       } else {
502         return false;
503       }
504     } catch (SeriesServiceDatabaseException e) {
505       throw new SeriesException(e);
506     }
507   }
508 
509   @Override
510   public void repopulate(IndexRebuildService.DataType type) throws IndexRebuildException {
511     try {
512       List<SeriesEntity> databaseSeries = persistence.getAllSeries();
513       final int total = databaseSeries.size();
514       logIndexRebuildBegin(logger, total, "series");
515       int current = 0;
516       int n = 20;
517       var updatedSeriesRange = new ArrayList<Series>();
518 
519       for (SeriesEntity series: databaseSeries) {
520         String seriesId = series.getSeriesId();
521         logger.trace("Adding series {} for organization {} to the {} index.", seriesId,
522                 series.getOrganization(), index.getIndexName());
523         Organization organization = orgDirectory.getOrganization(series.getOrganization());
524         User systemUser = SecurityUtil.createSystemUser(systemUserName, organization);
525         current++;
526 
527         SecurityUtil.runAs(securityService, organization, systemUser,
528               () -> {
529                 var updatedSeriesData = Optional.of(
530                     new Series(seriesId, organization.getId(), series.getCreatorName())
531                 );
532                 try {
533                   DublinCoreCatalog catalog = DublinCoreXmlFormat.read(series.getDublinCoreXML());
534                   updatedSeriesData = getMetadataUpdateFunction(seriesId, catalog, organization.getId())
535                       .apply(updatedSeriesData);
536                 } catch (IOException | ParserConfigurationException | SAXException e) {
537                   logger.error("Could not read dublincore XML of series {}.", seriesId, e);
538                   return;
539                 }
540 
541                 // remove all extended metadata catalogs first so we get rid of old data
542                 updatedSeriesData = getResetExtendedMetadataFunction().apply(updatedSeriesData);
543                 for (Map.Entry<String, byte[]> entry: series.getElements().entrySet()) {
544                   try {
545                     DublinCoreCatalog dc = DublinCoreByteFormat.read(entry.getValue());
546 
547                     updatedSeriesData = getExtendedMetadataUpdateFunction(seriesId, dc, entry.getKey(),
548                             organization.getId()).apply(updatedSeriesData);
549 
550                   } catch (IOException | ParseException | ParserConfigurationException | SAXException e) {
551                     logger.error("Could not parse series element {} of series {} as a dublin core catalog, skipping.",
552                             entry.getKey(), seriesId, e);
553                   }
554                 }
555 
556                 String aclStr = series.getAccessControl();
557                 if (StringUtils.isNotBlank(aclStr)) {
558                   try {
559                     AccessControlList acl = AccessControlParser.parseAcl(aclStr);
560                     updatedSeriesData = getAclUpdateFunction(seriesId, acl, organization.getId())
561                         .apply(updatedSeriesData);
562                   } catch (Exception ex) {
563                     logger.error("Unable to parse ACL of series {}.", seriesId, ex);
564                   }
565                 }
566 
567                 try {
568                   Map<String, String> properties = persistence.getSeriesProperties(seriesId);
569                   updatedSeriesData = getThemePropertyUpdateFunction(seriesId,
570                           Optional.ofNullable(properties.get(THEME_PROPERTY_NAME)), organization.getId())
571                       .apply(updatedSeriesData);
572                 } catch (NotFoundException | SeriesServiceDatabaseException e) {
573                   logger.error("Error reading properties of series {}", seriesId, e);
574                 }
575                 updatedSeriesRange.add(updatedSeriesData.get());
576 
577               });
578 
579         if (updatedSeriesRange.size() >= n || current >= databaseSeries.size()) {
580           // do the actual index update
581           index.bulkSeriesUpdate(updatedSeriesRange);
582           logIndexRebuildProgress(logger, total, current, n);
583           updatedSeriesRange.clear();
584         }
585       }
586     } catch (Exception e) {
587       logIndexRebuildError(logger, e);
588       throw new IndexRebuildException(getService(), e);
589     }
590   }
591 
592   private void triggerEventHandlers(SeriesItem item) {
593     while (updateHandlers.size() != 1) {
594       logger.warn("Expecting 1 handler, but {} are registered.  Waiting 10s then retrying...", updateHandlers.size());
595       try {
596         Thread.sleep(10000L);
597       } catch (InterruptedException e) { /* swallow this, nothing to do */ }
598     }
599     for (SeriesUpdateHandler handler : updateHandlers) {
600       handler.execute(item);
601     }
602   }
603 
604   @Override
605   public IndexRebuildService.Service getService() {
606     return IndexRebuildService.Service.Series;
607   }
608 
609   /**
610    * Remove series from Elasticsearch index.
611    *
612    * @param seriesId
613    *          The series id
614    */
615   private void removeSeriesFromIndex(String seriesId) {
616     String orgId = securityService.getOrganization().getId();
617     logger.debug("Removing series {} from the {} index.", seriesId, index.getIndexName());
618 
619     try {
620       index.deleteSeries(seriesId, orgId);
621       logger.debug("Series {} removed from the {} index.", seriesId, index.getIndexName());
622     } catch (SearchIndexException e) {
623       logger.error("Series {} couldn't be removed from the {} index.", seriesId, index.getIndexName(), e);
624     }
625   }
626 
627   /**
628    * Remove series extended metadata from Elasticsearch index.
629    *
630    * @param seriesId
631    *          The series id
632    * @param type
633    *          The type of extended metadata to remove
634    */
635   private void removeSeriesExtendedMetadataFromIndex(String seriesId, String type) {
636     String orgId = securityService.getOrganization().getId();
637     logger.debug("Removing extended metadata of series {} from the {} index.", seriesId, index.getIndexName());
638 
639     // update series
640     Function<Optional<Series>, Optional<Series>> updateFunction = (Optional<Series> seriesOpt) -> {
641       if (seriesOpt.isPresent()) {
642         Series series = seriesOpt.get();
643         series.removeExtendedMetadata(type);
644         return Optional.of(series);
645       }
646       return Optional.empty();
647     };
648     updateSeriesInIndex(seriesId, orgId, updateFunction);
649   }
650 
651   /**
652    * Update series extended metadata in Elasticsearch index.
653    *
654    * @param seriesId
655    *          The series id
656    * @param dc
657    *          The dublin core catalog
658    * @param type
659    *          The type of dublin core catalog
660    */
661   private void updateSeriesExtendedMetadataInIndex(String seriesId, DublinCoreCatalog dc,
662           String type) {
663     String orgId = securityService.getOrganization().getId();
664     logger.debug("Updating extended metadata of series {} in the {} index.", seriesId, index.getIndexName());
665 
666     // update series
667     Function<Optional<Series>, Optional<Series>> updateFunction =
668             getExtendedMetadataUpdateFunction(seriesId, dc, type, orgId);
669     updateSeriesInIndex(seriesId, orgId, updateFunction);
670   }
671 
672   /**
673    * Get the function to reset the extended metadata for a series in an Elasticsearch index.
674    *
675    * @return the function to do the update
676    */
677   private Function<Optional<Series>, Optional<Series>> getResetExtendedMetadataFunction() {
678     return (Optional<Series> seriesOpt) -> {
679       if (seriesOpt.isPresent()) {
680         Series series = seriesOpt.get();
681         series.resetExtendedMetadata();
682         return Optional.of(series);
683       }
684       return Optional.empty();
685     };
686   }
687 
688   /**
689    * Get the function to update the extended metadata for a series in an Elasticsearch index.
690    *
691    * @param seriesId
692    *          The series id
693    * @param dc
694    *          The dublin core catalog
695    * @param type
696    *          The type of dublin core catalog
697    * @param orgId
698    *          The id of the current organization
699    * @return the function to do the update
700    */
701   private Function<Optional<Series>, Optional<Series>> getExtendedMetadataUpdateFunction(String seriesId,
702           DublinCoreCatalog dc, String type, String orgId) {
703     return (Optional<Series> seriesOpt) -> {
704       Series series = seriesOpt.orElse(new Series(seriesId, orgId));
705 
706       Map<String, List<String>> map = new HashMap();
707       Set<EName> eNames = dc.getProperties();
708       for (EName eName: eNames) {
709         String name = eName.getLocalName();
710         List<String> values = dc.get(eName, DublinCore.LANGUAGE_ANY);
711         map.put(name, values);
712       }
713       series.setExtendedMetadata(type, map);
714       return Optional.of(series);
715     };
716   }
717 
718   /**
719    * Update series metadata in Elasticsearch index.
720    *
721    * @param seriesId
722    *          The series id
723    * @param dc
724    *          The dublin core catalog
725    */
726   private void updateSeriesMetadataInIndex(String seriesId, DublinCoreCatalog dc) {
727     String orgId = securityService.getOrganization().getId();
728     logger.debug("Updating metadata of series {} in the {} index.", seriesId, index.getIndexName());
729 
730     // update series
731     Function<Optional<Series>, Optional<Series>> updateFunction = getMetadataUpdateFunction(seriesId, dc, orgId);
732     updateSeriesInIndex(seriesId, orgId, updateFunction);
733   }
734 
735   /**
736    * Get the function to update the metadata for a series in an Elasticsearch index.
737    *
738    * @param seriesId
739    *          The series id
740    * @param dc
741    *          The dublin core catalog
742    * @param orgId
743    *          The id of the current organization
744    * @return the function to do the update
745    */
746   private Function<Optional<Series>, Optional<Series>> getMetadataUpdateFunction(String seriesId, DublinCoreCatalog dc,
747           String orgId) {
748     return (Optional<Series> seriesOpt) -> {
749       Series series = seriesOpt.orElse(new Series(seriesId, orgId, securityService.getUser().getName()));
750       series.setTitle(dc.getFirst(DublinCoreCatalog.PROPERTY_TITLE));
751       series.setDescription(dc.getFirst(DublinCore.PROPERTY_DESCRIPTION));
752       series.setSubject(dc.getFirst(DublinCore.PROPERTY_SUBJECT));
753       series.setLanguage(dc.getFirst(DublinCoreCatalog.PROPERTY_LANGUAGE));
754       series.setLicense(dc.getFirst(DublinCoreCatalog.PROPERTY_LICENSE));
755       series.setRightsHolder(dc.getFirst(DublinCore.PROPERTY_RIGHTS_HOLDER));
756       String createdDateStr = dc.getFirst(DublinCoreCatalog.PROPERTY_CREATED);
757       if (createdDateStr != null) {
758         series.setCreatedDateTime(EncodingSchemeUtils.decodeDate(createdDateStr));
759       }
760       series.setPublishers(dc.get(DublinCore.PROPERTY_PUBLISHER, DublinCore.LANGUAGE_ANY));
761       series.setContributors(dc.get(DublinCore.PROPERTY_CONTRIBUTOR, DublinCore.LANGUAGE_ANY));
762       series.setOrganizers(dc.get(DublinCoreCatalog.PROPERTY_CREATOR, DublinCore.LANGUAGE_ANY));
763       return Optional.of(series);
764     };
765   }
766 
767   /**
768    * Update series acl in Elasticsearch index.
769    *
770    * @param seriesId
771    *          The series id
772    * @param acl
773    *          The acl to update
774    */
775   private void updateSeriesAclInIndex(String seriesId, AccessControlList acl) {
776     String orgId = securityService.getOrganization().getId();
777     logger.debug("Updating ACL of series {} in the {} index.", seriesId, index.getIndexName());
778     Function<Optional<Series>, Optional<Series>> updateFunction = getAclUpdateFunction(seriesId, acl, orgId);
779     updateSeriesInIndex(seriesId, orgId, updateFunction);
780   }
781 
782   /**
783    * Get the function to update the acl for a series in an Elasticsearch index.
784    *
785    * @param seriesId
786    *          The series id
787    * @param acl
788    *          The acl to update
789    * @param orgId
790    *          The id of the current organization
791    * @return the function to do the update
792    */
793   private Function<Optional<Series>, Optional<Series>> getAclUpdateFunction(String seriesId, AccessControlList acl,
794           String orgId) {
795     return (Optional<Series> seriesOpt) -> {
796       Series series = seriesOpt.orElse(new Series(seriesId, orgId));
797 
798       List<ManagedAcl> acls = aclServiceFactory.serviceFor(securityService.getOrganization()).getAcls();
799       Option<ManagedAcl> managedAcl = AccessInformationUtil.matchAcls(acls, acl);
800       if (managedAcl.isSome()) {
801         series.setManagedAcl(managedAcl.get().getName());
802       }
803 
804       series.setAccessPolicy(AccessControlParser.toJsonSilent(acl));
805       return Optional.of(series);
806     };
807   }
808 
809   /**
810    * Update series theme property in an Elasticsearch index.
811    *
812    * @param seriesId
813    *          The series id
814    * @param propertyValueOpt
815    *          The value of the property (optional)
816    */
817   private void updateThemePropertyInIndex(String seriesId, Optional<String> propertyValueOpt) {
818     String orgId = securityService.getOrganization().getId();
819     logger.debug("Updating theme property of series {} in the {} index.", seriesId, index.getIndexName());
820     Function<Optional<Series>, Optional<Series>> updateFunction =
821             getThemePropertyUpdateFunction(seriesId, propertyValueOpt, orgId);
822     updateSeriesInIndex(seriesId, orgId, updateFunction);
823   }
824 
825   /**
826    * Get the function to update the theme property for a series in an Elasticsearch index.
827    *
828    * @param seriesId
829    *          The series id
830    * @param propertyValueOpt
831    *          The value of the property (optional)
832    * @param orgId
833    *          The id of the current organization
834    * @return the function to do the update
835    */
836   private Function<Optional<Series>, Optional<Series>> getThemePropertyUpdateFunction(String seriesId,
837           Optional<String> propertyValueOpt, String orgId) {
838     return (Optional<Series> seriesOpt) -> {
839       Series series = seriesOpt.orElse(new Series(seriesId, orgId));
840       if (propertyValueOpt.isPresent()) {
841         series.setTheme(Long.valueOf(propertyValueOpt.get()));
842       } else {
843         series.setTheme(null);
844       }
845       return Optional.of(series);
846     };
847   }
848 
849   /**
850    * Update a series in an Elasticsearch index.
851    *
852    * @param seriesId
853    *          The series id
854    * @param updateFunctions
855    *          The function(s) to do the actual updating
856    * @param orgId
857    *          The id of the current organization
858    * @return the updated series (optional)
859    */
860   @SafeVarargs
861   private  Optional<Series> updateSeriesInIndex(String seriesId, String orgId,
862           Function<Optional<Series>, Optional<Series>>... updateFunctions) {
863     User user = securityService.getUser();
864     Function<Optional<Series>, Optional<Series>> updateFunction = Arrays.stream(updateFunctions)
865             .reduce(Function.identity(), Function::andThen);
866 
867     try {
868       Optional<Series> seriesOpt = index.addOrUpdateSeries(seriesId, updateFunction, orgId, user);
869       logger.debug("Series {} updated in the {} index", seriesId, index.getIndexName());
870       return seriesOpt;
871     } catch (SearchIndexException e) {
872       logger.error("Series {} couldn't be updated in the {} index.", seriesId, index.getIndexName(), e);
873       return Optional.empty();
874     }
875   }
876 }