1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.assetmanager.impl;
22
23 import static com.entwinemedia.fn.Prelude.chuck;
24 import static com.entwinemedia.fn.Stream.$;
25 import static java.lang.String.format;
26 import static org.opencastproject.assetmanager.api.fn.Enrichments.enrich;
27 import static org.opencastproject.mediapackage.MediaPackageSupport.Filters.hasNoChecksum;
28 import static org.opencastproject.mediapackage.MediaPackageSupport.Filters.isNotPublication;
29 import static org.opencastproject.mediapackage.MediaPackageSupport.getFileName;
30 import static org.opencastproject.mediapackage.MediaPackageSupport.getMediaPackageElementId;
31 import static org.opencastproject.security.api.SecurityConstants.EPISODE_ROLE_ID_PREFIX;
32 import static org.opencastproject.security.api.SecurityConstants.GLOBAL_ADMIN_ROLE;
33 import static org.opencastproject.security.api.SecurityConstants.GLOBAL_CAPTURE_AGENT_ROLE;
34 import static org.opencastproject.security.util.SecurityUtil.getEpisodeRoleId;
35
36 import org.opencastproject.assetmanager.api.Asset;
37 import org.opencastproject.assetmanager.api.AssetId;
38 import org.opencastproject.assetmanager.api.AssetManager;
39 import org.opencastproject.assetmanager.api.AssetManagerException;
40 import org.opencastproject.assetmanager.api.Availability;
41 import org.opencastproject.assetmanager.api.Property;
42 import org.opencastproject.assetmanager.api.PropertyId;
43 import org.opencastproject.assetmanager.api.Snapshot;
44 import org.opencastproject.assetmanager.api.Value;
45 import org.opencastproject.assetmanager.api.Version;
46 import org.opencastproject.assetmanager.api.fn.Enrichments;
47 import org.opencastproject.assetmanager.api.query.ADeleteQuery;
48 import org.opencastproject.assetmanager.api.query.AQueryBuilder;
49 import org.opencastproject.assetmanager.api.query.ARecord;
50 import org.opencastproject.assetmanager.api.query.AResult;
51 import org.opencastproject.assetmanager.api.query.ASelectQuery;
52 import org.opencastproject.assetmanager.api.query.Predicate;
53 import org.opencastproject.assetmanager.api.query.RichAResult;
54 import org.opencastproject.assetmanager.api.query.Target;
55 import org.opencastproject.assetmanager.api.storage.AssetStore;
56 import org.opencastproject.assetmanager.api.storage.DeletionSelector;
57 import org.opencastproject.assetmanager.api.storage.RemoteAssetStore;
58 import org.opencastproject.assetmanager.api.storage.Source;
59 import org.opencastproject.assetmanager.api.storage.StoragePath;
60 import org.opencastproject.assetmanager.impl.persistence.Database;
61 import org.opencastproject.assetmanager.impl.persistence.SnapshotDto;
62 import org.opencastproject.assetmanager.impl.query.AQueryBuilderImpl;
63 import org.opencastproject.assetmanager.impl.query.AbstractADeleteQuery;
64 import org.opencastproject.authorization.xacml.manager.api.AclServiceFactory;
65 import org.opencastproject.authorization.xacml.manager.api.ManagedAcl;
66 import org.opencastproject.authorization.xacml.manager.util.AccessInformationUtil;
67 import org.opencastproject.db.DBSessionFactory;
68 import org.opencastproject.elasticsearch.api.SearchIndexException;
69 import org.opencastproject.elasticsearch.index.ElasticsearchIndex;
70 import org.opencastproject.elasticsearch.index.objects.event.Event;
71 import org.opencastproject.elasticsearch.index.objects.event.EventIndexUtils;
72 import org.opencastproject.elasticsearch.index.rebuild.AbstractIndexProducer;
73 import org.opencastproject.elasticsearch.index.rebuild.IndexProducer;
74 import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildException;
75 import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService;
76 import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService.DataType;
77 import org.opencastproject.mediapackage.Catalog;
78 import org.opencastproject.mediapackage.MediaPackage;
79 import org.opencastproject.mediapackage.MediaPackageElement;
80 import org.opencastproject.mediapackage.MediaPackageElements;
81 import org.opencastproject.mediapackage.MediaPackageParser;
82 import org.opencastproject.mediapackage.MediaPackageSupport;
83 import org.opencastproject.message.broker.api.assetmanager.AssetManagerItem;
84 import org.opencastproject.message.broker.api.update.AssetManagerUpdateHandler;
85 import org.opencastproject.metadata.dublincore.DublinCores;
86 import org.opencastproject.metadata.dublincore.EventCatalogUIAdapter;
87 import org.opencastproject.security.api.AccessControlEntry;
88 import org.opencastproject.security.api.AccessControlList;
89 import org.opencastproject.security.api.AccessControlParser;
90 import org.opencastproject.security.api.AuthorizationService;
91 import org.opencastproject.security.api.DefaultOrganization;
92 import org.opencastproject.security.api.Organization;
93 import org.opencastproject.security.api.OrganizationDirectoryService;
94 import org.opencastproject.security.api.Role;
95 import org.opencastproject.security.api.SecurityService;
96 import org.opencastproject.security.api.UnauthorizedException;
97 import org.opencastproject.security.api.User;
98 import org.opencastproject.security.util.SecurityUtil;
99 import org.opencastproject.util.Checksum;
100 import org.opencastproject.util.ChecksumType;
101 import org.opencastproject.util.MimeTypes;
102 import org.opencastproject.util.NotFoundException;
103 import org.opencastproject.util.RequireUtil;
104 import org.opencastproject.util.data.functions.Functions;
105 import org.opencastproject.workspace.api.Workspace;
106
107 import com.entwinemedia.fn.Fn;
108 import com.entwinemedia.fn.Fx;
109 import com.entwinemedia.fn.P1;
110 import com.entwinemedia.fn.P1Lazy;
111 import com.entwinemedia.fn.Pred;
112 import com.entwinemedia.fn.Prelude;
113 import com.entwinemedia.fn.fns.Booleans;
114 import com.google.common.collect.Sets;
115
116 import org.apache.commons.io.FileUtils;
117 import org.apache.commons.io.IOUtils;
118 import org.apache.commons.lang3.BooleanUtils;
119 import org.apache.commons.lang3.StringUtils;
120 import org.osgi.service.component.ComponentContext;
121 import org.osgi.service.component.annotations.Activate;
122 import org.osgi.service.component.annotations.Component;
123 import org.osgi.service.component.annotations.Reference;
124 import org.osgi.service.component.annotations.ReferenceCardinality;
125 import org.osgi.service.component.annotations.ReferencePolicy;
126 import org.slf4j.Logger;
127 import org.slf4j.LoggerFactory;
128
129 import java.io.File;
130 import java.io.IOException;
131 import java.io.InputStream;
132 import java.net.URI;
133 import java.net.URISyntaxException;
134 import java.security.NoSuchAlgorithmException;
135 import java.util.ArrayList;
136 import java.util.Arrays;
137 import java.util.Collections;
138 import java.util.Date;
139 import java.util.HashMap;
140 import java.util.LinkedHashMap;
141 import java.util.List;
142 import java.util.Map;
143 import java.util.Objects;
144 import java.util.Optional;
145 import java.util.Set;
146 import java.util.UUID;
147 import java.util.concurrent.CopyOnWriteArrayList;
148 import java.util.function.Function;
149 import java.util.stream.Collectors;
150
151 import javax.persistence.EntityManagerFactory;
152
153
154
155
156 @Component(
157 property = {
158 "service.description=Opencast Asset Manager"
159 },
160 immediate = true,
161 service = { AssetManager.class, IndexProducer.class }
162 )
163 public class AssetManagerImpl extends AbstractIndexProducer implements AssetManager,
164 AbstractADeleteQuery.DeleteEpisodeHandler {
165
166 private static final Logger logger = LoggerFactory.getLogger(AssetManagerImpl.class);
167
168 private static final int PAGE_SIZE = 1000;
169
170 enum AdminRole {
171 GLOBAL, ORGANIZATION, NONE
172 }
173
174 public static final String WRITE_ACTION = "write";
175 public static final String READ_ACTION = "read";
176 public static final String SECURITY_NAMESPACE = "org.opencastproject.assetmanager.security";
177
178 private static final String MANIFEST_DEFAULT_NAME = "manifest";
179
180 private static final String CONFIG_EPISODE_ID_ROLE = "org.opencastproject.episode.id.role.access";
181 private static boolean episodeIdRole = false;
182
183 private CopyOnWriteArrayList<AssetManagerUpdateHandler> handlers = new CopyOnWriteArrayList<>();
184
185 private SecurityService securityService;
186 private AuthorizationService authorizationService;
187 private OrganizationDirectoryService orgDir;
188 private Workspace workspace;
189 private AssetStore assetStore;
190 private HttpAssetProvider httpAssetProvider;
191 private String systemUserName;
192 private Database db;
193 private DBSessionFactory dbSessionFactory;
194 private EntityManagerFactory emf;
195 private AclServiceFactory aclServiceFactory;
196 private ElasticsearchIndex index;
197 private Map<String, List<EventCatalogUIAdapter>> extendedEventCatalogUIAdapters = new HashMap<>();
198
199
200 private boolean includeAPIRoles;
201 private boolean includeCARoles;
202 private boolean includeUIRoles;
203
204
205 public static final Set<MediaPackageElement.Type> MOVABLE_TYPES = Sets.newHashSet(
206 MediaPackageElement.Type.Attachment,
207 MediaPackageElement.Type.Catalog,
208 MediaPackageElement.Type.Track
209 );
210
211 private final HashMap<String, RemoteAssetStore> remoteStores = new LinkedHashMap<>();
212
213
214
215
216 @Activate
217 public synchronized void activate(ComponentContext cc) {
218 logger.info("Activating AssetManager.");
219 db = new Database(dbSessionFactory.createSession(emf));
220 systemUserName = SecurityUtil.getSystemUserName(cc);
221
222 includeAPIRoles = BooleanUtils.toBoolean(Objects.toString(cc.getProperties().get("includeAPIRoles"), null));
223 includeCARoles = BooleanUtils.toBoolean(Objects.toString(cc.getProperties().get("includeCARoles"), null));
224 includeUIRoles = BooleanUtils.toBoolean(Objects.toString(cc.getProperties().get("includeUIRoles"), null));
225
226 episodeIdRole = BooleanUtils.toBoolean(Objects.toString(
227 cc.getBundleContext().getProperty(CONFIG_EPISODE_ID_ROLE), "false"));
228 logger.debug("Usage of episode ID roles is set to {}", episodeIdRole);
229 }
230
231
232
233
234
235 @Reference(target = "(osgi.unit.name=org.opencastproject.assetmanager.impl)")
236 public void setEntityManagerFactory(EntityManagerFactory emf) {
237 this.emf = emf;
238 }
239
240 @Reference
241 public void setDBSessionFactory(DBSessionFactory dbSessionFactory) {
242 this.dbSessionFactory = dbSessionFactory;
243 }
244
245 @Reference
246 public void setSecurityService(SecurityService securityService) {
247 this.securityService = securityService;
248 }
249
250 @Reference
251 public void setAuthorizationService(AuthorizationService authorizationService) {
252 this.authorizationService = authorizationService;
253 }
254
255 @Reference
256 public void setOrgDir(OrganizationDirectoryService orgDir) {
257 this.orgDir = orgDir;
258 }
259
260 @Reference
261 public void setWorkspace(Workspace workspace) {
262 this.workspace = workspace;
263 }
264
265 @Reference
266 public void setAssetStore(AssetStore assetStore) {
267 this.assetStore = assetStore;
268 }
269
270 @Reference(
271 cardinality = ReferenceCardinality.MULTIPLE,
272 policy = ReferencePolicy.DYNAMIC,
273 unbind = "removeEventHandler"
274 )
275 public void addEventHandler(AssetManagerUpdateHandler handler) {
276 this.handlers.add(handler);
277 }
278
279 public void removeEventHandler(AssetManagerUpdateHandler handler) {
280 this.handlers.remove(handler);
281 }
282
283 @Reference(
284 cardinality = ReferenceCardinality.MULTIPLE,
285 policy = ReferencePolicy.DYNAMIC,
286 unbind = "removeRemoteAssetStore"
287 )
288 public synchronized void addRemoteAssetStore(RemoteAssetStore assetStore) {
289 remoteStores.put(assetStore.getStoreType(), assetStore);
290 }
291
292 public void removeRemoteAssetStore(RemoteAssetStore store) {
293 remoteStores.remove(store.getStoreType());
294 }
295
296 @Reference
297 public void setHttpAssetProvider(HttpAssetProvider httpAssetProvider) {
298 this.httpAssetProvider = httpAssetProvider;
299 }
300
301 @Reference
302 public void setAclServiceFactory(AclServiceFactory aclServiceFactory) {
303 this.aclServiceFactory = aclServiceFactory;
304 }
305
306 @Reference
307 public void setIndex(ElasticsearchIndex index) {
308 this.index = index;
309 }
310
311 @Reference(cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC,
312 target = "(common-metadata=false)")
313 public synchronized void addCatalogUIAdapter(EventCatalogUIAdapter catalogUIAdapter) {
314 List<EventCatalogUIAdapter> list = extendedEventCatalogUIAdapters.computeIfAbsent(
315 catalogUIAdapter.getOrganization(), k -> new ArrayList());
316 list.add(catalogUIAdapter);
317 }
318
319 public synchronized void removeCatalogUIAdapter(EventCatalogUIAdapter catalogUIAdapter) {
320 if (extendedEventCatalogUIAdapters.containsKey(catalogUIAdapter.getOrganization())) {
321 extendedEventCatalogUIAdapters.get(catalogUIAdapter.getOrganization()).remove(catalogUIAdapter);
322 }
323 }
324
325
326
327
328
329 @Override
330 public Optional<MediaPackage> getMediaPackage(String mediaPackageId) {
331 final AQueryBuilder q = createQuery();
332 final AResult r = q.select(q.snapshot()).where(q.mediaPackageId(mediaPackageId).and(q.version().isLatest()))
333 .run();
334
335 if (r.getSize() == 0) {
336 return Optional.empty();
337 }
338 return Optional.of(r.getRecords().stream().findFirst().get().getSnapshot().get().getMediaPackage());
339 }
340
341 @Override
342 public Optional<Asset> getAsset(Version version, String mpId, String mpElementId) {
343 if (isAuthorized(mpId, READ_ACTION)) {
344
345 var asset = getDatabase().getAsset(RuntimeTypes.convert(version), mpId, mpElementId);
346 if (asset.isPresent()) {
347 var storageId = getSnapshotStorageLocation(version, mpId);
348 if (storageId.isPresent()) {
349 var store = getAssetStore(storageId.get());
350 if (store.isPresent()) {
351 var assetStream = store.get().get(StoragePath.mk(
352 asset.get().getOrganizationId(),
353 mpId,
354 version,
355 mpElementId
356 ));
357 if (assetStream.isPresent()) {
358
359 Checksum checksum = null;
360 try {
361 checksum = Checksum.fromString(asset.get().getAssetDto().getChecksum());
362 } catch (NoSuchAlgorithmException e) {
363 logger.warn("Invalid checksum for asset {} of media package {}", mpElementId, mpId, e);
364 }
365
366 final Asset a = new AssetImpl(
367 AssetId.mk(version, mpId, mpElementId),
368 assetStream.get(),
369 asset.get().getAssetDto().getMimeType(),
370 asset.get().getAssetDto().getSize(),
371 asset.get().getStorageId(),
372 asset.get().getAvailability(),
373 checksum);
374 return Optional.of(a);
375 }
376 }
377 }
378 }
379 return Optional.empty();
380 }
381 return chuck(new UnauthorizedException(
382 format("Not allowed to read assets of snapshot %s, version=%s", mpId, version)
383 ));
384 }
385
386 @Override
387 public Optional<AssetStore> getAssetStore(String storeId) {
388 if (assetStore.getStoreType().equals(storeId)) {
389 return Optional.of(assetStore);
390 } else {
391 if (remoteStores.containsKey(storeId)) {
392 return Optional.of(remoteStores.get(storeId));
393 } else {
394 return Optional.empty();
395 }
396 }
397 }
398
399 @Override
400 public AssetStore getLocalAssetStore() {
401 return assetStore;
402 }
403
404 @Override
405 public List<AssetStore> getRemoteAssetStores() {
406 return new ArrayList<>(remoteStores.values());
407 }
408
409
410
411 @Override
412 public boolean snapshotExists(final String mediaPackageId) {
413 return getDatabase().snapshotExists(mediaPackageId);
414 }
415
416 @Override
417 public boolean snapshotExists(final String mediaPackageId, final String organization) {
418 return getDatabase().snapshotExists(mediaPackageId, organization);
419 }
420
421 @Override
422 public Snapshot takeSnapshot(MediaPackage mp) {
423 return takeSnapshot(null, mp);
424 }
425
426 @Override
427 public Snapshot takeSnapshot(String owner, MediaPackage mp) {
428
429 final String mediaPackageId = mp.getIdentifier().toString();
430 final boolean firstSnapshot = !snapshotExists(mediaPackageId);
431
432
433
434
435 if (firstSnapshot) {
436
437 deleteProperties(mediaPackageId);
438 }
439 if (firstSnapshot || isAuthorized(mediaPackageId, WRITE_ACTION)) {
440 final Snapshot snapshot;
441 if (owner == null) {
442 snapshot = takeSnapshotInternal(mp);
443 } else {
444 snapshot = takeSnapshotInternal(owner, mp);
445 }
446
447 final AccessControlList acl = authorizationService.getActiveAcl(mp).getA();
448
449
450 deleteProperties(mediaPackageId, SECURITY_NAMESPACE);
451
452 for (final AccessControlEntry ace : acl.getEntries()) {
453 getDatabase().saveProperty(Property.mk(PropertyId.mk(mediaPackageId, SECURITY_NAMESPACE,
454 mkPropertyName(ace.getRole(), ace.getAction())), Value.mk(ace.isAllow())));
455 }
456
457 updateEventInIndex(snapshot);
458
459 logger.info("Trigger update handlers for snapshot {}, version {}",
460 snapshot.getMediaPackage().getIdentifier(), snapshot.getVersion());
461 fireEventHandlers(mkTakeSnapshotMessage(snapshot));
462
463 return snapshot;
464 }
465 return chuck(new UnauthorizedException("Not allowed to take snapshot of media package " + mediaPackageId));
466 }
467
468 private Snapshot takeSnapshotInternal(MediaPackage mediaPackage) {
469 final String mediaPackageId = mediaPackage.getIdentifier().toString();
470 AQueryBuilder queryBuilder = createQuery();
471 AResult result = queryBuilder.select(queryBuilder.snapshot())
472 .where(queryBuilder.mediaPackageId(mediaPackageId).and(queryBuilder.version().isLatest())).run();
473 Optional<ARecord> record = result.getRecords().stream().findFirst();
474 if (record.isPresent()) {
475 Optional<Snapshot> snapshot = Optional.of(record.get().getSnapshot().get());
476 if (snapshot.isPresent()) {
477 return takeSnapshotInternal(snapshot.get().getOwner(), mediaPackage);
478 }
479 }
480 return takeSnapshotInternal(DEFAULT_OWNER, mediaPackage);
481 }
482
483 private Snapshot takeSnapshotInternal(final String owner, final MediaPackage mp) {
484 return handleException(new P1Lazy<Snapshot>() {
485 @Override public Snapshot get1() {
486 try {
487 final Snapshot archived = addInternal(owner, MediaPackageSupport.copy(mp)).toSnapshot();
488 return getHttpAssetProvider().prepareForDelivery(archived);
489 } catch (Exception e) {
490 return Prelude.chuck(e);
491 }
492 }
493 });
494 }
495
496
497
498
499
500
501 private AssetManagerItem.TakeSnapshot mkTakeSnapshotMessage(Snapshot snapshot) {
502 final MediaPackage mp = snapshot.getMediaPackage();
503
504 long version;
505 try {
506 version = Long.parseLong(snapshot.getVersion().toString());
507 } catch (NumberFormatException e) {
508
509
510
511
512 throw new RuntimeException("The current implementation of the index requires versions being of type 'long'.");
513 }
514
515 return AssetManagerItem.add(workspace, mp, authorizationService.getActiveAcl(mp).getA(),
516 version, snapshot.getArchivalDate());
517 }
518
519 @Override
520 public void triggerIndexUpdate(String mediaPackageId) throws NotFoundException, UnauthorizedException {
521
522 if (!securityService.getUser().hasRole("ROLE_ADMIN")) {
523 throw new UnauthorizedException("Only global administrators may trigger manual event updates.");
524 }
525 final AQueryBuilder q = createQuery();
526 final AResult r = q.select(q.snapshot()).where(q.mediaPackageId(mediaPackageId).and(q.version().isLatest())).run();
527
528 if (r.getSize() == 0) {
529 throw new NotFoundException("No event with ID `" + mediaPackageId + "`");
530 }
531
532
533 var snapshot = r.getRecords().stream().findFirst().get().getSnapshot().get();
534 updateEventInIndex(snapshot);
535 }
536
537
538
539
540
541
542
543 private void updateEventInIndex(Snapshot snapshot) {
544 final MediaPackage mp = snapshot.getMediaPackage();
545 String eventId = mp.getIdentifier().toString();
546 final String organization = securityService.getOrganization().getId();
547 final User user = securityService.getUser();
548 logger.debug("Updating event {} in the {} index.", eventId, index.getIndexName());
549
550 Function<Optional<Event>, Optional<Event>> updateFunction = (Optional<Event> eventOpt) -> {
551 Event event = eventOpt.orElse(new Event(eventId, organization));
552
553 AccessControlList acl = authorizationService.getActiveAcl(mp).getA();
554 List<ManagedAcl> acls = aclServiceFactory.serviceFor(securityService.getOrganization()).getAcls();
555 for (final ManagedAcl managedAcl : AccessInformationUtil.matchAcls(acls, acl)) {
556 event.setManagedAcl(managedAcl.getName());
557 }
558 event.setAccessPolicy(AccessControlParser.toJsonSilent(acl));
559 event.setArchiveVersion(Long.parseLong(snapshot.getVersion().toString()));
560 if (StringUtils.isBlank(event.getCreator())) {
561 event.setCreator(securityService.getUser().getName());
562 }
563 EventIndexUtils.updateEvent(event, mp);
564
565
566 for (Catalog catalog: mp.getCatalogs(MediaPackageElements.EPISODE)) {
567 try (InputStream in = workspace.read(catalog.getURI())) {
568 EventIndexUtils.updateEvent(event, DublinCores.read(in));
569 } catch (IOException | NotFoundException e) {
570 throw new IllegalStateException(String.format("Unable to load common dublin core catalog for event '%s'",
571 mp.getIdentifier()), e);
572 }
573 }
574
575
576 event.resetExtendedMetadata();
577 for (EventCatalogUIAdapter extendedCatalogUIAdapter : extendedEventCatalogUIAdapters.getOrDefault(organization,
578 Collections.emptyList())) {
579 for (Catalog catalog: mp.getCatalogs(extendedCatalogUIAdapter.getFlavor())) {
580 try (InputStream in = workspace.read(catalog.getURI())) {
581 EventIndexUtils.updateEventExtendedMetadata(event, DublinCores.read(in),
582 extendedCatalogUIAdapter.getFlavor());
583 } catch (IOException | NotFoundException e) {
584 throw new IllegalStateException(String.format("Unable to load extended dublin core catalog '%s' for event "
585 + "'%s'", catalog.getFlavor(), mp.getIdentifier()), e);
586 }
587 }
588 }
589
590
591 try {
592 EventIndexUtils.updateSeriesName(event, organization, user, index);
593 } catch (SearchIndexException e) {
594 logger.error("Error updating the series name of the event {} in the {} index.", eventId, index.getIndexName(),
595 e);
596 }
597 return Optional.of(event);
598 };
599
600
601 try {
602 index.addOrUpdateEvent(eventId, updateFunction, organization, user);
603 logger.debug("Event {} updated in the {} index.", eventId, index.getIndexName());
604 } catch (SearchIndexException e) {
605 logger.error("Error updating the event {} in the {} index.", eventId, index.getIndexName(), e);
606 }
607 }
608
609
610
611
612
613
614
615 private void removeArchivedVersionFromIndex(String eventId) {
616 final String orgId = securityService.getOrganization().getId();
617 final User user = securityService.getUser();
618 logger.debug("Received AssetManager delete episode message {}", eventId);
619
620 Function<Optional<Event>, Optional<Event>> updateFunction = (Optional<Event> eventOpt) -> {
621 if (eventOpt.isEmpty()) {
622 logger.warn("Event {} not found for deletion", eventId);
623 return Optional.empty();
624 }
625 Event event = eventOpt.get();
626 event.setArchiveVersion(null);
627 return Optional.of(event);
628 };
629
630 try {
631 index.addOrUpdateEvent(eventId, updateFunction, orgId, user);
632 logger.debug("Event {} removed from the {} index", eventId, index.getIndexName());
633 } catch (SearchIndexException e) {
634 logger.error("Error deleting the event {} from the {} index.", eventId, index.getIndexName(), e);
635 }
636 }
637
638 @Override
639 public RichAResult getSnapshotsById(final String mpId) {
640 RequireUtil.requireNotBlank(mpId, "mpId");
641 AQueryBuilder q = createQuery();
642 ASelectQuery query = baseQuery(q, mpId);
643 return Enrichments.enrich(query.run());
644 }
645
646 @Override
647 public RichAResult getSnapshotsByIdOrderedByVersion(String mpId, boolean asc) {
648 RequireUtil.requireNotBlank(mpId, "mpId");
649 AQueryBuilder q = createQuery();
650 ASelectQuery query = baseQuery(q, mpId);
651 if (asc) {
652 query = query.orderBy(q.version().asc());
653 } else {
654 query = query.orderBy(q.version().desc());
655 }
656 return Enrichments.enrich(query.run());
657 }
658
659 @Override
660 public RichAResult getSnapshotsByIdAndVersion(final String mpId, final Version version) {
661 RequireUtil.requireNotBlank(mpId, "mpId");
662 RequireUtil.notNull(version, "version");
663 AQueryBuilder q = createQuery();
664 ASelectQuery query = baseQuery(q, version, mpId);
665 return Enrichments.enrich(query.run());
666 }
667
668 @Override
669 public RichAResult getSnapshotsByDate(final Date start, final Date end) {
670 RequireUtil.notNull(start, "start");
671 RequireUtil.notNull(end, "end");
672 AQueryBuilder q = createQuery();
673 ASelectQuery query = baseQuery(q).where(q.archived().ge(start)).where(q.archived().le(end));
674 return Enrichments.enrich(query.run());
675 }
676
677 @Override
678 public RichAResult getSnapshotsByDateOrderedById(Date start, Date end) {
679 RequireUtil.notNull(start, "start");
680 RequireUtil.notNull(end, "end");
681 AQueryBuilder q = createQuery();
682 ASelectQuery query = baseQuery(q).where(q.archived().ge(start)).where(q.archived().le(end));
683 return Enrichments.enrich(query.orderBy(q.mediapackageId().asc()).run());
684 }
685
686 @Override
687 public RichAResult getSnapshotsByIdAndDate(final String mpId, final Date start, final Date end) {
688 RequireUtil.requireNotBlank(mpId, "mpId");
689 RequireUtil.notNull(start, "start");
690 RequireUtil.notNull(end, "end");
691 AQueryBuilder q = createQuery();
692 ASelectQuery query = baseQuery(q, mpId).where(q.archived().ge(start)).where(q.archived().le(end));
693 return Enrichments.enrich(query.run());
694 }
695
696 @Override
697 public RichAResult getSnapshotsByIdAndDateOrderedByVersion(String mpId, Date start, Date end, boolean asc) {
698 RequireUtil.requireNotBlank(mpId, "mpId");
699 RequireUtil.notNull(start, "start");
700 RequireUtil.notNull(end, "end");
701 AQueryBuilder q = createQuery();
702 ASelectQuery query = baseQuery(q, mpId).where(q.archived().ge(start)).where(q.archived().le(end));
703 if (asc) {
704 query = query.orderBy(q.version().asc());
705 } else {
706 query = query.orderBy(q.version().desc());
707 }
708 return Enrichments.enrich(query.run());
709 }
710
711 @Override
712 public void moveSnapshotsById(final String mpId, final String targetStore) throws NotFoundException {
713 RichAResult results = getSnapshotsById(mpId);
714
715 if (results.getRecords().isEmpty()) {
716 throw new NotFoundException("Mediapackage " + mpId + " not found!");
717 }
718
719 processOperations(results, targetStore);
720 }
721
722 @Override
723 public void moveSnapshotsByIdAndVersion(final String mpId, final Version version, final String targetStore)
724 throws NotFoundException {
725 RichAResult results = getSnapshotsByIdAndVersion(mpId, version);
726
727 if (results.getRecords().isEmpty()) {
728 throw new NotFoundException("Mediapackage " + mpId + "@" + version.toString() + " not found!");
729 }
730
731 processOperations(results, targetStore);
732 }
733
734 @Override
735 public void moveSnapshotsByDate(final Date start, final Date end, final String targetStore)
736 throws NotFoundException {
737
738
739 AQueryBuilder q = createQuery();
740 ASelectQuery query = baseQuery(q)
741 .where(q.storage(targetStore).not())
742 .where(q.archived().ge(start))
743 .where(q.archived().le(end));
744 RichAResult results = Enrichments.enrich(query.run());
745
746 if (results.getRecords().isEmpty()) {
747 throw new NotFoundException("No media packages found between " + start + " and " + end);
748 }
749
750 processOperations(results, targetStore);
751 }
752
753 @Override
754 public void moveSnapshotsByIdAndDate(final String mpId, final Date start, final Date end, final String targetStore)
755 throws NotFoundException {
756 RichAResult results = getSnapshotsByIdAndDate(mpId, start, end);
757
758 if (results.getRecords().isEmpty()) {
759 throw new NotFoundException("No media package with id " + mpId + " found between " + start + " and " + end);
760 }
761
762 processOperations(results, targetStore);
763 }
764
765 @Override
766 public void moveSnapshotToStore(final Version version, final String mpId, final String storeId)
767 throws NotFoundException {
768
769
770 AQueryBuilder q = createQuery();
771 RichAResult results = Enrichments.enrich(baseQuery(q, version, mpId).run());
772
773 if (results.getRecords().isEmpty()) {
774 throw new NotFoundException("Mediapackage " + mpId + "@" + version.toString() + " not found!");
775 }
776 processOperations(results, storeId);
777 }
778
779
780 private void processOperations(final RichAResult results, final String targetStoreId) {
781 results.getRecords().forEach(record -> {
782 Snapshot s = record.getSnapshot().get();
783 Optional<String> currentStoreId = getSnapshotStorageLocation(s);
784
785 if (currentStoreId.isEmpty()) {
786 logger.warn("IsNone store ID");
787 return;
788 }
789
790
791 if (currentStoreId.get().equals(targetStoreId)) {
792
793 return;
794 }
795
796 AssetStore currentStore;
797 AssetStore targetStore;
798
799 Optional<AssetStore> optCurrentStore = getAssetStore(currentStoreId.get());
800 Optional<AssetStore> optTargetStore = getAssetStore(targetStoreId);
801
802 if (!optCurrentStore.isEmpty()) {
803 currentStore = optCurrentStore.get();
804 } else {
805 logger.error("Unknown current store: " + currentStoreId.get());
806 return;
807 }
808 if (!optTargetStore.isEmpty()) {
809 targetStore = optTargetStore.get();
810 } else {
811 logger.error("Unknown target store: " + targetStoreId);
812 return;
813 }
814
815
816
817 String localAssetStoreType = getLocalAssetStore().getStoreType();
818 if (localAssetStoreType.equals(currentStoreId.get()) || localAssetStoreType.equals(targetStoreId)) {
819 logger.debug("Moving {} from {} to {}", s, currentStoreId, targetStoreId);
820
821 try {
822 copyAssetsToStore(s, targetStore);
823 copyManifest(s, targetStore);
824 } catch (Exception e) {
825 Functions.chuck(e);
826 }
827 getDatabase().setStorageLocation(s, targetStoreId);
828 currentStore.delete(DeletionSelector.delete(s.getOrganizationId(),
829 s.getMediaPackage().getIdentifier().toString(), s.getVersion()
830 ));
831 } else {
832
833 String intermediateStore = getLocalAssetStore().getStoreType();
834 logger.debug("Moving {} from {} to {}, then to {}",
835 s, currentStoreId, intermediateStore, targetStoreId);
836 Version version = s.getVersion();
837 String mpId = s.getMediaPackage().getIdentifier().toString();
838 try {
839 moveSnapshotToStore(version, mpId, intermediateStore);
840 moveSnapshotToStore(version, mpId, targetStoreId);
841 } catch (NotFoundException e) {
842 Functions.chuck(e);
843 }
844 }
845 });
846 }
847
848
849 public Optional<String> getSnapshotStorageLocation(final Version version, final String mpId) {
850 RichAResult result = getSnapshotsByIdAndVersion(mpId, version);
851
852 for (Snapshot snapshot : result.getSnapshots()) {
853 return Optional.of(snapshot.getStorageId());
854 }
855
856 logger.error("Mediapackage " + mpId + "@" + version + " not found!");
857 return Optional.empty();
858 }
859
860 public Optional<String> getSnapshotStorageLocation(final Snapshot snap) {
861 return getSnapshotStorageLocation(snap.getVersion(), snap.getMediaPackage().getIdentifier().toString());
862 }
863
864
865
866 @Override
867 public boolean setProperty(Property property) {
868 final String mpId = property.getId().getMediaPackageId();
869 if (isAuthorized(mpId, WRITE_ACTION)) {
870 return getDatabase().saveProperty(property);
871 }
872 return chuck(new UnauthorizedException("Not allowed to set property on episode " + mpId));
873 }
874
875 @Override
876 public List<Property> selectProperties(final String mediaPackageId, String namespace) {
877 if (isAuthorized(mediaPackageId, READ_ACTION)) {
878 return getDatabase().selectProperties(mediaPackageId, namespace);
879 }
880 return chuck(new UnauthorizedException(format("Not allowed to read properties of event %s", mediaPackageId)));
881 }
882
883 @Override
884 public int deleteProperties(final String mediaPackageId) {
885 return getDatabase().deleteProperties(mediaPackageId);
886 }
887
888 @Override
889 public int deleteProperties(final String mediaPackageId, final String namespace) {
890 return getDatabase().deleteProperties(mediaPackageId, namespace);
891 }
892
893
894
895 @Override
896 public AQueryBuilder createQuery() {
897 return new AQueryBuilderDecorator(createQueryWithoutSecurityCheck()) {
898 @Override public ASelectQuery select(Target... target) {
899 switch (isAdmin()) {
900 case GLOBAL:
901 return super.select(target);
902 case ORGANIZATION:
903 return super.select(target).where(restrictToUsersOrganization());
904 default:
905 return super.select(target).where(mkAuthPredicate(READ_ACTION));
906 }
907 }
908
909 @Override public ADeleteQuery delete(String owner, Target target) {
910 switch (isAdmin()) {
911 case GLOBAL:
912 return super.delete(owner, target);
913 case ORGANIZATION:
914 return super.delete(owner, target).where(restrictToUsersOrganization());
915 default:
916 return super.delete(owner, target).where(mkAuthPredicate(WRITE_ACTION));
917 }
918 }
919 };
920 }
921
922 private AQueryBuilder createQueryWithoutSecurityCheck() {
923 return new AQueryBuilderDecorator(new AQueryBuilderImpl(this)) {
924 @Override
925 public ADeleteQuery delete(String owner, Target target) {
926 return new ADeleteQueryWithMessaging(super.delete(owner, target));
927 }
928 };
929 }
930
931 @Override
932 public Optional<Version> toVersion(String version) {
933 try {
934 return Optional.of(VersionImpl.mk(Long.parseLong(version)));
935 } catch (NumberFormatException e) {
936 return Optional.empty();
937 }
938 }
939
940 @Override
941 public long countEvents(final String organization) {
942 return getDatabase().countEvents(organization);
943 }
944
945 @Override
946 public void handleDeletedEpisode(String mpId) {
947 logger.info("Firing event handlers for deleting event {}", mpId);
948 fireEventHandlers(AssetManagerItem.deleteEpisode(mpId, new Date()));
949
950 removeArchivedVersionFromIndex(mpId);
951 }
952
953
954
955
956
957 @Override
958 public IndexRebuildService.Service getService() {
959 return IndexRebuildService.Service.AssetManager;
960 }
961
962 @Override
963 public DataType[] getSupportedDataTypes() {
964 return new DataType[]{ DataType.ALL, DataType.ACL };
965 }
966
967 @Override
968 public void repopulate(DataType dataType) throws IndexRebuildException {
969 final Organization originalOrg = securityService.getOrganization();
970 final User originalUser = (originalOrg != null ? securityService.getUser() : null);
971 try {
972 final Organization defaultOrg = new DefaultOrganization();
973 final User defaultSystemUser = SecurityUtil.createSystemUser(systemUserName, defaultOrg);
974 securityService.setOrganization(defaultOrg);
975 securityService.setUser(defaultSystemUser);
976
977 int offset = 0;
978 int total = (int) countEvents(null);
979 final AQueryBuilder q = createQuery();
980 RichAResult r;
981 int current = 0;
982 logIndexRebuildBegin(logger, total, "snapshot(s)");
983 var updatedEventRange = new ArrayList<Event>();
984 do {
985 r = enrich(q.select(q.snapshot()).where(q.version().isLatest()).orderBy(q.mediapackageId().desc())
986 .page(offset, PAGE_SIZE).run());
987 offset += PAGE_SIZE;
988 int n = 20;
989
990 final Map<String, List<Snapshot>> byOrg = r.getSnapshots().stream()
991 .collect(Collectors.groupingBy(Snapshot::getOrganizationId));
992 for (String orgId : byOrg.keySet()) {
993 final Organization snapshotOrg;
994 try {
995 snapshotOrg = orgDir.getOrganization(orgId);
996 User snapshotSystemUser = SecurityUtil.createSystemUser(systemUserName, snapshotOrg);
997 securityService.setOrganization(snapshotOrg);
998 securityService.setUser(snapshotSystemUser);
999 for (Snapshot snapshot : byOrg.get(orgId)) {
1000 try {
1001 current++;
1002
1003 var updatedEventData = index.getEvent(snapshot.getMediaPackage().getIdentifier().toString(), orgId,
1004 snapshotSystemUser);
1005 if (dataType == DataType.ALL) {
1006
1007 updatedEventData = getEventUpdateFunction(snapshot, orgId, snapshotSystemUser)
1008 .apply(updatedEventData);
1009 } else if (dataType == DataType.ACL) {
1010
1011 updatedEventData = getEventUpdateFunctionOnlyAcl(snapshot, orgId, snapshotSystemUser)
1012 .apply(updatedEventData);
1013 } else {
1014 throw new IndexRebuildException(dataType + " is not a supported data type. "
1015 + "Accepted values are " + Arrays.toString(getSupportedDataTypes()) + ".");
1016 }
1017 updatedEventRange.add(updatedEventData.get());
1018
1019 if (updatedEventRange.size() >= n || current >= total) {
1020 index.bulkEventUpdate(updatedEventRange);
1021 logIndexRebuildProgress(logger, total, current, n);
1022 updatedEventRange.clear();
1023 }
1024 } catch (Throwable t) {
1025 logSkippingElement(logger, "event", snapshot.getMediaPackage().getIdentifier().toString(),
1026 snapshotOrg, t);
1027 }
1028 }
1029 } catch (Throwable t) {
1030 logIndexRebuildError(logger, t, originalOrg);
1031 throw new IndexRebuildException(getService(), originalOrg, t);
1032 } finally {
1033 securityService.setOrganization(defaultOrg);
1034 securityService.setUser(defaultSystemUser);
1035 }
1036 }
1037 } while (offset < total);
1038 } finally {
1039 securityService.setOrganization(originalOrg);
1040 securityService.setUser(originalUser);
1041 }
1042 }
1043
1044
1045
1046
1047 public void setAvailability(Version version, String mpId, Availability availability) {
1048 if (isAuthorized(mpId, WRITE_ACTION)) {
1049 getDatabase().setAvailability(RuntimeTypes.convert(version), mpId, availability);
1050 } else {
1051 chuck(new UnauthorizedException("Not allowed to set availability of episode " + mpId));
1052 }
1053 }
1054
1055 public void setDatabase(Database database) {
1056 this.db = database;
1057 }
1058
1059 public Database getDatabase() {
1060 return db;
1061 }
1062
1063 public HttpAssetProvider getHttpAssetProvider() {
1064 return httpAssetProvider;
1065 }
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078 private Predicate mkAuthPredicate(final String action) {
1079 final AQueryBuilder q = createQueryWithoutSecurityCheck();
1080 return securityService.getUser().getRoles().stream()
1081 .filter(roleFilter)
1082 .map((role) -> {
1083 if (episodeIdRole && role.getName().startsWith(EPISODE_ROLE_ID_PREFIX)) {
1084 return q.mediapackageId().eq(StringUtils.substringBetween(
1085 role.getName(), EPISODE_ROLE_ID_PREFIX + "_", "_"));
1086 } else {
1087 return q.property(Value.BOOLEAN, SECURITY_NAMESPACE, mkPropertyName(role.getName(), action)).eq(true);
1088 }
1089 })
1090 .reduce(Predicate::or)
1091 .orElseGet(() -> q.always().not())
1092 .and(restrictToUsersOrganization());
1093 }
1094
1095
1096 private Predicate restrictToUsersOrganization() {
1097 return createQueryWithoutSecurityCheck().organizationId().eq(securityService.getUser().getOrganization().getId());
1098 }
1099
1100
1101 private boolean isAuthorized(final String mediaPackageId, final String action) {
1102 switch (isAdmin()) {
1103 case GLOBAL:
1104
1105 logger.debug("Access granted since user is global admin");
1106 return true;
1107 case ORGANIZATION:
1108
1109 logger.debug("User is organization admin. Checking organization. Checking organization ID of asset.");
1110 return snapshotExists(mediaPackageId, securityService.getOrganization().getId());
1111 default:
1112
1113 logger.debug("Non admin user. Checking organization.");
1114 final String org = securityService.getOrganization().getId();
1115 if (!snapshotExists(mediaPackageId, org)) {
1116 return false;
1117 }
1118
1119 User user = securityService.getUser();
1120 if (episodeIdRole && user.hasRole(getEpisodeRoleId(mediaPackageId, action))) {
1121 return true;
1122 }
1123
1124 logger.debug("Non admin user. Checking ACL rules.");
1125 final List<String> roles = user.getRoles().parallelStream()
1126 .filter(roleFilter)
1127 .map((role) -> mkPropertyName(role.getName(), action))
1128 .collect(Collectors.toList());
1129 return getDatabase().selectProperties(mediaPackageId, SECURITY_NAMESPACE).parallelStream()
1130 .map(p -> p.getId().getName())
1131 .filter(p -> p.endsWith(action))
1132 .anyMatch(p -> roles.stream().anyMatch(r -> r.equals(p)));
1133 }
1134 }
1135
1136 private AdminRole isAdmin() {
1137 final User user = securityService.getUser();
1138 if (user.hasRole(GLOBAL_ADMIN_ROLE)) {
1139 return AdminRole.GLOBAL;
1140 } else if (user.hasRole(securityService.getOrganization().getAdminRole())
1141 || user.hasRole(GLOBAL_CAPTURE_AGENT_ROLE)) {
1142
1143
1144 return AdminRole.ORGANIZATION;
1145 } else {
1146 return AdminRole.NONE;
1147 }
1148 }
1149
1150 private String mkPropertyName(String role, String action) {
1151 return role + " | " + action;
1152 }
1153
1154
1155
1156
1157 private final java.util.function.Predicate<Role> roleFilter = (role) -> {
1158 final String name = role.getName();
1159 return (includeAPIRoles || !name.startsWith("ROLE_API_"))
1160 && (includeCARoles || !name.startsWith("ROLE_CAPTURE_AGENT_"))
1161 && (includeUIRoles || !name.startsWith("ROLE_UI_"));
1162 };
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176 private ASelectQuery baseQuery(final AQueryBuilder q) {
1177 RequireUtil.notNull(q, "q");
1178 return q.select(q.snapshot());
1179 }
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191 private ASelectQuery baseQuery(final AQueryBuilder q, final String mpId) {
1192 RequireUtil.notNull(q, "q");
1193 ASelectQuery query = baseQuery(q);
1194 if (StringUtils.isNotEmpty(mpId)) {
1195 return query.where(q.mediaPackageId(mpId));
1196 } else {
1197 return query;
1198 }
1199 }
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213 private ASelectQuery baseQuery(final AQueryBuilder q, final Version version, final String mpId) {
1214 RequireUtil.notNull(q, "q");
1215 RequireUtil.requireNotBlank(mpId, "mpId");
1216 ASelectQuery query = baseQuery(q, mpId);
1217 if (null != version) {
1218 return query.where(q.version().eq(version));
1219 } else {
1220 return query;
1221 }
1222 }
1223
1224
1225 private void copyAssetsToStore(Snapshot snap, AssetStore store) {
1226 final String mpId = snap.getMediaPackage().getIdentifier().toString();
1227 final String orgId = snap.getOrganizationId();
1228 final Version version = snap.getVersion();
1229 final String prettyMpId = mpId + "@v" + version;
1230 logger.debug("Moving assets for snapshot {} to store {}", prettyMpId, store.getStoreType());
1231 for (final MediaPackageElement e : snap.getMediaPackage().getElements()) {
1232 if (!MOVABLE_TYPES.contains(e.getElementType())) {
1233 logger.debug("Skipping {} because type is {}", e.getIdentifier(), e.getElementType());
1234 continue;
1235 }
1236 logger.debug("Moving {} to store {}", e.getIdentifier(), store.getStoreType());
1237 final StoragePath storagePath = StoragePath.mk(orgId, mpId, version, e.getIdentifier());
1238 if (store.contains(storagePath)) {
1239 logger.debug("Element {} (version {}) is already in store {} so skipping it", e.getIdentifier(),
1240 version, store.getStoreType());
1241 continue;
1242 }
1243
1244
1245 final Optional<StoragePath> existingAssetOpt =
1246 getDatabase()
1247 .findAssetByChecksumAndStoreAndOrg(e.getChecksum().toString(), store.getStoreType(), orgId)
1248 .map(dto -> StoragePath.mk(
1249 dto.getOrganizationId(),
1250 dto.getMediaPackageId(),
1251 dto.getVersion(),
1252 dto.getAssetDto().getMediaPackageElementId()
1253 ));
1254
1255 if (existingAssetOpt.isPresent()) {
1256 final StoragePath existingAsset = existingAssetOpt.get();
1257 logger.debug("Content of asset {} with checksum {} already exists in {}",
1258 existingAsset.getMediaPackageElementId(), e.getChecksum(), store.getStoreType());
1259 if (!store.copy(existingAsset, storagePath)) {
1260 throw new AssetManagerException(format(
1261 "An asset with checksum %s has already been archived but trying to copy or link asset %s to it "
1262 + "failed",
1263 e.getChecksum(),
1264 existingAsset
1265 ));
1266 }
1267 } else {
1268 final Optional<Long> size = e.getSize() > 0 ? Optional.of(e.getSize()) : Optional.empty();
1269 store.put(storagePath, Source.mk(e.getURI(), size, Optional.ofNullable(e.getMimeType())));
1270 }
1271 getDatabase().setAssetStorageLocation(VersionImpl.mk(version), mpId, e.getIdentifier(), store.getStoreType());
1272 }
1273 }
1274
1275 private void copyManifest(Snapshot snap, AssetStore targetStore) throws IOException, NotFoundException {
1276 final String mpId = snap.getMediaPackage().getIdentifier().toString();
1277 final String orgId = snap.getOrganizationId();
1278 final Version version = snap.getVersion();
1279
1280 AssetStore currentStore = getAssetStore(snap.getStorageId()).get();
1281 Optional<String> manifestOpt = findManifestBaseName(snap, MANIFEST_DEFAULT_NAME, currentStore);
1282 if (manifestOpt.isEmpty()) {
1283 return;
1284 }
1285
1286
1287 String manifestBaseName = manifestOpt.get();
1288 StoragePath pathToManifest = new StoragePath(orgId, mpId, version, manifestBaseName);
1289
1290
1291 if (!targetStore.contains(pathToManifest)) {
1292 Optional<InputStream> inputStreamOpt;
1293 InputStream inputStream = null;
1294 String manifestFileName = null;
1295 try {
1296 inputStreamOpt = currentStore.get(pathToManifest);
1297 if (inputStreamOpt.isEmpty()) {
1298 throw new NotFoundException(
1299 String.format("Unexpected error. Manifest %s not found in current asset store", manifestBaseName));
1300 }
1301
1302 inputStream = inputStreamOpt.get();
1303 manifestFileName = UUID.randomUUID() + ".xml";
1304 URI manifestTmpUri = workspace.putInCollection("archive", manifestFileName, inputStream);
1305 targetStore.put(pathToManifest, Source.mk(manifestTmpUri, Optional.empty(), Optional.of(MimeTypes.XML)));
1306 } finally {
1307 IOUtils.closeQuietly(inputStream);
1308 try {
1309
1310 workspace.deleteFromCollection("archive", manifestFileName);
1311 } catch (NotFoundException e) {
1312
1313 } catch (IOException e) {
1314
1315
1316
1317
1318 if (e.getMessage().contains(manifestFileName)) {
1319 logger.warn("The manifest file {} didn't get deleted from the archive collection",
1320 manifestBaseName, e);
1321 }
1322
1323 }
1324 }
1325 }
1326 }
1327
1328 Optional<String> findManifestBaseName(Snapshot snap, String manifestName, AssetStore store) {
1329 StoragePath path = new StoragePath(snap.getOrganizationId(), snap.getMediaPackage().getIdentifier().toString(),
1330 snap.getVersion(), manifestName);
1331
1332 if (!store.contains(path)) {
1333
1334 if (MANIFEST_DEFAULT_NAME.equals(manifestName)) {
1335 return Optional.empty();
1336 } else {
1337 return Optional.of(manifestName.substring(0, manifestName.length() - 1));
1338 }
1339 }
1340
1341 return findManifestBaseName(snap, manifestName + "_", store);
1342 }
1343
1344
1345
1346
1347
1348
1349 void calcChecksumsForMediaPackageElements(PartialMediaPackage pmp) {
1350 final Fx<MediaPackageElement> addChecksum = new Fx<MediaPackageElement>() {
1351 @Override public void apply(MediaPackageElement mpe) {
1352 File file = null;
1353 try {
1354 logger.trace("Calculate checksum for {}", mpe.getURI());
1355 file = workspace.get(mpe.getURI(), true);
1356 mpe.setChecksum(Checksum.create(ChecksumType.DEFAULT_TYPE, file));
1357 } catch (IOException | NotFoundException e) {
1358 throw new AssetManagerException(format(
1359 "Cannot calculate checksum for media package element %s",
1360 mpe.getURI()
1361 ), e);
1362 } finally {
1363 if (file != null) {
1364 FileUtils.deleteQuietly(file);
1365 }
1366 }
1367 }
1368 };
1369 pmp.getElements().filter(hasNoChecksum.toFn()).each(addChecksum).run();
1370 }
1371
1372
1373 private SnapshotDto addInternal(String owner, final MediaPackage mp) throws Exception {
1374 final Date now = new Date();
1375
1376 final String mpId = mp.getIdentifier().toString();
1377 final VersionImpl version = getDatabase().claimVersion(mpId);
1378 logger.info("Creating new version {} of media package {}", version, mp);
1379 final PartialMediaPackage pmp = assetsOnly(mp);
1380
1381 calcChecksumsForMediaPackageElements(pmp);
1382
1383 storeAssets(pmp, version);
1384
1385 final SnapshotDto snapshotDto;
1386 try {
1387
1388 Fn<MediaPackageElement, URI> uriCreator = new Fn<MediaPackageElement, URI>() {
1389 @Override
1390 public URI apply(MediaPackageElement mpe) {
1391 try {
1392 String fileName = getFileName(mpe).getOr("unknown");
1393 return new URI(
1394 "urn",
1395 "matterhorn:" + mpId + ":" + version + ":" + mpe.getIdentifier() + ":" + fileName,
1396 null
1397 );
1398 } catch (URISyntaxException e) {
1399 throw new AssetManagerException(e);
1400 }
1401 }
1402 };
1403
1404 for (MediaPackageElement mpe : pmp.getElements()) {
1405 mpe.setURI(uriCreator.apply(mpe));
1406 }
1407
1408 String currentOrgId = securityService.getOrganization().getId();
1409 snapshotDto = getDatabase().saveSnapshot(
1410 currentOrgId, pmp, now, version,
1411 Availability.ONLINE, getLocalAssetStore().getStoreType(), owner
1412 );
1413 } catch (AssetManagerException e) {
1414 logger.error("Could not take snapshot {}", mpId, e);
1415 throw new AssetManagerException(e);
1416 }
1417
1418
1419 storeManifest(pmp, version);
1420 return snapshotDto;
1421 }
1422
1423
1424
1425
1426 private void storeAssets(final PartialMediaPackage pmp, final Version version) {
1427 final String mpId = pmp.getMediaPackage().getIdentifier().toString();
1428 final String orgId = securityService.getOrganization().getId();
1429 for (final MediaPackageElement e : pmp.getElements()) {
1430 logger.debug("Archiving {} {} {}", e.getFlavor(), e.getMimeType(), e.getURI());
1431 final StoragePath storagePath = StoragePath.mk(orgId, mpId, version, e.getIdentifier());
1432
1433 final Optional<StoragePath> existingAssetOpt = getDatabase()
1434 .findAssetByChecksumAndStoreAndOrg(e.getChecksum().toString(), getLocalAssetStore().getStoreType(), orgId)
1435 .map(dto -> StoragePath.mk(
1436 dto.getOrganizationId(),
1437 dto.getMediaPackageId(),
1438 dto.getVersion(),
1439 dto.getAssetDto().getMediaPackageElementId()));
1440
1441 if (existingAssetOpt.isPresent()) {
1442 final StoragePath existingAsset = existingAssetOpt.get();
1443 logger.debug("Content of asset {} with checksum {} has been archived before",
1444 existingAsset.getMediaPackageElementId(), e.getChecksum());
1445 if (!getLocalAssetStore().copy(existingAsset, storagePath)) {
1446 throw new AssetManagerException(format(
1447 "An asset with checksum %s has already been archived but trying to copy or link asset %s to it "
1448 + "failed",
1449 e.getChecksum(),
1450 existingAsset
1451 ));
1452 }
1453 } else {
1454 final Optional<Long> size = e.getSize() > 0 ? Optional.of(e.getSize()) : Optional.empty();
1455 getLocalAssetStore().put(storagePath, Source.mk(e.getURI(), size, Optional.ofNullable(e.getMimeType())));
1456 }
1457 }
1458 }
1459
1460 private void storeManifest(final PartialMediaPackage pmp, final Version version) throws Exception {
1461 final String mpId = pmp.getMediaPackage().getIdentifier().toString();
1462 final String orgId = securityService.getOrganization().getId();
1463
1464
1465 logger.debug("Archiving manifest of media package {} version {}", mpId, version);
1466
1467
1468 final String manifestFileName = format("manifest_%s_%s.xml", pmp.getMediaPackage().getIdentifier(), version);
1469 final URI manifestTmpUri = workspace.putInCollection(
1470 "archive",
1471 manifestFileName,
1472 IOUtils.toInputStream(MediaPackageParser.getAsXml(pmp.getMediaPackage()), "UTF-8"));
1473 try {
1474 getLocalAssetStore().put(
1475 StoragePath.mk(orgId, mpId, version, manifestAssetId(pmp, "manifest")),
1476 Source.mk(manifestTmpUri, Optional.empty(), Optional.of(MimeTypes.XML)));
1477 } finally {
1478
1479 workspace.deleteFromCollection("archive", manifestFileName);
1480 }
1481 }
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492 private String manifestAssetId(PartialMediaPackage pmp, String seedId) {
1493 if ($(pmp.getElements()).map(getMediaPackageElementId.toFn()).exists(Booleans.eq(seedId))) {
1494 return manifestAssetId(pmp, seedId + "_");
1495 } else {
1496 return seedId;
1497 }
1498 }
1499
1500
1501
1502
1503
1504
1505
1506 static <A> A handleException(final P1<A> p) throws AssetManagerException {
1507 try {
1508 return p.get1();
1509 } catch (Exception e) {
1510 logger.error("An error occurred", e);
1511 throw unwrapExceptionUntil(AssetManagerException.class, e).orElse(new AssetManagerException(e));
1512 }
1513 }
1514
1515
1516
1517
1518
1519 static <A extends Throwable> Optional<A> unwrapExceptionUntil(Class<A> type, Throwable e) {
1520 if (e == null) {
1521 return Optional.empty();
1522 } else if (type.isAssignableFrom(e.getClass())) {
1523 return Optional.of((A) e);
1524 } else {
1525 return unwrapExceptionUntil(type, e.getCause());
1526 }
1527 }
1528
1529
1530
1531
1532
1533 static PartialMediaPackage assetsOnly(MediaPackage mp) {
1534 final Pred<MediaPackageElement> isAsset = Pred.mk(isNotPublication.toFn());
1535 return PartialMediaPackage.mk(mp, isAsset);
1536 }
1537
1538
1539
1540
1541
1542
1543 public static Optional<String> getFileNameFromUrn(MediaPackageElement mpe) {
1544 Fn<URI, String> toString = new Fn<URI, String>() {
1545 @Override
1546 public String apply(URI uri) {
1547 return uri.toString();
1548 }
1549 };
1550
1551 Optional<URI> uri = Optional.ofNullable(mpe.getURI());
1552 if (uri.isPresent() && "urn".equals(uri.get().getScheme())) {
1553 String[] tmp = uri.get().toString().split(":");
1554 if (tmp.length < 1) {
1555 return Optional.empty();
1556 }
1557 return Optional.of(tmp[tmp.length - 1]);
1558 }
1559 return Optional.empty();
1560 }
1561
1562
1563
1564
1565
1566 public static Snapshot rewriteUris(Snapshot snapshot, Fn<MediaPackageElement, URI> uriCreator) {
1567 final MediaPackage mpCopy = MediaPackageSupport.copy(snapshot.getMediaPackage());
1568 for (final MediaPackageElement mpe : assetsOnly(mpCopy).getElements()) {
1569 mpe.setURI(uriCreator.apply(mpe));
1570 }
1571 return new SnapshotImpl(
1572 snapshot.getVersion(),
1573 snapshot.getOrganizationId(),
1574 snapshot.getArchivalDate(),
1575 snapshot.getAvailability(),
1576 snapshot.getStorageId(),
1577 snapshot.getOwner(),
1578 mpCopy);
1579 }
1580
1581 public void fireEventHandlers(AssetManagerItem item) {
1582 while (handlers.size() != 2) {
1583 logger.warn("Expecting 2 handlers, but {} are registered. Waiting 10s then retrying...", handlers.size());
1584 try {
1585 Thread.sleep(10000L);
1586 } catch (InterruptedException e) { }
1587 }
1588 for (AssetManagerUpdateHandler handler : handlers) {
1589 handler.execute(item);
1590 }
1591 }
1592
1593
1594
1595
1596
1597
1598 private final class ADeleteQueryWithMessaging extends ADeleteQueryDecorator {
1599 ADeleteQueryWithMessaging(ADeleteQuery delegate) {
1600 super(delegate);
1601 }
1602
1603 @Override
1604 public long run() {
1605 return RuntimeTypes.convert(delegate).run(AssetManagerImpl.this);
1606 }
1607
1608 @Override
1609 protected ADeleteQueryDecorator mkDecorator(ADeleteQuery delegate) {
1610 return new ADeleteQueryWithMessaging(delegate);
1611 }
1612 }
1613
1614
1615
1616
1617
1618
1619 private Function<Optional<Event>, Optional<Event>> getEventUpdateFunction(Snapshot snapshot,
1620 String orgId, User user) {
1621 return (Optional<Event> eventOpt) -> {
1622 MediaPackage mp = snapshot.getMediaPackage();
1623 String eventId = mp.getIdentifier().toString();
1624 Event event = eventOpt.orElse(new Event(eventId, orgId));
1625
1626 event = updateAclInEvent(event, mp, eventId);
1627
1628 event.setArchiveVersion(Long.parseLong(snapshot.getVersion().toString()));
1629 if (StringUtils.isBlank(event.getCreator())) {
1630 event.setCreator(securityService.getUser().getName());
1631 }
1632 EventIndexUtils.updateEvent(event, mp);
1633
1634 for (Catalog catalog: mp.getCatalogs(MediaPackageElements.EPISODE)) {
1635 try (InputStream in = workspace.read(catalog.getURI())) {
1636 EventIndexUtils.updateEvent(event, DublinCores.read(in));
1637 } catch (IOException | NotFoundException e) {
1638 throw new IllegalStateException(String.format("Unable to load dublin core catalog for event '%s'",
1639 mp.getIdentifier()), e);
1640 }
1641 }
1642
1643
1644 try {
1645 EventIndexUtils.updateSeriesName(event, orgId, user, index);
1646 } catch (SearchIndexException e) {
1647 logger.error("Error updating the series name of the event {} in the {} index.", eventId, index.getIndexName(),
1648 e);
1649 }
1650 return Optional.of(event);
1651 };
1652 }
1653
1654 private Function<Optional<Event>, Optional<Event>> getEventUpdateFunctionOnlyAcl(Snapshot snapshot,
1655 String orgId, User user) {
1656 return (Optional<Event> eventOpt) -> {
1657 MediaPackage mp = snapshot.getMediaPackage();
1658 String eventId = mp.getIdentifier().toString();
1659 Event event = eventOpt.orElse(new Event(eventId, orgId));
1660
1661 event = updateAclInEvent(event, mp, eventId);
1662
1663 return Optional.of(event);
1664 };
1665 }
1666
1667 private Event updateAclInEvent(Event event, MediaPackage mp, String eventId) {
1668 AccessControlList acl = authorizationService.getActiveAcl(mp).getA();
1669 List<ManagedAcl> acls = aclServiceFactory.serviceFor(securityService.getOrganization()).getAcls();
1670
1671 for (final ManagedAcl managedAcl : AccessInformationUtil.matchAcls(acls, acl)) {
1672 event.setManagedAcl(managedAcl.getName());
1673 }
1674 event.setAccessPolicy(AccessControlParser.toJsonSilent(acl));
1675
1676 return event;
1677 }
1678 }