View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.assetmanager.impl;
22  
23  import static java.lang.String.format;
24  import static org.opencastproject.mediapackage.MediaPackageSupport.Filters.hasNoChecksum;
25  import static org.opencastproject.mediapackage.MediaPackageSupport.Filters.isNotPublication;
26  import static org.opencastproject.mediapackage.MediaPackageSupport.getFileName;
27  import static org.opencastproject.security.api.SecurityConstants.GLOBAL_ADMIN_ROLE;
28  import static org.opencastproject.security.api.SecurityConstants.GLOBAL_CAPTURE_AGENT_ROLE;
29  import static org.opencastproject.security.util.SecurityUtil.getEpisodeRoleId;
30  import static org.opencastproject.util.data.functions.Misc.chuck;
31  
32  import org.opencastproject.assetmanager.api.Asset;
33  import org.opencastproject.assetmanager.api.AssetId;
34  import org.opencastproject.assetmanager.api.AssetManager;
35  import org.opencastproject.assetmanager.api.AssetManagerException;
36  import org.opencastproject.assetmanager.api.Availability;
37  import org.opencastproject.assetmanager.api.Property;
38  import org.opencastproject.assetmanager.api.PropertyId;
39  import org.opencastproject.assetmanager.api.Snapshot;
40  import org.opencastproject.assetmanager.api.Value;
41  import org.opencastproject.assetmanager.api.Version;
42  import org.opencastproject.assetmanager.api.storage.AssetStore;
43  import org.opencastproject.assetmanager.api.storage.DeletionSelector;
44  import org.opencastproject.assetmanager.api.storage.RemoteAssetStore;
45  import org.opencastproject.assetmanager.api.storage.Source;
46  import org.opencastproject.assetmanager.api.storage.StoragePath;
47  import org.opencastproject.assetmanager.impl.persistence.Database;
48  import org.opencastproject.assetmanager.impl.persistence.SnapshotDto;
49  import org.opencastproject.authorization.xacml.manager.api.AclServiceFactory;
50  import org.opencastproject.authorization.xacml.manager.api.ManagedAcl;
51  import org.opencastproject.authorization.xacml.manager.util.AccessInformationUtil;
52  import org.opencastproject.db.DBSessionFactory;
53  import org.opencastproject.elasticsearch.api.SearchIndexException;
54  import org.opencastproject.elasticsearch.index.ElasticsearchIndex;
55  import org.opencastproject.elasticsearch.index.objects.event.Event;
56  import org.opencastproject.elasticsearch.index.objects.event.EventIndexUtils;
57  import org.opencastproject.elasticsearch.index.rebuild.AbstractIndexProducer;
58  import org.opencastproject.elasticsearch.index.rebuild.IndexProducer;
59  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildException;
60  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService;
61  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService.DataType;
62  import org.opencastproject.mediapackage.Catalog;
63  import org.opencastproject.mediapackage.MediaPackage;
64  import org.opencastproject.mediapackage.MediaPackageElement;
65  import org.opencastproject.mediapackage.MediaPackageElements;
66  import org.opencastproject.mediapackage.MediaPackageParser;
67  import org.opencastproject.mediapackage.MediaPackageSupport;
68  import org.opencastproject.message.broker.api.assetmanager.AssetManagerItem;
69  import org.opencastproject.metadata.dublincore.DublinCores;
70  import org.opencastproject.metadata.dublincore.EventCatalogUIAdapter;
71  import org.opencastproject.security.api.AccessControlEntry;
72  import org.opencastproject.security.api.AccessControlList;
73  import org.opencastproject.security.api.AccessControlParser;
74  import org.opencastproject.security.api.AuthorizationService;
75  import org.opencastproject.security.api.DefaultOrganization;
76  import org.opencastproject.security.api.Organization;
77  import org.opencastproject.security.api.OrganizationDirectoryService;
78  import org.opencastproject.security.api.Role;
79  import org.opencastproject.security.api.SecurityService;
80  import org.opencastproject.security.api.UnauthorizedException;
81  import org.opencastproject.security.api.User;
82  import org.opencastproject.security.util.SecurityUtil;
83  import org.opencastproject.util.Checksum;
84  import org.opencastproject.util.ChecksumType;
85  import org.opencastproject.util.MimeTypes;
86  import org.opencastproject.util.NotFoundException;
87  import org.opencastproject.util.RequireUtil;
88  import org.opencastproject.workspace.api.Workspace;
89  
90  import com.google.common.collect.Sets;
91  
92  import org.apache.commons.io.FileUtils;
93  import org.apache.commons.io.IOUtils;
94  import org.apache.commons.lang3.BooleanUtils;
95  import org.apache.commons.lang3.StringUtils;
96  import org.osgi.service.component.ComponentContext;
97  import org.osgi.service.component.annotations.Activate;
98  import org.osgi.service.component.annotations.Component;
99  import org.osgi.service.component.annotations.Reference;
100 import org.osgi.service.component.annotations.ReferenceCardinality;
101 import org.osgi.service.component.annotations.ReferencePolicy;
102 import org.slf4j.Logger;
103 import org.slf4j.LoggerFactory;
104 
105 import java.io.File;
106 import java.io.IOException;
107 import java.io.InputStream;
108 import java.net.URI;
109 import java.security.NoSuchAlgorithmException;
110 import java.util.ArrayList;
111 import java.util.Arrays;
112 import java.util.Collection;
113 import java.util.Collections;
114 import java.util.Date;
115 import java.util.HashMap;
116 import java.util.LinkedHashMap;
117 import java.util.List;
118 import java.util.Map;
119 import java.util.Objects;
120 import java.util.Optional;
121 import java.util.Set;
122 import java.util.UUID;
123 import java.util.function.Function;
124 import java.util.function.Predicate;
125 import java.util.stream.Collectors;
126 
127 import javax.persistence.EntityManagerFactory;
128 
129 /**
130  * The Asset Manager implementation.
131  */
132 @Component(
133     property = {
134         "service.description=Opencast Asset Manager"
135     },
136     immediate = true,
137     service = { AssetManager.class, IndexProducer.class }
138 )
139 public class AssetManagerImpl extends AbstractIndexProducer implements AssetManager {
140 
141   private static final Logger logger = LoggerFactory.getLogger(AssetManagerImpl.class);
142 
143   private static final int PAGE_SIZE = 1000;
144 
145   enum AdminRole {
146     GLOBAL, ORGANIZATION, NONE
147   }
148 
149   public static final String WRITE_ACTION = "write";
150   public static final String READ_ACTION = "read";
151   public static final String SECURITY_NAMESPACE = "org.opencastproject.assetmanager.security";
152 
153   private static final String MANIFEST_DEFAULT_NAME = "manifest";
154 
155   private SecurityService securityService;
156   private AuthorizationService authorizationService;
157   private OrganizationDirectoryService orgDir;
158   private Workspace workspace;
159   private AssetStore assetStore;
160   private HttpAssetProvider httpAssetProvider;
161   private String systemUserName;
162   private Database db;
163   private DBSessionFactory dbSessionFactory;
164   private EntityManagerFactory emf;
165   private AclServiceFactory aclServiceFactory;
166   private ElasticsearchIndex index;
167   private Map<String, List<EventCatalogUIAdapter>> extendedEventCatalogUIAdapters = new HashMap<>();
168 
169   // Settings for role filter
170   private boolean includeAPIRoles;
171   private boolean includeCARoles;
172   private boolean includeUIRoles;
173 
174 
175   public static final Set<MediaPackageElement.Type> MOVABLE_TYPES = Sets.newHashSet(
176           MediaPackageElement.Type.Attachment,
177           MediaPackageElement.Type.Catalog,
178           MediaPackageElement.Type.Track
179   );
180 
181   private final HashMap<String, RemoteAssetStore> remoteStores = new LinkedHashMap<>();
182 
183   /**
184    * OSGi callback.
185    */
186   @Activate
187   public synchronized void activate(ComponentContext cc) {
188     logger.info("Activating AssetManager.");
189     db = new Database(dbSessionFactory.createSession(emf));
190     db.setHttpAssetProvider(getHttpAssetProvider());
191     systemUserName = SecurityUtil.getSystemUserName(cc);
192 
193     includeAPIRoles = BooleanUtils.toBoolean(Objects.toString(cc.getProperties().get("includeAPIRoles"), null));
194     includeCARoles = BooleanUtils.toBoolean(Objects.toString(cc.getProperties().get("includeCARoles"), null));
195     includeUIRoles = BooleanUtils.toBoolean(Objects.toString(cc.getProperties().get("includeUIRoles"), null));
196   }
197 
198   /**
199    * OSGi dependencies
200    */
201 
202   @Reference(target = "(osgi.unit.name=org.opencastproject.assetmanager.impl)")
203   public void setEntityManagerFactory(EntityManagerFactory emf) {
204     this.emf = emf;
205   }
206 
207   @Reference
208   public void setDBSessionFactory(DBSessionFactory dbSessionFactory) {
209     this.dbSessionFactory = dbSessionFactory;
210   }
211 
212   @Reference
213   public void setSecurityService(SecurityService securityService) {
214     this.securityService = securityService;
215   }
216 
217   @Reference
218   public void setAuthorizationService(AuthorizationService authorizationService) {
219     this.authorizationService = authorizationService;
220   }
221 
222   @Reference
223   public void setOrgDir(OrganizationDirectoryService orgDir) {
224     this.orgDir = orgDir;
225   }
226 
227   @Reference
228   public void setWorkspace(Workspace workspace) {
229     this.workspace = workspace;
230   }
231 
232   @Reference
233   public void setAssetStore(AssetStore assetStore) {
234     this.assetStore = assetStore;
235   }
236 
237   @Reference(
238       cardinality = ReferenceCardinality.MULTIPLE,
239       policy = ReferencePolicy.DYNAMIC,
240       unbind = "removeRemoteAssetStore"
241   )
242   public synchronized void addRemoteAssetStore(RemoteAssetStore assetStore) {
243     remoteStores.put(assetStore.getStoreType(), assetStore);
244   }
245 
246   public void removeRemoteAssetStore(RemoteAssetStore store) {
247     remoteStores.remove(store.getStoreType());
248   }
249 
250   @Reference
251   public void setHttpAssetProvider(HttpAssetProvider httpAssetProvider) {
252     this.httpAssetProvider = httpAssetProvider;
253   }
254 
255   @Reference
256   public void setAclServiceFactory(AclServiceFactory aclServiceFactory) {
257     this.aclServiceFactory = aclServiceFactory;
258   }
259 
260   @Reference
261   public void setIndex(ElasticsearchIndex index) {
262     this.index = index;
263   }
264 
265   @Reference(cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC,
266           target = "(common-metadata=false)")
267   public synchronized void addCatalogUIAdapter(EventCatalogUIAdapter catalogUIAdapter) {
268     List<EventCatalogUIAdapter> list = extendedEventCatalogUIAdapters.computeIfAbsent(
269             catalogUIAdapter.getOrganization(), k -> new ArrayList());
270     list.add(catalogUIAdapter);
271   }
272 
273   public synchronized void removeCatalogUIAdapter(EventCatalogUIAdapter catalogUIAdapter) {
274     if (extendedEventCatalogUIAdapters.containsKey(catalogUIAdapter.getOrganization())) {
275       extendedEventCatalogUIAdapters.get(catalogUIAdapter.getOrganization()).remove(catalogUIAdapter);
276     }
277   }
278 
279   /**
280    * AssetManager implementation
281    */
282 
283   @Override
284   public Optional<MediaPackage> getMediaPackage(String mediaPackageId) {
285     String orgId = securityService.getOrganization().getId();
286     switch (isAdmin()) {
287       case GLOBAL:
288         return getDatabase().getMediaPackage(mediaPackageId);
289       default:
290         if (isAuthorized(mediaPackageId, READ_ACTION)) {
291           return getDatabase().getMediaPackage(mediaPackageId, orgId);
292         }
293         return Optional.empty();
294     }
295   }
296 
297   @Override
298   public List<Snapshot> getLatestSnapshots(Collection mediaPackageIds) {
299     String orgId = securityService.getOrganization().getId();
300     switch (isAdmin()) {
301       case GLOBAL:
302         return getDatabase().getLatestSnapshotsByMediaPackageIds(mediaPackageIds, null);
303       default:
304         mediaPackageIds = isAuthorized(mediaPackageIds.stream().toList(), READ_ACTION);
305         return getDatabase().getLatestSnapshotsByMediaPackageIds(mediaPackageIds, orgId);
306     }
307   }
308 
309   @Override
310   public Optional<Snapshot> getLatestSnapshot(String mediaPackageId) {
311     String orgId = securityService.getOrganization().getId();
312     switch (isAdmin()) {
313       case GLOBAL:
314         return getDatabase().getLatestSnapshot(mediaPackageId);
315       default:
316         if (isAuthorized(mediaPackageId, READ_ACTION)) {
317           return getDatabase().getLatestSnapshot(mediaPackageId, orgId);
318         }
319         return Optional.empty();
320     }
321   }
322 
323   @Override
324   public Optional<Asset> getAsset(Version version, String mpId, String mpElementId) {
325     if (isAuthorized(mpId, READ_ACTION)) {
326       // try to fetch the asset
327       var asset = getDatabase().getAsset(RuntimeTypes.convert(version), mpId, mpElementId);
328       if (asset.isPresent()) {
329         var storageId = getSnapshotStorageLocation(version, mpId);
330         if (storageId.isPresent()) {
331           var store = getAssetStore(storageId.get());
332           if (store.isPresent()) {
333             var assetStream = store.get().get(StoragePath.mk(
334                 asset.get().getOrganizationId(),
335                 mpId,
336                 version,
337                 mpElementId
338             ));
339             if (assetStream.isPresent()) {
340 
341               Checksum checksum = null;
342               try {
343                 checksum = Checksum.fromString(asset.get().getAssetDto().getChecksum());
344               } catch (NoSuchAlgorithmException e) {
345                 logger.warn("Invalid checksum for asset {} of media package {}", mpElementId, mpId, e);
346               }
347 
348               final Asset a = new AssetImpl(
349                       AssetId.mk(version, mpId, mpElementId),
350                       assetStream.get(),
351                       asset.get().getAssetDto().getMimeType(),
352                       asset.get().getAssetDto().getSize(),
353                       asset.get().getStorageId(),
354                       asset.get().getAvailability(),
355                       checksum);
356               return Optional.of(a);
357             }
358           }
359         }
360       }
361       return Optional.empty();
362     }
363     throw new RuntimeException(new UnauthorizedException(
364             format("Not allowed to read assets of snapshot %s, version=%s", mpId, version)
365     ));
366   }
367 
368   @Override
369   public Optional<AssetStore> getAssetStore(String storeId) {
370     if (assetStore.getStoreType().equals(storeId)) {
371       return Optional.of(assetStore);
372     } else {
373       if (remoteStores.containsKey(storeId)) {
374         return Optional.of(remoteStores.get(storeId));
375       } else {
376         return Optional.empty();
377       }
378     }
379   }
380 
381   @Override
382   public AssetStore getLocalAssetStore() {
383     return assetStore;
384   }
385 
386   @Override
387   public List<AssetStore> getRemoteAssetStores() {
388     return new ArrayList<>(remoteStores.values());
389   }
390 
391   /** Snapshots */
392 
393   @Override
394   public boolean snapshotExists(final String mediaPackageId) {
395     return getDatabase().snapshotExists(mediaPackageId);
396   }
397 
398   @Override
399   public boolean snapshotExists(final String mediaPackageId, final String organization) {
400     return getDatabase().snapshotExists(mediaPackageId, organization);
401   }
402 
403   @Override
404   public Snapshot takeSnapshot(MediaPackage mp) {
405     return takeSnapshot(null, mp);
406   }
407 
408   @Override
409   public Snapshot takeSnapshot(String owner, MediaPackage mp) {
410 
411     final String mediaPackageId = mp.getIdentifier().toString();
412     final boolean firstSnapshot = !snapshotExists(mediaPackageId);
413 
414     // Allow this if:
415     //  - no previous snapshot exists
416     //  - the user has write access to the previous snapshot
417     if (firstSnapshot) {
418       // if it's the first snapshot, ensure that old, leftover properties are removed
419       deleteProperties(mediaPackageId);
420     }
421     if (firstSnapshot || isAuthorized(mediaPackageId, WRITE_ACTION)) {
422       final Snapshot snapshot;
423       if (owner == null) {
424         snapshot = takeSnapshotInternal(mp);
425       } else {
426         snapshot = takeSnapshotInternal(owner, mp);
427       }
428 
429       final AccessControlList acl = authorizationService.getActiveAcl(mp).getA();
430       // store acl as properties
431       // Drop old ACL rules
432       deleteProperties(mediaPackageId, SECURITY_NAMESPACE);
433       // Set new ACL rules
434       for (final AccessControlEntry ace : acl.getEntries()) {
435         getDatabase().saveProperty(Property.mk(PropertyId.mk(mediaPackageId, SECURITY_NAMESPACE,
436                 mkPropertyName(ace.getRole(), ace.getAction())), Value.mk(ace.isAllow())));
437       }
438 
439       updateEventInIndex(snapshot);
440 
441       return snapshot;
442     }
443     throw new RuntimeException(new UnauthorizedException(
444         "Not allowed to take snapshot of media package " + mediaPackageId));
445   }
446 
447   private Snapshot takeSnapshotInternal(MediaPackage mediaPackage) {
448     final String mediaPackageId = mediaPackage.getIdentifier().toString();
449     String orgId = securityService.getOrganization().getId();
450     Optional<Snapshot> snapshot;
451     switch (isAdmin()) {
452       case GLOBAL:
453         snapshot = getDatabase().getLatestSnapshot(mediaPackageId);
454         break;
455       default:
456         if (isAuthorized(mediaPackageId, WRITE_ACTION)) {
457           snapshot = getDatabase().getLatestSnapshot(mediaPackageId, orgId);
458         } else {
459           snapshot = Optional.empty();
460         }
461         break;
462     }
463     if (snapshot.isPresent()) {
464       return takeSnapshotInternal(snapshot.get().getOwner(), mediaPackage);
465     }
466     return takeSnapshotInternal(DEFAULT_OWNER, mediaPackage);
467   }
468 
469   private Snapshot takeSnapshotInternal(final String owner, final MediaPackage mp) {
470     try {
471       Snapshot archived = addInternal(owner, MediaPackageSupport.copy(mp)).toSnapshot();
472       return getHttpAssetProvider().prepareForDelivery(archived);
473     } catch (Exception e) {
474       logger.error("An error occurred", e);
475       throw unwrapExceptionUntil(AssetManagerException.class, e).orElse(new AssetManagerException(e));
476     }
477   }
478 
479   /**
480    * Create a {@link AssetManagerItem.TakeSnapshot} message.
481    * <p>
482    * Do not call outside of a security context.
483    */
484   private AssetManagerItem.TakeSnapshot mkTakeSnapshotMessage(Snapshot snapshot) {
485     final MediaPackage mp = snapshot.getMediaPackage();
486 
487     long version;
488     try {
489       version = Long.parseLong(snapshot.getVersion().toString());
490     } catch (NumberFormatException e) {
491       // The index requires a version to be a long value.
492       // Since the asset manager default implementation uses long values that should be not a problem.
493       // However, a decent exception message is helpful if a different implementation of the asset manager
494       // is used.
495       throw new RuntimeException("The current implementation of the index requires versions being of type 'long'.");
496     }
497 
498     return AssetManagerItem.add(workspace, mp, authorizationService.getActiveAcl(mp).getA(),
499             version, snapshot.getArchivalDate());
500   }
501 
502   @Override
503   public void triggerIndexUpdate(String mediaPackageId) throws NotFoundException, UnauthorizedException {
504 
505     if (!securityService.getUser().hasRole("ROLE_ADMIN")) {
506       throw new UnauthorizedException("Only global administrators may trigger manual event updates.");
507     }
508     Optional<Snapshot> snapshot = getDatabase().getLatestSnapshot(mediaPackageId);
509 
510     if (snapshot.isEmpty()) {
511       throw new NotFoundException("No event with ID `" + mediaPackageId + "`");
512     }
513 
514     // Update event index with latest snapshot
515     updateEventInIndex(snapshot.get());
516   }
517 
518   /**
519    * Update the event in the Elasticsearch index.
520    *
521    * @param snapshot
522    *         The newest snapshot of the event to update
523    */
524   private void updateEventInIndex(Snapshot snapshot) {
525     final String eventId = snapshot.getMediaPackage().getIdentifier().toString();
526     final String orgId = securityService.getOrganization().getId();
527     final User user = securityService.getUser();
528 
529     logger.debug("Updating event {} in the {} index.", eventId, index.getIndexName());
530     Function<Optional<Event>, Optional<Event>> updateFunction = getEventUpdateFunction(snapshot, orgId, user);
531 
532     try {
533       index.addOrUpdateEvent(eventId, updateFunction, orgId, user);
534       logger.debug("Event {} updated in the {} index.", eventId, index.getIndexName());
535     } catch (SearchIndexException e) {
536       logger.error("Error updating the event {} in the {} index.", eventId, index.getIndexName(), e);
537     }
538   }
539 
540   /**
541    * Remove the event from the Elasticsearch index
542    *
543    * @param eventId
544    *         The id of the event to remove
545    */
546   private void removeArchivedVersionFromIndex(String eventId) {
547     final String orgId = securityService.getOrganization().getId();
548     final User user = securityService.getUser();
549     logger.debug("Received AssetManager delete episode message {}", eventId);
550 
551     Function<Optional<Event>, Optional<Event>> updateFunction = (Optional<Event> eventOpt) -> {
552       if (eventOpt.isEmpty()) {
553         logger.warn("Event {} not found for deletion", eventId);
554         return Optional.empty();
555       }
556       Event event = eventOpt.get();
557       event.setArchiveVersion(null);
558       return Optional.of(event);
559     };
560 
561     try {
562       index.addOrUpdateEvent(eventId, updateFunction, orgId, user);
563       logger.debug("Event {} removed from the {} index", eventId, index.getIndexName());
564     } catch (SearchIndexException e) {
565       logger.error("Error deleting the event {} from the {} index.", eventId, index.getIndexName(), e);
566     }
567   }
568 
569   @Override
570   public List<Snapshot> getSnapshotsById(final String mpId) {
571     RequireUtil.requireNotBlank(mpId, "mpId");
572 
573     String orgId = securityService.getOrganization().getId();
574 
575     switch (isAdmin()) {
576       case GLOBAL:
577         return getDatabase().getSnapshots(mpId);
578       default:
579         if (isAuthorized(mpId, READ_ACTION)) {
580           return getDatabase().getSnapshots(mpId, orgId);
581         }
582         return new ArrayList<>();
583     }
584   }
585 
586   @Override
587   public List<Snapshot> getSnapshotsByIdOrderedByVersion(String mpId, boolean asc) {
588     RequireUtil.requireNotBlank(mpId, "mpId");
589 
590     String order;
591     if (asc) {
592       order = "ASC";
593     } else {
594       order = "DESC";
595     }
596 
597     String orgId = securityService.getOrganization().getId();
598     switch (isAdmin()) {
599       case GLOBAL:
600         return getDatabase().getSnapshots(mpId, null, order);
601       default:
602         if (isAuthorized(mpId, READ_ACTION)) {
603           return getDatabase().getSnapshots(mpId, orgId);
604         }
605         return new ArrayList<>();
606     }
607   }
608 
609   @Override
610   public List<Snapshot> getSnapshotsByIdAndVersion(final String mpId, final Version version) {
611     RequireUtil.requireNotBlank(mpId, "mpId");
612     RequireUtil.notNull(version, "version");
613 
614     String orgId = securityService.getOrganization().getId();
615     // TODO: Simplify the version class?
616     Long v = Long.parseLong(version.toString());
617     switch (isAdmin()) {
618       case GLOBAL:
619         return getDatabase().getSnapshotsByMpIdAndVersion(mpId, v, null);
620       default:
621         if (isAuthorized(mpId, READ_ACTION)) {
622           return getDatabase().getSnapshotsByMpIdAndVersion(mpId, v, orgId);
623         }
624         return new ArrayList<>();
625     }
626   }
627 
628   @Override
629   public List<Snapshot> getSnapshotsByDateOrderedById(Date start, Date end) {
630     RequireUtil.notNull(start, "start");
631     RequireUtil.notNull(end, "end");
632 
633     String orgId = securityService.getOrganization().getId();
634     switch (isAdmin()) {
635       case GLOBAL:
636         return getDatabase().getSnapshotsByDateOrderByMpId(start, end, null);
637       case ORGANIZATION:
638         return getDatabase().getSnapshotsByDateOrderByMpId(start, end, orgId);
639       default:
640         List<Snapshot> snapshots = new ArrayList<>();
641         List<Snapshot> snaps = getDatabase().getSnapshotsByDateOrderByMpId(start, end, orgId);
642         for (int i = 0; i < snaps.size(); i++) {
643           if (isAuthorized(snaps.get(i).getMediaPackage().getIdentifier().toString(), READ_ACTION)) {
644             snapshots.add(snaps.get(i));
645           }
646         }
647         return snapshots;
648     }
649   }
650 
651   @Override
652   public List<Snapshot> getSnapshotsByIdAndDate(final String mpId, final Date start, final Date end) {
653     RequireUtil.requireNotBlank(mpId, "mpId");
654     RequireUtil.notNull(start, "start");
655     RequireUtil.notNull(end, "end");
656 
657     String orgId = securityService.getOrganization().getId();
658     switch (isAdmin()) {
659       case GLOBAL:
660         return getDatabase().getSnapshotsByMpdIdAndDate(mpId, start, end, null);
661       default:
662         if (isAuthorized(mpId, READ_ACTION)) {
663           return getDatabase().getSnapshotsByMpdIdAndDate(mpId, start, end, orgId);
664         }
665         return new ArrayList<>();
666     }
667   }
668 
669   @Override
670   public List<Snapshot> getSnapshotsByIdAndDateOrderedByVersion(String mpId, Date start, Date end, boolean asc) {
671     RequireUtil.requireNotBlank(mpId, "mpId");
672     RequireUtil.notNull(start, "start");
673     RequireUtil.notNull(end, "end");
674 
675     String order;
676     if (asc) {
677       order = "ASC";
678     } else {
679       order = "DESC";
680     }
681 
682     String orgId = securityService.getOrganization().getId();
683     switch (isAdmin()) {
684       case GLOBAL:
685         return getDatabase().getSnapshotsByMpdIdAndDate(mpId, start, end, null, order);
686       default:
687         if (isAuthorized(mpId, READ_ACTION)) {
688           return getDatabase().getSnapshotsByMpdIdAndDate(mpId, start, end, orgId, order);
689         }
690         return new ArrayList<>();
691     }
692   }
693 
694   @Override
695   public List<Snapshot> getLatestSnapshotsBySeriesId(final String seriesId) {
696     RequireUtil.requireNotBlank(seriesId, "seriesId");
697 
698     String orgId = securityService.getOrganization().getId();
699 
700     switch (isAdmin()) {
701       case GLOBAL:
702         return getDatabase().getSnapshotsBySeries(seriesId, null);
703       case ORGANIZATION:
704         return getDatabase().getSnapshotsBySeries(seriesId, orgId);
705       default:
706         List<Snapshot> snapshots = new ArrayList<>();
707         List<Snapshot> snaps = getDatabase().getSnapshotsBySeries(seriesId, orgId);
708         for (int i = 0; i < snaps.size(); i++) {
709           if (isAuthorized(snaps.get(i).getMediaPackage().getIdentifier().toString(), READ_ACTION)) {
710             snapshots.add(snaps.get(i));
711           }
712         }
713         return snapshots;
714     }
715   }
716 
717   @Override
718   public Optional<Snapshot> getSnapshotByMpIdOrgIdAndVersion(String mpId, String orgId, Version version) {
719     return getDatabase().getSnapshot(mpId, orgId, Long.parseLong(version.toString()));
720   }
721 
722   @Override
723   public int deleteSnapshots(String mpId) {
724     String orgId = securityService.getOrganization().getId();
725     int numberOfDeletedSnapshots = 0;
726     switch (isAdmin()) {
727       case GLOBAL:
728         numberOfDeletedSnapshots = getDatabase().deleteSnapshots(mpId, null);
729         break;
730       default:
731         if (isAuthorized(mpId, WRITE_ACTION)) {
732           numberOfDeletedSnapshots = getDatabase().deleteSnapshots(mpId, orgId);
733         }
734         break;
735     }
736 
737     // delete from store
738     if (numberOfDeletedSnapshots > 0) {
739       final DeletionSelector deletionSelector = DeletionSelector.deleteAll(orgId, mpId);
740       getLocalAssetStore().delete(deletionSelector);
741       for (AssetStore as : getRemoteAssetStores()) {
742         as.delete(deletionSelector);
743       }
744     }
745 
746     return numberOfDeletedSnapshots;
747   }
748 
749   @Override
750   public int deleteAllButLatestSnapshot(String mpId) {
751     String orgId = securityService.getOrganization().getId();
752     int numberOfDeletedSnapshots = 0;
753     List<Long> versions = getDatabase().getVersionsByMediaPackage(mpId, null);
754 
755     switch (isAdmin()) {
756       case GLOBAL:
757         numberOfDeletedSnapshots = getDatabase().deleteAllButLatestSnapshot(mpId, null);
758         break;
759       default:
760         if (isAuthorized(mpId, WRITE_ACTION)) {
761           numberOfDeletedSnapshots = getDatabase().deleteAllButLatestSnapshot(mpId, orgId);
762         }
763         break;
764     }
765 
766     // delete from store
767     if (numberOfDeletedSnapshots > 0) {
768       // Skip last version
769       for (int i = 0; i < versions.size() - 1; i++) {
770         final DeletionSelector deletionSelector = DeletionSelector.delete(orgId, mpId,
771             new VersionImpl(versions.get(i)));
772         getLocalAssetStore().delete(deletionSelector);
773         for (AssetStore as : getRemoteAssetStores()) {
774           as.delete(deletionSelector);
775         }
776       }
777     }
778 
779     return numberOfDeletedSnapshots;
780   }
781 
782   @Override
783   public void moveSnapshotsById(final String mpId, final String targetStore) throws NotFoundException {
784     List<Snapshot> snapshots = getSnapshotsById(mpId);
785 
786     if (snapshots.isEmpty()) {
787       throw new NotFoundException("Mediapackage " + mpId + " not found!");
788     }
789 
790     processOperations(snapshots, targetStore);
791   }
792 
793   @Override
794   public void moveSnapshotsByIdAndVersion(final String mpId, final Version version, final String targetStore)
795           throws NotFoundException {
796     List<Snapshot> snapshots = getSnapshotsByIdAndVersion(mpId, version);
797 
798     if (snapshots.isEmpty()) {
799       throw new NotFoundException("Mediapackage " + mpId + "@" + version.toString() + " not found!");
800     }
801 
802     processOperations(snapshots, targetStore);
803   }
804 
805   @Override
806   public void moveSnapshotsByDate(final Date start, final Date end, final String targetStore)
807           throws NotFoundException {
808     String orgId = securityService.getOrganization().getId();
809     List<Snapshot> snapshots = new ArrayList<>();
810     switch (isAdmin()) {
811       case GLOBAL:
812         snapshots = getDatabase().getSnapshotsByNotStorageAndDate(targetStore, start, end, null);
813         break;
814       case ORGANIZATION:
815         snapshots = getDatabase().getSnapshotsByNotStorageAndDate(targetStore, start, end, orgId);
816         break;
817       default:
818         List<Snapshot> snaps = getDatabase().getSnapshotsByNotStorageAndDate(targetStore, start, end, orgId);
819         for (int i = 0; i < snaps.size(); i++) {
820           if (isAuthorized(snaps.get(i).getMediaPackage().getIdentifier().toString(), READ_ACTION)) {
821             snapshots.add(snaps.get(i));
822           }
823         }
824         break;
825     }
826 
827     if (snapshots.isEmpty()) {
828       throw new NotFoundException("No media packages found between " + start + " and " + end);
829     }
830 
831     processOperations(snapshots, targetStore);
832   }
833 
834   @Override
835   public void moveSnapshotsByIdAndDate(final String mpId, final Date start, final Date end, final String targetStore)
836           throws NotFoundException {
837     List<Snapshot> snapshots = getSnapshotsByIdAndDate(mpId, start, end);
838 
839     if (snapshots.isEmpty()) {
840       throw new NotFoundException("No media package with id " + mpId + " found between " + start + " and " + end);
841     }
842 
843     processOperations(snapshots, targetStore);
844   }
845 
846   @Override
847   public void moveSnapshotToStore(final Version version, final String mpId, final String storeId)
848           throws NotFoundException {
849 
850     //Find the snapshot
851     List<Snapshot> snapshots = getSnapshotsByIdAndVersion(mpId, version);
852 
853     if (snapshots.isEmpty()) {
854       throw new NotFoundException("Mediapackage " + mpId + "@" + version.toString() + " not found!");
855     }
856     processOperations(snapshots, storeId);
857   }
858 
859   //Do the actual moving
860   //TODO: Compare this to AssetManagerJobProducer.moveSnapshots. Check if they can be combined.
861   private void processOperations(List<Snapshot> snapshots, final String targetStoreId) {
862     snapshots.forEach(s -> {
863 //      Snapshot s = record.getSnapshot().get();
864       Optional<String> currentStoreId = getSnapshotStorageLocation(s);
865 
866       if (currentStoreId.isEmpty()) {
867         logger.warn("IsNone store ID");
868         return;
869       }
870 
871       //If this snapshot is already stored in the desired store
872       if (currentStoreId.get().equals(targetStoreId)) {
873         //return, since we don't need to move anything
874         return;
875       }
876 
877       AssetStore currentStore;
878       AssetStore targetStore;
879 
880       Optional<AssetStore> optCurrentStore = getAssetStore(currentStoreId.get());
881       Optional<AssetStore> optTargetStore = getAssetStore(targetStoreId);
882 
883       if (!optCurrentStore.isEmpty()) {
884         currentStore = optCurrentStore.get();
885       } else {
886         logger.error("Unknown current store: " + currentStoreId.get());
887         return;
888       }
889       if (!optTargetStore.isEmpty()) {
890         targetStore = optTargetStore.get();
891       } else {
892         logger.error("Unknown target store: " + targetStoreId);
893         return;
894       }
895 
896       //If the content is already local, or is moving from a remote to the local
897       // Returns true if the store id is equal to the local asset store's id
898       String localAssetStoreType = getLocalAssetStore().getStoreType();
899       if (localAssetStoreType.equals(currentStoreId.get()) || localAssetStoreType.equals(targetStoreId)) {
900         logger.debug("Moving {} from {} to {}", s, currentStoreId, targetStoreId);
901 
902         try {
903           copyAssetsToStore(s, targetStore);
904           copyManifest(s, targetStore);
905         } catch (Exception e) {
906           chuck(e);
907         }
908         getDatabase().setStorageLocation(s, targetStoreId);
909         currentStore.delete(DeletionSelector.delete(s.getOrganizationId(),
910                 s.getMediaPackage().getIdentifier().toString(), s.getVersion()
911         ));
912       } else {
913         //Else, the content is *not* local and is going to a *different* remote
914         String intermediateStore = getLocalAssetStore().getStoreType();
915         logger.debug("Moving {} from {} to {}, then to {}",
916                 s, currentStoreId, intermediateStore, targetStoreId);
917         Version version = s.getVersion();
918         String mpId = s.getMediaPackage().getIdentifier().toString();
919         try {
920           moveSnapshotToStore(version, mpId, intermediateStore);
921           moveSnapshotToStore(version, mpId, targetStoreId);
922         } catch (NotFoundException e) {
923           chuck(e);
924         }
925       }
926     });
927   }
928 
929   // Return the asset store ID that is currently storing the snapshot
930   public Optional<String> getSnapshotStorageLocation(final Version version, final String mpId) {
931     List<Snapshot> snapshots = getSnapshotsByIdAndVersion(mpId, version);
932 
933     for (Snapshot snapshot : snapshots) {
934       return Optional.of(snapshot.getStorageId());
935     }
936 
937     logger.error("Mediapackage " + mpId + "@" + version + " not found!");
938     return Optional.empty();
939   }
940 
941   public Optional<String> getSnapshotStorageLocation(final Snapshot snap) {
942     return getSnapshotStorageLocation(snap.getVersion(), snap.getMediaPackage().getIdentifier().toString());
943   }
944 
945   /** Properties */
946 
947   @Override
948   public boolean setProperty(Property property) {
949     final String mpId = property.getId().getMediaPackageId();
950     if (isAuthorized(mpId, WRITE_ACTION)) {
951       return getDatabase().saveProperty(property);
952     }
953     throw new RuntimeException(new UnauthorizedException("Not allowed to set property on episode " + mpId));
954   }
955 
956   @Override
957   public List<Property> selectProperties(final String mediaPackageId, String namespace) {
958     if (isAuthorized(mediaPackageId, READ_ACTION)) {
959       return getDatabase().selectProperties(mediaPackageId, namespace);
960     }
961     throw new RuntimeException(new UnauthorizedException(format(
962         "Not allowed to read properties of event %s", mediaPackageId)));
963   }
964 
965   @Override
966   public int deleteProperties(final String mediaPackageId) {
967     return getDatabase().deleteProperties(mediaPackageId);
968   }
969 
970   @Override
971   public int deleteProperties(final String mediaPackageId, final String namespace) {
972     return getDatabase().deleteProperties(mediaPackageId, namespace);
973   }
974 
975   @Override
976   public int deletePropertiesWithCurrentUser(final String mediaPackageId, final String namespace) {
977     User user = securityService.getUser();
978     switch (isAdmin()) {
979       case GLOBAL:
980         return getDatabase().deleteProperties(mediaPackageId, namespace);
981       case ORGANIZATION:
982         Optional<Snapshot> snapshot = getDatabase().getLatestSnapshot(mediaPackageId);
983         if (snapshot.isPresent() && snapshot.get().getOrganizationId().equals(user.getOrganization().getId())) {
984           return getDatabase().deleteProperties(mediaPackageId, namespace);
985         }
986         return 0;
987       default:
988         Optional<MediaPackage> mediaPackage = getMediaPackage(mediaPackageId);
989         if (mediaPackage.isPresent() && isAuthorized(mediaPackage.get().getIdentifier().toString(), WRITE_ACTION)) {
990           return getDatabase().deleteProperties(mediaPackageId, namespace);
991         }
992         return 0;
993     }
994   }
995 
996   /** Misc. */
997 
998   @Override
999   public Optional<Version> toVersion(String version) {
1000     try {
1001       return Optional.of(VersionImpl.mk(Long.parseLong(version)));
1002     } catch (NumberFormatException e) {
1003       return Optional.empty();
1004     }
1005   }
1006 
1007   @Override
1008   public long countEvents(final String organization) {
1009     return getDatabase().countEvents(organization);
1010   }
1011 
1012   @Override
1013   public long countSnapshots(final String organization) {
1014     return getDatabase().countSnapshots(organization);
1015   }
1016 
1017   @Override
1018   public long countAssets() {
1019     return getDatabase().countAssets();
1020   }
1021 
1022   @Override
1023   public long countProperties() {
1024     return getDatabase().countProperties();
1025   }
1026 
1027   /**
1028    * AbstractIndexProducer Implementation
1029    */
1030 
1031   @Override
1032   public IndexRebuildService.Service getService() {
1033     return IndexRebuildService.Service.AssetManager;
1034   }
1035 
1036   @Override
1037   public DataType[] getSupportedDataTypes() {
1038     return new DataType[]{ DataType.ALL, DataType.ACL };
1039   }
1040 
1041   @Override
1042   public void repopulate(DataType dataType) throws IndexRebuildException {
1043     final Organization originalOrg = securityService.getOrganization();
1044     final User originalUser = (originalOrg != null ? securityService.getUser() : null);
1045     try {
1046       final Organization defaultOrg = new DefaultOrganization();
1047       final User defaultSystemUser = SecurityUtil.createSystemUser(systemUserName, defaultOrg);
1048       securityService.setOrganization(defaultOrg);
1049       securityService.setUser(defaultSystemUser);
1050 
1051       int offset = 0;
1052       int total = (int) countEvents(null);
1053       int current = 0;
1054       logIndexRebuildBegin(logger, total, "snapshot(s)");
1055       var updatedEventRange = new ArrayList<Event>();
1056       do {
1057         List<Snapshot> snapshots = getDatabase().getSnapshotsForIndexRebuild(offset, PAGE_SIZE);
1058         offset += PAGE_SIZE;
1059         int n = 20;
1060 
1061         final Map<String, List<Snapshot>> byOrg = snapshots.stream()
1062             .collect(Collectors.groupingBy(Snapshot::getOrganizationId));
1063         for (String orgId : byOrg.keySet()) {
1064           final Organization snapshotOrg;
1065           try {
1066             snapshotOrg = orgDir.getOrganization(orgId);
1067             User snapshotSystemUser = SecurityUtil.createSystemUser(systemUserName, snapshotOrg);
1068             securityService.setOrganization(snapshotOrg);
1069             securityService.setUser(snapshotSystemUser);
1070             for (Snapshot snapshot : byOrg.get(orgId)) {
1071               try {
1072                 current++;
1073 
1074                 var updatedEventData = index.getEvent(snapshot.getMediaPackage().getIdentifier().toString(), orgId,
1075                     snapshotSystemUser);
1076                 if (dataType == DataType.ALL) {
1077                   // Reindex everything (default)
1078                   updatedEventData = getEventUpdateFunction(snapshot, orgId, snapshotSystemUser)
1079                       .apply(updatedEventData);
1080                 } else if (dataType == DataType.ACL) {
1081                   // Only reindex ACLs
1082                   updatedEventData = getEventUpdateFunctionOnlyAcl(snapshot, orgId)
1083                       .apply(updatedEventData);
1084                 } else {
1085                   throw new IndexRebuildException(dataType + " is not a supported data type. "
1086                       + "Accepted values are " + Arrays.toString(getSupportedDataTypes()) + ".");
1087                 }
1088                 updatedEventRange.add(updatedEventData.get());
1089 
1090                 if (updatedEventRange.size() >= n || current >= total) {
1091                   index.bulkEventUpdate(updatedEventRange);
1092                   logIndexRebuildProgress(logger, total, current, n);
1093                   updatedEventRange.clear();
1094                 }
1095               } catch (Throwable t) {
1096                 logSkippingElement(logger, "event", snapshot.getMediaPackage().getIdentifier().toString(),
1097                     snapshotOrg, t);
1098               }
1099             }
1100           } catch (Throwable t) {
1101             logIndexRebuildError(logger, t, originalOrg);
1102             throw new IndexRebuildException(getService(), originalOrg, t);
1103           } finally {
1104             securityService.setOrganization(defaultOrg);
1105             securityService.setUser(defaultSystemUser);
1106           }
1107         }
1108       } while (offset < total);
1109     } finally {
1110       securityService.setOrganization(originalOrg);
1111       securityService.setUser(originalUser);
1112     }
1113   }
1114 
1115   /**
1116    * Used for testing
1117    */
1118   public void setAvailability(Version version, String mpId, Availability availability) {
1119     if (isAuthorized(mpId, WRITE_ACTION)) {
1120       getDatabase().setAvailability(RuntimeTypes.convert(version), mpId, availability);
1121     } else {
1122       throw new RuntimeException(new UnauthorizedException("Not allowed to set availability of episode " + mpId));
1123     }
1124   }
1125 
1126   public void setDatabase(Database database) {
1127     this.db = database;
1128   }
1129 
1130   public Database getDatabase() {
1131     return db;
1132   }
1133 
1134   public HttpAssetProvider getHttpAssetProvider() {
1135     return httpAssetProvider;
1136   }
1137 
1138   /*
1139    * Security handling
1140    */
1141   /** Check authorization based on the given predicate. */
1142   private boolean isAuthorized(final String mediaPackageId, final String action) {
1143     switch (isAdmin()) {
1144       case GLOBAL:
1145         // grant general access
1146         logger.debug("Access granted since user is global admin");
1147         return true;
1148       case ORGANIZATION:
1149         // ensure that the requested assets belong to this organization
1150         logger.debug("User is organization admin. Checking organization. Checking organization ID of asset.");
1151         return snapshotExists(mediaPackageId, securityService.getOrganization().getId());
1152       default:
1153         // check organization
1154         logger.debug("Non admin user. Checking organization.");
1155         final String org = securityService.getOrganization().getId();
1156         if (!snapshotExists(mediaPackageId, org)) {
1157           return false;
1158         }
1159         // check episode role id
1160         User user = securityService.getUser();
1161         if (user.hasRole(getEpisodeRoleId(mediaPackageId, action))) {
1162           return true;
1163         }
1164         // check acl rules
1165         logger.debug("Non admin user. Checking ACL rules.");
1166         // TODO: Replace this custom ACL check with the general check from the auth service
1167         //   Warning: For now this will cause many difficult to track down bugs and is thus hardly possible
1168         // return authorizationService.hasPermission(getDatabase().getMediaPackage(mediaPackageId).get(), action);
1169         final List<String> roles = user.getRoles().parallelStream()
1170                 .filter(roleFilter)
1171                 .map((role) -> mkPropertyName(role.getName(), action))
1172                 .collect(Collectors.toList());
1173         return getDatabase().selectProperties(mediaPackageId, SECURITY_NAMESPACE).parallelStream()
1174                 .map(p -> p.getId().getName())
1175                 .filter(p -> p.endsWith(action))
1176                 .anyMatch(p -> roles.stream().anyMatch(r -> r.equals(p)));
1177     }
1178   }
1179 
1180   private List<String> isAuthorized(final List<String> mediaPackageIds, final String action) {
1181     return mediaPackageIds.stream()
1182         .filter(id -> isAuthorized(id, action))
1183         .collect(Collectors.toList());
1184   }
1185 
1186   private AdminRole isAdmin() {
1187     final User user = securityService.getUser();
1188     if (user.hasRole(GLOBAL_ADMIN_ROLE)) {
1189       return AdminRole.GLOBAL;
1190     } else if (user.hasRole(securityService.getOrganization().getAdminRole())
1191             || user.hasRole(GLOBAL_CAPTURE_AGENT_ROLE)) {
1192       // In this context, we treat capture agents the same way as organization admins, allowing them access so that
1193       // they can ingest new media without requiring them to be explicitly specified in the ACLs.
1194       return AdminRole.ORGANIZATION;
1195     } else {
1196       return AdminRole.NONE;
1197     }
1198   }
1199 
1200   private String mkPropertyName(String role, String action) {
1201     return role + " | " + action;
1202   }
1203 
1204   /**
1205    * Configurable filter for roles
1206    */
1207   private final java.util.function.Predicate<Role> roleFilter = (role) -> {
1208     final String name = role.getName();
1209     return (includeAPIRoles || !name.startsWith("ROLE_API_"))
1210             && (includeCARoles  || !name.startsWith("ROLE_CAPTURE_AGENT_"))
1211             && (includeUIRoles  || !name.startsWith("ROLE_UI_"));
1212   };
1213 
1214   /*
1215    * Utility
1216    */
1217 
1218   /** Move the assets for a snapshot to the target store */
1219   private void copyAssetsToStore(Snapshot snap, AssetStore store) {
1220     final String mpId = snap.getMediaPackage().getIdentifier().toString();
1221     final String orgId = snap.getOrganizationId();
1222     final Version version = snap.getVersion();
1223     final String prettyMpId = mpId + "@v" + version;
1224     logger.debug("Moving assets for snapshot {} to store {}", prettyMpId, store.getStoreType());
1225     for (final MediaPackageElement e : snap.getMediaPackage().getElements()) {
1226       if (!MOVABLE_TYPES.contains(e.getElementType())) {
1227         logger.debug("Skipping {} because type is {}", e.getIdentifier(), e.getElementType());
1228         continue;
1229       }
1230       logger.debug("Moving {} to store {}", e.getIdentifier(), store.getStoreType());
1231       final StoragePath storagePath = StoragePath.mk(orgId, mpId, version, e.getIdentifier());
1232       if (store.contains(storagePath)) {
1233         logger.debug("Element {} (version {}) is already in store {} so skipping it", e.getIdentifier(),
1234                 version, store.getStoreType());
1235         continue;
1236       }
1237 
1238       // find asset in versions & stores
1239       final Optional<StoragePath> existingAssetOpt =
1240           getDatabase()
1241           .findAssetByChecksumAndStoreAndOrg(e.getChecksum().toString(), store.getStoreType(), orgId)
1242           .map(dto -> StoragePath.mk(
1243               dto.getOrganizationId(),
1244               dto.getMediaPackageId(),
1245               dto.getVersion(),
1246               dto.getAssetDto().getMediaPackageElementId()
1247           ));
1248 
1249       if (existingAssetOpt.isPresent()) {
1250         final StoragePath existingAsset = existingAssetOpt.get();
1251         logger.debug("Content of asset {} with checksum {} already exists in {}",
1252                 existingAsset.getMediaPackageElementId(), e.getChecksum(), store.getStoreType());
1253         if (!store.copy(existingAsset, storagePath)) {
1254           throw new AssetManagerException(format(
1255                   "An asset with checksum %s has already been archived but trying to copy or link asset %s to it "
1256                           + "failed",
1257                   e.getChecksum(),
1258                   existingAsset
1259           ));
1260         }
1261       } else {
1262         final Optional<Long> size = e.getSize() > 0 ? Optional.of(e.getSize()) : Optional.empty();
1263         store.put(storagePath, Source.mk(e.getURI(), size, Optional.ofNullable(e.getMimeType())));
1264       }
1265       getDatabase().setAssetStorageLocation(VersionImpl.mk(version), mpId, e.getIdentifier(), store.getStoreType());
1266     }
1267   }
1268 
1269   private void copyManifest(Snapshot snap, AssetStore targetStore) throws IOException, NotFoundException {
1270     final String mpId = snap.getMediaPackage().getIdentifier().toString();
1271     final String orgId = snap.getOrganizationId();
1272     final Version version = snap.getVersion();
1273 
1274     AssetStore currentStore = getAssetStore(snap.getStorageId()).get();
1275     Optional<String> manifestOpt = findManifestBaseName(snap, MANIFEST_DEFAULT_NAME, currentStore);
1276     if (manifestOpt.isEmpty()) {
1277       return; // Nothing to do, already moved to long-term storage
1278     }
1279 
1280     // Copy the manifest file
1281     String manifestBaseName = manifestOpt.get();
1282     StoragePath pathToManifest = new StoragePath(orgId, mpId, version, manifestBaseName);
1283 
1284     // Already copied?
1285     if (!targetStore.contains(pathToManifest)) {
1286       Optional<InputStream> inputStreamOpt;
1287       InputStream inputStream = null;
1288       String manifestFileName = null;
1289       try {
1290         inputStreamOpt = currentStore.get(pathToManifest);
1291         if (inputStreamOpt.isEmpty()) { // This should never happen because it has been tested before
1292           throw new NotFoundException(
1293                   String.format("Unexpected error. Manifest %s not found in current asset store", manifestBaseName));
1294         }
1295 
1296         inputStream = inputStreamOpt.get();
1297         manifestFileName = UUID.randomUUID() + ".xml";
1298         URI manifestTmpUri = workspace.putInCollection("archive", manifestFileName, inputStream);
1299         targetStore.put(pathToManifest, Source.mk(manifestTmpUri, Optional.empty(), Optional.of(MimeTypes.XML)));
1300       } finally {
1301         IOUtils.closeQuietly(inputStream);
1302         try {
1303           // Make sure to clean up the temporary file
1304           workspace.deleteFromCollection("archive", manifestFileName);
1305         } catch (NotFoundException e) {
1306           // This is OK, we are deleting it anyway
1307         } catch (IOException e) {
1308           // This usually happens when the collection directory cannot be deleted
1309           // because another process is running at the same time and wrote a file there
1310           // after it was tested but before it was actually deleted. We will consider this ok.
1311           // Does the error message mention the manifest file name?
1312           if (e.getMessage().contains(manifestFileName)) {
1313             logger.warn("The manifest file {} didn't get deleted from the archive collection",
1314                     manifestBaseName, e);
1315           }
1316           // Else the error is related to the file-archive collection, which is fine
1317         }
1318       }
1319     }
1320   }
1321 
1322   Optional<String> findManifestBaseName(Snapshot snap, String manifestName, AssetStore store) {
1323     StoragePath path = new StoragePath(snap.getOrganizationId(), snap.getMediaPackage().getIdentifier().toString(),
1324             snap.getVersion(), manifestName);
1325     // If manifest_.xml, etc not found, return previous name (copied from the EpsiodeServiceImpl logic)
1326     if (!store.contains(path)) {
1327       // If first call, manifest is not found, which probably means it has already been moved
1328       if (MANIFEST_DEFAULT_NAME.equals(manifestName)) {
1329         return Optional.empty(); // No manifest found in current store
1330       } else {
1331         return Optional.of(manifestName.substring(0, manifestName.length() - 1));
1332       }
1333     }
1334     // This is the same logic as when building the manifest name: manifest, manifest_, manifest__, etc
1335     return findManifestBaseName(snap, manifestName + "_", store);
1336   }
1337 
1338   /* -------------------------------------------------------------------------------------------------------------- */
1339 
1340   /**
1341    * Make sure each of the elements has a checksum.
1342    */
1343   void calcChecksumsForMediaPackageElements(PartialMediaPackage pmp) {
1344     pmp.getElements().stream()
1345         .filter(hasNoChecksum)
1346         .forEach(mpe -> {
1347           File file = null;
1348           try {
1349             logger.trace("Calculate checksum for {}", mpe.getURI());
1350             file = workspace.get(mpe.getURI(), true);
1351             mpe.setChecksum(Checksum.create(ChecksumType.DEFAULT_TYPE, file));
1352           } catch (IOException | NotFoundException e) {
1353             throw new AssetManagerException(String.format(
1354                 "Cannot calculate checksum for media package element %s",
1355                 mpe.getURI()
1356             ), e);
1357           } finally {
1358             if (file != null) {
1359               FileUtils.deleteQuietly(file);
1360             }
1361           }
1362         });
1363   }
1364 
1365   /** Mutates mp and its elements, so make sure to work on a copy. */
1366   private SnapshotDto addInternal(String owner, final MediaPackage mp) throws Exception {
1367     final Date now = new Date();
1368     // claim a new version for the media package
1369     final String mpId = mp.getIdentifier().toString();
1370     final VersionImpl version = getDatabase().claimVersion(mpId);
1371     logger.info("Creating new version {} of media package {}", version, mp);
1372     final PartialMediaPackage pmp = assetsOnly(mp);
1373     // make sure they have a checksum
1374     calcChecksumsForMediaPackageElements(pmp);
1375     // download and archive elements
1376     storeAssets(pmp, version);
1377     // store mediapackage in db
1378     final SnapshotDto snapshotDto;
1379     try {
1380       // rewrite URIs for archival
1381       for (MediaPackageElement mpe : pmp.getElements()) {
1382         String fileName = getFileName(mpe).orElse("unknown");
1383         URI archiveUri = new URI(
1384             "urn",
1385             "matterhorn:" + mpId + ":" + version + ":" + mpe.getIdentifier() + ":" + fileName,
1386             null
1387         );
1388         mpe.setURI(archiveUri);
1389       }
1390 
1391       String currentOrgId = securityService.getOrganization().getId();
1392       snapshotDto = getDatabase().saveSnapshot(
1393               currentOrgId, pmp, now, version,
1394               Availability.ONLINE, getLocalAssetStore().getStoreType(), owner
1395       );
1396     } catch (AssetManagerException e) {
1397       logger.error("Could not take snapshot {}", mpId, e);
1398       throw new AssetManagerException(e);
1399     }
1400     // save manifest to element store
1401     // this is done at the end after the media package element ids have been rewritten to neutral URNs
1402     storeManifest(pmp, version);
1403     return snapshotDto;
1404   }
1405 
1406   /**
1407    * Store all elements of <code>pmp</code> under the given version.
1408    */
1409   private void storeAssets(final PartialMediaPackage pmp, final Version version) {
1410     final String mpId = pmp.getMediaPackage().getIdentifier().toString();
1411     final String orgId = securityService.getOrganization().getId();
1412     for (final MediaPackageElement e : pmp.getElements()) {
1413       logger.debug("Archiving {} {} {}", e.getFlavor(), e.getMimeType(), e.getURI());
1414       final StoragePath storagePath = StoragePath.mk(orgId, mpId, version, e.getIdentifier());
1415       // find asset in versions
1416       final Optional<StoragePath> existingAssetOpt = getDatabase()
1417           .findAssetByChecksumAndStoreAndOrg(e.getChecksum().toString(), getLocalAssetStore().getStoreType(), orgId)
1418           .map(dto -> StoragePath.mk(
1419                   dto.getOrganizationId(),
1420                   dto.getMediaPackageId(),
1421                   dto.getVersion(),
1422                   dto.getAssetDto().getMediaPackageElementId()));
1423 
1424       if (existingAssetOpt.isPresent()) {
1425         final StoragePath existingAsset = existingAssetOpt.get();
1426         logger.debug("Content of asset {} with checksum {} has been archived before",
1427                 existingAsset.getMediaPackageElementId(), e.getChecksum());
1428         if (!getLocalAssetStore().copy(existingAsset, storagePath)) {
1429           throw new AssetManagerException(format(
1430                   "An asset with checksum %s has already been archived but trying to copy or link asset %s to it "
1431                           + "failed",
1432                   e.getChecksum(),
1433                   existingAsset
1434           ));
1435         }
1436       } else {
1437         final Optional<Long> size = e.getSize() > 0 ? Optional.of(e.getSize()) : Optional.empty();
1438         getLocalAssetStore().put(storagePath, Source.mk(e.getURI(), size, Optional.ofNullable(e.getMimeType())));
1439       }
1440     }
1441   }
1442 
1443   private void storeManifest(final PartialMediaPackage pmp, final Version version) throws Exception {
1444     final String mpId = pmp.getMediaPackage().getIdentifier().toString();
1445     final String orgId = securityService.getOrganization().getId();
1446     // store the manifest.xml
1447     // TODO make use of checksums
1448     logger.debug("Archiving manifest of media package {} version {}", mpId, version);
1449     // temporarily save the manifest XML into the workspace to
1450     // Fix file not found exception when several snapshots are taken at the same time
1451     final String manifestFileName = format("manifest_%s_%s.xml", pmp.getMediaPackage().getIdentifier(), version);
1452     final URI manifestTmpUri = workspace.putInCollection(
1453             "archive",
1454             manifestFileName,
1455             IOUtils.toInputStream(MediaPackageParser.getAsXml(pmp.getMediaPackage()), "UTF-8"));
1456     try {
1457       getLocalAssetStore().put(
1458               StoragePath.mk(orgId, mpId, version, manifestAssetId(pmp, "manifest")),
1459               Source.mk(manifestTmpUri, Optional.empty(), Optional.of(MimeTypes.XML)));
1460     } finally {
1461       // make sure to clean up the temporary file
1462       workspace.deleteFromCollection("archive", manifestFileName);
1463     }
1464   }
1465 
1466   /**
1467    * Create a unique id for the manifest xml. This is to avoid an id collision
1468    * in the rare case that the media package contains an XML element with the id
1469    * used for the manifest. A UUID could also be used but this is far less
1470    * readable.
1471    *
1472    * @param seedId
1473    *          the id to start with
1474    */
1475   private String manifestAssetId(PartialMediaPackage pmp, String seedId) {
1476     for (MediaPackageElement element : pmp.getElements()) {
1477       if (seedId.equals(element.getIdentifier())) {
1478         return manifestAssetId(pmp, seedId + "_");
1479       }
1480     }
1481     return seedId;
1482   }
1483 
1484   /* --------------------------------------------------------------------------------------------------------------- */
1485 
1486   /**
1487    * Walk up the stacktrace to find a cause of type <code>type</code>. Return none if no such
1488    * type can be found.
1489    */
1490   static <A extends Throwable> Optional<A> unwrapExceptionUntil(Class<A> type, Throwable e) {
1491     if (e == null) {
1492       return Optional.empty();
1493     } else if (type.isAssignableFrom(e.getClass())) {
1494       return Optional.of((A) e);
1495     } else {
1496       return unwrapExceptionUntil(type, e.getCause());
1497     }
1498   }
1499 
1500   /**
1501    * Return a partial media package filtering assets. Assets are elements the archive is going to manager, i.e. all
1502    * non-publication elements.
1503    */
1504   static PartialMediaPackage assetsOnly(MediaPackage mp) {
1505     Predicate<MediaPackageElement> isAsset = isNotPublication;
1506     return PartialMediaPackage.mk(mp, isAsset);
1507   }
1508 
1509   /**
1510    * Extract the file name from a media package elements URN.
1511    *
1512    * @return the file name or none if it could not be determined
1513    */
1514   public static Optional<String> getFileNameFromUrn(MediaPackageElement mpe) {
1515     Optional<URI> uri = Optional.ofNullable(mpe.getURI());
1516     if (uri.isPresent() && "urn".equals(uri.get().getScheme())) {
1517       String[] tmp = uri.get().toString().split(":");
1518       if (tmp.length < 1) {
1519         return Optional.empty();
1520       }
1521       return Optional.of(tmp[tmp.length - 1]);
1522     }
1523     return Optional.empty();
1524   }
1525 
1526   /**
1527    * Rewrite URIs of all asset elements of a snapshot's media package.
1528    * This method does not mutate anything.
1529    */
1530   public static Snapshot rewriteUris(Snapshot snapshot, Function<MediaPackageElement, URI> uriCreator) {
1531     final MediaPackage mpCopy = MediaPackageSupport.copy(snapshot.getMediaPackage());
1532     for (final MediaPackageElement mpe : assetsOnly(mpCopy).getElements()) {
1533       mpe.setURI(uriCreator.apply(mpe));
1534     }
1535     return new SnapshotImpl(
1536             snapshot.getVersion(),
1537             snapshot.getOrganizationId(),
1538             snapshot.getArchivalDate(),
1539             snapshot.getAvailability(),
1540             snapshot.getStorageId(),
1541             snapshot.getOwner(),
1542             mpCopy);
1543   }
1544 
1545   /**
1546    * Get the function to update a commented event in the Elasticsearch index.
1547    *
1548    * @return the function to do the update
1549    */
1550   private Function<Optional<Event>, Optional<Event>> getEventUpdateFunction(Snapshot snapshot,
1551           String orgId, User user) {
1552     return (Optional<Event> eventOpt) -> {
1553       MediaPackage mp = snapshot.getMediaPackage();
1554       String eventId = mp.getIdentifier().toString();
1555       Event event = eventOpt.orElse(new Event(eventId, orgId));
1556 
1557       event = updateAclInEvent(event, mp);
1558 
1559       event.setArchiveVersion(Long.parseLong(snapshot.getVersion().toString()));
1560       if (StringUtils.isBlank(event.getCreator())) {
1561         event.setCreator(securityService.getUser().getName());
1562       }
1563       EventIndexUtils.updateEvent(event, mp);
1564 
1565       for (Catalog catalog: mp.getCatalogs(MediaPackageElements.EPISODE)) {
1566         try (InputStream in = workspace.read(catalog.getURI())) {
1567           EventIndexUtils.updateEvent(event, DublinCores.read(in));
1568         } catch (IOException | NotFoundException e) {
1569           throw new IllegalStateException(String.format("Unable to load dublin core catalog for event '%s'",
1570                   mp.getIdentifier()), e);
1571         }
1572       }
1573 
1574       // extended metadata
1575       event.resetExtendedMetadata();  // getting rid of old data
1576       for (EventCatalogUIAdapter extendedCatalogUIAdapter : extendedEventCatalogUIAdapters.getOrDefault(orgId,
1577               Collections.emptyList())) {
1578         for (Catalog catalog: mp.getCatalogs(extendedCatalogUIAdapter.getFlavor())) {
1579           try (InputStream in = workspace.read(catalog.getURI())) {
1580             EventIndexUtils.updateEventExtendedMetadata(event, DublinCores.read(in),
1581                     extendedCatalogUIAdapter.getFlavor());
1582           } catch (IOException | NotFoundException e) {
1583             throw new IllegalStateException(String.format("Unable to load extended dublin core catalog '%s' for event "
1584                     + "'%s'", catalog.getFlavor(), mp.getIdentifier()), e);
1585           }
1586         }
1587       }
1588 
1589       // Update series name if not already done
1590       try {
1591         EventIndexUtils.updateSeriesName(event, orgId, user, index);
1592       } catch (SearchIndexException e) {
1593         logger.error("Error updating the series name of the event {} in the {} index.", eventId, index.getIndexName(),
1594                 e);
1595       }
1596       return Optional.of(event);
1597     };
1598   }
1599 
1600   private Function<Optional<Event>, Optional<Event>> getEventUpdateFunctionOnlyAcl(Snapshot snapshot,
1601       String orgId) {
1602     return (Optional<Event> eventOpt) -> {
1603       MediaPackage mp = snapshot.getMediaPackage();
1604       String eventId = mp.getIdentifier().toString();
1605       Event event = eventOpt.orElse(new Event(eventId, orgId));
1606 
1607       event = updateAclInEvent(event, mp);
1608 
1609       return Optional.of(event);
1610     };
1611   }
1612 
1613   private Event updateAclInEvent(Event event, MediaPackage mp) {
1614     AccessControlList acl = authorizationService.getActiveAcl(mp).getA();
1615     List<ManagedAcl> acls = aclServiceFactory.serviceFor(securityService.getOrganization()).getAcls();
1616 
1617     Optional<ManagedAcl> managedAcl = AccessInformationUtil.matchAcls(acls, acl);
1618     if (managedAcl.isPresent()) {
1619       event.setManagedAcl(managedAcl.get().getName());
1620     }
1621     event.setAccessPolicy(AccessControlParser.toJsonSilent(acl));
1622 
1623     return event;
1624   }
1625 }