View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.assetmanager.impl;
22  
23  import static com.entwinemedia.fn.Prelude.chuck;
24  import static com.entwinemedia.fn.Stream.$;
25  import static java.lang.String.format;
26  import static org.opencastproject.assetmanager.api.fn.Enrichments.enrich;
27  import static org.opencastproject.mediapackage.MediaPackageSupport.Filters.hasNoChecksum;
28  import static org.opencastproject.mediapackage.MediaPackageSupport.Filters.isNotPublication;
29  import static org.opencastproject.mediapackage.MediaPackageSupport.getFileName;
30  import static org.opencastproject.mediapackage.MediaPackageSupport.getMediaPackageElementId;
31  import static org.opencastproject.security.api.SecurityConstants.EPISODE_ROLE_ID_PREFIX;
32  import static org.opencastproject.security.api.SecurityConstants.GLOBAL_ADMIN_ROLE;
33  import static org.opencastproject.security.api.SecurityConstants.GLOBAL_CAPTURE_AGENT_ROLE;
34  import static org.opencastproject.security.util.SecurityUtil.getEpisodeRoleId;
35  
36  import org.opencastproject.assetmanager.api.Asset;
37  import org.opencastproject.assetmanager.api.AssetId;
38  import org.opencastproject.assetmanager.api.AssetManager;
39  import org.opencastproject.assetmanager.api.AssetManagerException;
40  import org.opencastproject.assetmanager.api.Availability;
41  import org.opencastproject.assetmanager.api.Property;
42  import org.opencastproject.assetmanager.api.PropertyId;
43  import org.opencastproject.assetmanager.api.Snapshot;
44  import org.opencastproject.assetmanager.api.Value;
45  import org.opencastproject.assetmanager.api.Version;
46  import org.opencastproject.assetmanager.api.fn.Enrichments;
47  import org.opencastproject.assetmanager.api.query.ADeleteQuery;
48  import org.opencastproject.assetmanager.api.query.AQueryBuilder;
49  import org.opencastproject.assetmanager.api.query.ARecord;
50  import org.opencastproject.assetmanager.api.query.AResult;
51  import org.opencastproject.assetmanager.api.query.ASelectQuery;
52  import org.opencastproject.assetmanager.api.query.Predicate;
53  import org.opencastproject.assetmanager.api.query.RichAResult;
54  import org.opencastproject.assetmanager.api.query.Target;
55  import org.opencastproject.assetmanager.api.storage.AssetStore;
56  import org.opencastproject.assetmanager.api.storage.DeletionSelector;
57  import org.opencastproject.assetmanager.api.storage.RemoteAssetStore;
58  import org.opencastproject.assetmanager.api.storage.Source;
59  import org.opencastproject.assetmanager.api.storage.StoragePath;
60  import org.opencastproject.assetmanager.impl.persistence.Database;
61  import org.opencastproject.assetmanager.impl.persistence.SnapshotDto;
62  import org.opencastproject.assetmanager.impl.query.AQueryBuilderImpl;
63  import org.opencastproject.assetmanager.impl.query.AbstractADeleteQuery;
64  import org.opencastproject.authorization.xacml.manager.api.AclServiceFactory;
65  import org.opencastproject.authorization.xacml.manager.api.ManagedAcl;
66  import org.opencastproject.authorization.xacml.manager.util.AccessInformationUtil;
67  import org.opencastproject.db.DBSessionFactory;
68  import org.opencastproject.elasticsearch.api.SearchIndexException;
69  import org.opencastproject.elasticsearch.index.ElasticsearchIndex;
70  import org.opencastproject.elasticsearch.index.objects.event.Event;
71  import org.opencastproject.elasticsearch.index.objects.event.EventIndexUtils;
72  import org.opencastproject.elasticsearch.index.rebuild.AbstractIndexProducer;
73  import org.opencastproject.elasticsearch.index.rebuild.IndexProducer;
74  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildException;
75  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService;
76  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService.DataType;
77  import org.opencastproject.mediapackage.Catalog;
78  import org.opencastproject.mediapackage.MediaPackage;
79  import org.opencastproject.mediapackage.MediaPackageElement;
80  import org.opencastproject.mediapackage.MediaPackageElements;
81  import org.opencastproject.mediapackage.MediaPackageParser;
82  import org.opencastproject.mediapackage.MediaPackageSupport;
83  import org.opencastproject.message.broker.api.assetmanager.AssetManagerItem;
84  import org.opencastproject.message.broker.api.update.AssetManagerUpdateHandler;
85  import org.opencastproject.metadata.dublincore.DublinCores;
86  import org.opencastproject.metadata.dublincore.EventCatalogUIAdapter;
87  import org.opencastproject.security.api.AccessControlEntry;
88  import org.opencastproject.security.api.AccessControlList;
89  import org.opencastproject.security.api.AccessControlParser;
90  import org.opencastproject.security.api.AuthorizationService;
91  import org.opencastproject.security.api.DefaultOrganization;
92  import org.opencastproject.security.api.Organization;
93  import org.opencastproject.security.api.OrganizationDirectoryService;
94  import org.opencastproject.security.api.Role;
95  import org.opencastproject.security.api.SecurityService;
96  import org.opencastproject.security.api.UnauthorizedException;
97  import org.opencastproject.security.api.User;
98  import org.opencastproject.security.util.SecurityUtil;
99  import org.opencastproject.util.Checksum;
100 import org.opencastproject.util.ChecksumType;
101 import org.opencastproject.util.MimeTypes;
102 import org.opencastproject.util.NotFoundException;
103 import org.opencastproject.util.RequireUtil;
104 import org.opencastproject.util.data.functions.Functions;
105 import org.opencastproject.workspace.api.Workspace;
106 
107 import com.entwinemedia.fn.Fn;
108 import com.entwinemedia.fn.Fx;
109 import com.entwinemedia.fn.P1;
110 import com.entwinemedia.fn.P1Lazy;
111 import com.entwinemedia.fn.Pred;
112 import com.entwinemedia.fn.Prelude;
113 import com.entwinemedia.fn.fns.Booleans;
114 import com.google.common.collect.Sets;
115 
116 import org.apache.commons.io.FileUtils;
117 import org.apache.commons.io.IOUtils;
118 import org.apache.commons.lang3.BooleanUtils;
119 import org.apache.commons.lang3.StringUtils;
120 import org.osgi.service.component.ComponentContext;
121 import org.osgi.service.component.annotations.Activate;
122 import org.osgi.service.component.annotations.Component;
123 import org.osgi.service.component.annotations.Reference;
124 import org.osgi.service.component.annotations.ReferenceCardinality;
125 import org.osgi.service.component.annotations.ReferencePolicy;
126 import org.slf4j.Logger;
127 import org.slf4j.LoggerFactory;
128 
129 import java.io.File;
130 import java.io.IOException;
131 import java.io.InputStream;
132 import java.net.URI;
133 import java.net.URISyntaxException;
134 import java.security.NoSuchAlgorithmException;
135 import java.util.ArrayList;
136 import java.util.Arrays;
137 import java.util.Collections;
138 import java.util.Date;
139 import java.util.HashMap;
140 import java.util.LinkedHashMap;
141 import java.util.List;
142 import java.util.Map;
143 import java.util.Objects;
144 import java.util.Optional;
145 import java.util.Set;
146 import java.util.UUID;
147 import java.util.concurrent.CopyOnWriteArrayList;
148 import java.util.function.Function;
149 import java.util.stream.Collectors;
150 
151 import javax.persistence.EntityManagerFactory;
152 
153 /**
154  * The Asset Manager implementation.
155  */
156 @Component(
157     property = {
158         "service.description=Opencast Asset Manager"
159     },
160     immediate = true,
161     service = { AssetManager.class, IndexProducer.class }
162 )
163 public class AssetManagerImpl extends AbstractIndexProducer implements AssetManager,
164     AbstractADeleteQuery.DeleteEpisodeHandler {
165 
166   private static final Logger logger = LoggerFactory.getLogger(AssetManagerImpl.class);
167 
168   private static final int PAGE_SIZE = 1000;
169 
170   enum AdminRole {
171     GLOBAL, ORGANIZATION, NONE
172   }
173 
174   public static final String WRITE_ACTION = "write";
175   public static final String READ_ACTION = "read";
176   public static final String SECURITY_NAMESPACE = "org.opencastproject.assetmanager.security";
177 
178   private static final String MANIFEST_DEFAULT_NAME = "manifest";
179 
180   private CopyOnWriteArrayList<AssetManagerUpdateHandler> handlers = new CopyOnWriteArrayList<>();
181 
182   private SecurityService securityService;
183   private AuthorizationService authorizationService;
184   private OrganizationDirectoryService orgDir;
185   private Workspace workspace;
186   private AssetStore assetStore;
187   private HttpAssetProvider httpAssetProvider;
188   private String systemUserName;
189   private Database db;
190   private DBSessionFactory dbSessionFactory;
191   private EntityManagerFactory emf;
192   private AclServiceFactory aclServiceFactory;
193   private ElasticsearchIndex index;
194   private Map<String, List<EventCatalogUIAdapter>> extendedEventCatalogUIAdapters = new HashMap<>();
195 
196   // Settings for role filter
197   private boolean includeAPIRoles;
198   private boolean includeCARoles;
199   private boolean includeUIRoles;
200 
201 
202   public static final Set<MediaPackageElement.Type> MOVABLE_TYPES = Sets.newHashSet(
203           MediaPackageElement.Type.Attachment,
204           MediaPackageElement.Type.Catalog,
205           MediaPackageElement.Type.Track
206   );
207 
208   private final HashMap<String, RemoteAssetStore> remoteStores = new LinkedHashMap<>();
209 
210   /**
211    * OSGi callback.
212    */
213   @Activate
214   public synchronized void activate(ComponentContext cc) {
215     logger.info("Activating AssetManager.");
216     db = new Database(dbSessionFactory.createSession(emf));
217     systemUserName = SecurityUtil.getSystemUserName(cc);
218 
219     includeAPIRoles = BooleanUtils.toBoolean(Objects.toString(cc.getProperties().get("includeAPIRoles"), null));
220     includeCARoles = BooleanUtils.toBoolean(Objects.toString(cc.getProperties().get("includeCARoles"), null));
221     includeUIRoles = BooleanUtils.toBoolean(Objects.toString(cc.getProperties().get("includeUIRoles"), null));
222   }
223 
224   /**
225    * OSGi dependencies
226    */
227 
228   @Reference(target = "(osgi.unit.name=org.opencastproject.assetmanager.impl)")
229   public void setEntityManagerFactory(EntityManagerFactory emf) {
230     this.emf = emf;
231   }
232 
233   @Reference
234   public void setDBSessionFactory(DBSessionFactory dbSessionFactory) {
235     this.dbSessionFactory = dbSessionFactory;
236   }
237 
238   @Reference
239   public void setSecurityService(SecurityService securityService) {
240     this.securityService = securityService;
241   }
242 
243   @Reference
244   public void setAuthorizationService(AuthorizationService authorizationService) {
245     this.authorizationService = authorizationService;
246   }
247 
248   @Reference
249   public void setOrgDir(OrganizationDirectoryService orgDir) {
250     this.orgDir = orgDir;
251   }
252 
253   @Reference
254   public void setWorkspace(Workspace workspace) {
255     this.workspace = workspace;
256   }
257 
258   @Reference
259   public void setAssetStore(AssetStore assetStore) {
260     this.assetStore = assetStore;
261   }
262 
263   @Reference(
264       cardinality = ReferenceCardinality.MULTIPLE,
265       policy = ReferencePolicy.DYNAMIC,
266       unbind = "removeEventHandler"
267   )
268   public void addEventHandler(AssetManagerUpdateHandler handler) {
269     this.handlers.add(handler);
270   }
271 
272   public void removeEventHandler(AssetManagerUpdateHandler handler) {
273     this.handlers.remove(handler);
274   }
275 
276   @Reference(
277       cardinality = ReferenceCardinality.MULTIPLE,
278       policy = ReferencePolicy.DYNAMIC,
279       unbind = "removeRemoteAssetStore"
280   )
281   public synchronized void addRemoteAssetStore(RemoteAssetStore assetStore) {
282     remoteStores.put(assetStore.getStoreType(), assetStore);
283   }
284 
285   public void removeRemoteAssetStore(RemoteAssetStore store) {
286     remoteStores.remove(store.getStoreType());
287   }
288 
289   @Reference
290   public void setHttpAssetProvider(HttpAssetProvider httpAssetProvider) {
291     this.httpAssetProvider = httpAssetProvider;
292   }
293 
294   @Reference
295   public void setAclServiceFactory(AclServiceFactory aclServiceFactory) {
296     this.aclServiceFactory = aclServiceFactory;
297   }
298 
299   @Reference
300   public void setIndex(ElasticsearchIndex index) {
301     this.index = index;
302   }
303 
304   @Reference(cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC,
305           target = "(common-metadata=false)")
306   public synchronized void addCatalogUIAdapter(EventCatalogUIAdapter catalogUIAdapter) {
307     List<EventCatalogUIAdapter> list = extendedEventCatalogUIAdapters.computeIfAbsent(
308             catalogUIAdapter.getOrganization(), k -> new ArrayList());
309     list.add(catalogUIAdapter);
310   }
311 
312   public synchronized void removeCatalogUIAdapter(EventCatalogUIAdapter catalogUIAdapter) {
313     if (extendedEventCatalogUIAdapters.containsKey(catalogUIAdapter.getOrganization())) {
314       extendedEventCatalogUIAdapters.get(catalogUIAdapter.getOrganization()).remove(catalogUIAdapter);
315     }
316   }
317 
318   /**
319    * AssetManager implementation
320    */
321 
322   @Override
323   public Optional<MediaPackage> getMediaPackage(String mediaPackageId) {
324     final AQueryBuilder q = createQuery();
325     final AResult r = q.select(q.snapshot()).where(q.mediaPackageId(mediaPackageId).and(q.version().isLatest()))
326             .run();
327 
328     if (r.getSize() == 0) {
329       return Optional.empty();
330     }
331     return Optional.of(r.getRecords().stream().findFirst().get().getSnapshot().get().getMediaPackage());
332   }
333 
334   @Override
335   public Optional<Asset> getAsset(Version version, String mpId, String mpElementId) {
336     if (isAuthorized(mpId, READ_ACTION)) {
337       // try to fetch the asset
338       var asset = getDatabase().getAsset(RuntimeTypes.convert(version), mpId, mpElementId);
339       if (asset.isPresent()) {
340         var storageId = getSnapshotStorageLocation(version, mpId);
341         if (storageId.isPresent()) {
342           var store = getAssetStore(storageId.get());
343           if (store.isPresent()) {
344             var assetStream = store.get().get(StoragePath.mk(
345                 asset.get().getOrganizationId(),
346                 mpId,
347                 version,
348                 mpElementId
349             ));
350             if (assetStream.isPresent()) {
351 
352               Checksum checksum = null;
353               try {
354                 checksum = Checksum.fromString(asset.get().getAssetDto().getChecksum());
355               } catch (NoSuchAlgorithmException e) {
356                 logger.warn("Invalid checksum for asset {} of media package {}", mpElementId, mpId, e);
357               }
358 
359               final Asset a = new AssetImpl(
360                       AssetId.mk(version, mpId, mpElementId),
361                       assetStream.get(),
362                       asset.get().getAssetDto().getMimeType(),
363                       asset.get().getAssetDto().getSize(),
364                       asset.get().getStorageId(),
365                       asset.get().getAvailability(),
366                       checksum);
367               return Optional.of(a);
368             }
369           }
370         }
371       }
372       return Optional.empty();
373     }
374     return chuck(new UnauthorizedException(
375             format("Not allowed to read assets of snapshot %s, version=%s", mpId, version)
376     ));
377   }
378 
379   @Override
380   public Optional<AssetStore> getAssetStore(String storeId) {
381     if (assetStore.getStoreType().equals(storeId)) {
382       return Optional.of(assetStore);
383     } else {
384       if (remoteStores.containsKey(storeId)) {
385         return Optional.of(remoteStores.get(storeId));
386       } else {
387         return Optional.empty();
388       }
389     }
390   }
391 
392   @Override
393   public AssetStore getLocalAssetStore() {
394     return assetStore;
395   }
396 
397   @Override
398   public List<AssetStore> getRemoteAssetStores() {
399     return new ArrayList<>(remoteStores.values());
400   }
401 
402   /** Snapshots */
403 
404   @Override
405   public boolean snapshotExists(final String mediaPackageId) {
406     return getDatabase().snapshotExists(mediaPackageId);
407   }
408 
409   @Override
410   public boolean snapshotExists(final String mediaPackageId, final String organization) {
411     return getDatabase().snapshotExists(mediaPackageId, organization);
412   }
413 
414   @Override
415   public Snapshot takeSnapshot(MediaPackage mp) {
416     return takeSnapshot(null, mp);
417   }
418 
419   @Override
420   public Snapshot takeSnapshot(String owner, MediaPackage mp) {
421 
422     final String mediaPackageId = mp.getIdentifier().toString();
423     final boolean firstSnapshot = !snapshotExists(mediaPackageId);
424 
425     // Allow this if:
426     //  - no previous snapshot exists
427     //  - the user has write access to the previous snapshot
428     if (firstSnapshot) {
429       // if it's the first snapshot, ensure that old, leftover properties are removed
430       deleteProperties(mediaPackageId);
431     }
432     if (firstSnapshot || isAuthorized(mediaPackageId, WRITE_ACTION)) {
433       final Snapshot snapshot;
434       if (owner == null) {
435         snapshot = takeSnapshotInternal(mp);
436       } else {
437         snapshot = takeSnapshotInternal(owner, mp);
438       }
439 
440       final AccessControlList acl = authorizationService.getActiveAcl(mp).getA();
441       // store acl as properties
442       // Drop old ACL rules
443       deleteProperties(mediaPackageId, SECURITY_NAMESPACE);
444       // Set new ACL rules
445       for (final AccessControlEntry ace : acl.getEntries()) {
446         getDatabase().saveProperty(Property.mk(PropertyId.mk(mediaPackageId, SECURITY_NAMESPACE,
447                 mkPropertyName(ace.getRole(), ace.getAction())), Value.mk(ace.isAllow())));
448       }
449 
450       updateEventInIndex(snapshot);
451 
452       logger.info("Trigger update handlers for snapshot {}, version {}",
453           snapshot.getMediaPackage().getIdentifier(), snapshot.getVersion());
454       fireEventHandlers(mkTakeSnapshotMessage(snapshot));
455 
456       return snapshot;
457     }
458     return chuck(new UnauthorizedException("Not allowed to take snapshot of media package " + mediaPackageId));
459   }
460 
461   private Snapshot takeSnapshotInternal(MediaPackage mediaPackage) {
462     final String mediaPackageId = mediaPackage.getIdentifier().toString();
463     AQueryBuilder queryBuilder = createQuery();
464     AResult result = queryBuilder.select(queryBuilder.snapshot())
465             .where(queryBuilder.mediaPackageId(mediaPackageId).and(queryBuilder.version().isLatest())).run();
466     Optional<ARecord> record = result.getRecords().stream().findFirst();
467     if (record.isPresent()) {
468       Optional<Snapshot> snapshot = Optional.of(record.get().getSnapshot().get());
469       if (snapshot.isPresent()) {
470         return takeSnapshotInternal(snapshot.get().getOwner(), mediaPackage);
471       }
472     }
473     return takeSnapshotInternal(DEFAULT_OWNER, mediaPackage);
474   }
475 
476   private Snapshot takeSnapshotInternal(final String owner, final MediaPackage mp) {
477     return handleException(new P1Lazy<Snapshot>() {
478       @Override public Snapshot get1() {
479         try {
480           final Snapshot archived = addInternal(owner, MediaPackageSupport.copy(mp)).toSnapshot();
481           return getHttpAssetProvider().prepareForDelivery(archived);
482         } catch (Exception e) {
483           return Prelude.chuck(e);
484         }
485       }
486     });
487   }
488 
489   /**
490    * Create a {@link AssetManagerItem.TakeSnapshot} message.
491    * <p>
492    * Do not call outside of a security context.
493    */
494   private AssetManagerItem.TakeSnapshot mkTakeSnapshotMessage(Snapshot snapshot) {
495     final MediaPackage mp = snapshot.getMediaPackage();
496 
497     long version;
498     try {
499       version = Long.parseLong(snapshot.getVersion().toString());
500     } catch (NumberFormatException e) {
501       // The index requires a version to be a long value.
502       // Since the asset manager default implementation uses long values that should be not a problem.
503       // However, a decent exception message is helpful if a different implementation of the asset manager
504       // is used.
505       throw new RuntimeException("The current implementation of the index requires versions being of type 'long'.");
506     }
507 
508     return AssetManagerItem.add(workspace, mp, authorizationService.getActiveAcl(mp).getA(),
509             version, snapshot.getArchivalDate());
510   }
511 
512   @Override
513   public void triggerIndexUpdate(String mediaPackageId) throws NotFoundException, UnauthorizedException {
514 
515     if (!securityService.getUser().hasRole("ROLE_ADMIN")) {
516       throw new UnauthorizedException("Only global administrators may trigger manual event updates.");
517     }
518     final AQueryBuilder q = createQuery();
519     final AResult r = q.select(q.snapshot()).where(q.mediaPackageId(mediaPackageId).and(q.version().isLatest())).run();
520 
521     if (r.getSize() == 0) {
522       throw new NotFoundException("No event with ID `" + mediaPackageId + "`");
523     }
524 
525     // Update event index with latest snapshot
526     var snapshot = r.getRecords().stream().findFirst().get().getSnapshot().get();
527     updateEventInIndex(snapshot);
528   }
529 
530   /**
531    * Update the event in the Elasticsearch index.
532    *
533    * @param snapshot
534    *         The newest snapshot of the event to update
535    */
536   private void updateEventInIndex(Snapshot snapshot) {
537     final String eventId = snapshot.getMediaPackage().getIdentifier().toString();
538     final String orgId = securityService.getOrganization().getId();
539     final User user = securityService.getUser();
540 
541     logger.debug("Updating event {} in the {} index.", eventId, index.getIndexName());
542     Function<Optional<Event>, Optional<Event>> updateFunction = getEventUpdateFunction(snapshot, orgId, user);
543 
544     try {
545       index.addOrUpdateEvent(eventId, updateFunction, orgId, user);
546       logger.debug("Event {} updated in the {} index.", eventId, index.getIndexName());
547     } catch (SearchIndexException e) {
548       logger.error("Error updating the event {} in the {} index.", eventId, index.getIndexName(), e);
549     }
550   }
551 
552   /**
553    * Remove the event from the Elasticsearch index
554    *
555    * @param eventId
556    *         The id of the event to remove
557    */
558   private void removeArchivedVersionFromIndex(String eventId) {
559     final String orgId = securityService.getOrganization().getId();
560     final User user = securityService.getUser();
561     logger.debug("Received AssetManager delete episode message {}", eventId);
562 
563     Function<Optional<Event>, Optional<Event>> updateFunction = (Optional<Event> eventOpt) -> {
564       if (eventOpt.isEmpty()) {
565         logger.warn("Event {} not found for deletion", eventId);
566         return Optional.empty();
567       }
568       Event event = eventOpt.get();
569       event.setArchiveVersion(null);
570       return Optional.of(event);
571     };
572 
573     try {
574       index.addOrUpdateEvent(eventId, updateFunction, orgId, user);
575       logger.debug("Event {} removed from the {} index", eventId, index.getIndexName());
576     } catch (SearchIndexException e) {
577       logger.error("Error deleting the event {} from the {} index.", eventId, index.getIndexName(), e);
578     }
579   }
580 
581   @Override
582   public RichAResult getSnapshotsById(final String mpId) {
583     RequireUtil.requireNotBlank(mpId, "mpId");
584     AQueryBuilder q = createQuery();
585     ASelectQuery query = baseQuery(q, mpId);
586     return Enrichments.enrich(query.run());
587   }
588 
589   @Override
590   public RichAResult getSnapshotsByIdOrderedByVersion(String mpId, boolean asc) {
591     RequireUtil.requireNotBlank(mpId, "mpId");
592     AQueryBuilder q = createQuery();
593     ASelectQuery query = baseQuery(q, mpId);
594     if (asc) {
595       query = query.orderBy(q.version().asc());
596     } else {
597       query = query.orderBy(q.version().desc());
598     }
599     return Enrichments.enrich(query.run());
600   }
601 
602   @Override
603   public RichAResult getSnapshotsByIdAndVersion(final String mpId, final Version version) {
604     RequireUtil.requireNotBlank(mpId, "mpId");
605     RequireUtil.notNull(version, "version");
606     AQueryBuilder q = createQuery();
607     ASelectQuery query = baseQuery(q, version, mpId);
608     return Enrichments.enrich(query.run());
609   }
610 
611   @Override
612   public RichAResult getSnapshotsByDate(final Date start, final Date end) {
613     RequireUtil.notNull(start, "start");
614     RequireUtil.notNull(end, "end");
615     AQueryBuilder q = createQuery();
616     ASelectQuery query = baseQuery(q).where(q.archived().ge(start)).where(q.archived().le(end));
617     return Enrichments.enrich(query.run());
618   }
619 
620   @Override
621   public RichAResult getSnapshotsByDateOrderedById(Date start, Date end) {
622     RequireUtil.notNull(start, "start");
623     RequireUtil.notNull(end, "end");
624     AQueryBuilder q = createQuery();
625     ASelectQuery query = baseQuery(q).where(q.archived().ge(start)).where(q.archived().le(end));
626     return Enrichments.enrich(query.orderBy(q.mediapackageId().asc()).run());
627   }
628 
629   @Override
630   public RichAResult getSnapshotsByIdAndDate(final String mpId, final Date start, final Date end) {
631     RequireUtil.requireNotBlank(mpId, "mpId");
632     RequireUtil.notNull(start, "start");
633     RequireUtil.notNull(end, "end");
634     AQueryBuilder q = createQuery();
635     ASelectQuery query = baseQuery(q, mpId).where(q.archived().ge(start)).where(q.archived().le(end));
636     return Enrichments.enrich(query.run());
637   }
638 
639   @Override
640   public RichAResult getSnapshotsByIdAndDateOrderedByVersion(String mpId, Date start, Date end, boolean asc) {
641     RequireUtil.requireNotBlank(mpId, "mpId");
642     RequireUtil.notNull(start, "start");
643     RequireUtil.notNull(end, "end");
644     AQueryBuilder q = createQuery();
645     ASelectQuery query = baseQuery(q, mpId).where(q.archived().ge(start)).where(q.archived().le(end));
646     if (asc) {
647       query = query.orderBy(q.version().asc());
648     } else {
649       query = query.orderBy(q.version().desc());
650     }
651     return Enrichments.enrich(query.run());
652   }
653 
654   @Override
655   public void moveSnapshotsById(final String mpId, final String targetStore) throws NotFoundException {
656     RichAResult results = getSnapshotsById(mpId);
657 
658     if (results.getRecords().isEmpty()) {
659       throw new NotFoundException("Mediapackage " + mpId + " not found!");
660     }
661 
662     processOperations(results, targetStore);
663   }
664 
665   @Override
666   public void moveSnapshotsByIdAndVersion(final String mpId, final Version version, final String targetStore)
667           throws NotFoundException {
668     RichAResult results = getSnapshotsByIdAndVersion(mpId, version);
669 
670     if (results.getRecords().isEmpty()) {
671       throw new NotFoundException("Mediapackage " + mpId + "@" + version.toString() + " not found!");
672     }
673 
674     processOperations(results, targetStore);
675   }
676 
677   @Override
678   public void moveSnapshotsByDate(final Date start, final Date end, final String targetStore)
679           throws NotFoundException {
680     // We don't use #getSnapshotsByDate() as this includes also all snapshots already in targetStore. On large installs
681     // this could lead to memory overflow.
682     AQueryBuilder q = createQuery();
683     ASelectQuery query = baseQuery(q)
684         .where(q.storage(targetStore).not())
685         .where(q.archived().ge(start))
686         .where(q.archived().le(end));
687     RichAResult results = Enrichments.enrich(query.run());
688 
689     if (results.getRecords().isEmpty()) {
690       throw new NotFoundException("No media packages found between " + start + " and " + end);
691     }
692 
693     processOperations(results, targetStore);
694   }
695 
696   @Override
697   public void moveSnapshotsByIdAndDate(final String mpId, final Date start, final Date end, final String targetStore)
698           throws NotFoundException {
699     RichAResult results = getSnapshotsByIdAndDate(mpId, start, end);
700 
701     if (results.getRecords().isEmpty()) {
702       throw new NotFoundException("No media package with id " + mpId + " found between " + start + " and " + end);
703     }
704 
705     processOperations(results, targetStore);
706   }
707 
708   @Override
709   public void moveSnapshotToStore(final Version version, final String mpId, final String storeId)
710           throws NotFoundException {
711 
712     //Find the snapshot
713     AQueryBuilder q = createQuery();
714     RichAResult results = Enrichments.enrich(baseQuery(q, version, mpId).run());
715 
716     if (results.getRecords().isEmpty()) {
717       throw new NotFoundException("Mediapackage " + mpId + "@" + version.toString() + " not found!");
718     }
719     processOperations(results, storeId);
720   }
721 
722   //Do the actual moving
723   private void processOperations(final RichAResult results, final String targetStoreId) {
724     results.getRecords().forEach(record -> {
725       Snapshot s = record.getSnapshot().get();
726       Optional<String> currentStoreId = getSnapshotStorageLocation(s);
727 
728       if (currentStoreId.isEmpty()) {
729         logger.warn("IsNone store ID");
730         return;
731       }
732 
733       //If this snapshot is already stored in the desired store
734       if (currentStoreId.get().equals(targetStoreId)) {
735         //return, since we don't need to move anything
736         return;
737       }
738 
739       AssetStore currentStore;
740       AssetStore targetStore;
741 
742       Optional<AssetStore> optCurrentStore = getAssetStore(currentStoreId.get());
743       Optional<AssetStore> optTargetStore = getAssetStore(targetStoreId);
744 
745       if (!optCurrentStore.isEmpty()) {
746         currentStore = optCurrentStore.get();
747       } else {
748         logger.error("Unknown current store: " + currentStoreId.get());
749         return;
750       }
751       if (!optTargetStore.isEmpty()) {
752         targetStore = optTargetStore.get();
753       } else {
754         logger.error("Unknown target store: " + targetStoreId);
755         return;
756       }
757 
758       //If the content is already local, or is moving from a remote to the local
759       // Returns true if the store id is equal to the local asset store's id
760       String localAssetStoreType = getLocalAssetStore().getStoreType();
761       if (localAssetStoreType.equals(currentStoreId.get()) || localAssetStoreType.equals(targetStoreId)) {
762         logger.debug("Moving {} from {} to {}", s, currentStoreId, targetStoreId);
763 
764         try {
765           copyAssetsToStore(s, targetStore);
766           copyManifest(s, targetStore);
767         } catch (Exception e) {
768           Functions.chuck(e);
769         }
770         getDatabase().setStorageLocation(s, targetStoreId);
771         currentStore.delete(DeletionSelector.delete(s.getOrganizationId(),
772                 s.getMediaPackage().getIdentifier().toString(), s.getVersion()
773         ));
774       } else {
775         //Else, the content is *not* local and is going to a *different* remote
776         String intermediateStore = getLocalAssetStore().getStoreType();
777         logger.debug("Moving {} from {} to {}, then to {}",
778                 s, currentStoreId, intermediateStore, targetStoreId);
779         Version version = s.getVersion();
780         String mpId = s.getMediaPackage().getIdentifier().toString();
781         try {
782           moveSnapshotToStore(version, mpId, intermediateStore);
783           moveSnapshotToStore(version, mpId, targetStoreId);
784         } catch (NotFoundException e) {
785           Functions.chuck(e);
786         }
787       }
788     });
789   }
790 
791   // Return the asset store ID that is currently storing the snapshot
792   public Optional<String> getSnapshotStorageLocation(final Version version, final String mpId) {
793     RichAResult result = getSnapshotsByIdAndVersion(mpId, version);
794 
795     for (Snapshot snapshot : result.getSnapshots()) {
796       return Optional.of(snapshot.getStorageId());
797     }
798 
799     logger.error("Mediapackage " + mpId + "@" + version + " not found!");
800     return Optional.empty();
801   }
802 
803   public Optional<String> getSnapshotStorageLocation(final Snapshot snap) {
804     return getSnapshotStorageLocation(snap.getVersion(), snap.getMediaPackage().getIdentifier().toString());
805   }
806 
807   /** Properties */
808 
809   @Override
810   public boolean setProperty(Property property) {
811     final String mpId = property.getId().getMediaPackageId();
812     if (isAuthorized(mpId, WRITE_ACTION)) {
813       return getDatabase().saveProperty(property);
814     }
815     return chuck(new UnauthorizedException("Not allowed to set property on episode " + mpId));
816   }
817 
818   @Override
819   public List<Property> selectProperties(final String mediaPackageId, String namespace) {
820     if (isAuthorized(mediaPackageId, READ_ACTION)) {
821       return getDatabase().selectProperties(mediaPackageId, namespace);
822     }
823     return chuck(new UnauthorizedException(format("Not allowed to read properties of event %s", mediaPackageId)));
824   }
825 
826   @Override
827   public int deleteProperties(final String mediaPackageId) {
828     return getDatabase().deleteProperties(mediaPackageId);
829   }
830 
831   @Override
832   public int deleteProperties(final String mediaPackageId, final String namespace) {
833     return getDatabase().deleteProperties(mediaPackageId, namespace);
834   }
835 
836   /** Misc. */
837 
838   @Override
839   public AQueryBuilder createQuery() {
840     return new AQueryBuilderDecorator(createQueryWithoutSecurityCheck()) {
841       @Override public ASelectQuery select(Target... target) {
842         switch (isAdmin()) {
843           case GLOBAL:
844             return super.select(target);
845           case ORGANIZATION:
846             return super.select(target).where(restrictToUsersOrganization());
847           default:
848             return super.select(target).where(mkAuthPredicate(READ_ACTION));
849         }
850       }
851 
852       @Override public ADeleteQuery delete(String owner, Target target) {
853         switch (isAdmin()) {
854           case GLOBAL:
855             return super.delete(owner, target);
856           case ORGANIZATION:
857             return super.delete(owner, target).where(restrictToUsersOrganization());
858           default:
859             return super.delete(owner, target).where(mkAuthPredicate(WRITE_ACTION));
860         }
861       }
862     };
863   }
864 
865   private AQueryBuilder createQueryWithoutSecurityCheck() {
866     return new AQueryBuilderDecorator(new AQueryBuilderImpl(this)) {
867       @Override
868       public ADeleteQuery delete(String owner, Target target) {
869         return new ADeleteQueryWithMessaging(super.delete(owner, target));
870       }
871     };
872   }
873 
874   @Override
875   public Optional<Version> toVersion(String version) {
876     try {
877       return Optional.of(VersionImpl.mk(Long.parseLong(version)));
878     } catch (NumberFormatException e) {
879       return Optional.empty();
880     }
881   }
882 
883   @Override
884   public long countEvents(final String organization) {
885     return getDatabase().countEvents(organization);
886   }
887 
888   @Override
889   public void handleDeletedEpisode(String mpId) {
890     logger.info("Firing event handlers for deleting event {}", mpId);
891     fireEventHandlers(AssetManagerItem.deleteEpisode(mpId, new Date()));
892 
893     removeArchivedVersionFromIndex(mpId);
894   }
895 
896   /**
897    * AbstractIndexProducer Implementation
898    */
899 
900   @Override
901   public IndexRebuildService.Service getService() {
902     return IndexRebuildService.Service.AssetManager;
903   }
904 
905   @Override
906   public DataType[] getSupportedDataTypes() {
907     return new DataType[]{ DataType.ALL, DataType.ACL };
908   }
909 
910   @Override
911   public void repopulate(DataType dataType) throws IndexRebuildException {
912     final Organization originalOrg = securityService.getOrganization();
913     final User originalUser = (originalOrg != null ? securityService.getUser() : null);
914     try {
915       final Organization defaultOrg = new DefaultOrganization();
916       final User defaultSystemUser = SecurityUtil.createSystemUser(systemUserName, defaultOrg);
917       securityService.setOrganization(defaultOrg);
918       securityService.setUser(defaultSystemUser);
919 
920       int offset = 0;
921       int total = (int) countEvents(null);
922       final AQueryBuilder q = createQuery();
923       RichAResult r;
924       int current = 0;
925       logIndexRebuildBegin(logger, total, "snapshot(s)");
926       var updatedEventRange = new ArrayList<Event>();
927       do {
928         r = enrich(q.select(q.snapshot()).where(q.version().isLatest()).orderBy(q.mediapackageId().desc())
929             .page(offset, PAGE_SIZE).run());
930         offset += PAGE_SIZE;
931         int n = 20;
932 
933         final Map<String, List<Snapshot>> byOrg = r.getSnapshots().stream()
934             .collect(Collectors.groupingBy(Snapshot::getOrganizationId));
935         for (String orgId : byOrg.keySet()) {
936           final Organization snapshotOrg;
937           try {
938             snapshotOrg = orgDir.getOrganization(orgId);
939             User snapshotSystemUser = SecurityUtil.createSystemUser(systemUserName, snapshotOrg);
940             securityService.setOrganization(snapshotOrg);
941             securityService.setUser(snapshotSystemUser);
942             for (Snapshot snapshot : byOrg.get(orgId)) {
943               try {
944                 current++;
945 
946                 var updatedEventData = index.getEvent(snapshot.getMediaPackage().getIdentifier().toString(), orgId,
947                     snapshotSystemUser);
948                 if (dataType == DataType.ALL) {
949                   // Reindex everything (default)
950                   updatedEventData = getEventUpdateFunction(snapshot, orgId, snapshotSystemUser)
951                       .apply(updatedEventData);
952                 } else if (dataType == DataType.ACL) {
953                   // Only reindex ACLs
954                   updatedEventData = getEventUpdateFunctionOnlyAcl(snapshot, orgId)
955                       .apply(updatedEventData);
956                 } else {
957                   throw new IndexRebuildException(dataType + " is not a supported data type. "
958                       + "Accepted values are " + Arrays.toString(getSupportedDataTypes()) + ".");
959                 }
960                 updatedEventRange.add(updatedEventData.get());
961 
962                 if (updatedEventRange.size() >= n || current >= total) {
963                   index.bulkEventUpdate(updatedEventRange);
964                   logIndexRebuildProgress(logger, total, current, n);
965                   updatedEventRange.clear();
966                 }
967               } catch (Throwable t) {
968                 logSkippingElement(logger, "event", snapshot.getMediaPackage().getIdentifier().toString(),
969                     snapshotOrg, t);
970               }
971             }
972           } catch (Throwable t) {
973             logIndexRebuildError(logger, t, originalOrg);
974             throw new IndexRebuildException(getService(), originalOrg, t);
975           } finally {
976             securityService.setOrganization(defaultOrg);
977             securityService.setUser(defaultSystemUser);
978           }
979         }
980       } while (offset < total);
981     } finally {
982       securityService.setOrganization(originalOrg);
983       securityService.setUser(originalUser);
984     }
985   }
986 
987   /**
988    * Used for testing
989    */
990   public void setAvailability(Version version, String mpId, Availability availability) {
991     if (isAuthorized(mpId, WRITE_ACTION)) {
992       getDatabase().setAvailability(RuntimeTypes.convert(version), mpId, availability);
993     } else {
994       chuck(new UnauthorizedException("Not allowed to set availability of episode " + mpId));
995     }
996   }
997 
998   public void setDatabase(Database database) {
999     this.db = database;
1000   }
1001 
1002   public Database getDatabase() {
1003     return db;
1004   }
1005 
1006   public HttpAssetProvider getHttpAssetProvider() {
1007     return httpAssetProvider;
1008   }
1009 
1010   /*
1011    * Security handling
1012    */
1013 
1014   /**
1015    * Create an authorization predicate to be used with {@link #isAuthorized(String, String)},
1016    * restricting access to the user's organization and the given action.
1017    *
1018    * @param action
1019    *     the action to restrict access to
1020    */
1021   private Predicate mkAuthPredicate(final String action) {
1022     final AQueryBuilder q = createQueryWithoutSecurityCheck();
1023     return securityService.getUser().getRoles().stream()
1024             .filter(roleFilter)
1025             .map((role) -> {
1026               if (role.getName().startsWith(EPISODE_ROLE_ID_PREFIX)) {
1027                 return q.mediapackageId().eq(StringUtils.substringBetween(
1028                     role.getName(), EPISODE_ROLE_ID_PREFIX + "_", "_"));
1029               } else {
1030                 return q.property(Value.BOOLEAN, SECURITY_NAMESPACE, mkPropertyName(role.getName(), action)).eq(true);
1031               }
1032             })
1033             .reduce(Predicate::or)
1034             .orElseGet(() -> q.always().not())
1035             .and(restrictToUsersOrganization());
1036   }
1037 
1038   /** Create a predicate that restricts access to the user's organization. */
1039   private Predicate restrictToUsersOrganization() {
1040     return createQueryWithoutSecurityCheck().organizationId().eq(securityService.getUser().getOrganization().getId());
1041   }
1042 
1043   /** Check authorization based on the given predicate. */
1044   private boolean isAuthorized(final String mediaPackageId, final String action) {
1045     switch (isAdmin()) {
1046       case GLOBAL:
1047         // grant general access
1048         logger.debug("Access granted since user is global admin");
1049         return true;
1050       case ORGANIZATION:
1051         // ensure that the requested assets belong to this organization
1052         logger.debug("User is organization admin. Checking organization. Checking organization ID of asset.");
1053         return snapshotExists(mediaPackageId, securityService.getOrganization().getId());
1054       default:
1055         // check organization
1056         logger.debug("Non admin user. Checking organization.");
1057         final String org = securityService.getOrganization().getId();
1058         if (!snapshotExists(mediaPackageId, org)) {
1059           return false;
1060         }
1061         // check episode role id
1062         User user = securityService.getUser();
1063         if (user.hasRole(getEpisodeRoleId(mediaPackageId, action))) {
1064           return true;
1065         }
1066         // check acl rules
1067         logger.debug("Non admin user. Checking ACL rules.");
1068         final List<String> roles = user.getRoles().parallelStream()
1069                 .filter(roleFilter)
1070                 .map((role) -> mkPropertyName(role.getName(), action))
1071                 .collect(Collectors.toList());
1072         return getDatabase().selectProperties(mediaPackageId, SECURITY_NAMESPACE).parallelStream()
1073                 .map(p -> p.getId().getName())
1074                 .filter(p -> p.endsWith(action))
1075                 .anyMatch(p -> roles.stream().anyMatch(r -> r.equals(p)));
1076     }
1077   }
1078 
1079   private AdminRole isAdmin() {
1080     final User user = securityService.getUser();
1081     if (user.hasRole(GLOBAL_ADMIN_ROLE)) {
1082       return AdminRole.GLOBAL;
1083     } else if (user.hasRole(securityService.getOrganization().getAdminRole())
1084             || user.hasRole(GLOBAL_CAPTURE_AGENT_ROLE)) {
1085       // In this context, we treat capture agents the same way as organization admins, allowing them access so that
1086       // they can ingest new media without requiring them to be explicitly specified in the ACLs.
1087       return AdminRole.ORGANIZATION;
1088     } else {
1089       return AdminRole.NONE;
1090     }
1091   }
1092 
1093   private String mkPropertyName(String role, String action) {
1094     return role + " | " + action;
1095   }
1096 
1097   /**
1098    * Configurable filter for roles
1099    */
1100   private final java.util.function.Predicate<Role> roleFilter = (role) -> {
1101     final String name = role.getName();
1102     return (includeAPIRoles || !name.startsWith("ROLE_API_"))
1103             && (includeCARoles  || !name.startsWith("ROLE_CAPTURE_AGENT_"))
1104             && (includeUIRoles  || !name.startsWith("ROLE_UI_"));
1105   };
1106 
1107   /*
1108    * Utility
1109    */
1110 
1111   /**
1112    * Return a basic query which returns the snapshot and its current storage location
1113    *
1114    * @param q
1115    *   The query builder object to configure
1116    * @return
1117    *   The {@link ASelectQuery} configured with as described above
1118    */
1119   private ASelectQuery baseQuery(final AQueryBuilder q) {
1120     RequireUtil.notNull(q, "q");
1121     return q.select(q.snapshot());
1122   }
1123 
1124   /**
1125    * Return a mediapackage filtered query which returns the snapshot and its current storage location
1126    *
1127    * @param q
1128    *   The query builder object to configure
1129    * @param mpId
1130    *   The mediapackage ID to filter results for
1131    * @return
1132    *   The {@link ASelectQuery} configured with as described above
1133    */
1134   private ASelectQuery baseQuery(final AQueryBuilder q, final String mpId) {
1135     RequireUtil.notNull(q, "q");
1136     ASelectQuery query = baseQuery(q);
1137     if (StringUtils.isNotEmpty(mpId)) {
1138       return query.where(q.mediaPackageId(mpId));
1139     } else {
1140       return query;
1141     }
1142   }
1143 
1144   /**
1145    * Return a mediapackage and version filtered query which returns the snapshot and its current storage location
1146    *
1147    * @param q
1148    *   The query builder object to configure
1149    * @param version
1150    *   The version to filter results for
1151    * @param mpId
1152    *   The mediapackage ID to filter results for
1153    * @return
1154    *   The {@link ASelectQuery} configured with as described above
1155    */
1156   private ASelectQuery baseQuery(final AQueryBuilder q, final Version version, final String mpId) {
1157     RequireUtil.notNull(q, "q");
1158     RequireUtil.requireNotBlank(mpId, "mpId");
1159     ASelectQuery query = baseQuery(q, mpId);
1160     if (null != version) {
1161       return query.where(q.version().eq(version));
1162     } else {
1163       return query;
1164     }
1165   }
1166 
1167   /** Move the assets for a snapshot to the target store */
1168   private void copyAssetsToStore(Snapshot snap, AssetStore store) {
1169     final String mpId = snap.getMediaPackage().getIdentifier().toString();
1170     final String orgId = snap.getOrganizationId();
1171     final Version version = snap.getVersion();
1172     final String prettyMpId = mpId + "@v" + version;
1173     logger.debug("Moving assets for snapshot {} to store {}", prettyMpId, store.getStoreType());
1174     for (final MediaPackageElement e : snap.getMediaPackage().getElements()) {
1175       if (!MOVABLE_TYPES.contains(e.getElementType())) {
1176         logger.debug("Skipping {} because type is {}", e.getIdentifier(), e.getElementType());
1177         continue;
1178       }
1179       logger.debug("Moving {} to store {}", e.getIdentifier(), store.getStoreType());
1180       final StoragePath storagePath = StoragePath.mk(orgId, mpId, version, e.getIdentifier());
1181       if (store.contains(storagePath)) {
1182         logger.debug("Element {} (version {}) is already in store {} so skipping it", e.getIdentifier(),
1183                 version, store.getStoreType());
1184         continue;
1185       }
1186 
1187       // find asset in versions & stores
1188       final Optional<StoragePath> existingAssetOpt =
1189           getDatabase()
1190           .findAssetByChecksumAndStoreAndOrg(e.getChecksum().toString(), store.getStoreType(), orgId)
1191           .map(dto -> StoragePath.mk(
1192               dto.getOrganizationId(),
1193               dto.getMediaPackageId(),
1194               dto.getVersion(),
1195               dto.getAssetDto().getMediaPackageElementId()
1196           ));
1197 
1198       if (existingAssetOpt.isPresent()) {
1199         final StoragePath existingAsset = existingAssetOpt.get();
1200         logger.debug("Content of asset {} with checksum {} already exists in {}",
1201                 existingAsset.getMediaPackageElementId(), e.getChecksum(), store.getStoreType());
1202         if (!store.copy(existingAsset, storagePath)) {
1203           throw new AssetManagerException(format(
1204                   "An asset with checksum %s has already been archived but trying to copy or link asset %s to it "
1205                           + "failed",
1206                   e.getChecksum(),
1207                   existingAsset
1208           ));
1209         }
1210       } else {
1211         final Optional<Long> size = e.getSize() > 0 ? Optional.of(e.getSize()) : Optional.empty();
1212         store.put(storagePath, Source.mk(e.getURI(), size, Optional.ofNullable(e.getMimeType())));
1213       }
1214       getDatabase().setAssetStorageLocation(VersionImpl.mk(version), mpId, e.getIdentifier(), store.getStoreType());
1215     }
1216   }
1217 
1218   private void copyManifest(Snapshot snap, AssetStore targetStore) throws IOException, NotFoundException {
1219     final String mpId = snap.getMediaPackage().getIdentifier().toString();
1220     final String orgId = snap.getOrganizationId();
1221     final Version version = snap.getVersion();
1222 
1223     AssetStore currentStore = getAssetStore(snap.getStorageId()).get();
1224     Optional<String> manifestOpt = findManifestBaseName(snap, MANIFEST_DEFAULT_NAME, currentStore);
1225     if (manifestOpt.isEmpty()) {
1226       return; // Nothing to do, already moved to long-term storage
1227     }
1228 
1229     // Copy the manifest file
1230     String manifestBaseName = manifestOpt.get();
1231     StoragePath pathToManifest = new StoragePath(orgId, mpId, version, manifestBaseName);
1232 
1233     // Already copied?
1234     if (!targetStore.contains(pathToManifest)) {
1235       Optional<InputStream> inputStreamOpt;
1236       InputStream inputStream = null;
1237       String manifestFileName = null;
1238       try {
1239         inputStreamOpt = currentStore.get(pathToManifest);
1240         if (inputStreamOpt.isEmpty()) { // This should never happen because it has been tested before
1241           throw new NotFoundException(
1242                   String.format("Unexpected error. Manifest %s not found in current asset store", manifestBaseName));
1243         }
1244 
1245         inputStream = inputStreamOpt.get();
1246         manifestFileName = UUID.randomUUID() + ".xml";
1247         URI manifestTmpUri = workspace.putInCollection("archive", manifestFileName, inputStream);
1248         targetStore.put(pathToManifest, Source.mk(manifestTmpUri, Optional.empty(), Optional.of(MimeTypes.XML)));
1249       } finally {
1250         IOUtils.closeQuietly(inputStream);
1251         try {
1252           // Make sure to clean up the temporary file
1253           workspace.deleteFromCollection("archive", manifestFileName);
1254         } catch (NotFoundException e) {
1255           // This is OK, we are deleting it anyway
1256         } catch (IOException e) {
1257           // This usually happens when the collection directory cannot be deleted
1258           // because another process is running at the same time and wrote a file there
1259           // after it was tested but before it was actually deleted. We will consider this ok.
1260           // Does the error message mention the manifest file name?
1261           if (e.getMessage().contains(manifestFileName)) {
1262             logger.warn("The manifest file {} didn't get deleted from the archive collection",
1263                     manifestBaseName, e);
1264           }
1265           // Else the error is related to the file-archive collection, which is fine
1266         }
1267       }
1268     }
1269   }
1270 
1271   Optional<String> findManifestBaseName(Snapshot snap, String manifestName, AssetStore store) {
1272     StoragePath path = new StoragePath(snap.getOrganizationId(), snap.getMediaPackage().getIdentifier().toString(),
1273             snap.getVersion(), manifestName);
1274     // If manifest_.xml, etc not found, return previous name (copied from the EpsiodeServiceImpl logic)
1275     if (!store.contains(path)) {
1276       // If first call, manifest is not found, which probably means it has already been moved
1277       if (MANIFEST_DEFAULT_NAME.equals(manifestName)) {
1278         return Optional.empty(); // No manifest found in current store
1279       } else {
1280         return Optional.of(manifestName.substring(0, manifestName.length() - 1));
1281       }
1282     }
1283     // This is the same logic as when building the manifest name: manifest, manifest_, manifest__, etc
1284     return findManifestBaseName(snap, manifestName + "_", store);
1285   }
1286 
1287   /* -------------------------------------------------------------------------------------------------------------- */
1288 
1289   /**
1290    * Make sure each of the elements has a checksum.
1291    */
1292   void calcChecksumsForMediaPackageElements(PartialMediaPackage pmp) {
1293     final Fx<MediaPackageElement> addChecksum = new Fx<MediaPackageElement>() {
1294       @Override public void apply(MediaPackageElement mpe) {
1295         File file = null;
1296         try {
1297           logger.trace("Calculate checksum for {}", mpe.getURI());
1298           file = workspace.get(mpe.getURI(), true);
1299           mpe.setChecksum(Checksum.create(ChecksumType.DEFAULT_TYPE, file));
1300         } catch (IOException | NotFoundException e) {
1301           throw new AssetManagerException(format(
1302                   "Cannot calculate checksum for media package element %s",
1303                   mpe.getURI()
1304           ), e);
1305         } finally {
1306           if (file != null) {
1307             FileUtils.deleteQuietly(file);
1308           }
1309         }
1310       }
1311     };
1312     pmp.getElements().filter(hasNoChecksum.toFn()).each(addChecksum).run();
1313   }
1314 
1315   /** Mutates mp and its elements, so make sure to work on a copy. */
1316   private SnapshotDto addInternal(String owner, final MediaPackage mp) throws Exception {
1317     final Date now = new Date();
1318     // claim a new version for the media package
1319     final String mpId = mp.getIdentifier().toString();
1320     final VersionImpl version = getDatabase().claimVersion(mpId);
1321     logger.info("Creating new version {} of media package {}", version, mp);
1322     final PartialMediaPackage pmp = assetsOnly(mp);
1323     // make sure they have a checksum
1324     calcChecksumsForMediaPackageElements(pmp);
1325     // download and archive elements
1326     storeAssets(pmp, version);
1327     // store mediapackage in db
1328     final SnapshotDto snapshotDto;
1329     try {
1330       // rewrite URIs for archival
1331       Fn<MediaPackageElement, URI> uriCreator = new Fn<MediaPackageElement, URI>() {
1332         @Override
1333         public URI apply(MediaPackageElement mpe) {
1334           try {
1335             String fileName = getFileName(mpe).getOr("unknown");
1336             return new URI(
1337                     "urn",
1338                     "matterhorn:" + mpId + ":" + version + ":" + mpe.getIdentifier() + ":" + fileName,
1339                     null
1340             );
1341           } catch (URISyntaxException e) {
1342             throw new AssetManagerException(e);
1343           }
1344         }
1345       };
1346 
1347       for (MediaPackageElement mpe : pmp.getElements()) {
1348         mpe.setURI(uriCreator.apply(mpe));
1349       }
1350 
1351       String currentOrgId = securityService.getOrganization().getId();
1352       snapshotDto = getDatabase().saveSnapshot(
1353               currentOrgId, pmp, now, version,
1354               Availability.ONLINE, getLocalAssetStore().getStoreType(), owner
1355       );
1356     } catch (AssetManagerException e) {
1357       logger.error("Could not take snapshot {}", mpId, e);
1358       throw new AssetManagerException(e);
1359     }
1360     // save manifest to element store
1361     // this is done at the end after the media package element ids have been rewritten to neutral URNs
1362     storeManifest(pmp, version);
1363     return snapshotDto;
1364   }
1365 
1366   /**
1367    * Store all elements of <code>pmp</code> under the given version.
1368    */
1369   private void storeAssets(final PartialMediaPackage pmp, final Version version) {
1370     final String mpId = pmp.getMediaPackage().getIdentifier().toString();
1371     final String orgId = securityService.getOrganization().getId();
1372     for (final MediaPackageElement e : pmp.getElements()) {
1373       logger.debug("Archiving {} {} {}", e.getFlavor(), e.getMimeType(), e.getURI());
1374       final StoragePath storagePath = StoragePath.mk(orgId, mpId, version, e.getIdentifier());
1375       // find asset in versions
1376       final Optional<StoragePath> existingAssetOpt = getDatabase()
1377           .findAssetByChecksumAndStoreAndOrg(e.getChecksum().toString(), getLocalAssetStore().getStoreType(), orgId)
1378           .map(dto -> StoragePath.mk(
1379                   dto.getOrganizationId(),
1380                   dto.getMediaPackageId(),
1381                   dto.getVersion(),
1382                   dto.getAssetDto().getMediaPackageElementId()));
1383 
1384       if (existingAssetOpt.isPresent()) {
1385         final StoragePath existingAsset = existingAssetOpt.get();
1386         logger.debug("Content of asset {} with checksum {} has been archived before",
1387                 existingAsset.getMediaPackageElementId(), e.getChecksum());
1388         if (!getLocalAssetStore().copy(existingAsset, storagePath)) {
1389           throw new AssetManagerException(format(
1390                   "An asset with checksum %s has already been archived but trying to copy or link asset %s to it "
1391                           + "failed",
1392                   e.getChecksum(),
1393                   existingAsset
1394           ));
1395         }
1396       } else {
1397         final Optional<Long> size = e.getSize() > 0 ? Optional.of(e.getSize()) : Optional.empty();
1398         getLocalAssetStore().put(storagePath, Source.mk(e.getURI(), size, Optional.ofNullable(e.getMimeType())));
1399       }
1400     }
1401   }
1402 
1403   private void storeManifest(final PartialMediaPackage pmp, final Version version) throws Exception {
1404     final String mpId = pmp.getMediaPackage().getIdentifier().toString();
1405     final String orgId = securityService.getOrganization().getId();
1406     // store the manifest.xml
1407     // TODO make use of checksums
1408     logger.debug("Archiving manifest of media package {} version {}", mpId, version);
1409     // temporarily save the manifest XML into the workspace to
1410     // Fix file not found exception when several snapshots are taken at the same time
1411     final String manifestFileName = format("manifest_%s_%s.xml", pmp.getMediaPackage().getIdentifier(), version);
1412     final URI manifestTmpUri = workspace.putInCollection(
1413             "archive",
1414             manifestFileName,
1415             IOUtils.toInputStream(MediaPackageParser.getAsXml(pmp.getMediaPackage()), "UTF-8"));
1416     try {
1417       getLocalAssetStore().put(
1418               StoragePath.mk(orgId, mpId, version, manifestAssetId(pmp, "manifest")),
1419               Source.mk(manifestTmpUri, Optional.empty(), Optional.of(MimeTypes.XML)));
1420     } finally {
1421       // make sure to clean up the temporary file
1422       workspace.deleteFromCollection("archive", manifestFileName);
1423     }
1424   }
1425 
1426   /**
1427    * Create a unique id for the manifest xml. This is to avoid an id collision
1428    * in the rare case that the media package contains an XML element with the id
1429    * used for the manifest. A UUID could also be used but this is far less
1430    * readable.
1431    *
1432    * @param seedId
1433    *          the id to start with
1434    */
1435   private String manifestAssetId(PartialMediaPackage pmp, String seedId) {
1436     if ($(pmp.getElements()).map(getMediaPackageElementId.toFn()).exists(Booleans.eq(seedId))) {
1437       return manifestAssetId(pmp, seedId + "_");
1438     } else {
1439       return seedId;
1440     }
1441   }
1442 
1443   /* --------------------------------------------------------------------------------------------------------------- */
1444 
1445   /**
1446    * Unify exception handling by wrapping any occurring exception in an
1447    * {@link AssetManagerException}.
1448    */
1449   static <A> A handleException(final P1<A> p) throws AssetManagerException {
1450     try {
1451       return p.get1();
1452     } catch (Exception e) {
1453       logger.error("An error occurred", e);
1454       throw unwrapExceptionUntil(AssetManagerException.class, e).orElse(new AssetManagerException(e));
1455     }
1456   }
1457 
1458   /**
1459    * Walk up the stacktrace to find a cause of type <code>type</code>. Return none if no such
1460    * type can be found.
1461    */
1462   static <A extends Throwable> Optional<A> unwrapExceptionUntil(Class<A> type, Throwable e) {
1463     if (e == null) {
1464       return Optional.empty();
1465     } else if (type.isAssignableFrom(e.getClass())) {
1466       return Optional.of((A) e);
1467     } else {
1468       return unwrapExceptionUntil(type, e.getCause());
1469     }
1470   }
1471 
1472   /**
1473    * Return a partial media package filtering assets. Assets are elements the archive is going to manager, i.e. all
1474    * non-publication elements.
1475    */
1476   static PartialMediaPackage assetsOnly(MediaPackage mp) {
1477     final Pred<MediaPackageElement> isAsset = Pred.mk(isNotPublication.toFn());
1478     return PartialMediaPackage.mk(mp, isAsset);
1479   }
1480 
1481   /**
1482    * Extract the file name from a media package elements URN.
1483    *
1484    * @return the file name or none if it could not be determined
1485    */
1486   public static Optional<String> getFileNameFromUrn(MediaPackageElement mpe) {
1487     Fn<URI, String> toString = new Fn<URI, String>() {
1488       @Override
1489       public String apply(URI uri) {
1490         return uri.toString();
1491       }
1492     };
1493 
1494     Optional<URI> uri = Optional.ofNullable(mpe.getURI());
1495     if (uri.isPresent() && "urn".equals(uri.get().getScheme())) {
1496       String[] tmp = uri.get().toString().split(":");
1497       if (tmp.length < 1) {
1498         return Optional.empty();
1499       }
1500       return Optional.of(tmp[tmp.length - 1]);
1501     }
1502     return Optional.empty();
1503   }
1504 
1505   /**
1506    * Rewrite URIs of all asset elements of a snapshot's media package.
1507    * This method does not mutate anything.
1508    */
1509   public static Snapshot rewriteUris(Snapshot snapshot, Fn<MediaPackageElement, URI> uriCreator) {
1510     final MediaPackage mpCopy = MediaPackageSupport.copy(snapshot.getMediaPackage());
1511     for (final MediaPackageElement mpe : assetsOnly(mpCopy).getElements()) {
1512       mpe.setURI(uriCreator.apply(mpe));
1513     }
1514     return new SnapshotImpl(
1515             snapshot.getVersion(),
1516             snapshot.getOrganizationId(),
1517             snapshot.getArchivalDate(),
1518             snapshot.getAvailability(),
1519             snapshot.getStorageId(),
1520             snapshot.getOwner(),
1521             mpCopy);
1522   }
1523 
1524   public void fireEventHandlers(AssetManagerItem item) {
1525     while (handlers.size() != 2) {
1526       logger.warn("Expecting 2 handlers, but {} are registered.  Waiting 10s then retrying...", handlers.size());
1527       try {
1528         Thread.sleep(10000L);
1529       } catch (InterruptedException e) { /* swallow this, nothing to do */ }
1530     }
1531     for (AssetManagerUpdateHandler handler : handlers) {
1532       handler.execute(item);
1533     }
1534   }
1535 
1536   /**
1537    * Call {@link
1538    * org.opencastproject.assetmanager.impl.query.AbstractADeleteQuery#run(AbstractADeleteQuery.DeleteEpisodeHandler)}
1539    * with a delete handler. Also make sure to propagate the behaviour to subsequent instances.
1540    */
1541   private final class ADeleteQueryWithMessaging extends ADeleteQueryDecorator {
1542     ADeleteQueryWithMessaging(ADeleteQuery delegate) {
1543       super(delegate);
1544     }
1545 
1546     @Override
1547     public long run() {
1548       return RuntimeTypes.convert(delegate).run(AssetManagerImpl.this);
1549     }
1550 
1551     @Override
1552     protected ADeleteQueryDecorator mkDecorator(ADeleteQuery delegate) {
1553       return new ADeleteQueryWithMessaging(delegate);
1554     }
1555   }
1556 
1557   /**
1558    * Get the function to update a commented event in the Elasticsearch index.
1559    *
1560    * @return the function to do the update
1561    */
1562   private Function<Optional<Event>, Optional<Event>> getEventUpdateFunction(Snapshot snapshot,
1563           String orgId, User user) {
1564     return (Optional<Event> eventOpt) -> {
1565       MediaPackage mp = snapshot.getMediaPackage();
1566       String eventId = mp.getIdentifier().toString();
1567       Event event = eventOpt.orElse(new Event(eventId, orgId));
1568 
1569       event = updateAclInEvent(event, mp);
1570 
1571       event.setArchiveVersion(Long.parseLong(snapshot.getVersion().toString()));
1572       if (StringUtils.isBlank(event.getCreator())) {
1573         event.setCreator(securityService.getUser().getName());
1574       }
1575       EventIndexUtils.updateEvent(event, mp);
1576 
1577       for (Catalog catalog: mp.getCatalogs(MediaPackageElements.EPISODE)) {
1578         try (InputStream in = workspace.read(catalog.getURI())) {
1579           EventIndexUtils.updateEvent(event, DublinCores.read(in));
1580         } catch (IOException | NotFoundException e) {
1581           throw new IllegalStateException(String.format("Unable to load dublin core catalog for event '%s'",
1582                   mp.getIdentifier()), e);
1583         }
1584       }
1585 
1586       // extended metadata
1587       event.resetExtendedMetadata();  // getting rid of old data
1588       for (EventCatalogUIAdapter extendedCatalogUIAdapter : extendedEventCatalogUIAdapters.getOrDefault(orgId,
1589               Collections.emptyList())) {
1590         for (Catalog catalog: mp.getCatalogs(extendedCatalogUIAdapter.getFlavor())) {
1591           try (InputStream in = workspace.read(catalog.getURI())) {
1592             EventIndexUtils.updateEventExtendedMetadata(event, DublinCores.read(in),
1593                     extendedCatalogUIAdapter.getFlavor());
1594           } catch (IOException | NotFoundException e) {
1595             throw new IllegalStateException(String.format("Unable to load extended dublin core catalog '%s' for event "
1596                     + "'%s'", catalog.getFlavor(), mp.getIdentifier()), e);
1597           }
1598         }
1599       }
1600 
1601       // Update series name if not already done
1602       try {
1603         EventIndexUtils.updateSeriesName(event, orgId, user, index);
1604       } catch (SearchIndexException e) {
1605         logger.error("Error updating the series name of the event {} in the {} index.", eventId, index.getIndexName(),
1606                 e);
1607       }
1608       return Optional.of(event);
1609     };
1610   }
1611 
1612   private Function<Optional<Event>, Optional<Event>> getEventUpdateFunctionOnlyAcl(Snapshot snapshot,
1613       String orgId) {
1614     return (Optional<Event> eventOpt) -> {
1615       MediaPackage mp = snapshot.getMediaPackage();
1616       String eventId = mp.getIdentifier().toString();
1617       Event event = eventOpt.orElse(new Event(eventId, orgId));
1618 
1619       event = updateAclInEvent(event, mp);
1620 
1621       return Optional.of(event);
1622     };
1623   }
1624 
1625   private Event updateAclInEvent(Event event, MediaPackage mp) {
1626     AccessControlList acl = authorizationService.getActiveAcl(mp).getA();
1627     List<ManagedAcl> acls = aclServiceFactory.serviceFor(securityService.getOrganization()).getAcls();
1628 
1629     for (final ManagedAcl managedAcl : AccessInformationUtil.matchAcls(acls, acl)) {
1630       event.setManagedAcl(managedAcl.getName());
1631     }
1632     event.setAccessPolicy(AccessControlParser.toJsonSilent(acl));
1633 
1634     return event;
1635   }
1636 }