View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.assetmanager.impl;
22  
23  import static com.entwinemedia.fn.Prelude.chuck;
24  import static com.entwinemedia.fn.Stream.$;
25  import static java.lang.String.format;
26  import static org.opencastproject.assetmanager.api.fn.Enrichments.enrich;
27  import static org.opencastproject.mediapackage.MediaPackageSupport.Filters.hasNoChecksum;
28  import static org.opencastproject.mediapackage.MediaPackageSupport.Filters.isNotPublication;
29  import static org.opencastproject.mediapackage.MediaPackageSupport.getFileName;
30  import static org.opencastproject.mediapackage.MediaPackageSupport.getMediaPackageElementId;
31  import static org.opencastproject.security.api.SecurityConstants.EPISODE_ROLE_ID_PREFIX;
32  import static org.opencastproject.security.api.SecurityConstants.GLOBAL_ADMIN_ROLE;
33  import static org.opencastproject.security.api.SecurityConstants.GLOBAL_CAPTURE_AGENT_ROLE;
34  import static org.opencastproject.security.util.SecurityUtil.getEpisodeRoleId;
35  
36  import org.opencastproject.assetmanager.api.Asset;
37  import org.opencastproject.assetmanager.api.AssetId;
38  import org.opencastproject.assetmanager.api.AssetManager;
39  import org.opencastproject.assetmanager.api.AssetManagerException;
40  import org.opencastproject.assetmanager.api.Availability;
41  import org.opencastproject.assetmanager.api.Property;
42  import org.opencastproject.assetmanager.api.PropertyId;
43  import org.opencastproject.assetmanager.api.Snapshot;
44  import org.opencastproject.assetmanager.api.Value;
45  import org.opencastproject.assetmanager.api.Version;
46  import org.opencastproject.assetmanager.api.fn.Enrichments;
47  import org.opencastproject.assetmanager.api.query.ADeleteQuery;
48  import org.opencastproject.assetmanager.api.query.AQueryBuilder;
49  import org.opencastproject.assetmanager.api.query.ARecord;
50  import org.opencastproject.assetmanager.api.query.AResult;
51  import org.opencastproject.assetmanager.api.query.ASelectQuery;
52  import org.opencastproject.assetmanager.api.query.Predicate;
53  import org.opencastproject.assetmanager.api.query.RichAResult;
54  import org.opencastproject.assetmanager.api.query.Target;
55  import org.opencastproject.assetmanager.api.storage.AssetStore;
56  import org.opencastproject.assetmanager.api.storage.DeletionSelector;
57  import org.opencastproject.assetmanager.api.storage.RemoteAssetStore;
58  import org.opencastproject.assetmanager.api.storage.Source;
59  import org.opencastproject.assetmanager.api.storage.StoragePath;
60  import org.opencastproject.assetmanager.impl.persistence.Database;
61  import org.opencastproject.assetmanager.impl.persistence.SnapshotDto;
62  import org.opencastproject.assetmanager.impl.query.AQueryBuilderImpl;
63  import org.opencastproject.assetmanager.impl.query.AbstractADeleteQuery;
64  import org.opencastproject.authorization.xacml.manager.api.AclServiceFactory;
65  import org.opencastproject.authorization.xacml.manager.api.ManagedAcl;
66  import org.opencastproject.authorization.xacml.manager.util.AccessInformationUtil;
67  import org.opencastproject.db.DBSessionFactory;
68  import org.opencastproject.elasticsearch.api.SearchIndexException;
69  import org.opencastproject.elasticsearch.index.ElasticsearchIndex;
70  import org.opencastproject.elasticsearch.index.objects.event.Event;
71  import org.opencastproject.elasticsearch.index.objects.event.EventIndexUtils;
72  import org.opencastproject.elasticsearch.index.rebuild.AbstractIndexProducer;
73  import org.opencastproject.elasticsearch.index.rebuild.IndexProducer;
74  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildException;
75  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService;
76  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService.DataType;
77  import org.opencastproject.mediapackage.Catalog;
78  import org.opencastproject.mediapackage.MediaPackage;
79  import org.opencastproject.mediapackage.MediaPackageElement;
80  import org.opencastproject.mediapackage.MediaPackageElements;
81  import org.opencastproject.mediapackage.MediaPackageParser;
82  import org.opencastproject.mediapackage.MediaPackageSupport;
83  import org.opencastproject.message.broker.api.assetmanager.AssetManagerItem;
84  import org.opencastproject.message.broker.api.update.AssetManagerUpdateHandler;
85  import org.opencastproject.metadata.dublincore.DublinCores;
86  import org.opencastproject.metadata.dublincore.EventCatalogUIAdapter;
87  import org.opencastproject.security.api.AccessControlEntry;
88  import org.opencastproject.security.api.AccessControlList;
89  import org.opencastproject.security.api.AccessControlParser;
90  import org.opencastproject.security.api.AuthorizationService;
91  import org.opencastproject.security.api.DefaultOrganization;
92  import org.opencastproject.security.api.Organization;
93  import org.opencastproject.security.api.OrganizationDirectoryService;
94  import org.opencastproject.security.api.Role;
95  import org.opencastproject.security.api.SecurityService;
96  import org.opencastproject.security.api.UnauthorizedException;
97  import org.opencastproject.security.api.User;
98  import org.opencastproject.security.util.SecurityUtil;
99  import org.opencastproject.util.Checksum;
100 import org.opencastproject.util.ChecksumType;
101 import org.opencastproject.util.MimeTypes;
102 import org.opencastproject.util.NotFoundException;
103 import org.opencastproject.util.RequireUtil;
104 import org.opencastproject.util.data.functions.Functions;
105 import org.opencastproject.workspace.api.Workspace;
106 
107 import com.entwinemedia.fn.Fn;
108 import com.entwinemedia.fn.Fx;
109 import com.entwinemedia.fn.P1;
110 import com.entwinemedia.fn.P1Lazy;
111 import com.entwinemedia.fn.Pred;
112 import com.entwinemedia.fn.Prelude;
113 import com.entwinemedia.fn.fns.Booleans;
114 import com.google.common.collect.Sets;
115 
116 import org.apache.commons.io.FileUtils;
117 import org.apache.commons.io.IOUtils;
118 import org.apache.commons.lang3.BooleanUtils;
119 import org.apache.commons.lang3.StringUtils;
120 import org.osgi.service.component.ComponentContext;
121 import org.osgi.service.component.annotations.Activate;
122 import org.osgi.service.component.annotations.Component;
123 import org.osgi.service.component.annotations.Reference;
124 import org.osgi.service.component.annotations.ReferenceCardinality;
125 import org.osgi.service.component.annotations.ReferencePolicy;
126 import org.slf4j.Logger;
127 import org.slf4j.LoggerFactory;
128 
129 import java.io.File;
130 import java.io.IOException;
131 import java.io.InputStream;
132 import java.net.URI;
133 import java.net.URISyntaxException;
134 import java.security.NoSuchAlgorithmException;
135 import java.util.ArrayList;
136 import java.util.Arrays;
137 import java.util.Collections;
138 import java.util.Date;
139 import java.util.HashMap;
140 import java.util.LinkedHashMap;
141 import java.util.List;
142 import java.util.Map;
143 import java.util.Objects;
144 import java.util.Optional;
145 import java.util.Set;
146 import java.util.UUID;
147 import java.util.concurrent.CopyOnWriteArrayList;
148 import java.util.function.Function;
149 import java.util.stream.Collectors;
150 
151 import javax.persistence.EntityManagerFactory;
152 
153 /**
154  * The Asset Manager implementation.
155  */
156 @Component(
157     property = {
158         "service.description=Opencast Asset Manager"
159     },
160     immediate = true,
161     service = { AssetManager.class, IndexProducer.class }
162 )
163 public class AssetManagerImpl extends AbstractIndexProducer implements AssetManager,
164     AbstractADeleteQuery.DeleteEpisodeHandler {
165 
166   private static final Logger logger = LoggerFactory.getLogger(AssetManagerImpl.class);
167 
168   private static final int PAGE_SIZE = 1000;
169 
170   enum AdminRole {
171     GLOBAL, ORGANIZATION, NONE
172   }
173 
174   public static final String WRITE_ACTION = "write";
175   public static final String READ_ACTION = "read";
176   public static final String SECURITY_NAMESPACE = "org.opencastproject.assetmanager.security";
177 
178   private static final String MANIFEST_DEFAULT_NAME = "manifest";
179 
180   private static final String CONFIG_EPISODE_ID_ROLE = "org.opencastproject.episode.id.role.access";
181   private static boolean episodeIdRole = false;
182 
183   private CopyOnWriteArrayList<AssetManagerUpdateHandler> handlers = new CopyOnWriteArrayList<>();
184 
185   private SecurityService securityService;
186   private AuthorizationService authorizationService;
187   private OrganizationDirectoryService orgDir;
188   private Workspace workspace;
189   private AssetStore assetStore;
190   private HttpAssetProvider httpAssetProvider;
191   private String systemUserName;
192   private Database db;
193   private DBSessionFactory dbSessionFactory;
194   private EntityManagerFactory emf;
195   private AclServiceFactory aclServiceFactory;
196   private ElasticsearchIndex index;
197   private Map<String, List<EventCatalogUIAdapter>> extendedEventCatalogUIAdapters = new HashMap<>();
198 
199   // Settings for role filter
200   private boolean includeAPIRoles;
201   private boolean includeCARoles;
202   private boolean includeUIRoles;
203 
204 
205   public static final Set<MediaPackageElement.Type> MOVABLE_TYPES = Sets.newHashSet(
206           MediaPackageElement.Type.Attachment,
207           MediaPackageElement.Type.Catalog,
208           MediaPackageElement.Type.Track
209   );
210 
211   private final HashMap<String, RemoteAssetStore> remoteStores = new LinkedHashMap<>();
212 
213   /**
214    * OSGi callback.
215    */
216   @Activate
217   public synchronized void activate(ComponentContext cc) {
218     logger.info("Activating AssetManager.");
219     db = new Database(dbSessionFactory.createSession(emf));
220     systemUserName = SecurityUtil.getSystemUserName(cc);
221 
222     includeAPIRoles = BooleanUtils.toBoolean(Objects.toString(cc.getProperties().get("includeAPIRoles"), null));
223     includeCARoles = BooleanUtils.toBoolean(Objects.toString(cc.getProperties().get("includeCARoles"), null));
224     includeUIRoles = BooleanUtils.toBoolean(Objects.toString(cc.getProperties().get("includeUIRoles"), null));
225 
226     episodeIdRole = BooleanUtils.toBoolean(Objects.toString(
227         cc.getBundleContext().getProperty(CONFIG_EPISODE_ID_ROLE), "false"));
228     logger.debug("Usage of episode ID roles is set to {}", episodeIdRole);
229   }
230 
231   /**
232    * OSGi dependencies
233    */
234 
235   @Reference(target = "(osgi.unit.name=org.opencastproject.assetmanager.impl)")
236   public void setEntityManagerFactory(EntityManagerFactory emf) {
237     this.emf = emf;
238   }
239 
240   @Reference
241   public void setDBSessionFactory(DBSessionFactory dbSessionFactory) {
242     this.dbSessionFactory = dbSessionFactory;
243   }
244 
245   @Reference
246   public void setSecurityService(SecurityService securityService) {
247     this.securityService = securityService;
248   }
249 
250   @Reference
251   public void setAuthorizationService(AuthorizationService authorizationService) {
252     this.authorizationService = authorizationService;
253   }
254 
255   @Reference
256   public void setOrgDir(OrganizationDirectoryService orgDir) {
257     this.orgDir = orgDir;
258   }
259 
260   @Reference
261   public void setWorkspace(Workspace workspace) {
262     this.workspace = workspace;
263   }
264 
265   @Reference
266   public void setAssetStore(AssetStore assetStore) {
267     this.assetStore = assetStore;
268   }
269 
270   @Reference(
271       cardinality = ReferenceCardinality.MULTIPLE,
272       policy = ReferencePolicy.DYNAMIC,
273       unbind = "removeEventHandler"
274   )
275   public void addEventHandler(AssetManagerUpdateHandler handler) {
276     this.handlers.add(handler);
277   }
278 
279   public void removeEventHandler(AssetManagerUpdateHandler handler) {
280     this.handlers.remove(handler);
281   }
282 
283   @Reference(
284       cardinality = ReferenceCardinality.MULTIPLE,
285       policy = ReferencePolicy.DYNAMIC,
286       unbind = "removeRemoteAssetStore"
287   )
288   public synchronized void addRemoteAssetStore(RemoteAssetStore assetStore) {
289     remoteStores.put(assetStore.getStoreType(), assetStore);
290   }
291 
292   public void removeRemoteAssetStore(RemoteAssetStore store) {
293     remoteStores.remove(store.getStoreType());
294   }
295 
296   @Reference
297   public void setHttpAssetProvider(HttpAssetProvider httpAssetProvider) {
298     this.httpAssetProvider = httpAssetProvider;
299   }
300 
301   @Reference
302   public void setAclServiceFactory(AclServiceFactory aclServiceFactory) {
303     this.aclServiceFactory = aclServiceFactory;
304   }
305 
306   @Reference
307   public void setIndex(ElasticsearchIndex index) {
308     this.index = index;
309   }
310 
311   @Reference(cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC,
312           target = "(common-metadata=false)")
313   public synchronized void addCatalogUIAdapter(EventCatalogUIAdapter catalogUIAdapter) {
314     List<EventCatalogUIAdapter> list = extendedEventCatalogUIAdapters.computeIfAbsent(
315             catalogUIAdapter.getOrganization(), k -> new ArrayList());
316     list.add(catalogUIAdapter);
317   }
318 
319   public synchronized void removeCatalogUIAdapter(EventCatalogUIAdapter catalogUIAdapter) {
320     if (extendedEventCatalogUIAdapters.containsKey(catalogUIAdapter.getOrganization())) {
321       extendedEventCatalogUIAdapters.get(catalogUIAdapter.getOrganization()).remove(catalogUIAdapter);
322     }
323   }
324 
325   /**
326    * AssetManager implementation
327    */
328 
329   @Override
330   public Optional<MediaPackage> getMediaPackage(String mediaPackageId) {
331     final AQueryBuilder q = createQuery();
332     final AResult r = q.select(q.snapshot()).where(q.mediaPackageId(mediaPackageId).and(q.version().isLatest()))
333             .run();
334 
335     if (r.getSize() == 0) {
336       return Optional.empty();
337     }
338     return Optional.of(r.getRecords().stream().findFirst().get().getSnapshot().get().getMediaPackage());
339   }
340 
341   @Override
342   public Optional<Asset> getAsset(Version version, String mpId, String mpElementId) {
343     if (isAuthorized(mpId, READ_ACTION)) {
344       // try to fetch the asset
345       var asset = getDatabase().getAsset(RuntimeTypes.convert(version), mpId, mpElementId);
346       if (asset.isPresent()) {
347         var storageId = getSnapshotStorageLocation(version, mpId);
348         if (storageId.isPresent()) {
349           var store = getAssetStore(storageId.get());
350           if (store.isPresent()) {
351             var assetStream = store.get().get(StoragePath.mk(
352                 asset.get().getOrganizationId(),
353                 mpId,
354                 version,
355                 mpElementId
356             ));
357             if (assetStream.isPresent()) {
358 
359               Checksum checksum = null;
360               try {
361                 checksum = Checksum.fromString(asset.get().getAssetDto().getChecksum());
362               } catch (NoSuchAlgorithmException e) {
363                 logger.warn("Invalid checksum for asset {} of media package {}", mpElementId, mpId, e);
364               }
365 
366               final Asset a = new AssetImpl(
367                       AssetId.mk(version, mpId, mpElementId),
368                       assetStream.get(),
369                       asset.get().getAssetDto().getMimeType(),
370                       asset.get().getAssetDto().getSize(),
371                       asset.get().getStorageId(),
372                       asset.get().getAvailability(),
373                       checksum);
374               return Optional.of(a);
375             }
376           }
377         }
378       }
379       return Optional.empty();
380     }
381     return chuck(new UnauthorizedException(
382             format("Not allowed to read assets of snapshot %s, version=%s", mpId, version)
383     ));
384   }
385 
386   @Override
387   public Optional<AssetStore> getAssetStore(String storeId) {
388     if (assetStore.getStoreType().equals(storeId)) {
389       return Optional.of(assetStore);
390     } else {
391       if (remoteStores.containsKey(storeId)) {
392         return Optional.of(remoteStores.get(storeId));
393       } else {
394         return Optional.empty();
395       }
396     }
397   }
398 
399   @Override
400   public AssetStore getLocalAssetStore() {
401     return assetStore;
402   }
403 
404   @Override
405   public List<AssetStore> getRemoteAssetStores() {
406     return new ArrayList<>(remoteStores.values());
407   }
408 
409   /** Snapshots */
410 
411   @Override
412   public boolean snapshotExists(final String mediaPackageId) {
413     return getDatabase().snapshotExists(mediaPackageId);
414   }
415 
416   @Override
417   public boolean snapshotExists(final String mediaPackageId, final String organization) {
418     return getDatabase().snapshotExists(mediaPackageId, organization);
419   }
420 
421   @Override
422   public Snapshot takeSnapshot(MediaPackage mp) {
423     return takeSnapshot(null, mp);
424   }
425 
426   @Override
427   public Snapshot takeSnapshot(String owner, MediaPackage mp) {
428 
429     final String mediaPackageId = mp.getIdentifier().toString();
430     final boolean firstSnapshot = !snapshotExists(mediaPackageId);
431 
432     // Allow this if:
433     //  - no previous snapshot exists
434     //  - the user has write access to the previous snapshot
435     if (firstSnapshot) {
436       // if it's the first snapshot, ensure that old, leftover properties are removed
437       deleteProperties(mediaPackageId);
438     }
439     if (firstSnapshot || isAuthorized(mediaPackageId, WRITE_ACTION)) {
440       final Snapshot snapshot;
441       if (owner == null) {
442         snapshot = takeSnapshotInternal(mp);
443       } else {
444         snapshot = takeSnapshotInternal(owner, mp);
445       }
446 
447       final AccessControlList acl = authorizationService.getActiveAcl(mp).getA();
448       // store acl as properties
449       // Drop old ACL rules
450       deleteProperties(mediaPackageId, SECURITY_NAMESPACE);
451       // Set new ACL rules
452       for (final AccessControlEntry ace : acl.getEntries()) {
453         getDatabase().saveProperty(Property.mk(PropertyId.mk(mediaPackageId, SECURITY_NAMESPACE,
454                 mkPropertyName(ace.getRole(), ace.getAction())), Value.mk(ace.isAllow())));
455       }
456 
457       updateEventInIndex(snapshot);
458 
459       logger.info("Trigger update handlers for snapshot {}, version {}",
460           snapshot.getMediaPackage().getIdentifier(), snapshot.getVersion());
461       fireEventHandlers(mkTakeSnapshotMessage(snapshot));
462 
463       return snapshot;
464     }
465     return chuck(new UnauthorizedException("Not allowed to take snapshot of media package " + mediaPackageId));
466   }
467 
468   private Snapshot takeSnapshotInternal(MediaPackage mediaPackage) {
469     final String mediaPackageId = mediaPackage.getIdentifier().toString();
470     AQueryBuilder queryBuilder = createQuery();
471     AResult result = queryBuilder.select(queryBuilder.snapshot())
472             .where(queryBuilder.mediaPackageId(mediaPackageId).and(queryBuilder.version().isLatest())).run();
473     Optional<ARecord> record = result.getRecords().stream().findFirst();
474     if (record.isPresent()) {
475       Optional<Snapshot> snapshot = Optional.of(record.get().getSnapshot().get());
476       if (snapshot.isPresent()) {
477         return takeSnapshotInternal(snapshot.get().getOwner(), mediaPackage);
478       }
479     }
480     return takeSnapshotInternal(DEFAULT_OWNER, mediaPackage);
481   }
482 
483   private Snapshot takeSnapshotInternal(final String owner, final MediaPackage mp) {
484     return handleException(new P1Lazy<Snapshot>() {
485       @Override public Snapshot get1() {
486         try {
487           final Snapshot archived = addInternal(owner, MediaPackageSupport.copy(mp)).toSnapshot();
488           return getHttpAssetProvider().prepareForDelivery(archived);
489         } catch (Exception e) {
490           return Prelude.chuck(e);
491         }
492       }
493     });
494   }
495 
496   /**
497    * Create a {@link AssetManagerItem.TakeSnapshot} message.
498    * <p>
499    * Do not call outside of a security context.
500    */
501   private AssetManagerItem.TakeSnapshot mkTakeSnapshotMessage(Snapshot snapshot) {
502     final MediaPackage mp = snapshot.getMediaPackage();
503 
504     long version;
505     try {
506       version = Long.parseLong(snapshot.getVersion().toString());
507     } catch (NumberFormatException e) {
508       // The index requires a version to be a long value.
509       // Since the asset manager default implementation uses long values that should be not a problem.
510       // However, a decent exception message is helpful if a different implementation of the asset manager
511       // is used.
512       throw new RuntimeException("The current implementation of the index requires versions being of type 'long'.");
513     }
514 
515     return AssetManagerItem.add(workspace, mp, authorizationService.getActiveAcl(mp).getA(),
516             version, snapshot.getArchivalDate());
517   }
518 
519   @Override
520   public void triggerIndexUpdate(String mediaPackageId) throws NotFoundException, UnauthorizedException {
521 
522     if (!securityService.getUser().hasRole("ROLE_ADMIN")) {
523       throw new UnauthorizedException("Only global administrators may trigger manual event updates.");
524     }
525     final AQueryBuilder q = createQuery();
526     final AResult r = q.select(q.snapshot()).where(q.mediaPackageId(mediaPackageId).and(q.version().isLatest())).run();
527 
528     if (r.getSize() == 0) {
529       throw new NotFoundException("No event with ID `" + mediaPackageId + "`");
530     }
531 
532     // Update event index with latest snapshot
533     var snapshot = r.getRecords().stream().findFirst().get().getSnapshot().get();
534     updateEventInIndex(snapshot);
535   }
536 
537   /**
538    * Update the event in the Elasticsearch index.
539    *
540    * @param snapshot
541    *         The newest snapshot of the event to update
542    */
543   private void updateEventInIndex(Snapshot snapshot) {
544     final MediaPackage mp = snapshot.getMediaPackage();
545     String eventId = mp.getIdentifier().toString();
546     final String organization = securityService.getOrganization().getId();
547     final User user = securityService.getUser();
548     logger.debug("Updating event {} in the {} index.", eventId, index.getIndexName());
549 
550     Function<Optional<Event>, Optional<Event>> updateFunction = (Optional<Event> eventOpt) -> {
551       Event event = eventOpt.orElse(new Event(eventId, organization));
552 
553       AccessControlList acl = authorizationService.getActiveAcl(mp).getA();
554       List<ManagedAcl> acls = aclServiceFactory.serviceFor(securityService.getOrganization()).getAcls();
555       for (final ManagedAcl managedAcl : AccessInformationUtil.matchAcls(acls, acl)) {
556         event.setManagedAcl(managedAcl.getName());
557       }
558       event.setAccessPolicy(AccessControlParser.toJsonSilent(acl));
559       event.setArchiveVersion(Long.parseLong(snapshot.getVersion().toString()));
560       if (StringUtils.isBlank(event.getCreator())) {
561         event.setCreator(securityService.getUser().getName());
562       }
563       EventIndexUtils.updateEvent(event, mp);
564 
565       // common metadata
566       for (Catalog catalog: mp.getCatalogs(MediaPackageElements.EPISODE)) {
567         try (InputStream in = workspace.read(catalog.getURI())) {
568           EventIndexUtils.updateEvent(event, DublinCores.read(in));
569         } catch (IOException | NotFoundException e) {
570           throw new IllegalStateException(String.format("Unable to load common dublin core catalog for event '%s'",
571                   mp.getIdentifier()), e);
572         }
573       }
574 
575       // extended metadata
576       event.resetExtendedMetadata();  // getting rid of old data
577       for (EventCatalogUIAdapter extendedCatalogUIAdapter : extendedEventCatalogUIAdapters.getOrDefault(organization,
578               Collections.emptyList())) {
579         for (Catalog catalog: mp.getCatalogs(extendedCatalogUIAdapter.getFlavor())) {
580           try (InputStream in = workspace.read(catalog.getURI())) {
581             EventIndexUtils.updateEventExtendedMetadata(event, DublinCores.read(in),
582                     extendedCatalogUIAdapter.getFlavor());
583           } catch (IOException | NotFoundException e) {
584             throw new IllegalStateException(String.format("Unable to load extended dublin core catalog '%s' for event "
585                             + "'%s'", catalog.getFlavor(), mp.getIdentifier()), e);
586           }
587         }
588       }
589 
590       // Update series name if not already done
591       try {
592         EventIndexUtils.updateSeriesName(event, organization, user, index);
593       } catch (SearchIndexException e) {
594         logger.error("Error updating the series name of the event {} in the {} index.", eventId, index.getIndexName(),
595                 e);
596       }
597       return Optional.of(event);
598     };
599 
600     // Persist the scheduling event
601     try {
602       index.addOrUpdateEvent(eventId, updateFunction, organization, user);
603       logger.debug("Event {} updated in the {} index.", eventId, index.getIndexName());
604     } catch (SearchIndexException e) {
605       logger.error("Error updating the event {} in the {} index.", eventId, index.getIndexName(), e);
606     }
607   }
608 
609   /**
610    * Remove the event from the Elasticsearch index
611    *
612    * @param eventId
613    *         The id of the event to remove
614    */
615   private void removeArchivedVersionFromIndex(String eventId) {
616     final String orgId = securityService.getOrganization().getId();
617     final User user = securityService.getUser();
618     logger.debug("Received AssetManager delete episode message {}", eventId);
619 
620     Function<Optional<Event>, Optional<Event>> updateFunction = (Optional<Event> eventOpt) -> {
621       if (eventOpt.isEmpty()) {
622         logger.warn("Event {} not found for deletion", eventId);
623         return Optional.empty();
624       }
625       Event event = eventOpt.get();
626       event.setArchiveVersion(null);
627       return Optional.of(event);
628     };
629 
630     try {
631       index.addOrUpdateEvent(eventId, updateFunction, orgId, user);
632       logger.debug("Event {} removed from the {} index", eventId, index.getIndexName());
633     } catch (SearchIndexException e) {
634       logger.error("Error deleting the event {} from the {} index.", eventId, index.getIndexName(), e);
635     }
636   }
637 
638   @Override
639   public RichAResult getSnapshotsById(final String mpId) {
640     RequireUtil.requireNotBlank(mpId, "mpId");
641     AQueryBuilder q = createQuery();
642     ASelectQuery query = baseQuery(q, mpId);
643     return Enrichments.enrich(query.run());
644   }
645 
646   @Override
647   public RichAResult getSnapshotsByIdOrderedByVersion(String mpId, boolean asc) {
648     RequireUtil.requireNotBlank(mpId, "mpId");
649     AQueryBuilder q = createQuery();
650     ASelectQuery query = baseQuery(q, mpId);
651     if (asc) {
652       query = query.orderBy(q.version().asc());
653     } else {
654       query = query.orderBy(q.version().desc());
655     }
656     return Enrichments.enrich(query.run());
657   }
658 
659   @Override
660   public RichAResult getSnapshotsByIdAndVersion(final String mpId, final Version version) {
661     RequireUtil.requireNotBlank(mpId, "mpId");
662     RequireUtil.notNull(version, "version");
663     AQueryBuilder q = createQuery();
664     ASelectQuery query = baseQuery(q, version, mpId);
665     return Enrichments.enrich(query.run());
666   }
667 
668   @Override
669   public RichAResult getSnapshotsByDate(final Date start, final Date end) {
670     RequireUtil.notNull(start, "start");
671     RequireUtil.notNull(end, "end");
672     AQueryBuilder q = createQuery();
673     ASelectQuery query = baseQuery(q).where(q.archived().ge(start)).where(q.archived().le(end));
674     return Enrichments.enrich(query.run());
675   }
676 
677   @Override
678   public RichAResult getSnapshotsByDateOrderedById(Date start, Date end) {
679     RequireUtil.notNull(start, "start");
680     RequireUtil.notNull(end, "end");
681     AQueryBuilder q = createQuery();
682     ASelectQuery query = baseQuery(q).where(q.archived().ge(start)).where(q.archived().le(end));
683     return Enrichments.enrich(query.orderBy(q.mediapackageId().asc()).run());
684   }
685 
686   @Override
687   public RichAResult getSnapshotsByIdAndDate(final String mpId, final Date start, final Date end) {
688     RequireUtil.requireNotBlank(mpId, "mpId");
689     RequireUtil.notNull(start, "start");
690     RequireUtil.notNull(end, "end");
691     AQueryBuilder q = createQuery();
692     ASelectQuery query = baseQuery(q, mpId).where(q.archived().ge(start)).where(q.archived().le(end));
693     return Enrichments.enrich(query.run());
694   }
695 
696   @Override
697   public RichAResult getSnapshotsByIdAndDateOrderedByVersion(String mpId, Date start, Date end, boolean asc) {
698     RequireUtil.requireNotBlank(mpId, "mpId");
699     RequireUtil.notNull(start, "start");
700     RequireUtil.notNull(end, "end");
701     AQueryBuilder q = createQuery();
702     ASelectQuery query = baseQuery(q, mpId).where(q.archived().ge(start)).where(q.archived().le(end));
703     if (asc) {
704       query = query.orderBy(q.version().asc());
705     } else {
706       query = query.orderBy(q.version().desc());
707     }
708     return Enrichments.enrich(query.run());
709   }
710 
711   @Override
712   public void moveSnapshotsById(final String mpId, final String targetStore) throws NotFoundException {
713     RichAResult results = getSnapshotsById(mpId);
714 
715     if (results.getRecords().isEmpty()) {
716       throw new NotFoundException("Mediapackage " + mpId + " not found!");
717     }
718 
719     processOperations(results, targetStore);
720   }
721 
722   @Override
723   public void moveSnapshotsByIdAndVersion(final String mpId, final Version version, final String targetStore)
724           throws NotFoundException {
725     RichAResult results = getSnapshotsByIdAndVersion(mpId, version);
726 
727     if (results.getRecords().isEmpty()) {
728       throw new NotFoundException("Mediapackage " + mpId + "@" + version.toString() + " not found!");
729     }
730 
731     processOperations(results, targetStore);
732   }
733 
734   @Override
735   public void moveSnapshotsByDate(final Date start, final Date end, final String targetStore)
736           throws NotFoundException {
737     // We don't use #getSnapshotsByDate() as this includes also all snapshots already in targetStore. On large installs
738     // this could lead to memory overflow.
739     AQueryBuilder q = createQuery();
740     ASelectQuery query = baseQuery(q)
741         .where(q.storage(targetStore).not())
742         .where(q.archived().ge(start))
743         .where(q.archived().le(end));
744     RichAResult results = Enrichments.enrich(query.run());
745 
746     if (results.getRecords().isEmpty()) {
747       throw new NotFoundException("No media packages found between " + start + " and " + end);
748     }
749 
750     processOperations(results, targetStore);
751   }
752 
753   @Override
754   public void moveSnapshotsByIdAndDate(final String mpId, final Date start, final Date end, final String targetStore)
755           throws NotFoundException {
756     RichAResult results = getSnapshotsByIdAndDate(mpId, start, end);
757 
758     if (results.getRecords().isEmpty()) {
759       throw new NotFoundException("No media package with id " + mpId + " found between " + start + " and " + end);
760     }
761 
762     processOperations(results, targetStore);
763   }
764 
765   @Override
766   public void moveSnapshotToStore(final Version version, final String mpId, final String storeId)
767           throws NotFoundException {
768 
769     //Find the snapshot
770     AQueryBuilder q = createQuery();
771     RichAResult results = Enrichments.enrich(baseQuery(q, version, mpId).run());
772 
773     if (results.getRecords().isEmpty()) {
774       throw new NotFoundException("Mediapackage " + mpId + "@" + version.toString() + " not found!");
775     }
776     processOperations(results, storeId);
777   }
778 
779   //Do the actual moving
780   private void processOperations(final RichAResult results, final String targetStoreId) {
781     results.getRecords().forEach(record -> {
782       Snapshot s = record.getSnapshot().get();
783       Optional<String> currentStoreId = getSnapshotStorageLocation(s);
784 
785       if (currentStoreId.isEmpty()) {
786         logger.warn("IsNone store ID");
787         return;
788       }
789 
790       //If this snapshot is already stored in the desired store
791       if (currentStoreId.get().equals(targetStoreId)) {
792         //return, since we don't need to move anything
793         return;
794       }
795 
796       AssetStore currentStore;
797       AssetStore targetStore;
798 
799       Optional<AssetStore> optCurrentStore = getAssetStore(currentStoreId.get());
800       Optional<AssetStore> optTargetStore = getAssetStore(targetStoreId);
801 
802       if (!optCurrentStore.isEmpty()) {
803         currentStore = optCurrentStore.get();
804       } else {
805         logger.error("Unknown current store: " + currentStoreId.get());
806         return;
807       }
808       if (!optTargetStore.isEmpty()) {
809         targetStore = optTargetStore.get();
810       } else {
811         logger.error("Unknown target store: " + targetStoreId);
812         return;
813       }
814 
815       //If the content is already local, or is moving from a remote to the local
816       // Returns true if the store id is equal to the local asset store's id
817       String localAssetStoreType = getLocalAssetStore().getStoreType();
818       if (localAssetStoreType.equals(currentStoreId.get()) || localAssetStoreType.equals(targetStoreId)) {
819         logger.debug("Moving {} from {} to {}", s, currentStoreId, targetStoreId);
820 
821         try {
822           copyAssetsToStore(s, targetStore);
823           copyManifest(s, targetStore);
824         } catch (Exception e) {
825           Functions.chuck(e);
826         }
827         getDatabase().setStorageLocation(s, targetStoreId);
828         currentStore.delete(DeletionSelector.delete(s.getOrganizationId(),
829                 s.getMediaPackage().getIdentifier().toString(), s.getVersion()
830         ));
831       } else {
832         //Else, the content is *not* local and is going to a *different* remote
833         String intermediateStore = getLocalAssetStore().getStoreType();
834         logger.debug("Moving {} from {} to {}, then to {}",
835                 s, currentStoreId, intermediateStore, targetStoreId);
836         Version version = s.getVersion();
837         String mpId = s.getMediaPackage().getIdentifier().toString();
838         try {
839           moveSnapshotToStore(version, mpId, intermediateStore);
840           moveSnapshotToStore(version, mpId, targetStoreId);
841         } catch (NotFoundException e) {
842           Functions.chuck(e);
843         }
844       }
845     });
846   }
847 
848   // Return the asset store ID that is currently storing the snapshot
849   public Optional<String> getSnapshotStorageLocation(final Version version, final String mpId) {
850     RichAResult result = getSnapshotsByIdAndVersion(mpId, version);
851 
852     for (Snapshot snapshot : result.getSnapshots()) {
853       return Optional.of(snapshot.getStorageId());
854     }
855 
856     logger.error("Mediapackage " + mpId + "@" + version + " not found!");
857     return Optional.empty();
858   }
859 
860   public Optional<String> getSnapshotStorageLocation(final Snapshot snap) {
861     return getSnapshotStorageLocation(snap.getVersion(), snap.getMediaPackage().getIdentifier().toString());
862   }
863 
864   /** Properties */
865 
866   @Override
867   public boolean setProperty(Property property) {
868     final String mpId = property.getId().getMediaPackageId();
869     if (isAuthorized(mpId, WRITE_ACTION)) {
870       return getDatabase().saveProperty(property);
871     }
872     return chuck(new UnauthorizedException("Not allowed to set property on episode " + mpId));
873   }
874 
875   @Override
876   public List<Property> selectProperties(final String mediaPackageId, String namespace) {
877     if (isAuthorized(mediaPackageId, READ_ACTION)) {
878       return getDatabase().selectProperties(mediaPackageId, namespace);
879     }
880     return chuck(new UnauthorizedException(format("Not allowed to read properties of event %s", mediaPackageId)));
881   }
882 
883   @Override
884   public int deleteProperties(final String mediaPackageId) {
885     return getDatabase().deleteProperties(mediaPackageId);
886   }
887 
888   @Override
889   public int deleteProperties(final String mediaPackageId, final String namespace) {
890     return getDatabase().deleteProperties(mediaPackageId, namespace);
891   }
892 
893   /** Misc. */
894 
895   @Override
896   public AQueryBuilder createQuery() {
897     return new AQueryBuilderDecorator(createQueryWithoutSecurityCheck()) {
898       @Override public ASelectQuery select(Target... target) {
899         switch (isAdmin()) {
900           case GLOBAL:
901             return super.select(target);
902           case ORGANIZATION:
903             return super.select(target).where(restrictToUsersOrganization());
904           default:
905             return super.select(target).where(mkAuthPredicate(READ_ACTION));
906         }
907       }
908 
909       @Override public ADeleteQuery delete(String owner, Target target) {
910         switch (isAdmin()) {
911           case GLOBAL:
912             return super.delete(owner, target);
913           case ORGANIZATION:
914             return super.delete(owner, target).where(restrictToUsersOrganization());
915           default:
916             return super.delete(owner, target).where(mkAuthPredicate(WRITE_ACTION));
917         }
918       }
919     };
920   }
921 
922   private AQueryBuilder createQueryWithoutSecurityCheck() {
923     return new AQueryBuilderDecorator(new AQueryBuilderImpl(this)) {
924       @Override
925       public ADeleteQuery delete(String owner, Target target) {
926         return new ADeleteQueryWithMessaging(super.delete(owner, target));
927       }
928     };
929   }
930 
931   @Override
932   public Optional<Version> toVersion(String version) {
933     try {
934       return Optional.of(VersionImpl.mk(Long.parseLong(version)));
935     } catch (NumberFormatException e) {
936       return Optional.empty();
937     }
938   }
939 
940   @Override
941   public long countEvents(final String organization) {
942     return getDatabase().countEvents(organization);
943   }
944 
945   @Override
946   public void handleDeletedEpisode(String mpId) {
947     logger.info("Firing event handlers for deleting event {}", mpId);
948     fireEventHandlers(AssetManagerItem.deleteEpisode(mpId, new Date()));
949 
950     removeArchivedVersionFromIndex(mpId);
951   }
952 
953   /**
954    * AbstractIndexProducer Implementation
955    */
956 
957   @Override
958   public IndexRebuildService.Service getService() {
959     return IndexRebuildService.Service.AssetManager;
960   }
961 
962   @Override
963   public DataType[] getSupportedDataTypes() {
964     return new DataType[]{ DataType.ALL, DataType.ACL };
965   }
966 
967   @Override
968   public void repopulate(DataType dataType) throws IndexRebuildException {
969     final Organization originalOrg = securityService.getOrganization();
970     final User originalUser = (originalOrg != null ? securityService.getUser() : null);
971     try {
972       final Organization defaultOrg = new DefaultOrganization();
973       final User defaultSystemUser = SecurityUtil.createSystemUser(systemUserName, defaultOrg);
974       securityService.setOrganization(defaultOrg);
975       securityService.setUser(defaultSystemUser);
976 
977       int offset = 0;
978       int total = (int) countEvents(null);
979       final AQueryBuilder q = createQuery();
980       RichAResult r;
981       int current = 0;
982       logIndexRebuildBegin(logger, total, "snapshot(s)");
983       var updatedEventRange = new ArrayList<Event>();
984       do {
985         r = enrich(q.select(q.snapshot()).where(q.version().isLatest()).orderBy(q.mediapackageId().desc())
986             .page(offset, PAGE_SIZE).run());
987         offset += PAGE_SIZE;
988         int n = 20;
989 
990         final Map<String, List<Snapshot>> byOrg = r.getSnapshots().stream()
991             .collect(Collectors.groupingBy(Snapshot::getOrganizationId));
992         for (String orgId : byOrg.keySet()) {
993           final Organization snapshotOrg;
994           try {
995             snapshotOrg = orgDir.getOrganization(orgId);
996             User snapshotSystemUser = SecurityUtil.createSystemUser(systemUserName, snapshotOrg);
997             securityService.setOrganization(snapshotOrg);
998             securityService.setUser(snapshotSystemUser);
999             for (Snapshot snapshot : byOrg.get(orgId)) {
1000               try {
1001                 current++;
1002 
1003                 var updatedEventData = index.getEvent(snapshot.getMediaPackage().getIdentifier().toString(), orgId,
1004                     snapshotSystemUser);
1005                 if (dataType == DataType.ALL) {
1006                   // Reindex everything (default)
1007                   updatedEventData = getEventUpdateFunction(snapshot, orgId, snapshotSystemUser)
1008                       .apply(updatedEventData);
1009                 } else if (dataType == DataType.ACL) {
1010                   // Only reindex ACLs
1011                   updatedEventData = getEventUpdateFunctionOnlyAcl(snapshot, orgId, snapshotSystemUser)
1012                       .apply(updatedEventData);
1013                 } else {
1014                   throw new IndexRebuildException(dataType + " is not a supported data type. "
1015                       + "Accepted values are " + Arrays.toString(getSupportedDataTypes()) + ".");
1016                 }
1017                 updatedEventRange.add(updatedEventData.get());
1018 
1019                 if (updatedEventRange.size() >= n || current >= total) {
1020                   index.bulkEventUpdate(updatedEventRange);
1021                   logIndexRebuildProgress(logger, total, current, n);
1022                   updatedEventRange.clear();
1023                 }
1024               } catch (Throwable t) {
1025                 logSkippingElement(logger, "event", snapshot.getMediaPackage().getIdentifier().toString(),
1026                     snapshotOrg, t);
1027               }
1028             }
1029           } catch (Throwable t) {
1030             logIndexRebuildError(logger, t, originalOrg);
1031             throw new IndexRebuildException(getService(), originalOrg, t);
1032           } finally {
1033             securityService.setOrganization(defaultOrg);
1034             securityService.setUser(defaultSystemUser);
1035           }
1036         }
1037       } while (offset < total);
1038     } finally {
1039       securityService.setOrganization(originalOrg);
1040       securityService.setUser(originalUser);
1041     }
1042   }
1043 
1044   /**
1045    * Used for testing
1046    */
1047   public void setAvailability(Version version, String mpId, Availability availability) {
1048     if (isAuthorized(mpId, WRITE_ACTION)) {
1049       getDatabase().setAvailability(RuntimeTypes.convert(version), mpId, availability);
1050     } else {
1051       chuck(new UnauthorizedException("Not allowed to set availability of episode " + mpId));
1052     }
1053   }
1054 
1055   public void setDatabase(Database database) {
1056     this.db = database;
1057   }
1058 
1059   public Database getDatabase() {
1060     return db;
1061   }
1062 
1063   public HttpAssetProvider getHttpAssetProvider() {
1064     return httpAssetProvider;
1065   }
1066 
1067   /*
1068    * Security handling
1069    */
1070 
1071   /**
1072    * Create an authorization predicate to be used with {@link #isAuthorized(String, String)},
1073    * restricting access to the user's organization and the given action.
1074    *
1075    * @param action
1076    *     the action to restrict access to
1077    */
1078   private Predicate mkAuthPredicate(final String action) {
1079     final AQueryBuilder q = createQueryWithoutSecurityCheck();
1080     return securityService.getUser().getRoles().stream()
1081             .filter(roleFilter)
1082             .map((role) -> {
1083               if (episodeIdRole && role.getName().startsWith(EPISODE_ROLE_ID_PREFIX)) {
1084                 return q.mediapackageId().eq(StringUtils.substringBetween(
1085                     role.getName(), EPISODE_ROLE_ID_PREFIX + "_", "_"));
1086               } else {
1087                 return q.property(Value.BOOLEAN, SECURITY_NAMESPACE, mkPropertyName(role.getName(), action)).eq(true);
1088               }
1089             })
1090             .reduce(Predicate::or)
1091             .orElseGet(() -> q.always().not())
1092             .and(restrictToUsersOrganization());
1093   }
1094 
1095   /** Create a predicate that restricts access to the user's organization. */
1096   private Predicate restrictToUsersOrganization() {
1097     return createQueryWithoutSecurityCheck().organizationId().eq(securityService.getUser().getOrganization().getId());
1098   }
1099 
1100   /** Check authorization based on the given predicate. */
1101   private boolean isAuthorized(final String mediaPackageId, final String action) {
1102     switch (isAdmin()) {
1103       case GLOBAL:
1104         // grant general access
1105         logger.debug("Access granted since user is global admin");
1106         return true;
1107       case ORGANIZATION:
1108         // ensure that the requested assets belong to this organization
1109         logger.debug("User is organization admin. Checking organization. Checking organization ID of asset.");
1110         return snapshotExists(mediaPackageId, securityService.getOrganization().getId());
1111       default:
1112         // check organization
1113         logger.debug("Non admin user. Checking organization.");
1114         final String org = securityService.getOrganization().getId();
1115         if (!snapshotExists(mediaPackageId, org)) {
1116           return false;
1117         }
1118         // check episode role id
1119         User user = securityService.getUser();
1120         if (episodeIdRole && user.hasRole(getEpisodeRoleId(mediaPackageId, action))) {
1121           return true;
1122         }
1123         // check acl rules
1124         logger.debug("Non admin user. Checking ACL rules.");
1125         final List<String> roles = user.getRoles().parallelStream()
1126                 .filter(roleFilter)
1127                 .map((role) -> mkPropertyName(role.getName(), action))
1128                 .collect(Collectors.toList());
1129         return getDatabase().selectProperties(mediaPackageId, SECURITY_NAMESPACE).parallelStream()
1130                 .map(p -> p.getId().getName())
1131                 .filter(p -> p.endsWith(action))
1132                 .anyMatch(p -> roles.stream().anyMatch(r -> r.equals(p)));
1133     }
1134   }
1135 
1136   private AdminRole isAdmin() {
1137     final User user = securityService.getUser();
1138     if (user.hasRole(GLOBAL_ADMIN_ROLE)) {
1139       return AdminRole.GLOBAL;
1140     } else if (user.hasRole(securityService.getOrganization().getAdminRole())
1141             || user.hasRole(GLOBAL_CAPTURE_AGENT_ROLE)) {
1142       // In this context, we treat capture agents the same way as organization admins, allowing them access so that
1143       // they can ingest new media without requiring them to be explicitly specified in the ACLs.
1144       return AdminRole.ORGANIZATION;
1145     } else {
1146       return AdminRole.NONE;
1147     }
1148   }
1149 
1150   private String mkPropertyName(String role, String action) {
1151     return role + " | " + action;
1152   }
1153 
1154   /**
1155    * Configurable filter for roles
1156    */
1157   private final java.util.function.Predicate<Role> roleFilter = (role) -> {
1158     final String name = role.getName();
1159     return (includeAPIRoles || !name.startsWith("ROLE_API_"))
1160             && (includeCARoles  || !name.startsWith("ROLE_CAPTURE_AGENT_"))
1161             && (includeUIRoles  || !name.startsWith("ROLE_UI_"));
1162   };
1163 
1164   /*
1165    * Utility
1166    */
1167 
1168   /**
1169    * Return a basic query which returns the snapshot and its current storage location
1170    *
1171    * @param q
1172    *   The query builder object to configure
1173    * @return
1174    *   The {@link ASelectQuery} configured with as described above
1175    */
1176   private ASelectQuery baseQuery(final AQueryBuilder q) {
1177     RequireUtil.notNull(q, "q");
1178     return q.select(q.snapshot());
1179   }
1180 
1181   /**
1182    * Return a mediapackage filtered query which returns the snapshot and its current storage location
1183    *
1184    * @param q
1185    *   The query builder object to configure
1186    * @param mpId
1187    *   The mediapackage ID to filter results for
1188    * @return
1189    *   The {@link ASelectQuery} configured with as described above
1190    */
1191   private ASelectQuery baseQuery(final AQueryBuilder q, final String mpId) {
1192     RequireUtil.notNull(q, "q");
1193     ASelectQuery query = baseQuery(q);
1194     if (StringUtils.isNotEmpty(mpId)) {
1195       return query.where(q.mediaPackageId(mpId));
1196     } else {
1197       return query;
1198     }
1199   }
1200 
1201   /**
1202    * Return a mediapackage and version filtered query which returns the snapshot and its current storage location
1203    *
1204    * @param q
1205    *   The query builder object to configure
1206    * @param version
1207    *   The version to filter results for
1208    * @param mpId
1209    *   The mediapackage ID to filter results for
1210    * @return
1211    *   The {@link ASelectQuery} configured with as described above
1212    */
1213   private ASelectQuery baseQuery(final AQueryBuilder q, final Version version, final String mpId) {
1214     RequireUtil.notNull(q, "q");
1215     RequireUtil.requireNotBlank(mpId, "mpId");
1216     ASelectQuery query = baseQuery(q, mpId);
1217     if (null != version) {
1218       return query.where(q.version().eq(version));
1219     } else {
1220       return query;
1221     }
1222   }
1223 
1224   /** Move the assets for a snapshot to the target store */
1225   private void copyAssetsToStore(Snapshot snap, AssetStore store) {
1226     final String mpId = snap.getMediaPackage().getIdentifier().toString();
1227     final String orgId = snap.getOrganizationId();
1228     final Version version = snap.getVersion();
1229     final String prettyMpId = mpId + "@v" + version;
1230     logger.debug("Moving assets for snapshot {} to store {}", prettyMpId, store.getStoreType());
1231     for (final MediaPackageElement e : snap.getMediaPackage().getElements()) {
1232       if (!MOVABLE_TYPES.contains(e.getElementType())) {
1233         logger.debug("Skipping {} because type is {}", e.getIdentifier(), e.getElementType());
1234         continue;
1235       }
1236       logger.debug("Moving {} to store {}", e.getIdentifier(), store.getStoreType());
1237       final StoragePath storagePath = StoragePath.mk(orgId, mpId, version, e.getIdentifier());
1238       if (store.contains(storagePath)) {
1239         logger.debug("Element {} (version {}) is already in store {} so skipping it", e.getIdentifier(),
1240                 version, store.getStoreType());
1241         continue;
1242       }
1243 
1244       // find asset in versions & stores
1245       final Optional<StoragePath> existingAssetOpt =
1246           getDatabase()
1247           .findAssetByChecksumAndStoreAndOrg(e.getChecksum().toString(), store.getStoreType(), orgId)
1248           .map(dto -> StoragePath.mk(
1249               dto.getOrganizationId(),
1250               dto.getMediaPackageId(),
1251               dto.getVersion(),
1252               dto.getAssetDto().getMediaPackageElementId()
1253           ));
1254 
1255       if (existingAssetOpt.isPresent()) {
1256         final StoragePath existingAsset = existingAssetOpt.get();
1257         logger.debug("Content of asset {} with checksum {} already exists in {}",
1258                 existingAsset.getMediaPackageElementId(), e.getChecksum(), store.getStoreType());
1259         if (!store.copy(existingAsset, storagePath)) {
1260           throw new AssetManagerException(format(
1261                   "An asset with checksum %s has already been archived but trying to copy or link asset %s to it "
1262                           + "failed",
1263                   e.getChecksum(),
1264                   existingAsset
1265           ));
1266         }
1267       } else {
1268         final Optional<Long> size = e.getSize() > 0 ? Optional.of(e.getSize()) : Optional.empty();
1269         store.put(storagePath, Source.mk(e.getURI(), size, Optional.ofNullable(e.getMimeType())));
1270       }
1271       getDatabase().setAssetStorageLocation(VersionImpl.mk(version), mpId, e.getIdentifier(), store.getStoreType());
1272     }
1273   }
1274 
1275   private void copyManifest(Snapshot snap, AssetStore targetStore) throws IOException, NotFoundException {
1276     final String mpId = snap.getMediaPackage().getIdentifier().toString();
1277     final String orgId = snap.getOrganizationId();
1278     final Version version = snap.getVersion();
1279 
1280     AssetStore currentStore = getAssetStore(snap.getStorageId()).get();
1281     Optional<String> manifestOpt = findManifestBaseName(snap, MANIFEST_DEFAULT_NAME, currentStore);
1282     if (manifestOpt.isEmpty()) {
1283       return; // Nothing to do, already moved to long-term storage
1284     }
1285 
1286     // Copy the manifest file
1287     String manifestBaseName = manifestOpt.get();
1288     StoragePath pathToManifest = new StoragePath(orgId, mpId, version, manifestBaseName);
1289 
1290     // Already copied?
1291     if (!targetStore.contains(pathToManifest)) {
1292       Optional<InputStream> inputStreamOpt;
1293       InputStream inputStream = null;
1294       String manifestFileName = null;
1295       try {
1296         inputStreamOpt = currentStore.get(pathToManifest);
1297         if (inputStreamOpt.isEmpty()) { // This should never happen because it has been tested before
1298           throw new NotFoundException(
1299                   String.format("Unexpected error. Manifest %s not found in current asset store", manifestBaseName));
1300         }
1301 
1302         inputStream = inputStreamOpt.get();
1303         manifestFileName = UUID.randomUUID() + ".xml";
1304         URI manifestTmpUri = workspace.putInCollection("archive", manifestFileName, inputStream);
1305         targetStore.put(pathToManifest, Source.mk(manifestTmpUri, Optional.empty(), Optional.of(MimeTypes.XML)));
1306       } finally {
1307         IOUtils.closeQuietly(inputStream);
1308         try {
1309           // Make sure to clean up the temporary file
1310           workspace.deleteFromCollection("archive", manifestFileName);
1311         } catch (NotFoundException e) {
1312           // This is OK, we are deleting it anyway
1313         } catch (IOException e) {
1314           // This usually happens when the collection directory cannot be deleted
1315           // because another process is running at the same time and wrote a file there
1316           // after it was tested but before it was actually deleted. We will consider this ok.
1317           // Does the error message mention the manifest file name?
1318           if (e.getMessage().contains(manifestFileName)) {
1319             logger.warn("The manifest file {} didn't get deleted from the archive collection",
1320                     manifestBaseName, e);
1321           }
1322           // Else the error is related to the file-archive collection, which is fine
1323         }
1324       }
1325     }
1326   }
1327 
1328   Optional<String> findManifestBaseName(Snapshot snap, String manifestName, AssetStore store) {
1329     StoragePath path = new StoragePath(snap.getOrganizationId(), snap.getMediaPackage().getIdentifier().toString(),
1330             snap.getVersion(), manifestName);
1331     // If manifest_.xml, etc not found, return previous name (copied from the EpsiodeServiceImpl logic)
1332     if (!store.contains(path)) {
1333       // If first call, manifest is not found, which probably means it has already been moved
1334       if (MANIFEST_DEFAULT_NAME.equals(manifestName)) {
1335         return Optional.empty(); // No manifest found in current store
1336       } else {
1337         return Optional.of(manifestName.substring(0, manifestName.length() - 1));
1338       }
1339     }
1340     // This is the same logic as when building the manifest name: manifest, manifest_, manifest__, etc
1341     return findManifestBaseName(snap, manifestName + "_", store);
1342   }
1343 
1344   /* -------------------------------------------------------------------------------------------------------------- */
1345 
1346   /**
1347    * Make sure each of the elements has a checksum.
1348    */
1349   void calcChecksumsForMediaPackageElements(PartialMediaPackage pmp) {
1350     final Fx<MediaPackageElement> addChecksum = new Fx<MediaPackageElement>() {
1351       @Override public void apply(MediaPackageElement mpe) {
1352         File file = null;
1353         try {
1354           logger.trace("Calculate checksum for {}", mpe.getURI());
1355           file = workspace.get(mpe.getURI(), true);
1356           mpe.setChecksum(Checksum.create(ChecksumType.DEFAULT_TYPE, file));
1357         } catch (IOException | NotFoundException e) {
1358           throw new AssetManagerException(format(
1359                   "Cannot calculate checksum for media package element %s",
1360                   mpe.getURI()
1361           ), e);
1362         } finally {
1363           if (file != null) {
1364             FileUtils.deleteQuietly(file);
1365           }
1366         }
1367       }
1368     };
1369     pmp.getElements().filter(hasNoChecksum.toFn()).each(addChecksum).run();
1370   }
1371 
1372   /** Mutates mp and its elements, so make sure to work on a copy. */
1373   private SnapshotDto addInternal(String owner, final MediaPackage mp) throws Exception {
1374     final Date now = new Date();
1375     // claim a new version for the media package
1376     final String mpId = mp.getIdentifier().toString();
1377     final VersionImpl version = getDatabase().claimVersion(mpId);
1378     logger.info("Creating new version {} of media package {}", version, mp);
1379     final PartialMediaPackage pmp = assetsOnly(mp);
1380     // make sure they have a checksum
1381     calcChecksumsForMediaPackageElements(pmp);
1382     // download and archive elements
1383     storeAssets(pmp, version);
1384     // store mediapackage in db
1385     final SnapshotDto snapshotDto;
1386     try {
1387       // rewrite URIs for archival
1388       Fn<MediaPackageElement, URI> uriCreator = new Fn<MediaPackageElement, URI>() {
1389         @Override
1390         public URI apply(MediaPackageElement mpe) {
1391           try {
1392             String fileName = getFileName(mpe).getOr("unknown");
1393             return new URI(
1394                     "urn",
1395                     "matterhorn:" + mpId + ":" + version + ":" + mpe.getIdentifier() + ":" + fileName,
1396                     null
1397             );
1398           } catch (URISyntaxException e) {
1399             throw new AssetManagerException(e);
1400           }
1401         }
1402       };
1403 
1404       for (MediaPackageElement mpe : pmp.getElements()) {
1405         mpe.setURI(uriCreator.apply(mpe));
1406       }
1407 
1408       String currentOrgId = securityService.getOrganization().getId();
1409       snapshotDto = getDatabase().saveSnapshot(
1410               currentOrgId, pmp, now, version,
1411               Availability.ONLINE, getLocalAssetStore().getStoreType(), owner
1412       );
1413     } catch (AssetManagerException e) {
1414       logger.error("Could not take snapshot {}", mpId, e);
1415       throw new AssetManagerException(e);
1416     }
1417     // save manifest to element store
1418     // this is done at the end after the media package element ids have been rewritten to neutral URNs
1419     storeManifest(pmp, version);
1420     return snapshotDto;
1421   }
1422 
1423   /**
1424    * Store all elements of <code>pmp</code> under the given version.
1425    */
1426   private void storeAssets(final PartialMediaPackage pmp, final Version version) {
1427     final String mpId = pmp.getMediaPackage().getIdentifier().toString();
1428     final String orgId = securityService.getOrganization().getId();
1429     for (final MediaPackageElement e : pmp.getElements()) {
1430       logger.debug("Archiving {} {} {}", e.getFlavor(), e.getMimeType(), e.getURI());
1431       final StoragePath storagePath = StoragePath.mk(orgId, mpId, version, e.getIdentifier());
1432       // find asset in versions
1433       final Optional<StoragePath> existingAssetOpt = getDatabase()
1434           .findAssetByChecksumAndStoreAndOrg(e.getChecksum().toString(), getLocalAssetStore().getStoreType(), orgId)
1435           .map(dto -> StoragePath.mk(
1436                   dto.getOrganizationId(),
1437                   dto.getMediaPackageId(),
1438                   dto.getVersion(),
1439                   dto.getAssetDto().getMediaPackageElementId()));
1440 
1441       if (existingAssetOpt.isPresent()) {
1442         final StoragePath existingAsset = existingAssetOpt.get();
1443         logger.debug("Content of asset {} with checksum {} has been archived before",
1444                 existingAsset.getMediaPackageElementId(), e.getChecksum());
1445         if (!getLocalAssetStore().copy(existingAsset, storagePath)) {
1446           throw new AssetManagerException(format(
1447                   "An asset with checksum %s has already been archived but trying to copy or link asset %s to it "
1448                           + "failed",
1449                   e.getChecksum(),
1450                   existingAsset
1451           ));
1452         }
1453       } else {
1454         final Optional<Long> size = e.getSize() > 0 ? Optional.of(e.getSize()) : Optional.empty();
1455         getLocalAssetStore().put(storagePath, Source.mk(e.getURI(), size, Optional.ofNullable(e.getMimeType())));
1456       }
1457     }
1458   }
1459 
1460   private void storeManifest(final PartialMediaPackage pmp, final Version version) throws Exception {
1461     final String mpId = pmp.getMediaPackage().getIdentifier().toString();
1462     final String orgId = securityService.getOrganization().getId();
1463     // store the manifest.xml
1464     // TODO make use of checksums
1465     logger.debug("Archiving manifest of media package {} version {}", mpId, version);
1466     // temporarily save the manifest XML into the workspace to
1467     // Fix file not found exception when several snapshots are taken at the same time
1468     final String manifestFileName = format("manifest_%s_%s.xml", pmp.getMediaPackage().getIdentifier(), version);
1469     final URI manifestTmpUri = workspace.putInCollection(
1470             "archive",
1471             manifestFileName,
1472             IOUtils.toInputStream(MediaPackageParser.getAsXml(pmp.getMediaPackage()), "UTF-8"));
1473     try {
1474       getLocalAssetStore().put(
1475               StoragePath.mk(orgId, mpId, version, manifestAssetId(pmp, "manifest")),
1476               Source.mk(manifestTmpUri, Optional.empty(), Optional.of(MimeTypes.XML)));
1477     } finally {
1478       // make sure to clean up the temporary file
1479       workspace.deleteFromCollection("archive", manifestFileName);
1480     }
1481   }
1482 
1483   /**
1484    * Create a unique id for the manifest xml. This is to avoid an id collision
1485    * in the rare case that the media package contains an XML element with the id
1486    * used for the manifest. A UUID could also be used but this is far less
1487    * readable.
1488    *
1489    * @param seedId
1490    *          the id to start with
1491    */
1492   private String manifestAssetId(PartialMediaPackage pmp, String seedId) {
1493     if ($(pmp.getElements()).map(getMediaPackageElementId.toFn()).exists(Booleans.eq(seedId))) {
1494       return manifestAssetId(pmp, seedId + "_");
1495     } else {
1496       return seedId;
1497     }
1498   }
1499 
1500   /* --------------------------------------------------------------------------------------------------------------- */
1501 
1502   /**
1503    * Unify exception handling by wrapping any occurring exception in an
1504    * {@link AssetManagerException}.
1505    */
1506   static <A> A handleException(final P1<A> p) throws AssetManagerException {
1507     try {
1508       return p.get1();
1509     } catch (Exception e) {
1510       logger.error("An error occurred", e);
1511       throw unwrapExceptionUntil(AssetManagerException.class, e).orElse(new AssetManagerException(e));
1512     }
1513   }
1514 
1515   /**
1516    * Walk up the stacktrace to find a cause of type <code>type</code>. Return none if no such
1517    * type can be found.
1518    */
1519   static <A extends Throwable> Optional<A> unwrapExceptionUntil(Class<A> type, Throwable e) {
1520     if (e == null) {
1521       return Optional.empty();
1522     } else if (type.isAssignableFrom(e.getClass())) {
1523       return Optional.of((A) e);
1524     } else {
1525       return unwrapExceptionUntil(type, e.getCause());
1526     }
1527   }
1528 
1529   /**
1530    * Return a partial media package filtering assets. Assets are elements the archive is going to manager, i.e. all
1531    * non-publication elements.
1532    */
1533   static PartialMediaPackage assetsOnly(MediaPackage mp) {
1534     final Pred<MediaPackageElement> isAsset = Pred.mk(isNotPublication.toFn());
1535     return PartialMediaPackage.mk(mp, isAsset);
1536   }
1537 
1538   /**
1539    * Extract the file name from a media package elements URN.
1540    *
1541    * @return the file name or none if it could not be determined
1542    */
1543   public static Optional<String> getFileNameFromUrn(MediaPackageElement mpe) {
1544     Fn<URI, String> toString = new Fn<URI, String>() {
1545       @Override
1546       public String apply(URI uri) {
1547         return uri.toString();
1548       }
1549     };
1550 
1551     Optional<URI> uri = Optional.ofNullable(mpe.getURI());
1552     if (uri.isPresent() && "urn".equals(uri.get().getScheme())) {
1553       String[] tmp = uri.get().toString().split(":");
1554       if (tmp.length < 1) {
1555         return Optional.empty();
1556       }
1557       return Optional.of(tmp[tmp.length - 1]);
1558     }
1559     return Optional.empty();
1560   }
1561 
1562   /**
1563    * Rewrite URIs of all asset elements of a snapshot's media package.
1564    * This method does not mutate anything.
1565    */
1566   public static Snapshot rewriteUris(Snapshot snapshot, Fn<MediaPackageElement, URI> uriCreator) {
1567     final MediaPackage mpCopy = MediaPackageSupport.copy(snapshot.getMediaPackage());
1568     for (final MediaPackageElement mpe : assetsOnly(mpCopy).getElements()) {
1569       mpe.setURI(uriCreator.apply(mpe));
1570     }
1571     return new SnapshotImpl(
1572             snapshot.getVersion(),
1573             snapshot.getOrganizationId(),
1574             snapshot.getArchivalDate(),
1575             snapshot.getAvailability(),
1576             snapshot.getStorageId(),
1577             snapshot.getOwner(),
1578             mpCopy);
1579   }
1580 
1581   public void fireEventHandlers(AssetManagerItem item) {
1582     while (handlers.size() != 2) {
1583       logger.warn("Expecting 2 handlers, but {} are registered.  Waiting 10s then retrying...", handlers.size());
1584       try {
1585         Thread.sleep(10000L);
1586       } catch (InterruptedException e) { /* swallow this, nothing to do */ }
1587     }
1588     for (AssetManagerUpdateHandler handler : handlers) {
1589       handler.execute(item);
1590     }
1591   }
1592 
1593   /**
1594    * Call {@link
1595    * org.opencastproject.assetmanager.impl.query.AbstractADeleteQuery#run(AbstractADeleteQuery.DeleteEpisodeHandler)}
1596    * with a delete handler. Also make sure to propagate the behaviour to subsequent instances.
1597    */
1598   private final class ADeleteQueryWithMessaging extends ADeleteQueryDecorator {
1599     ADeleteQueryWithMessaging(ADeleteQuery delegate) {
1600       super(delegate);
1601     }
1602 
1603     @Override
1604     public long run() {
1605       return RuntimeTypes.convert(delegate).run(AssetManagerImpl.this);
1606     }
1607 
1608     @Override
1609     protected ADeleteQueryDecorator mkDecorator(ADeleteQuery delegate) {
1610       return new ADeleteQueryWithMessaging(delegate);
1611     }
1612   }
1613 
1614   /**
1615    * Get the function to update a commented event in the Elasticsearch index.
1616    *
1617    * @return the function to do the update
1618    */
1619   private Function<Optional<Event>, Optional<Event>> getEventUpdateFunction(Snapshot snapshot,
1620           String orgId, User user) {
1621     return (Optional<Event> eventOpt) -> {
1622       MediaPackage mp = snapshot.getMediaPackage();
1623       String eventId = mp.getIdentifier().toString();
1624       Event event = eventOpt.orElse(new Event(eventId, orgId));
1625 
1626       event = updateAclInEvent(event, mp, eventId);
1627 
1628       event.setArchiveVersion(Long.parseLong(snapshot.getVersion().toString()));
1629       if (StringUtils.isBlank(event.getCreator())) {
1630         event.setCreator(securityService.getUser().getName());
1631       }
1632       EventIndexUtils.updateEvent(event, mp);
1633 
1634       for (Catalog catalog: mp.getCatalogs(MediaPackageElements.EPISODE)) {
1635         try (InputStream in = workspace.read(catalog.getURI())) {
1636           EventIndexUtils.updateEvent(event, DublinCores.read(in));
1637         } catch (IOException | NotFoundException e) {
1638           throw new IllegalStateException(String.format("Unable to load dublin core catalog for event '%s'",
1639                   mp.getIdentifier()), e);
1640         }
1641       }
1642 
1643       // Update series name if not already done
1644       try {
1645         EventIndexUtils.updateSeriesName(event, orgId, user, index);
1646       } catch (SearchIndexException e) {
1647         logger.error("Error updating the series name of the event {} in the {} index.", eventId, index.getIndexName(),
1648                 e);
1649       }
1650       return Optional.of(event);
1651     };
1652   }
1653 
1654   private Function<Optional<Event>, Optional<Event>> getEventUpdateFunctionOnlyAcl(Snapshot snapshot,
1655       String orgId, User user) {
1656     return (Optional<Event> eventOpt) -> {
1657       MediaPackage mp = snapshot.getMediaPackage();
1658       String eventId = mp.getIdentifier().toString();
1659       Event event = eventOpt.orElse(new Event(eventId, orgId));
1660 
1661       event = updateAclInEvent(event, mp, eventId);
1662 
1663       return Optional.of(event);
1664     };
1665   }
1666 
1667   private Event updateAclInEvent(Event event, MediaPackage mp, String eventId) {
1668     AccessControlList acl = authorizationService.getActiveAcl(mp).getA();
1669     List<ManagedAcl> acls = aclServiceFactory.serviceFor(securityService.getOrganization()).getAcls();
1670 
1671     for (final ManagedAcl managedAcl : AccessInformationUtil.matchAcls(acls, acl)) {
1672       event.setManagedAcl(managedAcl.getName());
1673     }
1674     event.setAccessPolicy(AccessControlParser.toJsonSilent(acl));
1675 
1676     return event;
1677   }
1678 }