View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.assetmanager.impl;
22  
23  import static java.lang.String.format;
24  import static org.opencastproject.mediapackage.MediaPackageSupport.Filters.hasNoChecksum;
25  import static org.opencastproject.mediapackage.MediaPackageSupport.Filters.isNotPublication;
26  import static org.opencastproject.mediapackage.MediaPackageSupport.getFileName;
27  import static org.opencastproject.metadata.dublincore.CatalogUIAdapter.ORGANIZATION_WILDCARD;
28  import static org.opencastproject.security.api.SecurityConstants.GLOBAL_ADMIN_ROLE;
29  import static org.opencastproject.security.api.SecurityConstants.GLOBAL_CAPTURE_AGENT_ROLE;
30  import static org.opencastproject.security.util.SecurityUtil.getEpisodeRoleId;
31  import static org.opencastproject.util.data.functions.Misc.chuck;
32  
33  import org.opencastproject.assetmanager.api.Asset;
34  import org.opencastproject.assetmanager.api.AssetId;
35  import org.opencastproject.assetmanager.api.AssetManager;
36  import org.opencastproject.assetmanager.api.AssetManagerException;
37  import org.opencastproject.assetmanager.api.Availability;
38  import org.opencastproject.assetmanager.api.Property;
39  import org.opencastproject.assetmanager.api.PropertyId;
40  import org.opencastproject.assetmanager.api.Snapshot;
41  import org.opencastproject.assetmanager.api.Value;
42  import org.opencastproject.assetmanager.api.Version;
43  import org.opencastproject.assetmanager.api.storage.AssetStore;
44  import org.opencastproject.assetmanager.api.storage.DeletionSelector;
45  import org.opencastproject.assetmanager.api.storage.RemoteAssetStore;
46  import org.opencastproject.assetmanager.api.storage.Source;
47  import org.opencastproject.assetmanager.api.storage.StoragePath;
48  import org.opencastproject.assetmanager.impl.persistence.Database;
49  import org.opencastproject.assetmanager.impl.persistence.SnapshotDto;
50  import org.opencastproject.authorization.xacml.manager.api.AclServiceFactory;
51  import org.opencastproject.authorization.xacml.manager.api.ManagedAcl;
52  import org.opencastproject.authorization.xacml.manager.util.AccessInformationUtil;
53  import org.opencastproject.db.DBSessionFactory;
54  import org.opencastproject.elasticsearch.api.SearchIndexException;
55  import org.opencastproject.elasticsearch.index.ElasticsearchIndex;
56  import org.opencastproject.elasticsearch.index.objects.event.Event;
57  import org.opencastproject.elasticsearch.index.objects.event.EventIndexUtils;
58  import org.opencastproject.elasticsearch.index.rebuild.AbstractIndexProducer;
59  import org.opencastproject.elasticsearch.index.rebuild.IndexProducer;
60  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildException;
61  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService;
62  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService.DataType;
63  import org.opencastproject.mediapackage.Catalog;
64  import org.opencastproject.mediapackage.MediaPackage;
65  import org.opencastproject.mediapackage.MediaPackageElement;
66  import org.opencastproject.mediapackage.MediaPackageElements;
67  import org.opencastproject.mediapackage.MediaPackageParser;
68  import org.opencastproject.mediapackage.MediaPackageSupport;
69  import org.opencastproject.message.broker.api.assetmanager.AssetManagerItem;
70  import org.opencastproject.message.broker.api.update.AssetManagerUpdateHandler;
71  import org.opencastproject.metadata.dublincore.DublinCores;
72  import org.opencastproject.metadata.dublincore.EventCatalogUIAdapter;
73  import org.opencastproject.security.api.AccessControlEntry;
74  import org.opencastproject.security.api.AccessControlList;
75  import org.opencastproject.security.api.AccessControlParser;
76  import org.opencastproject.security.api.AuthorizationService;
77  import org.opencastproject.security.api.DefaultOrganization;
78  import org.opencastproject.security.api.Organization;
79  import org.opencastproject.security.api.OrganizationDirectoryService;
80  import org.opencastproject.security.api.Role;
81  import org.opencastproject.security.api.SecurityService;
82  import org.opencastproject.security.api.UnauthorizedException;
83  import org.opencastproject.security.api.User;
84  import org.opencastproject.security.util.SecurityUtil;
85  import org.opencastproject.util.Checksum;
86  import org.opencastproject.util.ChecksumType;
87  import org.opencastproject.util.MimeTypes;
88  import org.opencastproject.util.NotFoundException;
89  import org.opencastproject.util.RequireUtil;
90  import org.opencastproject.workspace.api.Workspace;
91  
92  import com.google.common.collect.Sets;
93  
94  import org.apache.commons.io.FileUtils;
95  import org.apache.commons.io.IOUtils;
96  import org.apache.commons.lang3.BooleanUtils;
97  import org.apache.commons.lang3.StringUtils;
98  import org.osgi.service.component.ComponentContext;
99  import org.osgi.service.component.annotations.Activate;
100 import org.osgi.service.component.annotations.Component;
101 import org.osgi.service.component.annotations.Reference;
102 import org.osgi.service.component.annotations.ReferenceCardinality;
103 import org.osgi.service.component.annotations.ReferencePolicy;
104 import org.slf4j.Logger;
105 import org.slf4j.LoggerFactory;
106 
107 import java.io.File;
108 import java.io.IOException;
109 import java.io.InputStream;
110 import java.net.URI;
111 import java.security.NoSuchAlgorithmException;
112 import java.util.ArrayList;
113 import java.util.Arrays;
114 import java.util.Collection;
115 import java.util.Collections;
116 import java.util.Date;
117 import java.util.HashMap;
118 import java.util.LinkedHashMap;
119 import java.util.List;
120 import java.util.Map;
121 import java.util.Objects;
122 import java.util.Optional;
123 import java.util.Set;
124 import java.util.UUID;
125 import java.util.concurrent.CopyOnWriteArrayList;
126 import java.util.function.Function;
127 import java.util.function.Predicate;
128 import java.util.stream.Collectors;
129 
130 import javax.persistence.EntityManagerFactory;
131 
132 /**
133  * The Asset Manager implementation.
134  */
135 @Component(
136     property = {
137         "service.description=Opencast Asset Manager"
138     },
139     immediate = true,
140     service = { AssetManager.class, IndexProducer.class }
141 )
142 public class AssetManagerImpl extends AbstractIndexProducer implements AssetManager {
143 
144   private static final Logger logger = LoggerFactory.getLogger(AssetManagerImpl.class);
145 
146   private static final int PAGE_SIZE = 1000;
147 
148   enum AdminRole {
149     GLOBAL, ORGANIZATION, NONE
150   }
151 
152   public static final String WRITE_ACTION = "write";
153   public static final String READ_ACTION = "read";
154   public static final String SECURITY_NAMESPACE = "org.opencastproject.assetmanager.security";
155 
156   private static final int EXPEXTED_HANDLERS_COUNT = 2;
157 
158   private static final String MANIFEST_DEFAULT_NAME = "manifest";
159 
160   private CopyOnWriteArrayList<AssetManagerUpdateHandler> handlers = new CopyOnWriteArrayList<>();
161 
162   private SecurityService securityService;
163   private AuthorizationService authorizationService;
164   private OrganizationDirectoryService orgDir;
165   private Workspace workspace;
166   private AssetStore assetStore;
167   private HttpAssetProvider httpAssetProvider;
168   private String systemUserName;
169   private Database db;
170   private DBSessionFactory dbSessionFactory;
171   private EntityManagerFactory emf;
172   private AclServiceFactory aclServiceFactory;
173   private ElasticsearchIndex index;
174 
175   // careful: org key can be wildcard!
176   private Map<String, List<EventCatalogUIAdapter>> extendedEventCatalogUIAdapters = new HashMap<>();
177 
178   // Settings for role filter
179   private boolean includeAPIRoles;
180   private boolean includeCARoles;
181   private boolean includeUIRoles;
182 
183 
184   public static final Set<MediaPackageElement.Type> MOVABLE_TYPES = Sets.newHashSet(
185           MediaPackageElement.Type.Attachment,
186           MediaPackageElement.Type.Catalog,
187           MediaPackageElement.Type.Track
188   );
189 
190   private final HashMap<String, RemoteAssetStore> remoteStores = new LinkedHashMap<>();
191 
192   /**
193    * OSGi callback.
194    */
195   @Activate
196   public synchronized void activate(ComponentContext cc) {
197     logger.info("Activating AssetManager.");
198     db = new Database(dbSessionFactory.createSession(emf));
199     db.setHttpAssetProvider(getHttpAssetProvider());
200     systemUserName = SecurityUtil.getSystemUserName(cc);
201 
202     includeAPIRoles = BooleanUtils.toBoolean(Objects.toString(cc.getProperties().get("includeAPIRoles"), null));
203     includeCARoles = BooleanUtils.toBoolean(Objects.toString(cc.getProperties().get("includeCARoles"), null));
204     includeUIRoles = BooleanUtils.toBoolean(Objects.toString(cc.getProperties().get("includeUIRoles"), null));
205   }
206 
207   /**
208    * OSGi dependencies
209    */
210 
211   @Reference(target = "(osgi.unit.name=org.opencastproject.assetmanager.impl)")
212   public void setEntityManagerFactory(EntityManagerFactory emf) {
213     this.emf = emf;
214   }
215 
216   @Reference
217   public void setDBSessionFactory(DBSessionFactory dbSessionFactory) {
218     this.dbSessionFactory = dbSessionFactory;
219   }
220 
221   @Reference
222   public void setSecurityService(SecurityService securityService) {
223     this.securityService = securityService;
224   }
225 
226   @Reference
227   public void setAuthorizationService(AuthorizationService authorizationService) {
228     this.authorizationService = authorizationService;
229   }
230 
231   @Reference
232   public void setOrgDir(OrganizationDirectoryService orgDir) {
233     this.orgDir = orgDir;
234   }
235 
236   @Reference
237   public void setWorkspace(Workspace workspace) {
238     this.workspace = workspace;
239   }
240 
241   @Reference
242   public void setAssetStore(AssetStore assetStore) {
243     this.assetStore = assetStore;
244   }
245 
246   @Reference(
247       cardinality = ReferenceCardinality.MULTIPLE,
248       policy = ReferencePolicy.DYNAMIC,
249       unbind = "removeRemoteAssetStore"
250   )
251   public synchronized void addRemoteAssetStore(RemoteAssetStore assetStore) {
252     remoteStores.put(assetStore.getStoreType(), assetStore);
253   }
254 
255   public void removeRemoteAssetStore(RemoteAssetStore store) {
256     remoteStores.remove(store.getStoreType());
257   }
258 
259   @Reference
260   public void setHttpAssetProvider(HttpAssetProvider httpAssetProvider) {
261     this.httpAssetProvider = httpAssetProvider;
262   }
263 
264   @Reference
265   public void setAclServiceFactory(AclServiceFactory aclServiceFactory) {
266     this.aclServiceFactory = aclServiceFactory;
267   }
268 
269   @Reference
270   public void setIndex(ElasticsearchIndex index) {
271     this.index = index;
272   }
273 
274   @Reference(
275       cardinality = ReferenceCardinality.MULTIPLE,
276       policy = ReferencePolicy.DYNAMIC,
277       unbind = "removeEventHandler"
278   )
279 
280   public void addEventHandler(AssetManagerUpdateHandler handler) {
281     this.handlers.add(handler);
282   }
283 
284   public void removeEventHandler(AssetManagerUpdateHandler handler) {
285     this.handlers.remove(handler);
286   }
287 
288   @Reference(cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC,
289       target = "(common-metadata=false)")
290   public synchronized void addCatalogUIAdapter(EventCatalogUIAdapter catalogUIAdapter) {
291     List<EventCatalogUIAdapter> list = extendedEventCatalogUIAdapters.computeIfAbsent(
292             catalogUIAdapter.getOrganization(), k -> new ArrayList<>());
293     list.add(catalogUIAdapter);
294   }
295 
296   public synchronized void removeCatalogUIAdapter(EventCatalogUIAdapter catalogUIAdapter) {
297     if (extendedEventCatalogUIAdapters.containsKey(catalogUIAdapter.getOrganization())) {
298       extendedEventCatalogUIAdapters.get(catalogUIAdapter.getOrganization()).remove(catalogUIAdapter);
299     }
300   }
301 
302   /**
303    * AssetManager implementation
304    */
305 
306   @Override
307   public Optional<MediaPackage> getMediaPackage(String mediaPackageId) {
308     String orgId = securityService.getOrganization().getId();
309     switch (isAdmin()) {
310       case GLOBAL:
311         return getDatabase().getMediaPackage(mediaPackageId);
312       default:
313         if (isAuthorized(mediaPackageId, READ_ACTION)) {
314           return getDatabase().getMediaPackage(mediaPackageId, orgId);
315         }
316         return Optional.empty();
317     }
318   }
319 
320   @Override
321   public List<Snapshot> getLatestSnapshots(Collection mediaPackageIds) {
322     String orgId = securityService.getOrganization().getId();
323     switch (isAdmin()) {
324       case GLOBAL:
325         return getDatabase().getLatestSnapshotsByMediaPackageIds(mediaPackageIds, null);
326       default:
327         mediaPackageIds = isAuthorized(mediaPackageIds.stream().toList(), READ_ACTION);
328         return getDatabase().getLatestSnapshotsByMediaPackageIds(mediaPackageIds, orgId);
329     }
330   }
331 
332   @Override
333   public Optional<Snapshot> getLatestSnapshot(String mediaPackageId) {
334     String orgId = securityService.getOrganization().getId();
335     switch (isAdmin()) {
336       case GLOBAL:
337         return getDatabase().getLatestSnapshot(mediaPackageId);
338       default:
339         if (isAuthorized(mediaPackageId, READ_ACTION)) {
340           return getDatabase().getLatestSnapshot(mediaPackageId, orgId);
341         }
342         return Optional.empty();
343     }
344   }
345 
346   @Override
347   public Optional<Asset> getAsset(Version version, String mpId, String mpElementId) {
348     if (isAuthorized(mpId, READ_ACTION)) {
349       // try to fetch the asset
350       var asset = getDatabase().getAsset(RuntimeTypes.convert(version), mpId, mpElementId);
351       if (asset.isPresent()) {
352         var storageId = getSnapshotStorageLocation(version, mpId);
353         if (storageId.isPresent()) {
354           var store = getAssetStore(storageId.get());
355           if (store.isPresent()) {
356             var assetStream = store.get().get(StoragePath.mk(
357                 asset.get().getOrganizationId(),
358                 mpId,
359                 version,
360                 mpElementId
361             ));
362             if (assetStream.isPresent()) {
363 
364               Checksum checksum = null;
365               try {
366                 checksum = Checksum.fromString(asset.get().getAssetDto().getChecksum());
367               } catch (NoSuchAlgorithmException e) {
368                 logger.warn("Invalid checksum for asset {} of media package {}", mpElementId, mpId, e);
369               }
370 
371               final Asset a = new AssetImpl(
372                       AssetId.mk(version, mpId, mpElementId),
373                       assetStream.get(),
374                       asset.get().getAssetDto().getMimeType(),
375                       asset.get().getAssetDto().getSize(),
376                       asset.get().getStorageId(),
377                       asset.get().getAvailability(),
378                       checksum);
379               return Optional.of(a);
380             }
381           }
382         }
383       }
384       return Optional.empty();
385     }
386     throw new RuntimeException(new UnauthorizedException(
387             format("Not allowed to read assets of snapshot %s, version=%s", mpId, version)
388     ));
389   }
390 
391   @Override
392   public Optional<AssetStore> getAssetStore(String storeId) {
393     if (assetStore.getStoreType().equals(storeId)) {
394       return Optional.of(assetStore);
395     } else {
396       if (remoteStores.containsKey(storeId)) {
397         return Optional.of(remoteStores.get(storeId));
398       } else {
399         return Optional.empty();
400       }
401     }
402   }
403 
404   @Override
405   public AssetStore getLocalAssetStore() {
406     return assetStore;
407   }
408 
409   @Override
410   public List<AssetStore> getRemoteAssetStores() {
411     return new ArrayList<>(remoteStores.values());
412   }
413 
414   /** Snapshots */
415 
416   @Override
417   public boolean snapshotExists(final String mediaPackageId) {
418     return getDatabase().snapshotExists(mediaPackageId);
419   }
420 
421   @Override
422   public boolean snapshotExists(final String mediaPackageId, final String organization) {
423     return getDatabase().snapshotExists(mediaPackageId, organization);
424   }
425 
426   @Override
427   public Snapshot takeSnapshot(MediaPackage mp) {
428     return takeSnapshot(null, mp);
429   }
430 
431   @Override
432   public Snapshot takeSnapshot(String owner, MediaPackage mp) {
433 
434     final String mediaPackageId = mp.getIdentifier().toString();
435     final boolean firstSnapshot = !snapshotExists(mediaPackageId);
436 
437     // Allow this if:
438     //  - no previous snapshot exists
439     //  - the user has write access to the previous snapshot
440     if (firstSnapshot) {
441       // if it's the first snapshot, ensure that old, leftover properties are removed
442       deleteProperties(mediaPackageId);
443     }
444     if (firstSnapshot || isAuthorized(mediaPackageId, WRITE_ACTION)) {
445       final Snapshot snapshot;
446       if (owner == null) {
447         snapshot = takeSnapshotInternal(mp);
448       } else {
449         snapshot = takeSnapshotInternal(owner, mp);
450       }
451 
452       final AccessControlList acl = authorizationService.getActiveAcl(mp).getA();
453       // store acl as properties
454       // Drop old ACL rules
455       deleteProperties(mediaPackageId, SECURITY_NAMESPACE);
456       // Set new ACL rules
457       for (final AccessControlEntry ace : acl.getEntries()) {
458         getDatabase().saveProperty(Property.mk(PropertyId.mk(mediaPackageId, SECURITY_NAMESPACE,
459                 mkPropertyName(ace.getRole(), ace.getAction())), Value.mk(ace.isAllow())));
460       }
461 
462       updateEventInIndex(snapshot);
463 
464       logger.info("Trigger update handlers for snapshot {}, version {}",
465               snapshot.getMediaPackage().getIdentifier(), snapshot.getVersion());
466       fireEventHandlers(mkTakeSnapshotMessage(snapshot));
467 
468       return snapshot;
469     }
470     throw new RuntimeException(new UnauthorizedException(
471         "Not allowed to take snapshot of media package " + mediaPackageId));
472   }
473 
474   private Snapshot takeSnapshotInternal(MediaPackage mediaPackage) {
475     final String mediaPackageId = mediaPackage.getIdentifier().toString();
476     String orgId = securityService.getOrganization().getId();
477     Optional<Snapshot> snapshot;
478     switch (isAdmin()) {
479       case GLOBAL:
480         snapshot = getDatabase().getLatestSnapshot(mediaPackageId);
481         break;
482       default:
483         if (isAuthorized(mediaPackageId, WRITE_ACTION)) {
484           snapshot = getDatabase().getLatestSnapshot(mediaPackageId, orgId);
485         } else {
486           snapshot = Optional.empty();
487         }
488         break;
489     }
490     if (snapshot.isPresent()) {
491       return takeSnapshotInternal(snapshot.get().getOwner(), mediaPackage);
492     }
493     return takeSnapshotInternal(DEFAULT_OWNER, mediaPackage);
494   }
495 
496   private Snapshot takeSnapshotInternal(final String owner, final MediaPackage mp) {
497     try {
498       Snapshot archived = addInternal(owner, MediaPackageSupport.copy(mp)).toSnapshot();
499       return getHttpAssetProvider().prepareForDelivery(archived);
500     } catch (Exception e) {
501       logger.error("An error occurred", e);
502       throw unwrapExceptionUntil(AssetManagerException.class, e).orElse(new AssetManagerException(e));
503     }
504   }
505 
506   /**
507    * Create a {@link AssetManagerItem.TakeSnapshot} message.
508    * <p>
509    * Do not call outside of a security context.
510    */
511   private AssetManagerItem.TakeSnapshot mkTakeSnapshotMessage(Snapshot snapshot) {
512     final MediaPackage mp = snapshot.getMediaPackage();
513 
514     long version;
515     try {
516       version = Long.parseLong(snapshot.getVersion().toString());
517     } catch (NumberFormatException e) {
518       // The index requires a version to be a long value.
519       // Since the asset manager default implementation uses long values that should be not a problem.
520       // However, a decent exception message is helpful if a different implementation of the asset manager
521       // is used.
522       throw new RuntimeException("The current implementation of the index requires versions being of type 'long'.");
523     }
524 
525     return AssetManagerItem.add(workspace, mp, authorizationService.getActiveAcl(mp).getA(),
526             version, snapshot.getArchivalDate());
527   }
528 
529   @Override
530   public void triggerIndexUpdate(String mediaPackageId) throws NotFoundException, UnauthorizedException {
531 
532     if (!securityService.getUser().hasRole("ROLE_ADMIN")) {
533       throw new UnauthorizedException("Only global administrators may trigger manual event updates.");
534     }
535     Optional<Snapshot> snapshot = getDatabase().getLatestSnapshot(mediaPackageId);
536 
537     if (snapshot.isEmpty()) {
538       throw new NotFoundException("No event with ID `" + mediaPackageId + "`");
539     }
540 
541     // Update event index with latest snapshot
542     updateEventInIndex(snapshot.get());
543   }
544 
545   /**
546    * Update the event in the Elasticsearch index.
547    *
548    * @param snapshot
549    *         The newest snapshot of the event to update
550    */
551   private void updateEventInIndex(Snapshot snapshot) {
552     final String eventId = snapshot.getMediaPackage().getIdentifier().toString();
553     final String orgId = securityService.getOrganization().getId();
554     final User user = securityService.getUser();
555 
556     logger.debug("Updating event {} in the {} index.", eventId, index.getIndexName());
557     Function<Optional<Event>, Optional<Event>> updateFunction = getEventUpdateFunction(snapshot, orgId, user);
558 
559     try {
560       index.addOrUpdateEvent(eventId, updateFunction, orgId, user);
561       logger.debug("Event {} updated in the {} index.", eventId, index.getIndexName());
562     } catch (SearchIndexException e) {
563       logger.error("Error updating the event {} in the {} index.", eventId, index.getIndexName(), e);
564     }
565   }
566 
567   /**
568    * Remove the event from the Elasticsearch index
569    *
570    * @param eventId
571    *         The id of the event to remove
572    */
573   private void removeArchivedVersionFromIndex(String eventId) {
574     final String orgId = securityService.getOrganization().getId();
575     final User user = securityService.getUser();
576     logger.debug("Received AssetManager delete episode message {}", eventId);
577 
578     Function<Optional<Event>, Optional<Event>> updateFunction = (Optional<Event> eventOpt) -> {
579       if (eventOpt.isEmpty()) {
580         logger.warn("Event {} not found for deletion", eventId);
581         return Optional.empty();
582       }
583       Event event = eventOpt.get();
584       event.setArchiveVersion(null);
585       return Optional.of(event);
586     };
587 
588     try {
589       index.addOrUpdateEvent(eventId, updateFunction, orgId, user);
590       logger.debug("Event {} removed from the {} index", eventId, index.getIndexName());
591     } catch (SearchIndexException e) {
592       logger.error("Error deleting the event {} from the {} index.", eventId, index.getIndexName(), e);
593     }
594   }
595 
596   @Override
597   public List<Snapshot> getSnapshotsById(final String mpId) {
598     RequireUtil.requireNotBlank(mpId, "mpId");
599 
600     String orgId = securityService.getOrganization().getId();
601 
602     switch (isAdmin()) {
603       case GLOBAL:
604         return getDatabase().getSnapshots(mpId);
605       default:
606         if (isAuthorized(mpId, READ_ACTION)) {
607           return getDatabase().getSnapshots(mpId, orgId);
608         }
609         return new ArrayList<>();
610     }
611   }
612 
613   @Override
614   public List<Snapshot> getSnapshotsByIdOrderedByVersion(String mpId, boolean asc) {
615     RequireUtil.requireNotBlank(mpId, "mpId");
616 
617     String order;
618     if (asc) {
619       order = "ASC";
620     } else {
621       order = "DESC";
622     }
623 
624     String orgId = securityService.getOrganization().getId();
625     switch (isAdmin()) {
626       case GLOBAL:
627         return getDatabase().getSnapshots(mpId, null, order);
628       default:
629         if (isAuthorized(mpId, READ_ACTION)) {
630           return getDatabase().getSnapshots(mpId, orgId);
631         }
632         return new ArrayList<>();
633     }
634   }
635 
636   @Override
637   public List<Snapshot> getSnapshotsByIdAndVersion(final String mpId, final Version version) {
638     RequireUtil.requireNotBlank(mpId, "mpId");
639     RequireUtil.notNull(version, "version");
640 
641     String orgId = securityService.getOrganization().getId();
642     // TODO: Simplify the version class?
643     Long v = Long.parseLong(version.toString());
644     switch (isAdmin()) {
645       case GLOBAL:
646         return getDatabase().getSnapshotsByMpIdAndVersion(mpId, v, null);
647       default:
648         if (isAuthorized(mpId, READ_ACTION)) {
649           return getDatabase().getSnapshotsByMpIdAndVersion(mpId, v, orgId);
650         }
651         return new ArrayList<>();
652     }
653   }
654 
655   @Override
656   public List<Snapshot> getSnapshotsByDateOrderedById(Date start, Date end) {
657     RequireUtil.notNull(start, "start");
658     RequireUtil.notNull(end, "end");
659 
660     String orgId = securityService.getOrganization().getId();
661     switch (isAdmin()) {
662       case GLOBAL:
663         return getDatabase().getSnapshotsByDateOrderByMpId(start, end, null);
664       case ORGANIZATION:
665         return getDatabase().getSnapshotsByDateOrderByMpId(start, end, orgId);
666       default:
667         List<Snapshot> snapshots = new ArrayList<>();
668         List<Snapshot> snaps = getDatabase().getSnapshotsByDateOrderByMpId(start, end, orgId);
669         for (int i = 0; i < snaps.size(); i++) {
670           if (isAuthorized(snaps.get(i).getMediaPackage().getIdentifier().toString(), READ_ACTION)) {
671             snapshots.add(snaps.get(i));
672           }
673         }
674         return snapshots;
675     }
676   }
677 
678   @Override
679   public List<Snapshot> getSnapshotsByIdAndDate(final String mpId, final Date start, final Date end) {
680     RequireUtil.requireNotBlank(mpId, "mpId");
681     RequireUtil.notNull(start, "start");
682     RequireUtil.notNull(end, "end");
683 
684     String orgId = securityService.getOrganization().getId();
685     switch (isAdmin()) {
686       case GLOBAL:
687         return getDatabase().getSnapshotsByMpdIdAndDate(mpId, start, end, null);
688       default:
689         if (isAuthorized(mpId, READ_ACTION)) {
690           return getDatabase().getSnapshotsByMpdIdAndDate(mpId, start, end, orgId);
691         }
692         return new ArrayList<>();
693     }
694   }
695 
696   @Override
697   public List<Snapshot> getSnapshotsByIdAndDateOrderedByVersion(String mpId, Date start, Date end, boolean asc) {
698     RequireUtil.requireNotBlank(mpId, "mpId");
699     RequireUtil.notNull(start, "start");
700     RequireUtil.notNull(end, "end");
701 
702     String order;
703     if (asc) {
704       order = "ASC";
705     } else {
706       order = "DESC";
707     }
708 
709     String orgId = securityService.getOrganization().getId();
710     switch (isAdmin()) {
711       case GLOBAL:
712         return getDatabase().getSnapshotsByMpdIdAndDate(mpId, start, end, null, order);
713       default:
714         if (isAuthorized(mpId, READ_ACTION)) {
715           return getDatabase().getSnapshotsByMpdIdAndDate(mpId, start, end, orgId, order);
716         }
717         return new ArrayList<>();
718     }
719   }
720 
721   @Override
722   public List<Snapshot> getLatestSnapshotsBySeriesId(final String seriesId) {
723     RequireUtil.requireNotBlank(seriesId, "seriesId");
724 
725     String orgId = securityService.getOrganization().getId();
726 
727     switch (isAdmin()) {
728       case GLOBAL:
729         return getDatabase().getSnapshotsBySeries(seriesId, null);
730       case ORGANIZATION:
731         return getDatabase().getSnapshotsBySeries(seriesId, orgId);
732       default:
733         List<Snapshot> snapshots = new ArrayList<>();
734         List<Snapshot> snaps = getDatabase().getSnapshotsBySeries(seriesId, orgId);
735         for (int i = 0; i < snaps.size(); i++) {
736           if (isAuthorized(snaps.get(i).getMediaPackage().getIdentifier().toString(), READ_ACTION)) {
737             snapshots.add(snaps.get(i));
738           }
739         }
740         return snapshots;
741     }
742   }
743 
744   @Override
745   public Optional<Snapshot> getSnapshotByMpIdOrgIdAndVersion(String mpId, String orgId, Version version) {
746     return getDatabase().getSnapshot(mpId, orgId, Long.parseLong(version.toString()));
747   }
748 
749   @Override
750   public int deleteSnapshots(String mpId) {
751     String orgId = securityService.getOrganization().getId();
752     int numberOfDeletedSnapshots = 0;
753     switch (isAdmin()) {
754       case GLOBAL:
755         numberOfDeletedSnapshots = getDatabase().deleteSnapshots(mpId, null);
756         break;
757       default:
758         if (isAuthorized(mpId, WRITE_ACTION)) {
759           numberOfDeletedSnapshots = getDatabase().deleteSnapshots(mpId, orgId);
760         }
761         break;
762     }
763 
764     // delete from store
765     if (numberOfDeletedSnapshots > 0) {
766       final DeletionSelector deletionSelector = DeletionSelector.deleteAll(orgId, mpId);
767       getLocalAssetStore().delete(deletionSelector);
768       for (AssetStore as : getRemoteAssetStores()) {
769         as.delete(deletionSelector);
770       }
771     }
772 
773     logger.info("Firing event handlers for deleting event {}", mpId);
774     fireEventHandlers(AssetManagerItem.deleteEpisode(mpId, new Date()));
775     removeArchivedVersionFromIndex(mpId);
776 
777     return numberOfDeletedSnapshots;
778   }
779 
780   @Override
781   public int deleteAllButLatestSnapshot(String mpId) {
782     String orgId = securityService.getOrganization().getId();
783     int numberOfDeletedSnapshots = 0;
784     List<Long> versions = getDatabase().getVersionsByMediaPackage(mpId, null);
785 
786     switch (isAdmin()) {
787       case GLOBAL:
788         numberOfDeletedSnapshots = getDatabase().deleteAllButLatestSnapshot(mpId, null);
789         break;
790       default:
791         if (isAuthorized(mpId, WRITE_ACTION)) {
792           numberOfDeletedSnapshots = getDatabase().deleteAllButLatestSnapshot(mpId, orgId);
793         }
794         break;
795     }
796 
797     // delete from store
798     if (numberOfDeletedSnapshots > 0) {
799       // Skip last version
800       for (int i = 0; i < versions.size() - 1; i++) {
801         final DeletionSelector deletionSelector = DeletionSelector.delete(orgId, mpId,
802             new VersionImpl(versions.get(i)));
803         getLocalAssetStore().delete(deletionSelector);
804         for (AssetStore as : getRemoteAssetStores()) {
805           as.delete(deletionSelector);
806         }
807       }
808     }
809 
810     return numberOfDeletedSnapshots;
811   }
812 
813   @Override
814   public void moveSnapshotsById(final String mpId, final String targetStore) throws NotFoundException {
815     List<Snapshot> snapshots = getSnapshotsById(mpId);
816 
817     if (snapshots.isEmpty()) {
818       throw new NotFoundException("Mediapackage " + mpId + " not found!");
819     }
820 
821     processOperations(snapshots, targetStore);
822   }
823 
824   @Override
825   public void moveSnapshotsByIdAndVersion(final String mpId, final Version version, final String targetStore)
826           throws NotFoundException {
827     List<Snapshot> snapshots = getSnapshotsByIdAndVersion(mpId, version);
828 
829     if (snapshots.isEmpty()) {
830       throw new NotFoundException("Mediapackage " + mpId + "@" + version.toString() + " not found!");
831     }
832 
833     processOperations(snapshots, targetStore);
834   }
835 
836   @Override
837   public void moveSnapshotsByDate(final Date start, final Date end, final String targetStore)
838           throws NotFoundException {
839     String orgId = securityService.getOrganization().getId();
840     List<Snapshot> snapshots = new ArrayList<>();
841     switch (isAdmin()) {
842       case GLOBAL:
843         snapshots = getDatabase().getSnapshotsByNotStorageAndDate(targetStore, start, end, null);
844         break;
845       case ORGANIZATION:
846         snapshots = getDatabase().getSnapshotsByNotStorageAndDate(targetStore, start, end, orgId);
847         break;
848       default:
849         List<Snapshot> snaps = getDatabase().getSnapshotsByNotStorageAndDate(targetStore, start, end, orgId);
850         for (int i = 0; i < snaps.size(); i++) {
851           if (isAuthorized(snaps.get(i).getMediaPackage().getIdentifier().toString(), READ_ACTION)) {
852             snapshots.add(snaps.get(i));
853           }
854         }
855         break;
856     }
857 
858     if (snapshots.isEmpty()) {
859       throw new NotFoundException("No media packages found between " + start + " and " + end);
860     }
861 
862     processOperations(snapshots, targetStore);
863   }
864 
865   @Override
866   public void moveSnapshotsByIdAndDate(final String mpId, final Date start, final Date end, final String targetStore)
867           throws NotFoundException {
868     List<Snapshot> snapshots = getSnapshotsByIdAndDate(mpId, start, end);
869 
870     if (snapshots.isEmpty()) {
871       throw new NotFoundException("No media package with id " + mpId + " found between " + start + " and " + end);
872     }
873 
874     processOperations(snapshots, targetStore);
875   }
876 
877   @Override
878   public void moveSnapshotToStore(final Version version, final String mpId, final String storeId)
879           throws NotFoundException {
880 
881     //Find the snapshot
882     List<Snapshot> snapshots = getSnapshotsByIdAndVersion(mpId, version);
883 
884     if (snapshots.isEmpty()) {
885       throw new NotFoundException("Mediapackage " + mpId + "@" + version.toString() + " not found!");
886     }
887     processOperations(snapshots, storeId);
888   }
889 
890   //Do the actual moving
891   //TODO: Compare this to AssetManagerJobProducer.moveSnapshots. Check if they can be combined.
892   private void processOperations(List<Snapshot> snapshots, final String targetStoreId) {
893     snapshots.forEach(s -> {
894 //      Snapshot s = record.getSnapshot().get();
895       Optional<String> currentStoreId = getSnapshotStorageLocation(s);
896 
897       if (currentStoreId.isEmpty()) {
898         logger.warn("IsNone store ID");
899         return;
900       }
901 
902       //If this snapshot is already stored in the desired store
903       if (currentStoreId.get().equals(targetStoreId)) {
904         //return, since we don't need to move anything
905         return;
906       }
907 
908       AssetStore currentStore;
909       AssetStore targetStore;
910 
911       Optional<AssetStore> optCurrentStore = getAssetStore(currentStoreId.get());
912       Optional<AssetStore> optTargetStore = getAssetStore(targetStoreId);
913 
914       if (!optCurrentStore.isEmpty()) {
915         currentStore = optCurrentStore.get();
916       } else {
917         logger.error("Unknown current store: " + currentStoreId.get());
918         return;
919       }
920       if (!optTargetStore.isEmpty()) {
921         targetStore = optTargetStore.get();
922       } else {
923         logger.error("Unknown target store: " + targetStoreId);
924         return;
925       }
926 
927       //If the content is already local, or is moving from a remote to the local
928       // Returns true if the store id is equal to the local asset store's id
929       String localAssetStoreType = getLocalAssetStore().getStoreType();
930       if (localAssetStoreType.equals(currentStoreId.get()) || localAssetStoreType.equals(targetStoreId)) {
931         logger.debug("Moving {} from {} to {}", s, currentStoreId, targetStoreId);
932 
933         try {
934           copyAssetsToStore(s, targetStore);
935           copyManifest(s, targetStore);
936         } catch (Exception e) {
937           chuck(e);
938         }
939         getDatabase().setStorageLocation(s, targetStoreId);
940         currentStore.delete(DeletionSelector.delete(s.getOrganizationId(),
941                 s.getMediaPackage().getIdentifier().toString(), s.getVersion()
942         ));
943       } else {
944         //Else, the content is *not* local and is going to a *different* remote
945         String intermediateStore = getLocalAssetStore().getStoreType();
946         logger.debug("Moving {} from {} to {}, then to {}",
947                 s, currentStoreId, intermediateStore, targetStoreId);
948         Version version = s.getVersion();
949         String mpId = s.getMediaPackage().getIdentifier().toString();
950         try {
951           moveSnapshotToStore(version, mpId, intermediateStore);
952           moveSnapshotToStore(version, mpId, targetStoreId);
953         } catch (NotFoundException e) {
954           chuck(e);
955         }
956       }
957     });
958   }
959 
960   // Return the asset store ID that is currently storing the snapshot
961   public Optional<String> getSnapshotStorageLocation(final Version version, final String mpId) {
962     List<Snapshot> snapshots = getSnapshotsByIdAndVersion(mpId, version);
963 
964     for (Snapshot snapshot : snapshots) {
965       return Optional.of(snapshot.getStorageId());
966     }
967 
968     logger.error("Mediapackage " + mpId + "@" + version + " not found!");
969     return Optional.empty();
970   }
971 
972   public Optional<String> getSnapshotStorageLocation(final Snapshot snap) {
973     return getSnapshotStorageLocation(snap.getVersion(), snap.getMediaPackage().getIdentifier().toString());
974   }
975 
976   /** Properties */
977 
978   @Override
979   public boolean setProperty(Property property) {
980     final String mpId = property.getId().getMediaPackageId();
981     if (isAuthorized(mpId, WRITE_ACTION)) {
982       return getDatabase().saveProperty(property);
983     }
984     throw new RuntimeException(new UnauthorizedException("Not allowed to set property on episode " + mpId));
985   }
986 
987   @Override
988   public List<Property> selectProperties(final String mediaPackageId, String namespace) {
989     if (isAuthorized(mediaPackageId, READ_ACTION)) {
990       return getDatabase().selectProperties(mediaPackageId, namespace);
991     }
992     throw new RuntimeException(new UnauthorizedException(format(
993         "Not allowed to read properties of event %s", mediaPackageId)));
994   }
995 
996   @Override
997   public int deleteProperties(final String mediaPackageId) {
998     return getDatabase().deleteProperties(mediaPackageId);
999   }
1000 
1001   @Override
1002   public int deleteProperties(final String mediaPackageId, final String namespace) {
1003     return getDatabase().deleteProperties(mediaPackageId, namespace);
1004   }
1005 
1006   @Override
1007   public int deletePropertiesWithCurrentUser(final String mediaPackageId, final String namespace) {
1008     User user = securityService.getUser();
1009     switch (isAdmin()) {
1010       case GLOBAL:
1011         return getDatabase().deleteProperties(mediaPackageId, namespace);
1012       case ORGANIZATION:
1013         Optional<Snapshot> snapshot = getDatabase().getLatestSnapshot(mediaPackageId);
1014         if (snapshot.isPresent() && snapshot.get().getOrganizationId().equals(user.getOrganization().getId())) {
1015           return getDatabase().deleteProperties(mediaPackageId, namespace);
1016         }
1017         return 0;
1018       default:
1019         Optional<MediaPackage> mediaPackage = getMediaPackage(mediaPackageId);
1020         if (mediaPackage.isPresent() && isAuthorized(mediaPackage.get().getIdentifier().toString(), WRITE_ACTION)) {
1021           return getDatabase().deleteProperties(mediaPackageId, namespace);
1022         }
1023         return 0;
1024     }
1025   }
1026 
1027   /** Misc. */
1028 
1029   @Override
1030   public Optional<Version> toVersion(String version) {
1031     try {
1032       return Optional.of(VersionImpl.mk(Long.parseLong(version)));
1033     } catch (NumberFormatException e) {
1034       return Optional.empty();
1035     }
1036   }
1037 
1038   @Override
1039   public long countEvents(final String organization) {
1040     return getDatabase().countEvents(organization);
1041   }
1042 
1043   @Override
1044   public long countSnapshots(final String organization) {
1045     return getDatabase().countSnapshots(organization);
1046   }
1047 
1048   @Override
1049   public long countAssets() {
1050     return getDatabase().countAssets();
1051   }
1052 
1053   @Override
1054   public long countProperties() {
1055     return getDatabase().countProperties();
1056   }
1057 
1058   /**
1059    * AbstractIndexProducer Implementation
1060    */
1061 
1062   @Override
1063   public IndexRebuildService.Service getService() {
1064     return IndexRebuildService.Service.AssetManager;
1065   }
1066 
1067   @Override
1068   public DataType[] getSupportedDataTypes() {
1069     return new DataType[]{ DataType.ALL, DataType.ACL };
1070   }
1071 
1072   @Override
1073   public void repopulate(DataType dataType) throws IndexRebuildException {
1074     final Organization originalOrg = securityService.getOrganization();
1075     final User originalUser = (originalOrg != null ? securityService.getUser() : null);
1076     try {
1077       final Organization defaultOrg = new DefaultOrganization();
1078       final User defaultSystemUser = SecurityUtil.createSystemUser(systemUserName, defaultOrg);
1079       securityService.setOrganization(defaultOrg);
1080       securityService.setUser(defaultSystemUser);
1081 
1082       int offset = 0;
1083       int total = (int) countEvents(null);
1084       int current = 0;
1085       logIndexRebuildBegin(logger, total, "snapshot(s)");
1086       var updatedEventRange = new ArrayList<Event>();
1087       do {
1088         List<Snapshot> snapshots = getDatabase().getSnapshotsForIndexRebuild(offset, PAGE_SIZE);
1089         offset += PAGE_SIZE;
1090         int n = 20;
1091 
1092         final Map<String, List<Snapshot>> byOrg = snapshots.stream()
1093             .collect(Collectors.groupingBy(Snapshot::getOrganizationId));
1094         for (String orgId : byOrg.keySet()) {
1095           final Organization snapshotOrg;
1096           try {
1097             snapshotOrg = orgDir.getOrganization(orgId);
1098             User snapshotSystemUser = SecurityUtil.createSystemUser(systemUserName, snapshotOrg);
1099             securityService.setOrganization(snapshotOrg);
1100             securityService.setUser(snapshotSystemUser);
1101             for (Snapshot snapshot : byOrg.get(orgId)) {
1102               try {
1103                 current++;
1104 
1105                 var updatedEventData = index.getEvent(snapshot.getMediaPackage().getIdentifier().toString(), orgId,
1106                     snapshotSystemUser);
1107                 if (dataType == DataType.ALL) {
1108                   // Reindex everything (default)
1109                   updatedEventData = getEventUpdateFunction(snapshot, orgId, snapshotSystemUser)
1110                       .apply(updatedEventData);
1111                 } else if (dataType == DataType.ACL) {
1112                   // Only reindex ACLs
1113                   updatedEventData = getEventUpdateFunctionOnlyAcl(snapshot, orgId)
1114                       .apply(updatedEventData);
1115                 } else {
1116                   throw new IndexRebuildException(dataType + " is not a supported data type. "
1117                       + "Accepted values are " + Arrays.toString(getSupportedDataTypes()) + ".");
1118                 }
1119                 updatedEventRange.add(updatedEventData.get());
1120 
1121                 if (updatedEventRange.size() >= n || current >= total) {
1122                   index.bulkEventUpdate(updatedEventRange);
1123                   logIndexRebuildProgress(logger, total, current, n);
1124                   updatedEventRange.clear();
1125                 }
1126               } catch (Throwable t) {
1127                 logSkippingElement(logger, "event", snapshot.getMediaPackage().getIdentifier().toString(),
1128                     snapshotOrg, t);
1129               }
1130             }
1131           } catch (Throwable t) {
1132             logIndexRebuildError(logger, t, originalOrg);
1133             throw new IndexRebuildException(getService(), originalOrg, t);
1134           } finally {
1135             securityService.setOrganization(defaultOrg);
1136             securityService.setUser(defaultSystemUser);
1137           }
1138         }
1139       } while (offset < total);
1140     } finally {
1141       securityService.setOrganization(originalOrg);
1142       securityService.setUser(originalUser);
1143     }
1144   }
1145 
1146   /**
1147    * Used for testing
1148    */
1149   public void setAvailability(Version version, String mpId, Availability availability) {
1150     if (isAuthorized(mpId, WRITE_ACTION)) {
1151       getDatabase().setAvailability(RuntimeTypes.convert(version), mpId, availability);
1152     } else {
1153       throw new RuntimeException(new UnauthorizedException("Not allowed to set availability of episode " + mpId));
1154     }
1155   }
1156 
1157   public void setDatabase(Database database) {
1158     this.db = database;
1159   }
1160 
1161   public Database getDatabase() {
1162     return db;
1163   }
1164 
1165   public HttpAssetProvider getHttpAssetProvider() {
1166     return httpAssetProvider;
1167   }
1168 
1169   /*
1170    * Security handling
1171    */
1172   /** Check authorization based on the given predicate. */
1173   private boolean isAuthorized(final String mediaPackageId, final String action) {
1174     switch (isAdmin()) {
1175       case GLOBAL:
1176         // grant general access
1177         logger.debug("Access granted since user is global admin");
1178         return true;
1179       case ORGANIZATION:
1180         // ensure that the requested assets belong to this organization
1181         logger.debug("User is organization admin. Checking organization. Checking organization ID of asset.");
1182         return snapshotExists(mediaPackageId, securityService.getOrganization().getId());
1183       default:
1184         // check organization
1185         logger.debug("Non admin user. Checking organization.");
1186         final String org = securityService.getOrganization().getId();
1187         if (!snapshotExists(mediaPackageId, org)) {
1188           return false;
1189         }
1190         // check episode role id
1191         User user = securityService.getUser();
1192         if (user.hasRole(getEpisodeRoleId(mediaPackageId, action))) {
1193           return true;
1194         }
1195         // check acl rules
1196         logger.debug("Non admin user. Checking ACL rules.");
1197         // TODO: Replace this custom ACL check with the general check from the auth service
1198         //   Warning: For now this will cause many difficult to track down bugs and is thus hardly possible
1199         // return authorizationService.hasPermission(getDatabase().getMediaPackage(mediaPackageId).get(), action);
1200         final List<String> roles = user.getRoles().parallelStream()
1201                 .filter(roleFilter)
1202                 .map((role) -> mkPropertyName(role.getName(), action))
1203                 .collect(Collectors.toList());
1204         return getDatabase().selectProperties(mediaPackageId, SECURITY_NAMESPACE).parallelStream()
1205                 .map(p -> p.getId().getName())
1206                 .filter(p -> p.endsWith(action))
1207                 .anyMatch(p -> roles.stream().anyMatch(r -> r.equals(p)));
1208     }
1209   }
1210 
1211   private List<String> isAuthorized(final List<String> mediaPackageIds, final String action) {
1212     return mediaPackageIds.stream()
1213         .filter(id -> isAuthorized(id, action))
1214         .collect(Collectors.toList());
1215   }
1216 
1217   private AdminRole isAdmin() {
1218     final User user = securityService.getUser();
1219     if (user.hasRole(GLOBAL_ADMIN_ROLE)) {
1220       return AdminRole.GLOBAL;
1221     } else if (user.hasRole(securityService.getOrganization().getAdminRole())
1222             || user.hasRole(GLOBAL_CAPTURE_AGENT_ROLE)) {
1223       // In this context, we treat capture agents the same way as organization admins, allowing them access so that
1224       // they can ingest new media without requiring them to be explicitly specified in the ACLs.
1225       return AdminRole.ORGANIZATION;
1226     } else {
1227       return AdminRole.NONE;
1228     }
1229   }
1230 
1231   private String mkPropertyName(String role, String action) {
1232     return role + " | " + action;
1233   }
1234 
1235   /**
1236    * Configurable filter for roles
1237    */
1238   private final java.util.function.Predicate<Role> roleFilter = (role) -> {
1239     final String name = role.getName();
1240     return (includeAPIRoles || !name.startsWith("ROLE_API_"))
1241             && (includeCARoles  || !name.startsWith("ROLE_CAPTURE_AGENT_"))
1242             && (includeUIRoles  || !name.startsWith("ROLE_UI_"));
1243   };
1244 
1245   /*
1246    * Utility
1247    */
1248 
1249   /** Move the assets for a snapshot to the target store */
1250   private void copyAssetsToStore(Snapshot snap, AssetStore store) {
1251     final String mpId = snap.getMediaPackage().getIdentifier().toString();
1252     final String orgId = snap.getOrganizationId();
1253     final Version version = snap.getVersion();
1254     final String prettyMpId = mpId + "@v" + version;
1255     logger.debug("Moving assets for snapshot {} to store {}", prettyMpId, store.getStoreType());
1256     for (final MediaPackageElement e : snap.getMediaPackage().getElements()) {
1257       if (!MOVABLE_TYPES.contains(e.getElementType())) {
1258         logger.debug("Skipping {} because type is {}", e.getIdentifier(), e.getElementType());
1259         continue;
1260       }
1261       logger.debug("Moving {} to store {}", e.getIdentifier(), store.getStoreType());
1262       final StoragePath storagePath = StoragePath.mk(orgId, mpId, version, e.getIdentifier());
1263       if (store.contains(storagePath)) {
1264         logger.debug("Element {} (version {}) is already in store {} so skipping it", e.getIdentifier(),
1265                 version, store.getStoreType());
1266         continue;
1267       }
1268 
1269       // find asset in versions & stores
1270       final Optional<StoragePath> existingAssetOpt =
1271           getDatabase()
1272           .findAssetByChecksumAndStoreAndOrg(e.getChecksum().toString(), store.getStoreType(), orgId)
1273           .map(dto -> StoragePath.mk(
1274               dto.getOrganizationId(),
1275               dto.getMediaPackageId(),
1276               dto.getVersion(),
1277               dto.getAssetDto().getMediaPackageElementId()
1278           ));
1279 
1280       if (existingAssetOpt.isPresent()) {
1281         final StoragePath existingAsset = existingAssetOpt.get();
1282         logger.debug("Content of asset {} with checksum {} already exists in {}",
1283                 existingAsset.getMediaPackageElementId(), e.getChecksum(), store.getStoreType());
1284         if (!store.copy(existingAsset, storagePath)) {
1285           throw new AssetManagerException(format(
1286                   "An asset with checksum %s has already been archived but trying to copy or link asset %s to it "
1287                           + "failed",
1288                   e.getChecksum(),
1289                   existingAsset
1290           ));
1291         }
1292       } else {
1293         final Optional<Long> size = e.getSize() > 0 ? Optional.of(e.getSize()) : Optional.empty();
1294         store.put(storagePath, Source.mk(e.getURI(), size, Optional.ofNullable(e.getMimeType())));
1295       }
1296       getDatabase().setAssetStorageLocation(VersionImpl.mk(version), mpId, e.getIdentifier(), store.getStoreType());
1297     }
1298   }
1299 
1300   private void copyManifest(Snapshot snap, AssetStore targetStore) throws IOException, NotFoundException {
1301     final String mpId = snap.getMediaPackage().getIdentifier().toString();
1302     final String orgId = snap.getOrganizationId();
1303     final Version version = snap.getVersion();
1304 
1305     AssetStore currentStore = getAssetStore(snap.getStorageId()).get();
1306     Optional<String> manifestOpt = findManifestBaseName(snap, MANIFEST_DEFAULT_NAME, currentStore);
1307     if (manifestOpt.isEmpty()) {
1308       return; // Nothing to do, already moved to long-term storage
1309     }
1310 
1311     // Copy the manifest file
1312     String manifestBaseName = manifestOpt.get();
1313     StoragePath pathToManifest = new StoragePath(orgId, mpId, version, manifestBaseName);
1314 
1315     // Already copied?
1316     if (!targetStore.contains(pathToManifest)) {
1317       Optional<InputStream> inputStreamOpt;
1318       InputStream inputStream = null;
1319       String manifestFileName = null;
1320       try {
1321         inputStreamOpt = currentStore.get(pathToManifest);
1322         if (inputStreamOpt.isEmpty()) { // This should never happen because it has been tested before
1323           throw new NotFoundException(
1324                   String.format("Unexpected error. Manifest %s not found in current asset store", manifestBaseName));
1325         }
1326 
1327         inputStream = inputStreamOpt.get();
1328         manifestFileName = UUID.randomUUID() + ".xml";
1329         URI manifestTmpUri = workspace.putInCollection("archive", manifestFileName, inputStream);
1330         targetStore.put(pathToManifest, Source.mk(manifestTmpUri, Optional.empty(), Optional.of(MimeTypes.XML)));
1331       } finally {
1332         IOUtils.closeQuietly(inputStream);
1333         try {
1334           // Make sure to clean up the temporary file
1335           workspace.deleteFromCollection("archive", manifestFileName);
1336         } catch (NotFoundException e) {
1337           // This is OK, we are deleting it anyway
1338         } catch (IOException e) {
1339           // This usually happens when the collection directory cannot be deleted
1340           // because another process is running at the same time and wrote a file there
1341           // after it was tested but before it was actually deleted. We will consider this ok.
1342           // Does the error message mention the manifest file name?
1343           if (e.getMessage().contains(manifestFileName)) {
1344             logger.warn("The manifest file {} didn't get deleted from the archive collection",
1345                     manifestBaseName, e);
1346           }
1347           // Else the error is related to the file-archive collection, which is fine
1348         }
1349       }
1350     }
1351   }
1352 
1353   Optional<String> findManifestBaseName(Snapshot snap, String manifestName, AssetStore store) {
1354     StoragePath path = new StoragePath(snap.getOrganizationId(), snap.getMediaPackage().getIdentifier().toString(),
1355             snap.getVersion(), manifestName);
1356     // If manifest_.xml, etc not found, return previous name (copied from the EpsiodeServiceImpl logic)
1357     if (!store.contains(path)) {
1358       // If first call, manifest is not found, which probably means it has already been moved
1359       if (MANIFEST_DEFAULT_NAME.equals(manifestName)) {
1360         return Optional.empty(); // No manifest found in current store
1361       } else {
1362         return Optional.of(manifestName.substring(0, manifestName.length() - 1));
1363       }
1364     }
1365     // This is the same logic as when building the manifest name: manifest, manifest_, manifest__, etc
1366     return findManifestBaseName(snap, manifestName + "_", store);
1367   }
1368 
1369   /* -------------------------------------------------------------------------------------------------------------- */
1370 
1371   /**
1372    * Make sure each of the elements has a checksum.
1373    */
1374   void calcChecksumsForMediaPackageElements(PartialMediaPackage pmp) {
1375     pmp.getElements().stream()
1376         .filter(hasNoChecksum)
1377         .forEach(mpe -> {
1378           File file = null;
1379           try {
1380             logger.trace("Calculate checksum for {}", mpe.getURI());
1381             file = workspace.get(mpe.getURI(), true);
1382             mpe.setChecksum(Checksum.create(ChecksumType.DEFAULT_TYPE, file));
1383           } catch (IOException | NotFoundException e) {
1384             throw new AssetManagerException(String.format(
1385                 "Cannot calculate checksum for media package element %s",
1386                 mpe.getURI()
1387             ), e);
1388           } finally {
1389             if (file != null) {
1390               FileUtils.deleteQuietly(file);
1391             }
1392           }
1393         });
1394   }
1395 
1396   /** Mutates mp and its elements, so make sure to work on a copy. */
1397   private SnapshotDto addInternal(String owner, final MediaPackage mp) throws Exception {
1398     final Date now = new Date();
1399     // claim a new version for the media package
1400     final String mpId = mp.getIdentifier().toString();
1401     final VersionImpl version = getDatabase().claimVersion(mpId);
1402     logger.info("Creating new version {} of media package {}", version, mp);
1403     final PartialMediaPackage pmp = assetsOnly(mp);
1404     // make sure they have a checksum
1405     calcChecksumsForMediaPackageElements(pmp);
1406     // download and archive elements
1407     storeAssets(pmp, version);
1408     // store mediapackage in db
1409     final SnapshotDto snapshotDto;
1410     try {
1411       // rewrite URIs for archival
1412       for (MediaPackageElement mpe : pmp.getElements()) {
1413         String fileName = getFileName(mpe).orElse("unknown");
1414         URI archiveUri = new URI(
1415             "urn",
1416             "matterhorn:" + mpId + ":" + version + ":" + mpe.getIdentifier() + ":" + fileName,
1417             null
1418         );
1419         mpe.setURI(archiveUri);
1420       }
1421 
1422       String currentOrgId = securityService.getOrganization().getId();
1423       snapshotDto = getDatabase().saveSnapshot(
1424               currentOrgId, pmp, now, version,
1425               Availability.ONLINE, getLocalAssetStore().getStoreType(), owner
1426       );
1427     } catch (AssetManagerException e) {
1428       logger.error("Could not take snapshot {}", mpId, e);
1429       throw new AssetManagerException(e);
1430     }
1431     // save manifest to element store
1432     // this is done at the end after the media package element ids have been rewritten to neutral URNs
1433     storeManifest(pmp, version);
1434     return snapshotDto;
1435   }
1436 
1437   /**
1438    * Store all elements of <code>pmp</code> under the given version.
1439    */
1440   private void storeAssets(final PartialMediaPackage pmp, final Version version) {
1441     final String mpId = pmp.getMediaPackage().getIdentifier().toString();
1442     final String orgId = securityService.getOrganization().getId();
1443     for (final MediaPackageElement e : pmp.getElements()) {
1444       logger.debug("Archiving {} {} {}", e.getFlavor(), e.getMimeType(), e.getURI());
1445       final StoragePath storagePath = StoragePath.mk(orgId, mpId, version, e.getIdentifier());
1446       // find asset in versions
1447       final Optional<StoragePath> existingAssetOpt = getDatabase()
1448           .findAssetByChecksumAndStoreAndOrg(e.getChecksum().toString(), getLocalAssetStore().getStoreType(), orgId)
1449           .map(dto -> StoragePath.mk(
1450                   dto.getOrganizationId(),
1451                   dto.getMediaPackageId(),
1452                   dto.getVersion(),
1453                   dto.getAssetDto().getMediaPackageElementId()));
1454 
1455       if (existingAssetOpt.isPresent()) {
1456         final StoragePath existingAsset = existingAssetOpt.get();
1457         logger.debug("Content of asset {} with checksum {} has been archived before",
1458                 existingAsset.getMediaPackageElementId(), e.getChecksum());
1459         if (!getLocalAssetStore().copy(existingAsset, storagePath)) {
1460           throw new AssetManagerException(format(
1461                   "An asset with checksum %s has already been archived but trying to copy or link asset %s to it "
1462                           + "failed",
1463                   e.getChecksum(),
1464                   existingAsset
1465           ));
1466         }
1467       } else {
1468         final Optional<Long> size = e.getSize() > 0 ? Optional.of(e.getSize()) : Optional.empty();
1469         getLocalAssetStore().put(storagePath, Source.mk(e.getURI(), size, Optional.ofNullable(e.getMimeType())));
1470       }
1471     }
1472   }
1473 
1474   private void storeManifest(final PartialMediaPackage pmp, final Version version) throws Exception {
1475     final String mpId = pmp.getMediaPackage().getIdentifier().toString();
1476     final String orgId = securityService.getOrganization().getId();
1477     // store the manifest.xml
1478     // TODO make use of checksums
1479     logger.debug("Archiving manifest of media package {} version {}", mpId, version);
1480     // temporarily save the manifest XML into the workspace to
1481     // Fix file not found exception when several snapshots are taken at the same time
1482     final String manifestFileName = format("manifest_%s_%s.xml", pmp.getMediaPackage().getIdentifier(), version);
1483     final URI manifestTmpUri = workspace.putInCollection(
1484             "archive",
1485             manifestFileName,
1486             IOUtils.toInputStream(MediaPackageParser.getAsXml(pmp.getMediaPackage()), "UTF-8"));
1487     try {
1488       getLocalAssetStore().put(
1489               StoragePath.mk(orgId, mpId, version, manifestAssetId(pmp, "manifest")),
1490               Source.mk(manifestTmpUri, Optional.empty(), Optional.of(MimeTypes.XML)));
1491     } finally {
1492       // make sure to clean up the temporary file
1493       workspace.deleteFromCollection("archive", manifestFileName);
1494     }
1495   }
1496 
1497   /**
1498    * Create a unique id for the manifest xml. This is to avoid an id collision
1499    * in the rare case that the media package contains an XML element with the id
1500    * used for the manifest. A UUID could also be used but this is far less
1501    * readable.
1502    *
1503    * @param seedId
1504    *          the id to start with
1505    */
1506   private String manifestAssetId(PartialMediaPackage pmp, String seedId) {
1507     for (MediaPackageElement element : pmp.getElements()) {
1508       if (seedId.equals(element.getIdentifier())) {
1509         return manifestAssetId(pmp, seedId + "_");
1510       }
1511     }
1512     return seedId;
1513   }
1514 
1515   /* --------------------------------------------------------------------------------------------------------------- */
1516 
1517   /**
1518    * Walk up the stacktrace to find a cause of type <code>type</code>. Return none if no such
1519    * type can be found.
1520    */
1521   static <A extends Throwable> Optional<A> unwrapExceptionUntil(Class<A> type, Throwable e) {
1522     if (e == null) {
1523       return Optional.empty();
1524     } else if (type.isAssignableFrom(e.getClass())) {
1525       return Optional.of((A) e);
1526     } else {
1527       return unwrapExceptionUntil(type, e.getCause());
1528     }
1529   }
1530 
1531   /**
1532    * Return a partial media package filtering assets. Assets are elements the archive is going to manager, i.e. all
1533    * non-publication elements.
1534    */
1535   static PartialMediaPackage assetsOnly(MediaPackage mp) {
1536     Predicate<MediaPackageElement> isAsset = isNotPublication;
1537     return PartialMediaPackage.mk(mp, isAsset);
1538   }
1539 
1540   /**
1541    * Extract the file name from a media package elements URN.
1542    *
1543    * @return the file name or none if it could not be determined
1544    */
1545   public static Optional<String> getFileNameFromUrn(MediaPackageElement mpe) {
1546     Optional<URI> uri = Optional.ofNullable(mpe.getURI());
1547     if (uri.isPresent() && "urn".equals(uri.get().getScheme())) {
1548       String[] tmp = uri.get().toString().split(":");
1549       if (tmp.length < 1) {
1550         return Optional.empty();
1551       }
1552       return Optional.of(tmp[tmp.length - 1]);
1553     }
1554     return Optional.empty();
1555   }
1556 
1557   /**
1558    * Rewrite URIs of all asset elements of a snapshot's media package.
1559    * This method does not mutate anything.
1560    */
1561   public static Snapshot rewriteUris(Snapshot snapshot, Function<MediaPackageElement, URI> uriCreator) {
1562     final MediaPackage mpCopy = MediaPackageSupport.copy(snapshot.getMediaPackage());
1563     for (final MediaPackageElement mpe : assetsOnly(mpCopy).getElements()) {
1564       mpe.setURI(uriCreator.apply(mpe));
1565     }
1566     return new SnapshotImpl(
1567             snapshot.getVersion(),
1568             snapshot.getOrganizationId(),
1569             snapshot.getArchivalDate(),
1570             snapshot.getAvailability(),
1571             snapshot.getStorageId(),
1572             snapshot.getOwner(),
1573             mpCopy);
1574   }
1575 
1576   public void fireEventHandlers(AssetManagerItem item) {
1577     while (handlers.size() != EXPEXTED_HANDLERS_COUNT) {
1578       logger.warn("Expecting {} handlers, but {} are registered.  Waiting 10s then retrying...",
1579           EXPEXTED_HANDLERS_COUNT, handlers.size());
1580       try {
1581         Thread.sleep(10000L); // 10 seconds
1582       } catch (InterruptedException e) { /* swallow this, nothing to do */ }
1583     }
1584     for (AssetManagerUpdateHandler handler : handlers) {
1585       handler.execute(item);
1586     }
1587   }
1588 
1589   /**
1590    * Get the function to update a commented event in the Elasticsearch index.
1591    *
1592    * @return the function to do the update
1593    */
1594   private Function<Optional<Event>, Optional<Event>> getEventUpdateFunction(Snapshot snapshot,
1595           String orgId, User user) {
1596     return (Optional<Event> eventOpt) -> {
1597       MediaPackage mp = snapshot.getMediaPackage();
1598       String eventId = mp.getIdentifier().toString();
1599       Event event = eventOpt.orElse(new Event(eventId, orgId));
1600 
1601       event = updateAclInEvent(event, mp);
1602 
1603       event.setArchiveVersion(Long.parseLong(snapshot.getVersion().toString()));
1604       if (StringUtils.isBlank(event.getCreator())) {
1605         event.setCreator(securityService.getUser().getName());
1606       }
1607       EventIndexUtils.updateEvent(event, mp);
1608 
1609       for (Catalog catalog: mp.getCatalogs(MediaPackageElements.EPISODE)) {
1610         try (InputStream in = workspace.read(catalog.getURI())) {
1611           EventIndexUtils.updateEvent(event, DublinCores.read(in));
1612         } catch (IOException | NotFoundException e) {
1613           throw new IllegalStateException(String.format("Unable to load dublin core catalog for event '%s'",
1614                   mp.getIdentifier()), e);
1615         }
1616       }
1617 
1618       // extended metadata
1619       event.resetExtendedMetadata();  // getting rid of old data
1620 
1621       List<EventCatalogUIAdapter> orgAdapters = extendedEventCatalogUIAdapters.getOrDefault(orgId, new ArrayList<>());
1622       orgAdapters.addAll(extendedEventCatalogUIAdapters.getOrDefault(ORGANIZATION_WILDCARD, Collections.emptyList()));
1623       for (EventCatalogUIAdapter extendedCatalogUIAdapter : orgAdapters) {
1624         for (Catalog catalog: mp.getCatalogs(extendedCatalogUIAdapter.getFlavor())) {
1625           try (InputStream in = workspace.read(catalog.getURI())) {
1626             EventIndexUtils.updateEventExtendedMetadata(event, DublinCores.read(in),
1627                     extendedCatalogUIAdapter.getFlavor());
1628           } catch (IOException | NotFoundException e) {
1629             throw new IllegalStateException(String.format("Unable to load extended dublin core catalog '%s' for event "
1630                     + "'%s'", catalog.getFlavor(), mp.getIdentifier()), e);
1631           }
1632         }
1633       }
1634 
1635       // Update series name if not already done
1636       try {
1637         EventIndexUtils.updateSeriesName(event, orgId, user, index);
1638       } catch (SearchIndexException e) {
1639         logger.error("Error updating the series name of the event {} in the {} index.", eventId, index.getIndexName(),
1640                 e);
1641       }
1642       return Optional.of(event);
1643     };
1644   }
1645 
1646   private Function<Optional<Event>, Optional<Event>> getEventUpdateFunctionOnlyAcl(Snapshot snapshot,
1647       String orgId) {
1648     return (Optional<Event> eventOpt) -> {
1649       MediaPackage mp = snapshot.getMediaPackage();
1650       String eventId = mp.getIdentifier().toString();
1651       Event event = eventOpt.orElse(new Event(eventId, orgId));
1652 
1653       event = updateAclInEvent(event, mp);
1654 
1655       return Optional.of(event);
1656     };
1657   }
1658 
1659   private Event updateAclInEvent(Event event, MediaPackage mp) {
1660     AccessControlList acl = authorizationService.getActiveAcl(mp).getA();
1661     List<ManagedAcl> acls = aclServiceFactory.serviceFor(securityService.getOrganization()).getAcls();
1662 
1663     Optional<ManagedAcl> managedAcl = AccessInformationUtil.matchAcls(acls, acl);
1664     if (managedAcl.isPresent()) {
1665       event.setManagedAcl(managedAcl.get().getName());
1666     }
1667     event.setAccessPolicy(AccessControlParser.toJsonSilent(acl));
1668 
1669     return event;
1670   }
1671 }