View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.assetmanager.impl;
22  
23  import static java.lang.String.format;
24  import static org.opencastproject.mediapackage.MediaPackageSupport.Filters.hasNoChecksum;
25  import static org.opencastproject.mediapackage.MediaPackageSupport.Filters.isNotPublication;
26  import static org.opencastproject.mediapackage.MediaPackageSupport.getFileName;
27  import static org.opencastproject.metadata.dublincore.CatalogUIAdapter.ORGANIZATION_WILDCARD;
28  import static org.opencastproject.security.api.SecurityConstants.GLOBAL_ADMIN_ROLE;
29  import static org.opencastproject.security.api.SecurityConstants.GLOBAL_CAPTURE_AGENT_ROLE;
30  import static org.opencastproject.security.util.SecurityUtil.getEpisodeRoleId;
31  import static org.opencastproject.util.data.functions.Misc.chuck;
32  
33  import org.opencastproject.assetmanager.api.Asset;
34  import org.opencastproject.assetmanager.api.AssetId;
35  import org.opencastproject.assetmanager.api.AssetManager;
36  import org.opencastproject.assetmanager.api.AssetManagerException;
37  import org.opencastproject.assetmanager.api.Availability;
38  import org.opencastproject.assetmanager.api.Property;
39  import org.opencastproject.assetmanager.api.PropertyId;
40  import org.opencastproject.assetmanager.api.Snapshot;
41  import org.opencastproject.assetmanager.api.Value;
42  import org.opencastproject.assetmanager.api.Version;
43  import org.opencastproject.assetmanager.api.storage.AssetStore;
44  import org.opencastproject.assetmanager.api.storage.DeletionSelector;
45  import org.opencastproject.assetmanager.api.storage.RemoteAssetStore;
46  import org.opencastproject.assetmanager.api.storage.Source;
47  import org.opencastproject.assetmanager.api.storage.StoragePath;
48  import org.opencastproject.assetmanager.impl.persistence.Database;
49  import org.opencastproject.assetmanager.impl.persistence.SnapshotDto;
50  import org.opencastproject.authorization.xacml.manager.api.AclServiceFactory;
51  import org.opencastproject.authorization.xacml.manager.api.ManagedAcl;
52  import org.opencastproject.authorization.xacml.manager.util.AccessInformationUtil;
53  import org.opencastproject.db.DBSessionFactory;
54  import org.opencastproject.elasticsearch.api.SearchIndexException;
55  import org.opencastproject.elasticsearch.index.ElasticsearchIndex;
56  import org.opencastproject.elasticsearch.index.objects.event.Event;
57  import org.opencastproject.elasticsearch.index.objects.event.EventIndexUtils;
58  import org.opencastproject.elasticsearch.index.rebuild.AbstractIndexProducer;
59  import org.opencastproject.elasticsearch.index.rebuild.IndexProducer;
60  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildException;
61  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService;
62  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService.DataType;
63  import org.opencastproject.mediapackage.Catalog;
64  import org.opencastproject.mediapackage.MediaPackage;
65  import org.opencastproject.mediapackage.MediaPackageElement;
66  import org.opencastproject.mediapackage.MediaPackageElements;
67  import org.opencastproject.mediapackage.MediaPackageParser;
68  import org.opencastproject.mediapackage.MediaPackageSupport;
69  import org.opencastproject.message.broker.api.assetmanager.AssetManagerItem;
70  import org.opencastproject.message.broker.api.update.AssetManagerUpdateHandler;
71  import org.opencastproject.metadata.dublincore.DublinCores;
72  import org.opencastproject.metadata.dublincore.EventCatalogUIAdapter;
73  import org.opencastproject.security.api.AccessControlEntry;
74  import org.opencastproject.security.api.AccessControlList;
75  import org.opencastproject.security.api.AccessControlParser;
76  import org.opencastproject.security.api.AuthorizationService;
77  import org.opencastproject.security.api.DefaultOrganization;
78  import org.opencastproject.security.api.Organization;
79  import org.opencastproject.security.api.OrganizationDirectoryService;
80  import org.opencastproject.security.api.Role;
81  import org.opencastproject.security.api.SecurityService;
82  import org.opencastproject.security.api.UnauthorizedException;
83  import org.opencastproject.security.api.User;
84  import org.opencastproject.security.util.SecurityUtil;
85  import org.opencastproject.util.Checksum;
86  import org.opencastproject.util.ChecksumType;
87  import org.opencastproject.util.MimeTypes;
88  import org.opencastproject.util.NotFoundException;
89  import org.opencastproject.util.RequireUtil;
90  import org.opencastproject.workspace.api.Workspace;
91  
92  import com.google.common.collect.Sets;
93  
94  import org.apache.commons.io.FileUtils;
95  import org.apache.commons.io.IOUtils;
96  import org.apache.commons.lang3.BooleanUtils;
97  import org.apache.commons.lang3.StringUtils;
98  import org.osgi.service.component.ComponentContext;
99  import org.osgi.service.component.annotations.Activate;
100 import org.osgi.service.component.annotations.Component;
101 import org.osgi.service.component.annotations.Reference;
102 import org.osgi.service.component.annotations.ReferenceCardinality;
103 import org.osgi.service.component.annotations.ReferencePolicy;
104 import org.slf4j.Logger;
105 import org.slf4j.LoggerFactory;
106 
107 import java.io.File;
108 import java.io.IOException;
109 import java.io.InputStream;
110 import java.net.URI;
111 import java.security.NoSuchAlgorithmException;
112 import java.util.ArrayList;
113 import java.util.Arrays;
114 import java.util.Collection;
115 import java.util.Collections;
116 import java.util.Date;
117 import java.util.HashMap;
118 import java.util.LinkedHashMap;
119 import java.util.List;
120 import java.util.Map;
121 import java.util.Objects;
122 import java.util.Optional;
123 import java.util.Set;
124 import java.util.UUID;
125 import java.util.concurrent.CopyOnWriteArrayList;
126 import java.util.function.Function;
127 import java.util.function.Predicate;
128 import java.util.stream.Collectors;
129 
130 import javax.persistence.EntityManagerFactory;
131 
132 /**
133  * The Asset Manager implementation.
134  */
135 @Component(
136     property = {
137         "service.description=Opencast Asset Manager"
138     },
139     immediate = true,
140     service = { AssetManager.class, IndexProducer.class }
141 )
142 public class AssetManagerImpl extends AbstractIndexProducer implements AssetManager {
143 
144   private static final Logger logger = LoggerFactory.getLogger(AssetManagerImpl.class);
145 
146   private static final int PAGE_SIZE = 1000;
147 
148   enum AdminRole {
149     GLOBAL, ORGANIZATION, NONE
150   }
151 
152   public static final String WRITE_ACTION = "write";
153   public static final String READ_ACTION = "read";
154   public static final String SECURITY_NAMESPACE = "org.opencastproject.assetmanager.security";
155 
156   private static final int EXPEXTED_HANDLERS_COUNT = 2;
157 
158   private static final String MANIFEST_DEFAULT_NAME = "manifest";
159 
160   private CopyOnWriteArrayList<AssetManagerUpdateHandler> handlers = new CopyOnWriteArrayList<>();
161 
162   private SecurityService securityService;
163   private AuthorizationService authorizationService;
164   private OrganizationDirectoryService orgDir;
165   private Workspace workspace;
166   private AssetStore assetStore;
167   private HttpAssetProvider httpAssetProvider;
168   private String systemUserName;
169   private Database db;
170   private DBSessionFactory dbSessionFactory;
171   private EntityManagerFactory emf;
172   private AclServiceFactory aclServiceFactory;
173   private ElasticsearchIndex index;
174 
175   // careful: org key can be wildcard!
176   private Map<String, List<EventCatalogUIAdapter>> extendedEventCatalogUIAdapters = new HashMap<>();
177 
178   // Settings for role filter
179   private boolean includeAPIRoles;
180   private boolean includeCARoles;
181   private boolean includeUIRoles;
182 
183 
184   public static final Set<MediaPackageElement.Type> MOVABLE_TYPES = Sets.newHashSet(
185           MediaPackageElement.Type.Attachment,
186           MediaPackageElement.Type.Catalog,
187           MediaPackageElement.Type.Track
188   );
189 
190   private final HashMap<String, RemoteAssetStore> remoteStores = new LinkedHashMap<>();
191 
192   /**
193    * OSGi callback.
194    */
195   @Activate
196   public synchronized void activate(ComponentContext cc) {
197     logger.info("Activating AssetManager.");
198     db = new Database(dbSessionFactory.createSession(emf));
199     db.setHttpAssetProvider(getHttpAssetProvider());
200     systemUserName = SecurityUtil.getSystemUserName(cc);
201 
202     includeAPIRoles = BooleanUtils.toBoolean(Objects.toString(cc.getProperties().get("includeAPIRoles"), null));
203     includeCARoles = BooleanUtils.toBoolean(Objects.toString(cc.getProperties().get("includeCARoles"), null));
204     includeUIRoles = BooleanUtils.toBoolean(Objects.toString(cc.getProperties().get("includeUIRoles"), null));
205   }
206 
207   /**
208    * OSGi dependencies
209    */
210 
211   @Reference(target = "(osgi.unit.name=org.opencastproject.assetmanager.impl)")
212   public void setEntityManagerFactory(EntityManagerFactory emf) {
213     this.emf = emf;
214   }
215 
216   @Reference
217   public void setDBSessionFactory(DBSessionFactory dbSessionFactory) {
218     this.dbSessionFactory = dbSessionFactory;
219   }
220 
221   @Reference
222   public void setSecurityService(SecurityService securityService) {
223     this.securityService = securityService;
224   }
225 
226   @Reference
227   public void setAuthorizationService(AuthorizationService authorizationService) {
228     this.authorizationService = authorizationService;
229   }
230 
231   @Reference
232   public void setOrgDir(OrganizationDirectoryService orgDir) {
233     this.orgDir = orgDir;
234   }
235 
236   @Reference
237   public void setWorkspace(Workspace workspace) {
238     this.workspace = workspace;
239   }
240 
241   @Reference
242   public void setAssetStore(AssetStore assetStore) {
243     this.assetStore = assetStore;
244   }
245 
246   @Reference(
247       cardinality = ReferenceCardinality.MULTIPLE,
248       policy = ReferencePolicy.DYNAMIC,
249       unbind = "removeRemoteAssetStore"
250   )
251   public synchronized void addRemoteAssetStore(RemoteAssetStore assetStore) {
252     remoteStores.put(assetStore.getStoreType(), assetStore);
253   }
254 
255   public void removeRemoteAssetStore(RemoteAssetStore store) {
256     remoteStores.remove(store.getStoreType());
257   }
258 
259   @Reference
260   public void setHttpAssetProvider(HttpAssetProvider httpAssetProvider) {
261     this.httpAssetProvider = httpAssetProvider;
262   }
263 
264   @Reference
265   public void setAclServiceFactory(AclServiceFactory aclServiceFactory) {
266     this.aclServiceFactory = aclServiceFactory;
267   }
268 
269   @Reference
270   public void setIndex(ElasticsearchIndex index) {
271     this.index = index;
272   }
273 
274   @Reference(
275       cardinality = ReferenceCardinality.MULTIPLE,
276       policy = ReferencePolicy.DYNAMIC,
277       unbind = "removeEventHandler"
278   )
279 
280   public void addEventHandler(AssetManagerUpdateHandler handler) {
281     this.handlers.add(handler);
282   }
283 
284   public void removeEventHandler(AssetManagerUpdateHandler handler) {
285     this.handlers.remove(handler);
286   }
287 
288   @Reference(cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC,
289       target = "(common-metadata=false)")
290   public synchronized void addCatalogUIAdapter(EventCatalogUIAdapter catalogUIAdapter) {
291     List<EventCatalogUIAdapter> list = extendedEventCatalogUIAdapters.computeIfAbsent(
292             catalogUIAdapter.getOrganization(), k -> new ArrayList<>());
293     list.add(catalogUIAdapter);
294   }
295 
296   public synchronized void removeCatalogUIAdapter(EventCatalogUIAdapter catalogUIAdapter) {
297     if (extendedEventCatalogUIAdapters.containsKey(catalogUIAdapter.getOrganization())) {
298       extendedEventCatalogUIAdapters.get(catalogUIAdapter.getOrganization()).remove(catalogUIAdapter);
299     }
300   }
301 
302   /**
303    * AssetManager implementation
304    */
305 
306   @Override
307   public Optional<MediaPackage> getMediaPackage(String mediaPackageId) {
308     String orgId = securityService.getOrganization().getId();
309     switch (isAdmin()) {
310       case GLOBAL:
311         return getDatabase().getMediaPackage(mediaPackageId);
312       default:
313         if (isAuthorized(mediaPackageId, READ_ACTION)) {
314           return getDatabase().getMediaPackage(mediaPackageId, orgId);
315         }
316         return Optional.empty();
317     }
318   }
319 
320   @Override
321   public List<Snapshot> getLatestSnapshots(Collection mediaPackageIds) {
322     String orgId = securityService.getOrganization().getId();
323     switch (isAdmin()) {
324       case GLOBAL:
325         return getDatabase().getLatestSnapshotsByMediaPackageIds(mediaPackageIds, null);
326       default:
327         mediaPackageIds = isAuthorized(mediaPackageIds.stream().toList(), READ_ACTION);
328         return getDatabase().getLatestSnapshotsByMediaPackageIds(mediaPackageIds, orgId);
329     }
330   }
331 
332   @Override
333   public Optional<Snapshot> getLatestSnapshot(String mediaPackageId) {
334     String orgId = securityService.getOrganization().getId();
335     switch (isAdmin()) {
336       case GLOBAL:
337         return getDatabase().getLatestSnapshot(mediaPackageId);
338       default:
339         if (isAuthorized(mediaPackageId, READ_ACTION)) {
340           return getDatabase().getLatestSnapshot(mediaPackageId, orgId);
341         }
342         return Optional.empty();
343     }
344   }
345 
346   @Override
347   public Optional<Asset> getAsset(Version version, String mpId, String mpElementId) {
348     if (isAuthorized(mpId, READ_ACTION)) {
349       // try to fetch the asset
350       var assetDto = getDatabase().getAsset(RuntimeTypes.convert(version), mpId, mpElementId);
351       if (assetDto.isPresent()) {
352         var storageId = getSnapshotStorageLocation(version, mpId);
353         if (storageId.isPresent()) {
354           var store = getAssetStore(storageId.get());
355           if (store.isPresent()) {
356             var assetStream = store.get().get(StoragePath.mk(
357                 assetDto.get().getSnapshot().getOrganizationId(),
358                 mpId,
359                 version,
360                 mpElementId
361             ));
362             if (assetStream.isPresent()) {
363 
364               Checksum checksum = null;
365               try {
366                 checksum = Checksum.fromString(assetDto.get().getChecksum());
367               } catch (NoSuchAlgorithmException e) {
368                 logger.warn("Invalid checksum for asset {} of media package {}", mpElementId, mpId, e);
369               }
370 
371               final Asset a = new AssetImpl(
372                       AssetId.mk(version, mpId, mpElementId),
373                       assetStream.get(),
374                       assetDto.get().getMimeType(),
375                       assetDto.get().getSize(),
376                       assetDto.get().getSnapshot().getStorageId(),
377                       Availability.valueOf(assetDto.get().getSnapshot().getAvailability()),
378                       checksum);
379               return Optional.of(a);
380             }
381           }
382         }
383       }
384       return Optional.empty();
385     }
386     throw new RuntimeException(new UnauthorizedException(
387             format("Not allowed to read assets of snapshot %s, version=%s", mpId, version)
388     ));
389   }
390 
391   @Override
392   public Optional<AssetStore> getAssetStore(String storeId) {
393     if (assetStore.getStoreType().equals(storeId)) {
394       return Optional.of(assetStore);
395     } else {
396       if (remoteStores.containsKey(storeId)) {
397         return Optional.of(remoteStores.get(storeId));
398       } else {
399         return Optional.empty();
400       }
401     }
402   }
403 
404   @Override
405   public AssetStore getLocalAssetStore() {
406     return assetStore;
407   }
408 
409   @Override
410   public List<AssetStore> getRemoteAssetStores() {
411     return new ArrayList<>(remoteStores.values());
412   }
413 
414   /** Snapshots */
415 
416   @Override
417   public boolean snapshotExists(final String mediaPackageId) {
418     return getDatabase().snapshotExists(mediaPackageId);
419   }
420 
421   @Override
422   public boolean snapshotExists(final String mediaPackageId, final String organization) {
423     return getDatabase().snapshotExists(mediaPackageId, organization);
424   }
425 
426   @Override
427   public Snapshot takeSnapshot(MediaPackage mp) {
428     return takeSnapshot(null, mp);
429   }
430 
431   @Override
432   public Snapshot takeSnapshot(String owner, MediaPackage mp) {
433 
434     final String mediaPackageId = mp.getIdentifier().toString();
435     final boolean firstSnapshot = !snapshotExists(mediaPackageId);
436 
437     // Allow this if:
438     //  - no previous snapshot exists
439     //  - the user has write access to the previous snapshot
440     if (firstSnapshot) {
441       // if it's the first snapshot, ensure that old, leftover properties are removed
442       deleteProperties(mediaPackageId);
443     }
444     if (firstSnapshot || isAuthorized(mediaPackageId, WRITE_ACTION)) {
445       final Snapshot snapshot;
446       if (owner == null) {
447         snapshot = takeSnapshotInternal(mp);
448       } else {
449         snapshot = takeSnapshotInternal(owner, mp);
450       }
451 
452       final AccessControlList acl = authorizationService.getActiveAcl(mp).getA();
453       // store acl as properties
454       // Drop old ACL rules
455       deleteProperties(mediaPackageId, SECURITY_NAMESPACE);
456       // Set new ACL rules
457       for (final AccessControlEntry ace : acl.getEntries()) {
458         getDatabase().saveProperty(Property.mk(PropertyId.mk(mediaPackageId, SECURITY_NAMESPACE,
459                 mkPropertyName(ace.getRole(), ace.getAction())), Value.mk(ace.isAllow())));
460       }
461 
462       updateEventInIndex(snapshot);
463 
464       logger.info("Trigger update handlers for snapshot {}, version {}",
465               snapshot.getMediaPackage().getIdentifier(), snapshot.getVersion());
466       fireEventHandlers(mkTakeSnapshotMessage(snapshot));
467 
468       return snapshot;
469     }
470     throw new RuntimeException(new UnauthorizedException(
471         "Not allowed to take snapshot of media package " + mediaPackageId));
472   }
473 
474   private Snapshot takeSnapshotInternal(MediaPackage mediaPackage) {
475     final String mediaPackageId = mediaPackage.getIdentifier().toString();
476     String orgId = securityService.getOrganization().getId();
477     Optional<Snapshot> snapshot;
478     switch (isAdmin()) {
479       case GLOBAL:
480         snapshot = getDatabase().getLatestSnapshot(mediaPackageId);
481         break;
482       default:
483         if (isAuthorized(mediaPackageId, WRITE_ACTION)) {
484           snapshot = getDatabase().getLatestSnapshot(mediaPackageId, orgId);
485         } else {
486           snapshot = Optional.empty();
487         }
488         break;
489     }
490     if (snapshot.isPresent()) {
491       return takeSnapshotInternal(snapshot.get().getOwner(), mediaPackage);
492     }
493     return takeSnapshotInternal(DEFAULT_OWNER, mediaPackage);
494   }
495 
496   private Snapshot takeSnapshotInternal(final String owner, final MediaPackage mp) {
497     try {
498       Snapshot archived = addInternal(owner, MediaPackageSupport.copy(mp)).toSnapshot();
499       return getHttpAssetProvider().prepareForDelivery(archived);
500     } catch (Exception e) {
501       logger.error("An error occurred", e);
502       throw unwrapExceptionUntil(AssetManagerException.class, e).orElse(new AssetManagerException(e));
503     }
504   }
505 
506   /**
507    * Create a {@link AssetManagerItem.TakeSnapshot} message.
508    * <p>
509    * Do not call outside of a security context.
510    */
511   private AssetManagerItem.TakeSnapshot mkTakeSnapshotMessage(Snapshot snapshot) {
512     final MediaPackage mp = snapshot.getMediaPackage();
513 
514     long version;
515     try {
516       version = Long.parseLong(snapshot.getVersion().toString());
517     } catch (NumberFormatException e) {
518       // The index requires a version to be a long value.
519       // Since the asset manager default implementation uses long values that should be not a problem.
520       // However, a decent exception message is helpful if a different implementation of the asset manager
521       // is used.
522       throw new RuntimeException("The current implementation of the index requires versions being of type 'long'.");
523     }
524 
525     return AssetManagerItem.add(workspace, mp, authorizationService.getActiveAcl(mp).getA(),
526             version, snapshot.getArchivalDate());
527   }
528 
529   @Override
530   public void triggerIndexUpdate(String mediaPackageId) throws NotFoundException, UnauthorizedException {
531 
532     if (!securityService.getUser().hasRole("ROLE_ADMIN")) {
533       throw new UnauthorizedException("Only global administrators may trigger manual event updates.");
534     }
535     Optional<Snapshot> snapshot = getDatabase().getLatestSnapshot(mediaPackageId);
536 
537     if (snapshot.isEmpty()) {
538       throw new NotFoundException("No event with ID `" + mediaPackageId + "`");
539     }
540 
541     // Update event index with latest snapshot
542     updateEventInIndex(snapshot.get());
543   }
544 
545   /**
546    * Update the event in the Elasticsearch index.
547    *
548    * @param snapshot
549    *         The newest snapshot of the event to update
550    */
551   private void updateEventInIndex(Snapshot snapshot) {
552     final String eventId = snapshot.getMediaPackage().getIdentifier().toString();
553     final Organization organization = securityService.getOrganization();
554     final User user = securityService.getUser();
555 
556     logger.debug("Updating event {} in the {} index.", eventId, index.getIndexName());
557     Function<Optional<Event>, Optional<Event>> updateFunction = getEventUpdateFunction(
558         snapshot,
559         organization.getId(),
560         user
561     );
562 
563     try {
564       index.addOrUpdateEvent(eventId, updateFunction, organization, user);
565       logger.debug("Event {} updated in the {} index.", eventId, index.getIndexName());
566     } catch (SearchIndexException e) {
567       logger.error("Error updating the event {} in the {} index.", eventId, index.getIndexName(), e);
568     }
569   }
570 
571   /**
572    * Remove the event from the Elasticsearch index
573    *
574    * @param eventId
575    *         The id of the event to remove
576    */
577   private void removeArchivedVersionFromIndex(String eventId) {
578     final Organization organization = securityService.getOrganization();
579     final User user = securityService.getUser();
580     logger.debug("Received AssetManager delete episode message {}", eventId);
581 
582     Function<Optional<Event>, Optional<Event>> updateFunction = (Optional<Event> eventOpt) -> {
583       if (eventOpt.isEmpty()) {
584         logger.warn("Event {} not found for deletion", eventId);
585         return Optional.empty();
586       }
587       Event event = eventOpt.get();
588       event.setArchiveVersion(null);
589       return Optional.of(event);
590     };
591 
592     try {
593       index.addOrUpdateEvent(eventId, updateFunction, organization, user);
594       logger.debug("Event {} removed from the {} index", eventId, index.getIndexName());
595     } catch (SearchIndexException e) {
596       logger.error("Error deleting the event {} from the {} index.", eventId, index.getIndexName(), e);
597     }
598   }
599 
600   @Override
601   public List<Snapshot> getSnapshotsById(final String mpId) {
602     RequireUtil.requireNotBlank(mpId, "mpId");
603 
604     String orgId = securityService.getOrganization().getId();
605 
606     switch (isAdmin()) {
607       case GLOBAL:
608         return getDatabase().getSnapshots(mpId);
609       default:
610         if (isAuthorized(mpId, READ_ACTION)) {
611           return getDatabase().getSnapshots(mpId, orgId);
612         }
613         return new ArrayList<>();
614     }
615   }
616 
617   @Override
618   public List<Snapshot> getSnapshotsByIdOrderedByVersion(String mpId, boolean asc) {
619     RequireUtil.requireNotBlank(mpId, "mpId");
620 
621     String order;
622     if (asc) {
623       order = "ASC";
624     } else {
625       order = "DESC";
626     }
627 
628     String orgId = securityService.getOrganization().getId();
629     switch (isAdmin()) {
630       case GLOBAL:
631         return getDatabase().getSnapshots(mpId, null, order);
632       default:
633         if (isAuthorized(mpId, READ_ACTION)) {
634           return getDatabase().getSnapshots(mpId, orgId);
635         }
636         return new ArrayList<>();
637     }
638   }
639 
640   @Override
641   public List<Snapshot> getSnapshotsByIdAndVersion(final String mpId, final Version version) {
642     RequireUtil.requireNotBlank(mpId, "mpId");
643     RequireUtil.notNull(version, "version");
644 
645     String orgId = securityService.getOrganization().getId();
646     // TODO: Simplify the version class?
647     Long v = Long.parseLong(version.toString());
648     switch (isAdmin()) {
649       case GLOBAL:
650         return getDatabase().getSnapshotsByMpIdAndVersion(mpId, v, null);
651       default:
652         if (isAuthorized(mpId, READ_ACTION)) {
653           return getDatabase().getSnapshotsByMpIdAndVersion(mpId, v, orgId);
654         }
655         return new ArrayList<>();
656     }
657   }
658 
659   @Override
660   public List<Snapshot> getSnapshotsByDateOrderedById(Date start, Date end) {
661     RequireUtil.notNull(start, "start");
662     RequireUtil.notNull(end, "end");
663 
664     String orgId = securityService.getOrganization().getId();
665     switch (isAdmin()) {
666       case GLOBAL:
667         return getDatabase().getSnapshotsByDateOrderByMpId(start, end, null);
668       case ORGANIZATION:
669         return getDatabase().getSnapshotsByDateOrderByMpId(start, end, orgId);
670       default:
671         List<Snapshot> snapshots = new ArrayList<>();
672         List<Snapshot> snaps = getDatabase().getSnapshotsByDateOrderByMpId(start, end, orgId);
673         for (int i = 0; i < snaps.size(); i++) {
674           if (isAuthorized(snaps.get(i).getMediaPackage().getIdentifier().toString(), READ_ACTION)) {
675             snapshots.add(snaps.get(i));
676           }
677         }
678         return snapshots;
679     }
680   }
681 
682   @Override
683   public List<Snapshot> getSnapshotsByIdAndDate(final String mpId, final Date start, final Date end) {
684     RequireUtil.requireNotBlank(mpId, "mpId");
685     RequireUtil.notNull(start, "start");
686     RequireUtil.notNull(end, "end");
687 
688     String orgId = securityService.getOrganization().getId();
689     switch (isAdmin()) {
690       case GLOBAL:
691         return getDatabase().getSnapshotsByMpdIdAndDate(mpId, start, end, null);
692       default:
693         if (isAuthorized(mpId, READ_ACTION)) {
694           return getDatabase().getSnapshotsByMpdIdAndDate(mpId, start, end, orgId);
695         }
696         return new ArrayList<>();
697     }
698   }
699 
700   @Override
701   public List<Snapshot> getSnapshotsByIdAndDateOrderedByVersion(String mpId, Date start, Date end, boolean asc) {
702     RequireUtil.requireNotBlank(mpId, "mpId");
703     RequireUtil.notNull(start, "start");
704     RequireUtil.notNull(end, "end");
705 
706     String order;
707     if (asc) {
708       order = "ASC";
709     } else {
710       order = "DESC";
711     }
712 
713     String orgId = securityService.getOrganization().getId();
714     switch (isAdmin()) {
715       case GLOBAL:
716         return getDatabase().getSnapshotsByMpdIdAndDate(mpId, start, end, null, order);
717       default:
718         if (isAuthorized(mpId, READ_ACTION)) {
719           return getDatabase().getSnapshotsByMpdIdAndDate(mpId, start, end, orgId, order);
720         }
721         return new ArrayList<>();
722     }
723   }
724 
725   @Override
726   public List<Snapshot> getLatestSnapshotsBySeriesId(final String seriesId) {
727     RequireUtil.requireNotBlank(seriesId, "seriesId");
728 
729     String orgId = securityService.getOrganization().getId();
730 
731     switch (isAdmin()) {
732       case GLOBAL:
733         return getDatabase().getSnapshotsBySeries(seriesId, null);
734       case ORGANIZATION:
735         return getDatabase().getSnapshotsBySeries(seriesId, orgId);
736       default:
737         List<Snapshot> snapshots = new ArrayList<>();
738         List<Snapshot> snaps = getDatabase().getSnapshotsBySeries(seriesId, orgId);
739         for (int i = 0; i < snaps.size(); i++) {
740           if (isAuthorized(snaps.get(i).getMediaPackage().getIdentifier().toString(), READ_ACTION)) {
741             snapshots.add(snaps.get(i));
742           }
743         }
744         return snapshots;
745     }
746   }
747 
748   @Override
749   public Optional<Snapshot> getSnapshotByMpIdOrgIdAndVersion(String mpId, String orgId, Version version) {
750     return getDatabase().getSnapshot(mpId, orgId, Long.parseLong(version.toString()));
751   }
752 
753   @Override
754   public int deleteSnapshots(String mpId) {
755     String orgId = securityService.getOrganization().getId();
756     int numberOfDeletedSnapshots = 0;
757     switch (isAdmin()) {
758       case GLOBAL:
759         numberOfDeletedSnapshots = getDatabase().deleteSnapshots(mpId, null);
760         break;
761       default:
762         if (isAuthorized(mpId, WRITE_ACTION)) {
763           numberOfDeletedSnapshots = getDatabase().deleteSnapshots(mpId, orgId);
764         }
765         break;
766     }
767 
768     // delete from store
769     if (numberOfDeletedSnapshots > 0) {
770       final DeletionSelector deletionSelector = DeletionSelector.deleteAll(orgId, mpId);
771       getLocalAssetStore().delete(deletionSelector);
772       for (AssetStore as : getRemoteAssetStores()) {
773         as.delete(deletionSelector);
774       }
775     }
776 
777     logger.info("Firing event handlers for deleting event {}", mpId);
778     fireEventHandlers(AssetManagerItem.deleteEpisode(mpId, new Date()));
779     removeArchivedVersionFromIndex(mpId);
780 
781     return numberOfDeletedSnapshots;
782   }
783 
784   @Override
785   public int deleteAllButLatestSnapshot(String mpId) {
786     String orgId = securityService.getOrganization().getId();
787     int numberOfDeletedSnapshots = 0;
788     List<Long> versions = getDatabase().getVersionsByMediaPackage(mpId, null);
789 
790     switch (isAdmin()) {
791       case GLOBAL:
792         numberOfDeletedSnapshots = getDatabase().deleteAllButLatestSnapshot(mpId, null);
793         break;
794       default:
795         if (isAuthorized(mpId, WRITE_ACTION)) {
796           numberOfDeletedSnapshots = getDatabase().deleteAllButLatestSnapshot(mpId, orgId);
797         }
798         break;
799     }
800 
801     // delete from store
802     if (numberOfDeletedSnapshots > 0) {
803       // Skip last version
804       for (int i = 0; i < versions.size() - 1; i++) {
805         final DeletionSelector deletionSelector = DeletionSelector.delete(orgId, mpId,
806             new VersionImpl(versions.get(i)));
807         getLocalAssetStore().delete(deletionSelector);
808         for (AssetStore as : getRemoteAssetStores()) {
809           as.delete(deletionSelector);
810         }
811       }
812     }
813 
814     return numberOfDeletedSnapshots;
815   }
816 
817   @Override
818   public void moveSnapshotsById(final String mpId, final String targetStore) throws NotFoundException {
819     List<Snapshot> snapshots = getSnapshotsById(mpId);
820 
821     if (snapshots.isEmpty()) {
822       throw new NotFoundException("Mediapackage " + mpId + " not found!");
823     }
824 
825     processOperations(snapshots, targetStore);
826   }
827 
828   @Override
829   public void moveSnapshotsByIdAndVersion(final String mpId, final Version version, final String targetStore)
830           throws NotFoundException {
831     List<Snapshot> snapshots = getSnapshotsByIdAndVersion(mpId, version);
832 
833     if (snapshots.isEmpty()) {
834       throw new NotFoundException("Mediapackage " + mpId + "@" + version.toString() + " not found!");
835     }
836 
837     processOperations(snapshots, targetStore);
838   }
839 
840   @Override
841   public void moveSnapshotsByDate(final Date start, final Date end, final String targetStore)
842           throws NotFoundException {
843     String orgId = securityService.getOrganization().getId();
844     List<Snapshot> snapshots = new ArrayList<>();
845     switch (isAdmin()) {
846       case GLOBAL:
847         snapshots = getDatabase().getSnapshotsByNotStorageAndDate(targetStore, start, end, null);
848         break;
849       case ORGANIZATION:
850         snapshots = getDatabase().getSnapshotsByNotStorageAndDate(targetStore, start, end, orgId);
851         break;
852       default:
853         List<Snapshot> snaps = getDatabase().getSnapshotsByNotStorageAndDate(targetStore, start, end, orgId);
854         for (int i = 0; i < snaps.size(); i++) {
855           if (isAuthorized(snaps.get(i).getMediaPackage().getIdentifier().toString(), READ_ACTION)) {
856             snapshots.add(snaps.get(i));
857           }
858         }
859         break;
860     }
861 
862     if (snapshots.isEmpty()) {
863       throw new NotFoundException("No media packages found between " + start + " and " + end);
864     }
865 
866     processOperations(snapshots, targetStore);
867   }
868 
869   @Override
870   public void moveSnapshotsByIdAndDate(final String mpId, final Date start, final Date end, final String targetStore)
871           throws NotFoundException {
872     List<Snapshot> snapshots = getSnapshotsByIdAndDate(mpId, start, end);
873 
874     if (snapshots.isEmpty()) {
875       throw new NotFoundException("No media package with id " + mpId + " found between " + start + " and " + end);
876     }
877 
878     processOperations(snapshots, targetStore);
879   }
880 
881   @Override
882   public void moveSnapshotToStore(final Version version, final String mpId, final String storeId)
883           throws NotFoundException {
884 
885     //Find the snapshot
886     List<Snapshot> snapshots = getSnapshotsByIdAndVersion(mpId, version);
887 
888     if (snapshots.isEmpty()) {
889       throw new NotFoundException("Mediapackage " + mpId + "@" + version.toString() + " not found!");
890     }
891     processOperations(snapshots, storeId);
892   }
893 
894   //Do the actual moving
895   //TODO: Compare this to AssetManagerJobProducer.moveSnapshots. Check if they can be combined.
896   private void processOperations(List<Snapshot> snapshots, final String targetStoreId) {
897     snapshots.forEach(s -> {
898 //      Snapshot s = record.getSnapshot().get();
899       Optional<String> currentStoreId = getSnapshotStorageLocation(s);
900 
901       if (currentStoreId.isEmpty()) {
902         logger.warn("IsNone store ID");
903         return;
904       }
905 
906       //If this snapshot is already stored in the desired store
907       if (currentStoreId.get().equals(targetStoreId)) {
908         //return, since we don't need to move anything
909         return;
910       }
911 
912       AssetStore currentStore;
913       AssetStore targetStore;
914 
915       Optional<AssetStore> optCurrentStore = getAssetStore(currentStoreId.get());
916       Optional<AssetStore> optTargetStore = getAssetStore(targetStoreId);
917 
918       if (!optCurrentStore.isEmpty()) {
919         currentStore = optCurrentStore.get();
920       } else {
921         logger.error("Unknown current store: " + currentStoreId.get());
922         return;
923       }
924       if (!optTargetStore.isEmpty()) {
925         targetStore = optTargetStore.get();
926       } else {
927         logger.error("Unknown target store: " + targetStoreId);
928         return;
929       }
930 
931       //If the content is already local, or is moving from a remote to the local
932       // Returns true if the store id is equal to the local asset store's id
933       String localAssetStoreType = getLocalAssetStore().getStoreType();
934       if (localAssetStoreType.equals(currentStoreId.get()) || localAssetStoreType.equals(targetStoreId)) {
935         logger.debug("Moving {} from {} to {}", s, currentStoreId, targetStoreId);
936 
937         try {
938           copyAssetsToStore(s, targetStore);
939           copyManifest(s, targetStore);
940         } catch (Exception e) {
941           chuck(e);
942         }
943         getDatabase().setStorageLocation(s, targetStoreId);
944         currentStore.delete(DeletionSelector.delete(s.getOrganizationId(),
945                 s.getMediaPackage().getIdentifier().toString(), s.getVersion()
946         ));
947       } else {
948         //Else, the content is *not* local and is going to a *different* remote
949         String intermediateStore = getLocalAssetStore().getStoreType();
950         logger.debug("Moving {} from {} to {}, then to {}",
951                 s, currentStoreId, intermediateStore, targetStoreId);
952         Version version = s.getVersion();
953         String mpId = s.getMediaPackage().getIdentifier().toString();
954         try {
955           moveSnapshotToStore(version, mpId, intermediateStore);
956           moveSnapshotToStore(version, mpId, targetStoreId);
957         } catch (NotFoundException e) {
958           chuck(e);
959         }
960       }
961     });
962   }
963 
964   // Return the asset store ID that is currently storing the snapshot
965   public Optional<String> getSnapshotStorageLocation(final Version version, final String mpId) {
966     List<Snapshot> snapshots = getSnapshotsByIdAndVersion(mpId, version);
967 
968     for (Snapshot snapshot : snapshots) {
969       return Optional.of(snapshot.getStorageId());
970     }
971 
972     logger.error("Mediapackage " + mpId + "@" + version + " not found!");
973     return Optional.empty();
974   }
975 
976   public Optional<String> getSnapshotStorageLocation(final Snapshot snap) {
977     return getSnapshotStorageLocation(snap.getVersion(), snap.getMediaPackage().getIdentifier().toString());
978   }
979 
980   /** Properties */
981 
982   @Override
983   public boolean setProperty(Property property) {
984     final String mpId = property.getId().getMediaPackageId();
985     if (isAuthorized(mpId, WRITE_ACTION)) {
986       return getDatabase().saveProperty(property);
987     }
988     throw new RuntimeException(new UnauthorizedException("Not allowed to set property on episode " + mpId));
989   }
990 
991   @Override
992   public List<Property> selectProperties(final String mediaPackageId, String namespace) {
993     if (isAuthorized(mediaPackageId, READ_ACTION)) {
994       return getDatabase().selectProperties(mediaPackageId, namespace);
995     }
996     throw new RuntimeException(new UnauthorizedException(format(
997         "Not allowed to read properties of event %s", mediaPackageId)));
998   }
999 
1000   @Override
1001   public int deleteProperties(final String mediaPackageId) {
1002     return getDatabase().deleteProperties(mediaPackageId);
1003   }
1004 
1005   @Override
1006   public int deleteProperties(final String mediaPackageId, final String namespace) {
1007     return getDatabase().deleteProperties(mediaPackageId, namespace);
1008   }
1009 
1010   @Override
1011   public int deletePropertiesWithCurrentUser(final String mediaPackageId, final String namespace) {
1012     User user = securityService.getUser();
1013     switch (isAdmin()) {
1014       case GLOBAL:
1015         return getDatabase().deleteProperties(mediaPackageId, namespace);
1016       case ORGANIZATION:
1017         Optional<Snapshot> snapshot = getDatabase().getLatestSnapshot(mediaPackageId);
1018         if (snapshot.isPresent() && snapshot.get().getOrganizationId().equals(user.getOrganization().getId())) {
1019           return getDatabase().deleteProperties(mediaPackageId, namespace);
1020         }
1021         return 0;
1022       default:
1023         Optional<MediaPackage> mediaPackage = getMediaPackage(mediaPackageId);
1024         if (mediaPackage.isPresent() && isAuthorized(mediaPackage.get().getIdentifier().toString(), WRITE_ACTION)) {
1025           return getDatabase().deleteProperties(mediaPackageId, namespace);
1026         }
1027         return 0;
1028     }
1029   }
1030 
1031   /** Misc. */
1032 
1033   @Override
1034   public Optional<Version> toVersion(String version) {
1035     try {
1036       return Optional.of(VersionImpl.mk(Long.parseLong(version)));
1037     } catch (NumberFormatException e) {
1038       return Optional.empty();
1039     }
1040   }
1041 
1042   @Override
1043   public long countEvents(final String organization) {
1044     return getDatabase().countEvents(organization);
1045   }
1046 
1047   @Override
1048   public long countSnapshots(final String organization) {
1049     return getDatabase().countSnapshots(organization);
1050   }
1051 
1052   @Override
1053   public long countAssets() {
1054     return getDatabase().countAssets();
1055   }
1056 
1057   @Override
1058   public long countProperties() {
1059     return getDatabase().countProperties();
1060   }
1061 
1062   /**
1063    * AbstractIndexProducer Implementation
1064    */
1065 
1066   @Override
1067   public IndexRebuildService.Service getService() {
1068     return IndexRebuildService.Service.AssetManager;
1069   }
1070 
1071   @Override
1072   public DataType[] getSupportedDataTypes() {
1073     return new DataType[]{ DataType.ALL, DataType.ACL };
1074   }
1075 
1076   @Override
1077   public void repopulate(DataType dataType) throws IndexRebuildException {
1078     final Organization originalOrg = securityService.getOrganization();
1079     final User originalUser = (originalOrg != null ? securityService.getUser() : null);
1080     try {
1081       final Organization defaultOrg = new DefaultOrganization();
1082       final User defaultSystemUser = SecurityUtil.createSystemUser(systemUserName, defaultOrg);
1083       securityService.setOrganization(defaultOrg);
1084       securityService.setUser(defaultSystemUser);
1085 
1086       int offset = 0;
1087       int total = (int) countEvents(null);
1088       int current = 0;
1089       logIndexRebuildBegin(logger, total, "snapshot(s)");
1090       var updatedEventRange = new ArrayList<Event>();
1091       do {
1092         List<Snapshot> snapshots = getDatabase().getSnapshotsForIndexRebuild(offset, PAGE_SIZE);
1093         offset += PAGE_SIZE;
1094         int n = 20;
1095 
1096         final Map<String, List<Snapshot>> byOrg = snapshots.stream()
1097             .collect(Collectors.groupingBy(Snapshot::getOrganizationId));
1098         for (String orgId : byOrg.keySet()) {
1099           final Organization snapshotOrg;
1100           try {
1101             snapshotOrg = orgDir.getOrganization(orgId);
1102             User snapshotSystemUser = SecurityUtil.createSystemUser(systemUserName, snapshotOrg);
1103             securityService.setOrganization(snapshotOrg);
1104             securityService.setUser(snapshotSystemUser);
1105             for (Snapshot snapshot : byOrg.get(orgId)) {
1106               try {
1107                 current++;
1108 
1109                 var updatedEventData = index.getEvent(
1110                     snapshot.getMediaPackage().getIdentifier().toString(),
1111                     securityService.getOrganization(),
1112                     snapshotSystemUser
1113                 );
1114                 if (dataType == DataType.ALL) {
1115                   // Reindex everything (default)
1116                   updatedEventData = getEventUpdateFunction(snapshot, orgId, snapshotSystemUser)
1117                       .apply(updatedEventData);
1118                 } else if (dataType == DataType.ACL) {
1119                   // Only reindex ACLs
1120                   updatedEventData = getEventUpdateFunctionOnlyAcl(snapshot, orgId)
1121                       .apply(updatedEventData);
1122                 } else {
1123                   throw new IndexRebuildException(dataType + " is not a supported data type. "
1124                       + "Accepted values are " + Arrays.toString(getSupportedDataTypes()) + ".");
1125                 }
1126                 updatedEventRange.add(updatedEventData.get());
1127 
1128                 if (updatedEventRange.size() >= n || current >= total) {
1129                   index.bulkEventUpdate(updatedEventRange, securityService.getOrganization());
1130                   logIndexRebuildProgress(logger, total, current, n);
1131                   updatedEventRange.clear();
1132                 }
1133               } catch (Throwable t) {
1134                 logSkippingElement(logger, "event", snapshot.getMediaPackage().getIdentifier().toString(),
1135                     snapshotOrg, t);
1136               }
1137             }
1138           } catch (Throwable t) {
1139             logIndexRebuildError(logger, t, originalOrg);
1140             throw new IndexRebuildException(getService(), originalOrg, t);
1141           } finally {
1142             securityService.setOrganization(defaultOrg);
1143             securityService.setUser(defaultSystemUser);
1144           }
1145         }
1146       } while (offset < total);
1147     } finally {
1148       securityService.setOrganization(originalOrg);
1149       securityService.setUser(originalUser);
1150     }
1151   }
1152 
1153   /**
1154    * Used for testing
1155    */
1156   public void setAvailability(Version version, String mpId, Availability availability) {
1157     if (isAuthorized(mpId, WRITE_ACTION)) {
1158       getDatabase().setAvailability(RuntimeTypes.convert(version), mpId, availability);
1159     } else {
1160       throw new RuntimeException(new UnauthorizedException("Not allowed to set availability of episode " + mpId));
1161     }
1162   }
1163 
1164   public void setDatabase(Database database) {
1165     this.db = database;
1166   }
1167 
1168   public Database getDatabase() {
1169     return db;
1170   }
1171 
1172   public HttpAssetProvider getHttpAssetProvider() {
1173     return httpAssetProvider;
1174   }
1175 
1176   /*
1177    * Security handling
1178    */
1179   /** Check authorization based on the given predicate. */
1180   private boolean isAuthorized(final String mediaPackageId, final String action) {
1181     switch (isAdmin()) {
1182       case GLOBAL:
1183         // grant general access
1184         logger.debug("Access granted since user is global admin");
1185         return true;
1186       case ORGANIZATION:
1187         // ensure that the requested assets belong to this organization
1188         logger.debug("User is organization admin. Checking organization. Checking organization ID of asset.");
1189         return snapshotExists(mediaPackageId, securityService.getOrganization().getId());
1190       default:
1191         // check organization
1192         logger.debug("Non admin user. Checking organization.");
1193         final String org = securityService.getOrganization().getId();
1194         if (!snapshotExists(mediaPackageId, org)) {
1195           return false;
1196         }
1197         // check episode role id
1198         User user = securityService.getUser();
1199         if (user.hasRole(getEpisodeRoleId(mediaPackageId, action))) {
1200           return true;
1201         }
1202         // check acl rules
1203         logger.debug("Non admin user. Checking ACL rules.");
1204         // TODO: Replace this custom ACL check with the general check from the auth service
1205         //   Warning: For now this will cause many difficult to track down bugs and is thus hardly possible
1206         // return authorizationService.hasPermission(getDatabase().getMediaPackage(mediaPackageId).get(), action);
1207         final List<String> roles = user.getRoles().parallelStream()
1208                 .filter(roleFilter)
1209                 .map((role) -> mkPropertyName(role.getName(), action))
1210                 .collect(Collectors.toList());
1211         return getDatabase().selectProperties(mediaPackageId, SECURITY_NAMESPACE).parallelStream()
1212                 .map(p -> p.getId().getName())
1213                 .filter(p -> p.endsWith(action))
1214                 .anyMatch(p -> roles.stream().anyMatch(r -> r.equals(p)));
1215     }
1216   }
1217 
1218   private List<String> isAuthorized(final List<String> mediaPackageIds, final String action) {
1219     return mediaPackageIds.stream()
1220         .filter(id -> isAuthorized(id, action))
1221         .collect(Collectors.toList());
1222   }
1223 
1224   private AdminRole isAdmin() {
1225     final User user = securityService.getUser();
1226     if (user.hasRole(GLOBAL_ADMIN_ROLE)) {
1227       return AdminRole.GLOBAL;
1228     } else if (user.hasRole(securityService.getOrganization().getAdminRole())
1229             || user.hasRole(GLOBAL_CAPTURE_AGENT_ROLE)) {
1230       // In this context, we treat capture agents the same way as organization admins, allowing them access so that
1231       // they can ingest new media without requiring them to be explicitly specified in the ACLs.
1232       return AdminRole.ORGANIZATION;
1233     } else {
1234       return AdminRole.NONE;
1235     }
1236   }
1237 
1238   private String mkPropertyName(String role, String action) {
1239     return role + " | " + action;
1240   }
1241 
1242   /**
1243    * Configurable filter for roles
1244    */
1245   private final java.util.function.Predicate<Role> roleFilter = (role) -> {
1246     final String name = role.getName();
1247     return (includeAPIRoles || !name.startsWith("ROLE_API_"))
1248             && (includeCARoles  || !name.startsWith("ROLE_CAPTURE_AGENT_"))
1249             && (includeUIRoles  || !name.startsWith("ROLE_UI_"));
1250   };
1251 
1252   /*
1253    * Utility
1254    */
1255 
1256   /** Move the assets for a snapshot to the target store */
1257   private void copyAssetsToStore(Snapshot snap, AssetStore store) {
1258     final String mpId = snap.getMediaPackage().getIdentifier().toString();
1259     final String orgId = snap.getOrganizationId();
1260     final Version version = snap.getVersion();
1261     final String prettyMpId = mpId + "@v" + version;
1262     logger.debug("Moving assets for snapshot {} to store {}", prettyMpId, store.getStoreType());
1263     for (final MediaPackageElement e : snap.getMediaPackage().getElements()) {
1264       if (!MOVABLE_TYPES.contains(e.getElementType())) {
1265         logger.debug("Skipping {} because type is {}", e.getIdentifier(), e.getElementType());
1266         continue;
1267       }
1268       logger.debug("Moving {} to store {}", e.getIdentifier(), store.getStoreType());
1269       final StoragePath storagePath = StoragePath.mk(orgId, mpId, version, e.getIdentifier());
1270       if (store.contains(storagePath)) {
1271         logger.debug("Element {} (version {}) is already in store {} so skipping it", e.getIdentifier(),
1272                 version, store.getStoreType());
1273         continue;
1274       }
1275 
1276       // find asset in versions & stores
1277       final Optional<StoragePath> existingAssetOpt =
1278           getDatabase()
1279           .findAssetByChecksumAndStoreAndOrg(e.getChecksum().toString(), store.getStoreType(), orgId)
1280           .map(dto -> StoragePath.mk(
1281               dto.getSnapshot().getOrganizationId(),
1282               dto.getSnapshot().getMediaPackageId(),
1283               dto.getSnapshot().getVersion(),
1284               dto.getMediaPackageElementId()
1285           ));
1286 
1287       if (existingAssetOpt.isPresent()) {
1288         final StoragePath existingAsset = existingAssetOpt.get();
1289         logger.debug("Content of asset {} with checksum {} already exists in {}",
1290                 existingAsset.getMediaPackageElementId(), e.getChecksum(), store.getStoreType());
1291         if (!store.copy(existingAsset, storagePath)) {
1292           throw new AssetManagerException(format(
1293                   "An asset with checksum %s has already been archived but trying to copy or link asset %s to it "
1294                           + "failed",
1295                   e.getChecksum(),
1296                   existingAsset
1297           ));
1298         }
1299       } else {
1300         final Optional<Long> size = e.getSize() > 0 ? Optional.of(e.getSize()) : Optional.empty();
1301         store.put(storagePath, Source.mk(e.getURI(), size, Optional.ofNullable(e.getMimeType())));
1302       }
1303       getDatabase().setAssetStorageLocation(VersionImpl.mk(version), mpId, e.getIdentifier(), store.getStoreType());
1304     }
1305   }
1306 
1307   private void copyManifest(Snapshot snap, AssetStore targetStore) throws IOException, NotFoundException {
1308     final String mpId = snap.getMediaPackage().getIdentifier().toString();
1309     final String orgId = snap.getOrganizationId();
1310     final Version version = snap.getVersion();
1311 
1312     AssetStore currentStore = getAssetStore(snap.getStorageId()).get();
1313     Optional<String> manifestOpt = findManifestBaseName(snap, MANIFEST_DEFAULT_NAME, currentStore);
1314     if (manifestOpt.isEmpty()) {
1315       return; // Nothing to do, already moved to long-term storage
1316     }
1317 
1318     // Copy the manifest file
1319     String manifestBaseName = manifestOpt.get();
1320     StoragePath pathToManifest = new StoragePath(orgId, mpId, version, manifestBaseName);
1321 
1322     // Already copied?
1323     if (!targetStore.contains(pathToManifest)) {
1324       Optional<InputStream> inputStreamOpt;
1325       InputStream inputStream = null;
1326       String manifestFileName = null;
1327       try {
1328         inputStreamOpt = currentStore.get(pathToManifest);
1329         if (inputStreamOpt.isEmpty()) { // This should never happen because it has been tested before
1330           throw new NotFoundException(
1331                   String.format("Unexpected error. Manifest %s not found in current asset store", manifestBaseName));
1332         }
1333 
1334         inputStream = inputStreamOpt.get();
1335         manifestFileName = UUID.randomUUID() + ".xml";
1336         URI manifestTmpUri = workspace.putInCollection("archive", manifestFileName, inputStream);
1337         targetStore.put(pathToManifest, Source.mk(manifestTmpUri, Optional.empty(), Optional.of(MimeTypes.XML)));
1338       } finally {
1339         IOUtils.closeQuietly(inputStream);
1340         try {
1341           // Make sure to clean up the temporary file
1342           workspace.deleteFromCollection("archive", manifestFileName);
1343         } catch (NotFoundException e) {
1344           // This is OK, we are deleting it anyway
1345         } catch (IOException e) {
1346           // This usually happens when the collection directory cannot be deleted
1347           // because another process is running at the same time and wrote a file there
1348           // after it was tested but before it was actually deleted. We will consider this ok.
1349           // Does the error message mention the manifest file name?
1350           if (e.getMessage().contains(manifestFileName)) {
1351             logger.warn("The manifest file {} didn't get deleted from the archive collection",
1352                     manifestBaseName, e);
1353           }
1354           // Else the error is related to the file-archive collection, which is fine
1355         }
1356       }
1357     }
1358   }
1359 
1360   Optional<String> findManifestBaseName(Snapshot snap, String manifestName, AssetStore store) {
1361     StoragePath path = new StoragePath(snap.getOrganizationId(), snap.getMediaPackage().getIdentifier().toString(),
1362             snap.getVersion(), manifestName);
1363     // If manifest_.xml, etc not found, return previous name (copied from the EpsiodeServiceImpl logic)
1364     if (!store.contains(path)) {
1365       // If first call, manifest is not found, which probably means it has already been moved
1366       if (MANIFEST_DEFAULT_NAME.equals(manifestName)) {
1367         return Optional.empty(); // No manifest found in current store
1368       } else {
1369         return Optional.of(manifestName.substring(0, manifestName.length() - 1));
1370       }
1371     }
1372     // This is the same logic as when building the manifest name: manifest, manifest_, manifest__, etc
1373     return findManifestBaseName(snap, manifestName + "_", store);
1374   }
1375 
1376   /* -------------------------------------------------------------------------------------------------------------- */
1377 
1378   /**
1379    * Make sure each of the elements has a checksum.
1380    */
1381   void calcChecksumsForMediaPackageElements(PartialMediaPackage pmp) {
1382     pmp.getElements().stream()
1383         .filter(hasNoChecksum)
1384         .forEach(mpe -> {
1385           File file = null;
1386           try {
1387             logger.trace("Calculate checksum for {}", mpe.getURI());
1388             file = workspace.get(mpe.getURI(), true);
1389             mpe.setChecksum(Checksum.create(ChecksumType.DEFAULT_TYPE, file));
1390           } catch (IOException | NotFoundException e) {
1391             throw new AssetManagerException(String.format(
1392                 "Cannot calculate checksum for media package element %s",
1393                 mpe.getURI()
1394             ), e);
1395           } finally {
1396             if (file != null) {
1397               FileUtils.deleteQuietly(file);
1398             }
1399           }
1400         });
1401   }
1402 
1403   /** Mutates mp and its elements, so make sure to work on a copy. */
1404   private SnapshotDto addInternal(String owner, final MediaPackage mp) throws Exception {
1405     final Date now = new Date();
1406     // claim a new version for the media package
1407     final String mpId = mp.getIdentifier().toString();
1408     final VersionImpl version = getDatabase().claimVersion(mpId);
1409     logger.info("Creating new version {} of media package {}", version, mp);
1410     final PartialMediaPackage pmp = assetsOnly(mp);
1411     // make sure they have a checksum
1412     calcChecksumsForMediaPackageElements(pmp);
1413     // download and archive elements
1414     storeAssets(pmp, version);
1415     // store mediapackage in db
1416     final SnapshotDto snapshotDto;
1417     try {
1418       // rewrite URIs for archival
1419       for (MediaPackageElement mpe : pmp.getElements()) {
1420         String fileName = getFileName(mpe).orElse("unknown");
1421         URI archiveUri = new URI(
1422             "urn",
1423             "matterhorn:" + mpId + ":" + version + ":" + mpe.getIdentifier() + ":" + fileName,
1424             null
1425         );
1426         mpe.setURI(archiveUri);
1427       }
1428 
1429       String currentOrgId = securityService.getOrganization().getId();
1430       snapshotDto = getDatabase().saveSnapshot(
1431               currentOrgId, pmp, now, version,
1432               Availability.ONLINE, getLocalAssetStore().getStoreType(), owner
1433       );
1434     } catch (AssetManagerException e) {
1435       logger.error("Could not take snapshot {}", mpId, e);
1436       throw new AssetManagerException(e);
1437     }
1438     // save manifest to element store
1439     // this is done at the end after the media package element ids have been rewritten to neutral URNs
1440     storeManifest(pmp, version);
1441     return snapshotDto;
1442   }
1443 
1444   /**
1445    * Store all elements of <code>pmp</code> under the given version.
1446    */
1447   private void storeAssets(final PartialMediaPackage pmp, final Version version) {
1448     final String mpId = pmp.getMediaPackage().getIdentifier().toString();
1449     final String orgId = securityService.getOrganization().getId();
1450     for (final MediaPackageElement e : pmp.getElements()) {
1451       logger.debug("Archiving {} {} {}", e.getFlavor(), e.getMimeType(), e.getURI());
1452       final StoragePath storagePath = StoragePath.mk(orgId, mpId, version, e.getIdentifier());
1453       // find asset in versions
1454       final Optional<StoragePath> existingAssetOpt = getDatabase()
1455           .findAssetByChecksumAndStoreAndOrg(e.getChecksum().toString(), getLocalAssetStore().getStoreType(), orgId)
1456           .map(dto -> StoragePath.mk(
1457                   dto.getSnapshot().getOrganizationId(),
1458                   dto.getSnapshot().getMediaPackageId(),
1459                   dto.getSnapshot().getVersion(),
1460                   dto.getMediaPackageElementId()));
1461 
1462       if (existingAssetOpt.isPresent()) {
1463         final StoragePath existingAsset = existingAssetOpt.get();
1464         logger.debug("Content of asset {} with checksum {} has been archived before",
1465                 existingAsset.getMediaPackageElementId(), e.getChecksum());
1466         if (!getLocalAssetStore().copy(existingAsset, storagePath)) {
1467           throw new AssetManagerException(format(
1468                   "An asset with checksum %s has already been archived but trying to copy or link asset %s to it "
1469                           + "failed",
1470                   e.getChecksum(),
1471                   existingAsset
1472           ));
1473         }
1474       } else {
1475         final Optional<Long> size = e.getSize() > 0 ? Optional.of(e.getSize()) : Optional.empty();
1476         getLocalAssetStore().put(storagePath, Source.mk(e.getURI(), size, Optional.ofNullable(e.getMimeType())));
1477       }
1478     }
1479   }
1480 
1481   private void storeManifest(final PartialMediaPackage pmp, final Version version) throws Exception {
1482     final String mpId = pmp.getMediaPackage().getIdentifier().toString();
1483     final String orgId = securityService.getOrganization().getId();
1484     // store the manifest.xml
1485     // TODO make use of checksums
1486     logger.debug("Archiving manifest of media package {} version {}", mpId, version);
1487     // temporarily save the manifest XML into the workspace to
1488     // Fix file not found exception when several snapshots are taken at the same time
1489     final String manifestFileName = format("manifest_%s_%s.xml", pmp.getMediaPackage().getIdentifier(), version);
1490     final URI manifestTmpUri = workspace.putInCollection(
1491             "archive",
1492             manifestFileName,
1493             IOUtils.toInputStream(MediaPackageParser.getAsXml(pmp.getMediaPackage()), "UTF-8"));
1494     try {
1495       getLocalAssetStore().put(
1496               StoragePath.mk(orgId, mpId, version, manifestAssetId(pmp, "manifest")),
1497               Source.mk(manifestTmpUri, Optional.empty(), Optional.of(MimeTypes.XML)));
1498     } finally {
1499       // make sure to clean up the temporary file
1500       workspace.deleteFromCollection("archive", manifestFileName);
1501     }
1502   }
1503 
1504   /**
1505    * Create a unique id for the manifest xml. This is to avoid an id collision
1506    * in the rare case that the media package contains an XML element with the id
1507    * used for the manifest. A UUID could also be used but this is far less
1508    * readable.
1509    *
1510    * @param seedId
1511    *          the id to start with
1512    */
1513   private String manifestAssetId(PartialMediaPackage pmp, String seedId) {
1514     for (MediaPackageElement element : pmp.getElements()) {
1515       if (seedId.equals(element.getIdentifier())) {
1516         return manifestAssetId(pmp, seedId + "_");
1517       }
1518     }
1519     return seedId;
1520   }
1521 
1522   /* --------------------------------------------------------------------------------------------------------------- */
1523 
1524   /**
1525    * Walk up the stacktrace to find a cause of type <code>type</code>. Return none if no such
1526    * type can be found.
1527    */
1528   static <A extends Throwable> Optional<A> unwrapExceptionUntil(Class<A> type, Throwable e) {
1529     if (e == null) {
1530       return Optional.empty();
1531     } else if (type.isAssignableFrom(e.getClass())) {
1532       return Optional.of((A) e);
1533     } else {
1534       return unwrapExceptionUntil(type, e.getCause());
1535     }
1536   }
1537 
1538   /**
1539    * Return a partial media package filtering assets. Assets are elements the archive is going to manager, i.e. all
1540    * non-publication elements.
1541    */
1542   static PartialMediaPackage assetsOnly(MediaPackage mp) {
1543     Predicate<MediaPackageElement> isAsset = isNotPublication;
1544     return PartialMediaPackage.mk(mp, isAsset);
1545   }
1546 
1547   /**
1548    * Extract the file name from a media package elements URN.
1549    *
1550    * @return the file name or none if it could not be determined
1551    */
1552   public static Optional<String> getFileNameFromUrn(MediaPackageElement mpe) {
1553     Optional<URI> uri = Optional.ofNullable(mpe.getURI());
1554     if (uri.isPresent() && "urn".equals(uri.get().getScheme())) {
1555       String[] tmp = uri.get().toString().split(":");
1556       if (tmp.length < 1) {
1557         return Optional.empty();
1558       }
1559       return Optional.of(tmp[tmp.length - 1]);
1560     }
1561     return Optional.empty();
1562   }
1563 
1564   /**
1565    * Rewrite URIs of all asset elements of a snapshot's media package.
1566    * This method does not mutate anything.
1567    */
1568   public static Snapshot rewriteUris(Snapshot snapshot, Function<MediaPackageElement, URI> uriCreator) {
1569     final MediaPackage mpCopy = MediaPackageSupport.copy(snapshot.getMediaPackage());
1570     for (final MediaPackageElement mpe : assetsOnly(mpCopy).getElements()) {
1571       mpe.setURI(uriCreator.apply(mpe));
1572     }
1573     return new SnapshotImpl(
1574             snapshot.getVersion(),
1575             snapshot.getOrganizationId(),
1576             snapshot.getArchivalDate(),
1577             snapshot.getAvailability(),
1578             snapshot.getStorageId(),
1579             snapshot.getOwner(),
1580             mpCopy);
1581   }
1582 
1583   public void fireEventHandlers(AssetManagerItem item) {
1584     while (handlers.size() != EXPEXTED_HANDLERS_COUNT) {
1585       logger.warn("Expecting {} handlers, but {} are registered.  Waiting 10s then retrying...",
1586           EXPEXTED_HANDLERS_COUNT, handlers.size());
1587       try {
1588         Thread.sleep(10000L); // 10 seconds
1589       } catch (InterruptedException e) { /* swallow this, nothing to do */ }
1590     }
1591     for (AssetManagerUpdateHandler handler : handlers) {
1592       handler.execute(item);
1593     }
1594   }
1595 
1596   /**
1597    * Get the function to update a commented event in the Elasticsearch index.
1598    *
1599    * @return the function to do the update
1600    */
1601   private Function<Optional<Event>, Optional<Event>> getEventUpdateFunction(Snapshot snapshot,
1602           String orgId, User user) {
1603     return (Optional<Event> eventOpt) -> {
1604       MediaPackage mp = snapshot.getMediaPackage();
1605       String eventId = mp.getIdentifier().toString();
1606       Event event = eventOpt.orElse(new Event(eventId, orgId));
1607 
1608       event = updateAclInEvent(event, mp);
1609 
1610       event.setArchiveVersion(Long.parseLong(snapshot.getVersion().toString()));
1611       if (StringUtils.isBlank(event.getCreator())) {
1612         event.setCreator(securityService.getUser().getName());
1613       }
1614       EventIndexUtils.updateEvent(event, mp);
1615 
1616       for (Catalog catalog: mp.getCatalogs(MediaPackageElements.EPISODE)) {
1617         try (InputStream in = workspace.read(catalog.getURI())) {
1618           EventIndexUtils.updateEvent(event, DublinCores.read(in));
1619         } catch (IOException | NotFoundException e) {
1620           throw new IllegalStateException(String.format("Unable to load dublin core catalog for event '%s'",
1621                   mp.getIdentifier()), e);
1622         }
1623       }
1624 
1625       // extended metadata
1626       event.resetExtendedMetadata();  // getting rid of old data
1627 
1628       List<EventCatalogUIAdapter> orgAdapters = extendedEventCatalogUIAdapters.getOrDefault(orgId, new ArrayList<>());
1629       orgAdapters.addAll(extendedEventCatalogUIAdapters.getOrDefault(ORGANIZATION_WILDCARD, Collections.emptyList()));
1630       for (EventCatalogUIAdapter extendedCatalogUIAdapter : orgAdapters) {
1631         for (Catalog catalog: mp.getCatalogs(extendedCatalogUIAdapter.getFlavor())) {
1632           try (InputStream in = workspace.read(catalog.getURI())) {
1633             EventIndexUtils.updateEventExtendedMetadata(event, DublinCores.read(in),
1634                     extendedCatalogUIAdapter.getFlavor());
1635           } catch (IOException | NotFoundException e) {
1636             throw new IllegalStateException(String.format("Unable to load extended dublin core catalog '%s' for event "
1637                     + "'%s'", catalog.getFlavor(), mp.getIdentifier()), e);
1638           }
1639         }
1640       }
1641 
1642       // Update series name if not already done
1643       try {
1644         EventIndexUtils.updateSeriesName(event, orgId, user, index);
1645       } catch (SearchIndexException e) {
1646         logger.error("Error updating the series name of the event {} in the {} index.", eventId, index.getIndexName(),
1647                 e);
1648       }
1649       return Optional.of(event);
1650     };
1651   }
1652 
1653   private Function<Optional<Event>, Optional<Event>> getEventUpdateFunctionOnlyAcl(Snapshot snapshot,
1654       String orgId) {
1655     return (Optional<Event> eventOpt) -> {
1656       MediaPackage mp = snapshot.getMediaPackage();
1657       String eventId = mp.getIdentifier().toString();
1658       Event event = eventOpt.orElse(new Event(eventId, orgId));
1659 
1660       event = updateAclInEvent(event, mp);
1661 
1662       return Optional.of(event);
1663     };
1664   }
1665 
1666   private Event updateAclInEvent(Event event, MediaPackage mp) {
1667     AccessControlList acl = authorizationService.getActiveAcl(mp).getA();
1668     List<ManagedAcl> acls = aclServiceFactory.serviceFor(securityService.getOrganization()).getAcls();
1669 
1670     Optional<ManagedAcl> managedAcl = AccessInformationUtil.matchAcls(acls, acl);
1671     if (managedAcl.isPresent()) {
1672       event.setManagedAcl(managedAcl.get().getName());
1673     }
1674     event.setAccessPolicy(AccessControlParser.toJsonSilent(acl));
1675 
1676     return event;
1677   }
1678 }