1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.assetmanager.impl;
22
23 import static com.entwinemedia.fn.Prelude.chuck;
24 import static com.entwinemedia.fn.Stream.$;
25 import static java.lang.String.format;
26 import static org.opencastproject.assetmanager.api.fn.Enrichments.enrich;
27 import static org.opencastproject.mediapackage.MediaPackageSupport.Filters.hasNoChecksum;
28 import static org.opencastproject.mediapackage.MediaPackageSupport.Filters.isNotPublication;
29 import static org.opencastproject.mediapackage.MediaPackageSupport.getFileName;
30 import static org.opencastproject.mediapackage.MediaPackageSupport.getMediaPackageElementId;
31 import static org.opencastproject.security.api.SecurityConstants.EPISODE_ROLE_ID_PREFIX;
32 import static org.opencastproject.security.api.SecurityConstants.GLOBAL_ADMIN_ROLE;
33 import static org.opencastproject.security.api.SecurityConstants.GLOBAL_CAPTURE_AGENT_ROLE;
34 import static org.opencastproject.security.util.SecurityUtil.getEpisodeRoleId;
35
36 import org.opencastproject.assetmanager.api.Asset;
37 import org.opencastproject.assetmanager.api.AssetId;
38 import org.opencastproject.assetmanager.api.AssetManager;
39 import org.opencastproject.assetmanager.api.AssetManagerException;
40 import org.opencastproject.assetmanager.api.Availability;
41 import org.opencastproject.assetmanager.api.Property;
42 import org.opencastproject.assetmanager.api.PropertyId;
43 import org.opencastproject.assetmanager.api.Snapshot;
44 import org.opencastproject.assetmanager.api.Value;
45 import org.opencastproject.assetmanager.api.Version;
46 import org.opencastproject.assetmanager.api.fn.Enrichments;
47 import org.opencastproject.assetmanager.api.query.ADeleteQuery;
48 import org.opencastproject.assetmanager.api.query.AQueryBuilder;
49 import org.opencastproject.assetmanager.api.query.ARecord;
50 import org.opencastproject.assetmanager.api.query.AResult;
51 import org.opencastproject.assetmanager.api.query.ASelectQuery;
52 import org.opencastproject.assetmanager.api.query.Predicate;
53 import org.opencastproject.assetmanager.api.query.RichAResult;
54 import org.opencastproject.assetmanager.api.query.Target;
55 import org.opencastproject.assetmanager.api.storage.AssetStore;
56 import org.opencastproject.assetmanager.api.storage.DeletionSelector;
57 import org.opencastproject.assetmanager.api.storage.RemoteAssetStore;
58 import org.opencastproject.assetmanager.api.storage.Source;
59 import org.opencastproject.assetmanager.api.storage.StoragePath;
60 import org.opencastproject.assetmanager.impl.persistence.Database;
61 import org.opencastproject.assetmanager.impl.persistence.SnapshotDto;
62 import org.opencastproject.assetmanager.impl.query.AQueryBuilderImpl;
63 import org.opencastproject.assetmanager.impl.query.AbstractADeleteQuery;
64 import org.opencastproject.authorization.xacml.manager.api.AclServiceFactory;
65 import org.opencastproject.authorization.xacml.manager.api.ManagedAcl;
66 import org.opencastproject.authorization.xacml.manager.util.AccessInformationUtil;
67 import org.opencastproject.db.DBSessionFactory;
68 import org.opencastproject.elasticsearch.api.SearchIndexException;
69 import org.opencastproject.elasticsearch.index.ElasticsearchIndex;
70 import org.opencastproject.elasticsearch.index.objects.event.Event;
71 import org.opencastproject.elasticsearch.index.objects.event.EventIndexUtils;
72 import org.opencastproject.elasticsearch.index.rebuild.AbstractIndexProducer;
73 import org.opencastproject.elasticsearch.index.rebuild.IndexProducer;
74 import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildException;
75 import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService;
76 import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService.DataType;
77 import org.opencastproject.mediapackage.Catalog;
78 import org.opencastproject.mediapackage.MediaPackage;
79 import org.opencastproject.mediapackage.MediaPackageElement;
80 import org.opencastproject.mediapackage.MediaPackageElements;
81 import org.opencastproject.mediapackage.MediaPackageParser;
82 import org.opencastproject.mediapackage.MediaPackageSupport;
83 import org.opencastproject.message.broker.api.assetmanager.AssetManagerItem;
84 import org.opencastproject.message.broker.api.update.AssetManagerUpdateHandler;
85 import org.opencastproject.metadata.dublincore.DublinCores;
86 import org.opencastproject.metadata.dublincore.EventCatalogUIAdapter;
87 import org.opencastproject.security.api.AccessControlEntry;
88 import org.opencastproject.security.api.AccessControlList;
89 import org.opencastproject.security.api.AccessControlParser;
90 import org.opencastproject.security.api.AuthorizationService;
91 import org.opencastproject.security.api.DefaultOrganization;
92 import org.opencastproject.security.api.Organization;
93 import org.opencastproject.security.api.OrganizationDirectoryService;
94 import org.opencastproject.security.api.Role;
95 import org.opencastproject.security.api.SecurityService;
96 import org.opencastproject.security.api.UnauthorizedException;
97 import org.opencastproject.security.api.User;
98 import org.opencastproject.security.util.SecurityUtil;
99 import org.opencastproject.util.Checksum;
100 import org.opencastproject.util.ChecksumType;
101 import org.opencastproject.util.MimeTypes;
102 import org.opencastproject.util.NotFoundException;
103 import org.opencastproject.util.RequireUtil;
104 import org.opencastproject.util.data.functions.Functions;
105 import org.opencastproject.workspace.api.Workspace;
106
107 import com.entwinemedia.fn.Fn;
108 import com.entwinemedia.fn.Fx;
109 import com.entwinemedia.fn.P1;
110 import com.entwinemedia.fn.P1Lazy;
111 import com.entwinemedia.fn.Pred;
112 import com.entwinemedia.fn.Prelude;
113 import com.entwinemedia.fn.fns.Booleans;
114 import com.google.common.collect.Sets;
115
116 import org.apache.commons.io.FileUtils;
117 import org.apache.commons.io.IOUtils;
118 import org.apache.commons.lang3.BooleanUtils;
119 import org.apache.commons.lang3.StringUtils;
120 import org.osgi.service.component.ComponentContext;
121 import org.osgi.service.component.annotations.Activate;
122 import org.osgi.service.component.annotations.Component;
123 import org.osgi.service.component.annotations.Reference;
124 import org.osgi.service.component.annotations.ReferenceCardinality;
125 import org.osgi.service.component.annotations.ReferencePolicy;
126 import org.slf4j.Logger;
127 import org.slf4j.LoggerFactory;
128
129 import java.io.File;
130 import java.io.IOException;
131 import java.io.InputStream;
132 import java.net.URI;
133 import java.net.URISyntaxException;
134 import java.security.NoSuchAlgorithmException;
135 import java.util.ArrayList;
136 import java.util.Arrays;
137 import java.util.Collections;
138 import java.util.Date;
139 import java.util.HashMap;
140 import java.util.LinkedHashMap;
141 import java.util.List;
142 import java.util.Map;
143 import java.util.Objects;
144 import java.util.Optional;
145 import java.util.Set;
146 import java.util.UUID;
147 import java.util.concurrent.CopyOnWriteArrayList;
148 import java.util.function.Function;
149 import java.util.stream.Collectors;
150
151 import javax.persistence.EntityManagerFactory;
152
153
154
155
156 @Component(
157 property = {
158 "service.description=Opencast Asset Manager"
159 },
160 immediate = true,
161 service = { AssetManager.class, IndexProducer.class }
162 )
163 public class AssetManagerImpl extends AbstractIndexProducer implements AssetManager,
164 AbstractADeleteQuery.DeleteEpisodeHandler {
165
166 private static final Logger logger = LoggerFactory.getLogger(AssetManagerImpl.class);
167
168 private static final int PAGE_SIZE = 1000;
169
170 enum AdminRole {
171 GLOBAL, ORGANIZATION, NONE
172 }
173
174 public static final String WRITE_ACTION = "write";
175 public static final String READ_ACTION = "read";
176 public static final String SECURITY_NAMESPACE = "org.opencastproject.assetmanager.security";
177
178 private static final String MANIFEST_DEFAULT_NAME = "manifest";
179
180 private CopyOnWriteArrayList<AssetManagerUpdateHandler> handlers = new CopyOnWriteArrayList<>();
181
182 private SecurityService securityService;
183 private AuthorizationService authorizationService;
184 private OrganizationDirectoryService orgDir;
185 private Workspace workspace;
186 private AssetStore assetStore;
187 private HttpAssetProvider httpAssetProvider;
188 private String systemUserName;
189 private Database db;
190 private DBSessionFactory dbSessionFactory;
191 private EntityManagerFactory emf;
192 private AclServiceFactory aclServiceFactory;
193 private ElasticsearchIndex index;
194 private Map<String, List<EventCatalogUIAdapter>> extendedEventCatalogUIAdapters = new HashMap<>();
195
196
197 private boolean includeAPIRoles;
198 private boolean includeCARoles;
199 private boolean includeUIRoles;
200
201
202 public static final Set<MediaPackageElement.Type> MOVABLE_TYPES = Sets.newHashSet(
203 MediaPackageElement.Type.Attachment,
204 MediaPackageElement.Type.Catalog,
205 MediaPackageElement.Type.Track
206 );
207
208 private final HashMap<String, RemoteAssetStore> remoteStores = new LinkedHashMap<>();
209
210
211
212
213 @Activate
214 public synchronized void activate(ComponentContext cc) {
215 logger.info("Activating AssetManager.");
216 db = new Database(dbSessionFactory.createSession(emf));
217 systemUserName = SecurityUtil.getSystemUserName(cc);
218
219 includeAPIRoles = BooleanUtils.toBoolean(Objects.toString(cc.getProperties().get("includeAPIRoles"), null));
220 includeCARoles = BooleanUtils.toBoolean(Objects.toString(cc.getProperties().get("includeCARoles"), null));
221 includeUIRoles = BooleanUtils.toBoolean(Objects.toString(cc.getProperties().get("includeUIRoles"), null));
222 }
223
224
225
226
227
228 @Reference(target = "(osgi.unit.name=org.opencastproject.assetmanager.impl)")
229 public void setEntityManagerFactory(EntityManagerFactory emf) {
230 this.emf = emf;
231 }
232
233 @Reference
234 public void setDBSessionFactory(DBSessionFactory dbSessionFactory) {
235 this.dbSessionFactory = dbSessionFactory;
236 }
237
238 @Reference
239 public void setSecurityService(SecurityService securityService) {
240 this.securityService = securityService;
241 }
242
243 @Reference
244 public void setAuthorizationService(AuthorizationService authorizationService) {
245 this.authorizationService = authorizationService;
246 }
247
248 @Reference
249 public void setOrgDir(OrganizationDirectoryService orgDir) {
250 this.orgDir = orgDir;
251 }
252
253 @Reference
254 public void setWorkspace(Workspace workspace) {
255 this.workspace = workspace;
256 }
257
258 @Reference
259 public void setAssetStore(AssetStore assetStore) {
260 this.assetStore = assetStore;
261 }
262
263 @Reference(
264 cardinality = ReferenceCardinality.MULTIPLE,
265 policy = ReferencePolicy.DYNAMIC,
266 unbind = "removeEventHandler"
267 )
268 public void addEventHandler(AssetManagerUpdateHandler handler) {
269 this.handlers.add(handler);
270 }
271
272 public void removeEventHandler(AssetManagerUpdateHandler handler) {
273 this.handlers.remove(handler);
274 }
275
276 @Reference(
277 cardinality = ReferenceCardinality.MULTIPLE,
278 policy = ReferencePolicy.DYNAMIC,
279 unbind = "removeRemoteAssetStore"
280 )
281 public synchronized void addRemoteAssetStore(RemoteAssetStore assetStore) {
282 remoteStores.put(assetStore.getStoreType(), assetStore);
283 }
284
285 public void removeRemoteAssetStore(RemoteAssetStore store) {
286 remoteStores.remove(store.getStoreType());
287 }
288
289 @Reference
290 public void setHttpAssetProvider(HttpAssetProvider httpAssetProvider) {
291 this.httpAssetProvider = httpAssetProvider;
292 }
293
294 @Reference
295 public void setAclServiceFactory(AclServiceFactory aclServiceFactory) {
296 this.aclServiceFactory = aclServiceFactory;
297 }
298
299 @Reference
300 public void setIndex(ElasticsearchIndex index) {
301 this.index = index;
302 }
303
304 @Reference(cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC,
305 target = "(common-metadata=false)")
306 public synchronized void addCatalogUIAdapter(EventCatalogUIAdapter catalogUIAdapter) {
307 List<EventCatalogUIAdapter> list = extendedEventCatalogUIAdapters.computeIfAbsent(
308 catalogUIAdapter.getOrganization(), k -> new ArrayList());
309 list.add(catalogUIAdapter);
310 }
311
312 public synchronized void removeCatalogUIAdapter(EventCatalogUIAdapter catalogUIAdapter) {
313 if (extendedEventCatalogUIAdapters.containsKey(catalogUIAdapter.getOrganization())) {
314 extendedEventCatalogUIAdapters.get(catalogUIAdapter.getOrganization()).remove(catalogUIAdapter);
315 }
316 }
317
318
319
320
321
322 @Override
323 public Optional<MediaPackage> getMediaPackage(String mediaPackageId) {
324 final AQueryBuilder q = createQuery();
325 final AResult r = q.select(q.snapshot()).where(q.mediaPackageId(mediaPackageId).and(q.version().isLatest()))
326 .run();
327
328 if (r.getSize() == 0) {
329 return Optional.empty();
330 }
331 return Optional.of(r.getRecords().stream().findFirst().get().getSnapshot().get().getMediaPackage());
332 }
333
334 @Override
335 public Optional<Asset> getAsset(Version version, String mpId, String mpElementId) {
336 if (isAuthorized(mpId, READ_ACTION)) {
337
338 var asset = getDatabase().getAsset(RuntimeTypes.convert(version), mpId, mpElementId);
339 if (asset.isPresent()) {
340 var storageId = getSnapshotStorageLocation(version, mpId);
341 if (storageId.isPresent()) {
342 var store = getAssetStore(storageId.get());
343 if (store.isPresent()) {
344 var assetStream = store.get().get(StoragePath.mk(
345 asset.get().getOrganizationId(),
346 mpId,
347 version,
348 mpElementId
349 ));
350 if (assetStream.isPresent()) {
351
352 Checksum checksum = null;
353 try {
354 checksum = Checksum.fromString(asset.get().getAssetDto().getChecksum());
355 } catch (NoSuchAlgorithmException e) {
356 logger.warn("Invalid checksum for asset {} of media package {}", mpElementId, mpId, e);
357 }
358
359 final Asset a = new AssetImpl(
360 AssetId.mk(version, mpId, mpElementId),
361 assetStream.get(),
362 asset.get().getAssetDto().getMimeType(),
363 asset.get().getAssetDto().getSize(),
364 asset.get().getStorageId(),
365 asset.get().getAvailability(),
366 checksum);
367 return Optional.of(a);
368 }
369 }
370 }
371 }
372 return Optional.empty();
373 }
374 return chuck(new UnauthorizedException(
375 format("Not allowed to read assets of snapshot %s, version=%s", mpId, version)
376 ));
377 }
378
379 @Override
380 public Optional<AssetStore> getAssetStore(String storeId) {
381 if (assetStore.getStoreType().equals(storeId)) {
382 return Optional.of(assetStore);
383 } else {
384 if (remoteStores.containsKey(storeId)) {
385 return Optional.of(remoteStores.get(storeId));
386 } else {
387 return Optional.empty();
388 }
389 }
390 }
391
392 @Override
393 public AssetStore getLocalAssetStore() {
394 return assetStore;
395 }
396
397 @Override
398 public List<AssetStore> getRemoteAssetStores() {
399 return new ArrayList<>(remoteStores.values());
400 }
401
402
403
404 @Override
405 public boolean snapshotExists(final String mediaPackageId) {
406 return getDatabase().snapshotExists(mediaPackageId);
407 }
408
409 @Override
410 public boolean snapshotExists(final String mediaPackageId, final String organization) {
411 return getDatabase().snapshotExists(mediaPackageId, organization);
412 }
413
414 @Override
415 public Snapshot takeSnapshot(MediaPackage mp) {
416 return takeSnapshot(null, mp);
417 }
418
419 @Override
420 public Snapshot takeSnapshot(String owner, MediaPackage mp) {
421
422 final String mediaPackageId = mp.getIdentifier().toString();
423 final boolean firstSnapshot = !snapshotExists(mediaPackageId);
424
425
426
427
428 if (firstSnapshot) {
429
430 deleteProperties(mediaPackageId);
431 }
432 if (firstSnapshot || isAuthorized(mediaPackageId, WRITE_ACTION)) {
433 final Snapshot snapshot;
434 if (owner == null) {
435 snapshot = takeSnapshotInternal(mp);
436 } else {
437 snapshot = takeSnapshotInternal(owner, mp);
438 }
439
440 final AccessControlList acl = authorizationService.getActiveAcl(mp).getA();
441
442
443 deleteProperties(mediaPackageId, SECURITY_NAMESPACE);
444
445 for (final AccessControlEntry ace : acl.getEntries()) {
446 getDatabase().saveProperty(Property.mk(PropertyId.mk(mediaPackageId, SECURITY_NAMESPACE,
447 mkPropertyName(ace.getRole(), ace.getAction())), Value.mk(ace.isAllow())));
448 }
449
450 updateEventInIndex(snapshot);
451
452 logger.info("Trigger update handlers for snapshot {}, version {}",
453 snapshot.getMediaPackage().getIdentifier(), snapshot.getVersion());
454 fireEventHandlers(mkTakeSnapshotMessage(snapshot));
455
456 return snapshot;
457 }
458 return chuck(new UnauthorizedException("Not allowed to take snapshot of media package " + mediaPackageId));
459 }
460
461 private Snapshot takeSnapshotInternal(MediaPackage mediaPackage) {
462 final String mediaPackageId = mediaPackage.getIdentifier().toString();
463 AQueryBuilder queryBuilder = createQuery();
464 AResult result = queryBuilder.select(queryBuilder.snapshot())
465 .where(queryBuilder.mediaPackageId(mediaPackageId).and(queryBuilder.version().isLatest())).run();
466 Optional<ARecord> record = result.getRecords().stream().findFirst();
467 if (record.isPresent()) {
468 Optional<Snapshot> snapshot = Optional.of(record.get().getSnapshot().get());
469 if (snapshot.isPresent()) {
470 return takeSnapshotInternal(snapshot.get().getOwner(), mediaPackage);
471 }
472 }
473 return takeSnapshotInternal(DEFAULT_OWNER, mediaPackage);
474 }
475
476 private Snapshot takeSnapshotInternal(final String owner, final MediaPackage mp) {
477 return handleException(new P1Lazy<Snapshot>() {
478 @Override public Snapshot get1() {
479 try {
480 final Snapshot archived = addInternal(owner, MediaPackageSupport.copy(mp)).toSnapshot();
481 return getHttpAssetProvider().prepareForDelivery(archived);
482 } catch (Exception e) {
483 return Prelude.chuck(e);
484 }
485 }
486 });
487 }
488
489
490
491
492
493
494 private AssetManagerItem.TakeSnapshot mkTakeSnapshotMessage(Snapshot snapshot) {
495 final MediaPackage mp = snapshot.getMediaPackage();
496
497 long version;
498 try {
499 version = Long.parseLong(snapshot.getVersion().toString());
500 } catch (NumberFormatException e) {
501
502
503
504
505 throw new RuntimeException("The current implementation of the index requires versions being of type 'long'.");
506 }
507
508 return AssetManagerItem.add(workspace, mp, authorizationService.getActiveAcl(mp).getA(),
509 version, snapshot.getArchivalDate());
510 }
511
512 @Override
513 public void triggerIndexUpdate(String mediaPackageId) throws NotFoundException, UnauthorizedException {
514
515 if (!securityService.getUser().hasRole("ROLE_ADMIN")) {
516 throw new UnauthorizedException("Only global administrators may trigger manual event updates.");
517 }
518 final AQueryBuilder q = createQuery();
519 final AResult r = q.select(q.snapshot()).where(q.mediaPackageId(mediaPackageId).and(q.version().isLatest())).run();
520
521 if (r.getSize() == 0) {
522 throw new NotFoundException("No event with ID `" + mediaPackageId + "`");
523 }
524
525
526 var snapshot = r.getRecords().stream().findFirst().get().getSnapshot().get();
527 updateEventInIndex(snapshot);
528 }
529
530
531
532
533
534
535
536 private void updateEventInIndex(Snapshot snapshot) {
537 final String eventId = snapshot.getMediaPackage().getIdentifier().toString();
538 final String orgId = securityService.getOrganization().getId();
539 final User user = securityService.getUser();
540
541 logger.debug("Updating event {} in the {} index.", eventId, index.getIndexName());
542 Function<Optional<Event>, Optional<Event>> updateFunction = getEventUpdateFunction(snapshot, orgId, user);
543
544 try {
545 index.addOrUpdateEvent(eventId, updateFunction, orgId, user);
546 logger.debug("Event {} updated in the {} index.", eventId, index.getIndexName());
547 } catch (SearchIndexException e) {
548 logger.error("Error updating the event {} in the {} index.", eventId, index.getIndexName(), e);
549 }
550 }
551
552
553
554
555
556
557
558 private void removeArchivedVersionFromIndex(String eventId) {
559 final String orgId = securityService.getOrganization().getId();
560 final User user = securityService.getUser();
561 logger.debug("Received AssetManager delete episode message {}", eventId);
562
563 Function<Optional<Event>, Optional<Event>> updateFunction = (Optional<Event> eventOpt) -> {
564 if (eventOpt.isEmpty()) {
565 logger.warn("Event {} not found for deletion", eventId);
566 return Optional.empty();
567 }
568 Event event = eventOpt.get();
569 event.setArchiveVersion(null);
570 return Optional.of(event);
571 };
572
573 try {
574 index.addOrUpdateEvent(eventId, updateFunction, orgId, user);
575 logger.debug("Event {} removed from the {} index", eventId, index.getIndexName());
576 } catch (SearchIndexException e) {
577 logger.error("Error deleting the event {} from the {} index.", eventId, index.getIndexName(), e);
578 }
579 }
580
581 @Override
582 public RichAResult getSnapshotsById(final String mpId) {
583 RequireUtil.requireNotBlank(mpId, "mpId");
584 AQueryBuilder q = createQuery();
585 ASelectQuery query = baseQuery(q, mpId);
586 return Enrichments.enrich(query.run());
587 }
588
589 @Override
590 public RichAResult getSnapshotsByIdOrderedByVersion(String mpId, boolean asc) {
591 RequireUtil.requireNotBlank(mpId, "mpId");
592 AQueryBuilder q = createQuery();
593 ASelectQuery query = baseQuery(q, mpId);
594 if (asc) {
595 query = query.orderBy(q.version().asc());
596 } else {
597 query = query.orderBy(q.version().desc());
598 }
599 return Enrichments.enrich(query.run());
600 }
601
602 @Override
603 public RichAResult getSnapshotsByIdAndVersion(final String mpId, final Version version) {
604 RequireUtil.requireNotBlank(mpId, "mpId");
605 RequireUtil.notNull(version, "version");
606 AQueryBuilder q = createQuery();
607 ASelectQuery query = baseQuery(q, version, mpId);
608 return Enrichments.enrich(query.run());
609 }
610
611 @Override
612 public RichAResult getSnapshotsByDate(final Date start, final Date end) {
613 RequireUtil.notNull(start, "start");
614 RequireUtil.notNull(end, "end");
615 AQueryBuilder q = createQuery();
616 ASelectQuery query = baseQuery(q).where(q.archived().ge(start)).where(q.archived().le(end));
617 return Enrichments.enrich(query.run());
618 }
619
620 @Override
621 public RichAResult getSnapshotsByDateOrderedById(Date start, Date end) {
622 RequireUtil.notNull(start, "start");
623 RequireUtil.notNull(end, "end");
624 AQueryBuilder q = createQuery();
625 ASelectQuery query = baseQuery(q).where(q.archived().ge(start)).where(q.archived().le(end));
626 return Enrichments.enrich(query.orderBy(q.mediapackageId().asc()).run());
627 }
628
629 @Override
630 public RichAResult getSnapshotsByIdAndDate(final String mpId, final Date start, final Date end) {
631 RequireUtil.requireNotBlank(mpId, "mpId");
632 RequireUtil.notNull(start, "start");
633 RequireUtil.notNull(end, "end");
634 AQueryBuilder q = createQuery();
635 ASelectQuery query = baseQuery(q, mpId).where(q.archived().ge(start)).where(q.archived().le(end));
636 return Enrichments.enrich(query.run());
637 }
638
639 @Override
640 public RichAResult getSnapshotsByIdAndDateOrderedByVersion(String mpId, Date start, Date end, boolean asc) {
641 RequireUtil.requireNotBlank(mpId, "mpId");
642 RequireUtil.notNull(start, "start");
643 RequireUtil.notNull(end, "end");
644 AQueryBuilder q = createQuery();
645 ASelectQuery query = baseQuery(q, mpId).where(q.archived().ge(start)).where(q.archived().le(end));
646 if (asc) {
647 query = query.orderBy(q.version().asc());
648 } else {
649 query = query.orderBy(q.version().desc());
650 }
651 return Enrichments.enrich(query.run());
652 }
653
654 @Override
655 public void moveSnapshotsById(final String mpId, final String targetStore) throws NotFoundException {
656 RichAResult results = getSnapshotsById(mpId);
657
658 if (results.getRecords().isEmpty()) {
659 throw new NotFoundException("Mediapackage " + mpId + " not found!");
660 }
661
662 processOperations(results, targetStore);
663 }
664
665 @Override
666 public void moveSnapshotsByIdAndVersion(final String mpId, final Version version, final String targetStore)
667 throws NotFoundException {
668 RichAResult results = getSnapshotsByIdAndVersion(mpId, version);
669
670 if (results.getRecords().isEmpty()) {
671 throw new NotFoundException("Mediapackage " + mpId + "@" + version.toString() + " not found!");
672 }
673
674 processOperations(results, targetStore);
675 }
676
677 @Override
678 public void moveSnapshotsByDate(final Date start, final Date end, final String targetStore)
679 throws NotFoundException {
680
681
682 AQueryBuilder q = createQuery();
683 ASelectQuery query = baseQuery(q)
684 .where(q.storage(targetStore).not())
685 .where(q.archived().ge(start))
686 .where(q.archived().le(end));
687 RichAResult results = Enrichments.enrich(query.run());
688
689 if (results.getRecords().isEmpty()) {
690 throw new NotFoundException("No media packages found between " + start + " and " + end);
691 }
692
693 processOperations(results, targetStore);
694 }
695
696 @Override
697 public void moveSnapshotsByIdAndDate(final String mpId, final Date start, final Date end, final String targetStore)
698 throws NotFoundException {
699 RichAResult results = getSnapshotsByIdAndDate(mpId, start, end);
700
701 if (results.getRecords().isEmpty()) {
702 throw new NotFoundException("No media package with id " + mpId + " found between " + start + " and " + end);
703 }
704
705 processOperations(results, targetStore);
706 }
707
708 @Override
709 public void moveSnapshotToStore(final Version version, final String mpId, final String storeId)
710 throws NotFoundException {
711
712
713 AQueryBuilder q = createQuery();
714 RichAResult results = Enrichments.enrich(baseQuery(q, version, mpId).run());
715
716 if (results.getRecords().isEmpty()) {
717 throw new NotFoundException("Mediapackage " + mpId + "@" + version.toString() + " not found!");
718 }
719 processOperations(results, storeId);
720 }
721
722
723 private void processOperations(final RichAResult results, final String targetStoreId) {
724 results.getRecords().forEach(record -> {
725 Snapshot s = record.getSnapshot().get();
726 Optional<String> currentStoreId = getSnapshotStorageLocation(s);
727
728 if (currentStoreId.isEmpty()) {
729 logger.warn("IsNone store ID");
730 return;
731 }
732
733
734 if (currentStoreId.get().equals(targetStoreId)) {
735
736 return;
737 }
738
739 AssetStore currentStore;
740 AssetStore targetStore;
741
742 Optional<AssetStore> optCurrentStore = getAssetStore(currentStoreId.get());
743 Optional<AssetStore> optTargetStore = getAssetStore(targetStoreId);
744
745 if (!optCurrentStore.isEmpty()) {
746 currentStore = optCurrentStore.get();
747 } else {
748 logger.error("Unknown current store: " + currentStoreId.get());
749 return;
750 }
751 if (!optTargetStore.isEmpty()) {
752 targetStore = optTargetStore.get();
753 } else {
754 logger.error("Unknown target store: " + targetStoreId);
755 return;
756 }
757
758
759
760 String localAssetStoreType = getLocalAssetStore().getStoreType();
761 if (localAssetStoreType.equals(currentStoreId.get()) || localAssetStoreType.equals(targetStoreId)) {
762 logger.debug("Moving {} from {} to {}", s, currentStoreId, targetStoreId);
763
764 try {
765 copyAssetsToStore(s, targetStore);
766 copyManifest(s, targetStore);
767 } catch (Exception e) {
768 Functions.chuck(e);
769 }
770 getDatabase().setStorageLocation(s, targetStoreId);
771 currentStore.delete(DeletionSelector.delete(s.getOrganizationId(),
772 s.getMediaPackage().getIdentifier().toString(), s.getVersion()
773 ));
774 } else {
775
776 String intermediateStore = getLocalAssetStore().getStoreType();
777 logger.debug("Moving {} from {} to {}, then to {}",
778 s, currentStoreId, intermediateStore, targetStoreId);
779 Version version = s.getVersion();
780 String mpId = s.getMediaPackage().getIdentifier().toString();
781 try {
782 moveSnapshotToStore(version, mpId, intermediateStore);
783 moveSnapshotToStore(version, mpId, targetStoreId);
784 } catch (NotFoundException e) {
785 Functions.chuck(e);
786 }
787 }
788 });
789 }
790
791
792 public Optional<String> getSnapshotStorageLocation(final Version version, final String mpId) {
793 RichAResult result = getSnapshotsByIdAndVersion(mpId, version);
794
795 for (Snapshot snapshot : result.getSnapshots()) {
796 return Optional.of(snapshot.getStorageId());
797 }
798
799 logger.error("Mediapackage " + mpId + "@" + version + " not found!");
800 return Optional.empty();
801 }
802
803 public Optional<String> getSnapshotStorageLocation(final Snapshot snap) {
804 return getSnapshotStorageLocation(snap.getVersion(), snap.getMediaPackage().getIdentifier().toString());
805 }
806
807
808
809 @Override
810 public boolean setProperty(Property property) {
811 final String mpId = property.getId().getMediaPackageId();
812 if (isAuthorized(mpId, WRITE_ACTION)) {
813 return getDatabase().saveProperty(property);
814 }
815 return chuck(new UnauthorizedException("Not allowed to set property on episode " + mpId));
816 }
817
818 @Override
819 public List<Property> selectProperties(final String mediaPackageId, String namespace) {
820 if (isAuthorized(mediaPackageId, READ_ACTION)) {
821 return getDatabase().selectProperties(mediaPackageId, namespace);
822 }
823 return chuck(new UnauthorizedException(format("Not allowed to read properties of event %s", mediaPackageId)));
824 }
825
826 @Override
827 public int deleteProperties(final String mediaPackageId) {
828 return getDatabase().deleteProperties(mediaPackageId);
829 }
830
831 @Override
832 public int deleteProperties(final String mediaPackageId, final String namespace) {
833 return getDatabase().deleteProperties(mediaPackageId, namespace);
834 }
835
836
837
838 @Override
839 public AQueryBuilder createQuery() {
840 return new AQueryBuilderDecorator(createQueryWithoutSecurityCheck()) {
841 @Override public ASelectQuery select(Target... target) {
842 switch (isAdmin()) {
843 case GLOBAL:
844 return super.select(target);
845 case ORGANIZATION:
846 return super.select(target).where(restrictToUsersOrganization());
847 default:
848 return super.select(target).where(mkAuthPredicate(READ_ACTION));
849 }
850 }
851
852 @Override public ADeleteQuery delete(String owner, Target target) {
853 switch (isAdmin()) {
854 case GLOBAL:
855 return super.delete(owner, target);
856 case ORGANIZATION:
857 return super.delete(owner, target).where(restrictToUsersOrganization());
858 default:
859 return super.delete(owner, target).where(mkAuthPredicate(WRITE_ACTION));
860 }
861 }
862 };
863 }
864
865 private AQueryBuilder createQueryWithoutSecurityCheck() {
866 return new AQueryBuilderDecorator(new AQueryBuilderImpl(this)) {
867 @Override
868 public ADeleteQuery delete(String owner, Target target) {
869 return new ADeleteQueryWithMessaging(super.delete(owner, target));
870 }
871 };
872 }
873
874 @Override
875 public Optional<Version> toVersion(String version) {
876 try {
877 return Optional.of(VersionImpl.mk(Long.parseLong(version)));
878 } catch (NumberFormatException e) {
879 return Optional.empty();
880 }
881 }
882
883 @Override
884 public long countEvents(final String organization) {
885 return getDatabase().countEvents(organization);
886 }
887
888 @Override
889 public void handleDeletedEpisode(String mpId) {
890 logger.info("Firing event handlers for deleting event {}", mpId);
891 fireEventHandlers(AssetManagerItem.deleteEpisode(mpId, new Date()));
892
893 removeArchivedVersionFromIndex(mpId);
894 }
895
896
897
898
899
900 @Override
901 public IndexRebuildService.Service getService() {
902 return IndexRebuildService.Service.AssetManager;
903 }
904
905 @Override
906 public DataType[] getSupportedDataTypes() {
907 return new DataType[]{ DataType.ALL, DataType.ACL };
908 }
909
910 @Override
911 public void repopulate(DataType dataType) throws IndexRebuildException {
912 final Organization originalOrg = securityService.getOrganization();
913 final User originalUser = (originalOrg != null ? securityService.getUser() : null);
914 try {
915 final Organization defaultOrg = new DefaultOrganization();
916 final User defaultSystemUser = SecurityUtil.createSystemUser(systemUserName, defaultOrg);
917 securityService.setOrganization(defaultOrg);
918 securityService.setUser(defaultSystemUser);
919
920 int offset = 0;
921 int total = (int) countEvents(null);
922 final AQueryBuilder q = createQuery();
923 RichAResult r;
924 int current = 0;
925 logIndexRebuildBegin(logger, total, "snapshot(s)");
926 var updatedEventRange = new ArrayList<Event>();
927 do {
928 r = enrich(q.select(q.snapshot()).where(q.version().isLatest()).orderBy(q.mediapackageId().desc())
929 .page(offset, PAGE_SIZE).run());
930 offset += PAGE_SIZE;
931 int n = 20;
932
933 final Map<String, List<Snapshot>> byOrg = r.getSnapshots().stream()
934 .collect(Collectors.groupingBy(Snapshot::getOrganizationId));
935 for (String orgId : byOrg.keySet()) {
936 final Organization snapshotOrg;
937 try {
938 snapshotOrg = orgDir.getOrganization(orgId);
939 User snapshotSystemUser = SecurityUtil.createSystemUser(systemUserName, snapshotOrg);
940 securityService.setOrganization(snapshotOrg);
941 securityService.setUser(snapshotSystemUser);
942 for (Snapshot snapshot : byOrg.get(orgId)) {
943 try {
944 current++;
945
946 var updatedEventData = index.getEvent(snapshot.getMediaPackage().getIdentifier().toString(), orgId,
947 snapshotSystemUser);
948 if (dataType == DataType.ALL) {
949
950 updatedEventData = getEventUpdateFunction(snapshot, orgId, snapshotSystemUser)
951 .apply(updatedEventData);
952 } else if (dataType == DataType.ACL) {
953
954 updatedEventData = getEventUpdateFunctionOnlyAcl(snapshot, orgId)
955 .apply(updatedEventData);
956 } else {
957 throw new IndexRebuildException(dataType + " is not a supported data type. "
958 + "Accepted values are " + Arrays.toString(getSupportedDataTypes()) + ".");
959 }
960 updatedEventRange.add(updatedEventData.get());
961
962 if (updatedEventRange.size() >= n || current >= total) {
963 index.bulkEventUpdate(updatedEventRange);
964 logIndexRebuildProgress(logger, total, current, n);
965 updatedEventRange.clear();
966 }
967 } catch (Throwable t) {
968 logSkippingElement(logger, "event", snapshot.getMediaPackage().getIdentifier().toString(),
969 snapshotOrg, t);
970 }
971 }
972 } catch (Throwable t) {
973 logIndexRebuildError(logger, t, originalOrg);
974 throw new IndexRebuildException(getService(), originalOrg, t);
975 } finally {
976 securityService.setOrganization(defaultOrg);
977 securityService.setUser(defaultSystemUser);
978 }
979 }
980 } while (offset < total);
981 } finally {
982 securityService.setOrganization(originalOrg);
983 securityService.setUser(originalUser);
984 }
985 }
986
987
988
989
990 public void setAvailability(Version version, String mpId, Availability availability) {
991 if (isAuthorized(mpId, WRITE_ACTION)) {
992 getDatabase().setAvailability(RuntimeTypes.convert(version), mpId, availability);
993 } else {
994 chuck(new UnauthorizedException("Not allowed to set availability of episode " + mpId));
995 }
996 }
997
998 public void setDatabase(Database database) {
999 this.db = database;
1000 }
1001
1002 public Database getDatabase() {
1003 return db;
1004 }
1005
1006 public HttpAssetProvider getHttpAssetProvider() {
1007 return httpAssetProvider;
1008 }
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021 private Predicate mkAuthPredicate(final String action) {
1022 final AQueryBuilder q = createQueryWithoutSecurityCheck();
1023 return securityService.getUser().getRoles().stream()
1024 .filter(roleFilter)
1025 .map((role) -> {
1026 if (role.getName().startsWith(EPISODE_ROLE_ID_PREFIX)) {
1027 return q.mediapackageId().eq(StringUtils.substringBetween(
1028 role.getName(), EPISODE_ROLE_ID_PREFIX + "_", "_"));
1029 } else {
1030 return q.property(Value.BOOLEAN, SECURITY_NAMESPACE, mkPropertyName(role.getName(), action)).eq(true);
1031 }
1032 })
1033 .reduce(Predicate::or)
1034 .orElseGet(() -> q.always().not())
1035 .and(restrictToUsersOrganization());
1036 }
1037
1038
1039 private Predicate restrictToUsersOrganization() {
1040 return createQueryWithoutSecurityCheck().organizationId().eq(securityService.getUser().getOrganization().getId());
1041 }
1042
1043
1044 private boolean isAuthorized(final String mediaPackageId, final String action) {
1045 switch (isAdmin()) {
1046 case GLOBAL:
1047
1048 logger.debug("Access granted since user is global admin");
1049 return true;
1050 case ORGANIZATION:
1051
1052 logger.debug("User is organization admin. Checking organization. Checking organization ID of asset.");
1053 return snapshotExists(mediaPackageId, securityService.getOrganization().getId());
1054 default:
1055
1056 logger.debug("Non admin user. Checking organization.");
1057 final String org = securityService.getOrganization().getId();
1058 if (!snapshotExists(mediaPackageId, org)) {
1059 return false;
1060 }
1061
1062 User user = securityService.getUser();
1063 if (user.hasRole(getEpisodeRoleId(mediaPackageId, action))) {
1064 return true;
1065 }
1066
1067 logger.debug("Non admin user. Checking ACL rules.");
1068 final List<String> roles = user.getRoles().parallelStream()
1069 .filter(roleFilter)
1070 .map((role) -> mkPropertyName(role.getName(), action))
1071 .collect(Collectors.toList());
1072 return getDatabase().selectProperties(mediaPackageId, SECURITY_NAMESPACE).parallelStream()
1073 .map(p -> p.getId().getName())
1074 .filter(p -> p.endsWith(action))
1075 .anyMatch(p -> roles.stream().anyMatch(r -> r.equals(p)));
1076 }
1077 }
1078
1079 private AdminRole isAdmin() {
1080 final User user = securityService.getUser();
1081 if (user.hasRole(GLOBAL_ADMIN_ROLE)) {
1082 return AdminRole.GLOBAL;
1083 } else if (user.hasRole(securityService.getOrganization().getAdminRole())
1084 || user.hasRole(GLOBAL_CAPTURE_AGENT_ROLE)) {
1085
1086
1087 return AdminRole.ORGANIZATION;
1088 } else {
1089 return AdminRole.NONE;
1090 }
1091 }
1092
1093 private String mkPropertyName(String role, String action) {
1094 return role + " | " + action;
1095 }
1096
1097
1098
1099
1100 private final java.util.function.Predicate<Role> roleFilter = (role) -> {
1101 final String name = role.getName();
1102 return (includeAPIRoles || !name.startsWith("ROLE_API_"))
1103 && (includeCARoles || !name.startsWith("ROLE_CAPTURE_AGENT_"))
1104 && (includeUIRoles || !name.startsWith("ROLE_UI_"));
1105 };
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119 private ASelectQuery baseQuery(final AQueryBuilder q) {
1120 RequireUtil.notNull(q, "q");
1121 return q.select(q.snapshot());
1122 }
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134 private ASelectQuery baseQuery(final AQueryBuilder q, final String mpId) {
1135 RequireUtil.notNull(q, "q");
1136 ASelectQuery query = baseQuery(q);
1137 if (StringUtils.isNotEmpty(mpId)) {
1138 return query.where(q.mediaPackageId(mpId));
1139 } else {
1140 return query;
1141 }
1142 }
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156 private ASelectQuery baseQuery(final AQueryBuilder q, final Version version, final String mpId) {
1157 RequireUtil.notNull(q, "q");
1158 RequireUtil.requireNotBlank(mpId, "mpId");
1159 ASelectQuery query = baseQuery(q, mpId);
1160 if (null != version) {
1161 return query.where(q.version().eq(version));
1162 } else {
1163 return query;
1164 }
1165 }
1166
1167
1168 private void copyAssetsToStore(Snapshot snap, AssetStore store) {
1169 final String mpId = snap.getMediaPackage().getIdentifier().toString();
1170 final String orgId = snap.getOrganizationId();
1171 final Version version = snap.getVersion();
1172 final String prettyMpId = mpId + "@v" + version;
1173 logger.debug("Moving assets for snapshot {} to store {}", prettyMpId, store.getStoreType());
1174 for (final MediaPackageElement e : snap.getMediaPackage().getElements()) {
1175 if (!MOVABLE_TYPES.contains(e.getElementType())) {
1176 logger.debug("Skipping {} because type is {}", e.getIdentifier(), e.getElementType());
1177 continue;
1178 }
1179 logger.debug("Moving {} to store {}", e.getIdentifier(), store.getStoreType());
1180 final StoragePath storagePath = StoragePath.mk(orgId, mpId, version, e.getIdentifier());
1181 if (store.contains(storagePath)) {
1182 logger.debug("Element {} (version {}) is already in store {} so skipping it", e.getIdentifier(),
1183 version, store.getStoreType());
1184 continue;
1185 }
1186
1187
1188 final Optional<StoragePath> existingAssetOpt =
1189 getDatabase()
1190 .findAssetByChecksumAndStoreAndOrg(e.getChecksum().toString(), store.getStoreType(), orgId)
1191 .map(dto -> StoragePath.mk(
1192 dto.getOrganizationId(),
1193 dto.getMediaPackageId(),
1194 dto.getVersion(),
1195 dto.getAssetDto().getMediaPackageElementId()
1196 ));
1197
1198 if (existingAssetOpt.isPresent()) {
1199 final StoragePath existingAsset = existingAssetOpt.get();
1200 logger.debug("Content of asset {} with checksum {} already exists in {}",
1201 existingAsset.getMediaPackageElementId(), e.getChecksum(), store.getStoreType());
1202 if (!store.copy(existingAsset, storagePath)) {
1203 throw new AssetManagerException(format(
1204 "An asset with checksum %s has already been archived but trying to copy or link asset %s to it "
1205 + "failed",
1206 e.getChecksum(),
1207 existingAsset
1208 ));
1209 }
1210 } else {
1211 final Optional<Long> size = e.getSize() > 0 ? Optional.of(e.getSize()) : Optional.empty();
1212 store.put(storagePath, Source.mk(e.getURI(), size, Optional.ofNullable(e.getMimeType())));
1213 }
1214 getDatabase().setAssetStorageLocation(VersionImpl.mk(version), mpId, e.getIdentifier(), store.getStoreType());
1215 }
1216 }
1217
1218 private void copyManifest(Snapshot snap, AssetStore targetStore) throws IOException, NotFoundException {
1219 final String mpId = snap.getMediaPackage().getIdentifier().toString();
1220 final String orgId = snap.getOrganizationId();
1221 final Version version = snap.getVersion();
1222
1223 AssetStore currentStore = getAssetStore(snap.getStorageId()).get();
1224 Optional<String> manifestOpt = findManifestBaseName(snap, MANIFEST_DEFAULT_NAME, currentStore);
1225 if (manifestOpt.isEmpty()) {
1226 return;
1227 }
1228
1229
1230 String manifestBaseName = manifestOpt.get();
1231 StoragePath pathToManifest = new StoragePath(orgId, mpId, version, manifestBaseName);
1232
1233
1234 if (!targetStore.contains(pathToManifest)) {
1235 Optional<InputStream> inputStreamOpt;
1236 InputStream inputStream = null;
1237 String manifestFileName = null;
1238 try {
1239 inputStreamOpt = currentStore.get(pathToManifest);
1240 if (inputStreamOpt.isEmpty()) {
1241 throw new NotFoundException(
1242 String.format("Unexpected error. Manifest %s not found in current asset store", manifestBaseName));
1243 }
1244
1245 inputStream = inputStreamOpt.get();
1246 manifestFileName = UUID.randomUUID() + ".xml";
1247 URI manifestTmpUri = workspace.putInCollection("archive", manifestFileName, inputStream);
1248 targetStore.put(pathToManifest, Source.mk(manifestTmpUri, Optional.empty(), Optional.of(MimeTypes.XML)));
1249 } finally {
1250 IOUtils.closeQuietly(inputStream);
1251 try {
1252
1253 workspace.deleteFromCollection("archive", manifestFileName);
1254 } catch (NotFoundException e) {
1255
1256 } catch (IOException e) {
1257
1258
1259
1260
1261 if (e.getMessage().contains(manifestFileName)) {
1262 logger.warn("The manifest file {} didn't get deleted from the archive collection",
1263 manifestBaseName, e);
1264 }
1265
1266 }
1267 }
1268 }
1269 }
1270
1271 Optional<String> findManifestBaseName(Snapshot snap, String manifestName, AssetStore store) {
1272 StoragePath path = new StoragePath(snap.getOrganizationId(), snap.getMediaPackage().getIdentifier().toString(),
1273 snap.getVersion(), manifestName);
1274
1275 if (!store.contains(path)) {
1276
1277 if (MANIFEST_DEFAULT_NAME.equals(manifestName)) {
1278 return Optional.empty();
1279 } else {
1280 return Optional.of(manifestName.substring(0, manifestName.length() - 1));
1281 }
1282 }
1283
1284 return findManifestBaseName(snap, manifestName + "_", store);
1285 }
1286
1287
1288
1289
1290
1291
1292 void calcChecksumsForMediaPackageElements(PartialMediaPackage pmp) {
1293 final Fx<MediaPackageElement> addChecksum = new Fx<MediaPackageElement>() {
1294 @Override public void apply(MediaPackageElement mpe) {
1295 File file = null;
1296 try {
1297 logger.trace("Calculate checksum for {}", mpe.getURI());
1298 file = workspace.get(mpe.getURI(), true);
1299 mpe.setChecksum(Checksum.create(ChecksumType.DEFAULT_TYPE, file));
1300 } catch (IOException | NotFoundException e) {
1301 throw new AssetManagerException(format(
1302 "Cannot calculate checksum for media package element %s",
1303 mpe.getURI()
1304 ), e);
1305 } finally {
1306 if (file != null) {
1307 FileUtils.deleteQuietly(file);
1308 }
1309 }
1310 }
1311 };
1312 pmp.getElements().filter(hasNoChecksum.toFn()).each(addChecksum).run();
1313 }
1314
1315
1316 private SnapshotDto addInternal(String owner, final MediaPackage mp) throws Exception {
1317 final Date now = new Date();
1318
1319 final String mpId = mp.getIdentifier().toString();
1320 final VersionImpl version = getDatabase().claimVersion(mpId);
1321 logger.info("Creating new version {} of media package {}", version, mp);
1322 final PartialMediaPackage pmp = assetsOnly(mp);
1323
1324 calcChecksumsForMediaPackageElements(pmp);
1325
1326 storeAssets(pmp, version);
1327
1328 final SnapshotDto snapshotDto;
1329 try {
1330
1331 Fn<MediaPackageElement, URI> uriCreator = new Fn<MediaPackageElement, URI>() {
1332 @Override
1333 public URI apply(MediaPackageElement mpe) {
1334 try {
1335 String fileName = getFileName(mpe).getOr("unknown");
1336 return new URI(
1337 "urn",
1338 "matterhorn:" + mpId + ":" + version + ":" + mpe.getIdentifier() + ":" + fileName,
1339 null
1340 );
1341 } catch (URISyntaxException e) {
1342 throw new AssetManagerException(e);
1343 }
1344 }
1345 };
1346
1347 for (MediaPackageElement mpe : pmp.getElements()) {
1348 mpe.setURI(uriCreator.apply(mpe));
1349 }
1350
1351 String currentOrgId = securityService.getOrganization().getId();
1352 snapshotDto = getDatabase().saveSnapshot(
1353 currentOrgId, pmp, now, version,
1354 Availability.ONLINE, getLocalAssetStore().getStoreType(), owner
1355 );
1356 } catch (AssetManagerException e) {
1357 logger.error("Could not take snapshot {}", mpId, e);
1358 throw new AssetManagerException(e);
1359 }
1360
1361
1362 storeManifest(pmp, version);
1363 return snapshotDto;
1364 }
1365
1366
1367
1368
1369 private void storeAssets(final PartialMediaPackage pmp, final Version version) {
1370 final String mpId = pmp.getMediaPackage().getIdentifier().toString();
1371 final String orgId = securityService.getOrganization().getId();
1372 for (final MediaPackageElement e : pmp.getElements()) {
1373 logger.debug("Archiving {} {} {}", e.getFlavor(), e.getMimeType(), e.getURI());
1374 final StoragePath storagePath = StoragePath.mk(orgId, mpId, version, e.getIdentifier());
1375
1376 final Optional<StoragePath> existingAssetOpt = getDatabase()
1377 .findAssetByChecksumAndStoreAndOrg(e.getChecksum().toString(), getLocalAssetStore().getStoreType(), orgId)
1378 .map(dto -> StoragePath.mk(
1379 dto.getOrganizationId(),
1380 dto.getMediaPackageId(),
1381 dto.getVersion(),
1382 dto.getAssetDto().getMediaPackageElementId()));
1383
1384 if (existingAssetOpt.isPresent()) {
1385 final StoragePath existingAsset = existingAssetOpt.get();
1386 logger.debug("Content of asset {} with checksum {} has been archived before",
1387 existingAsset.getMediaPackageElementId(), e.getChecksum());
1388 if (!getLocalAssetStore().copy(existingAsset, storagePath)) {
1389 throw new AssetManagerException(format(
1390 "An asset with checksum %s has already been archived but trying to copy or link asset %s to it "
1391 + "failed",
1392 e.getChecksum(),
1393 existingAsset
1394 ));
1395 }
1396 } else {
1397 final Optional<Long> size = e.getSize() > 0 ? Optional.of(e.getSize()) : Optional.empty();
1398 getLocalAssetStore().put(storagePath, Source.mk(e.getURI(), size, Optional.ofNullable(e.getMimeType())));
1399 }
1400 }
1401 }
1402
1403 private void storeManifest(final PartialMediaPackage pmp, final Version version) throws Exception {
1404 final String mpId = pmp.getMediaPackage().getIdentifier().toString();
1405 final String orgId = securityService.getOrganization().getId();
1406
1407
1408 logger.debug("Archiving manifest of media package {} version {}", mpId, version);
1409
1410
1411 final String manifestFileName = format("manifest_%s_%s.xml", pmp.getMediaPackage().getIdentifier(), version);
1412 final URI manifestTmpUri = workspace.putInCollection(
1413 "archive",
1414 manifestFileName,
1415 IOUtils.toInputStream(MediaPackageParser.getAsXml(pmp.getMediaPackage()), "UTF-8"));
1416 try {
1417 getLocalAssetStore().put(
1418 StoragePath.mk(orgId, mpId, version, manifestAssetId(pmp, "manifest")),
1419 Source.mk(manifestTmpUri, Optional.empty(), Optional.of(MimeTypes.XML)));
1420 } finally {
1421
1422 workspace.deleteFromCollection("archive", manifestFileName);
1423 }
1424 }
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435 private String manifestAssetId(PartialMediaPackage pmp, String seedId) {
1436 if ($(pmp.getElements()).map(getMediaPackageElementId.toFn()).exists(Booleans.eq(seedId))) {
1437 return manifestAssetId(pmp, seedId + "_");
1438 } else {
1439 return seedId;
1440 }
1441 }
1442
1443
1444
1445
1446
1447
1448
1449 static <A> A handleException(final P1<A> p) throws AssetManagerException {
1450 try {
1451 return p.get1();
1452 } catch (Exception e) {
1453 logger.error("An error occurred", e);
1454 throw unwrapExceptionUntil(AssetManagerException.class, e).orElse(new AssetManagerException(e));
1455 }
1456 }
1457
1458
1459
1460
1461
1462 static <A extends Throwable> Optional<A> unwrapExceptionUntil(Class<A> type, Throwable e) {
1463 if (e == null) {
1464 return Optional.empty();
1465 } else if (type.isAssignableFrom(e.getClass())) {
1466 return Optional.of((A) e);
1467 } else {
1468 return unwrapExceptionUntil(type, e.getCause());
1469 }
1470 }
1471
1472
1473
1474
1475
1476 static PartialMediaPackage assetsOnly(MediaPackage mp) {
1477 final Pred<MediaPackageElement> isAsset = Pred.mk(isNotPublication.toFn());
1478 return PartialMediaPackage.mk(mp, isAsset);
1479 }
1480
1481
1482
1483
1484
1485
1486 public static Optional<String> getFileNameFromUrn(MediaPackageElement mpe) {
1487 Fn<URI, String> toString = new Fn<URI, String>() {
1488 @Override
1489 public String apply(URI uri) {
1490 return uri.toString();
1491 }
1492 };
1493
1494 Optional<URI> uri = Optional.ofNullable(mpe.getURI());
1495 if (uri.isPresent() && "urn".equals(uri.get().getScheme())) {
1496 String[] tmp = uri.get().toString().split(":");
1497 if (tmp.length < 1) {
1498 return Optional.empty();
1499 }
1500 return Optional.of(tmp[tmp.length - 1]);
1501 }
1502 return Optional.empty();
1503 }
1504
1505
1506
1507
1508
1509 public static Snapshot rewriteUris(Snapshot snapshot, Fn<MediaPackageElement, URI> uriCreator) {
1510 final MediaPackage mpCopy = MediaPackageSupport.copy(snapshot.getMediaPackage());
1511 for (final MediaPackageElement mpe : assetsOnly(mpCopy).getElements()) {
1512 mpe.setURI(uriCreator.apply(mpe));
1513 }
1514 return new SnapshotImpl(
1515 snapshot.getVersion(),
1516 snapshot.getOrganizationId(),
1517 snapshot.getArchivalDate(),
1518 snapshot.getAvailability(),
1519 snapshot.getStorageId(),
1520 snapshot.getOwner(),
1521 mpCopy);
1522 }
1523
1524 public void fireEventHandlers(AssetManagerItem item) {
1525 while (handlers.size() != 2) {
1526 logger.warn("Expecting 2 handlers, but {} are registered. Waiting 10s then retrying...", handlers.size());
1527 try {
1528 Thread.sleep(10000L);
1529 } catch (InterruptedException e) { }
1530 }
1531 for (AssetManagerUpdateHandler handler : handlers) {
1532 handler.execute(item);
1533 }
1534 }
1535
1536
1537
1538
1539
1540
1541 private final class ADeleteQueryWithMessaging extends ADeleteQueryDecorator {
1542 ADeleteQueryWithMessaging(ADeleteQuery delegate) {
1543 super(delegate);
1544 }
1545
1546 @Override
1547 public long run() {
1548 return RuntimeTypes.convert(delegate).run(AssetManagerImpl.this);
1549 }
1550
1551 @Override
1552 protected ADeleteQueryDecorator mkDecorator(ADeleteQuery delegate) {
1553 return new ADeleteQueryWithMessaging(delegate);
1554 }
1555 }
1556
1557
1558
1559
1560
1561
1562 private Function<Optional<Event>, Optional<Event>> getEventUpdateFunction(Snapshot snapshot,
1563 String orgId, User user) {
1564 return (Optional<Event> eventOpt) -> {
1565 MediaPackage mp = snapshot.getMediaPackage();
1566 String eventId = mp.getIdentifier().toString();
1567 Event event = eventOpt.orElse(new Event(eventId, orgId));
1568
1569 event = updateAclInEvent(event, mp);
1570
1571 event.setArchiveVersion(Long.parseLong(snapshot.getVersion().toString()));
1572 if (StringUtils.isBlank(event.getCreator())) {
1573 event.setCreator(securityService.getUser().getName());
1574 }
1575 EventIndexUtils.updateEvent(event, mp);
1576
1577 for (Catalog catalog: mp.getCatalogs(MediaPackageElements.EPISODE)) {
1578 try (InputStream in = workspace.read(catalog.getURI())) {
1579 EventIndexUtils.updateEvent(event, DublinCores.read(in));
1580 } catch (IOException | NotFoundException e) {
1581 throw new IllegalStateException(String.format("Unable to load dublin core catalog for event '%s'",
1582 mp.getIdentifier()), e);
1583 }
1584 }
1585
1586
1587 event.resetExtendedMetadata();
1588 for (EventCatalogUIAdapter extendedCatalogUIAdapter : extendedEventCatalogUIAdapters.getOrDefault(orgId,
1589 Collections.emptyList())) {
1590 for (Catalog catalog: mp.getCatalogs(extendedCatalogUIAdapter.getFlavor())) {
1591 try (InputStream in = workspace.read(catalog.getURI())) {
1592 EventIndexUtils.updateEventExtendedMetadata(event, DublinCores.read(in),
1593 extendedCatalogUIAdapter.getFlavor());
1594 } catch (IOException | NotFoundException e) {
1595 throw new IllegalStateException(String.format("Unable to load extended dublin core catalog '%s' for event "
1596 + "'%s'", catalog.getFlavor(), mp.getIdentifier()), e);
1597 }
1598 }
1599 }
1600
1601
1602 try {
1603 EventIndexUtils.updateSeriesName(event, orgId, user, index);
1604 } catch (SearchIndexException e) {
1605 logger.error("Error updating the series name of the event {} in the {} index.", eventId, index.getIndexName(),
1606 e);
1607 }
1608 return Optional.of(event);
1609 };
1610 }
1611
1612 private Function<Optional<Event>, Optional<Event>> getEventUpdateFunctionOnlyAcl(Snapshot snapshot,
1613 String orgId) {
1614 return (Optional<Event> eventOpt) -> {
1615 MediaPackage mp = snapshot.getMediaPackage();
1616 String eventId = mp.getIdentifier().toString();
1617 Event event = eventOpt.orElse(new Event(eventId, orgId));
1618
1619 event = updateAclInEvent(event, mp);
1620
1621 return Optional.of(event);
1622 };
1623 }
1624
1625 private Event updateAclInEvent(Event event, MediaPackage mp) {
1626 AccessControlList acl = authorizationService.getActiveAcl(mp).getA();
1627 List<ManagedAcl> acls = aclServiceFactory.serviceFor(securityService.getOrganization()).getAcls();
1628
1629 for (final ManagedAcl managedAcl : AccessInformationUtil.matchAcls(acls, acl)) {
1630 event.setManagedAcl(managedAcl.getName());
1631 }
1632 event.setAccessPolicy(AccessControlParser.toJsonSilent(acl));
1633
1634 return event;
1635 }
1636 }