AssetManagerImpl.java
/*
* Licensed to The Apereo Foundation under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional
* information regarding copyright ownership.
*
*
* The Apereo Foundation licenses this file to you under the Educational
* Community License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License
* at:
*
* http://opensource.org/licenses/ecl2.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.opencastproject.assetmanager.impl;
import static com.entwinemedia.fn.Prelude.chuck;
import static com.entwinemedia.fn.Stream.$;
import static java.lang.String.format;
import static org.opencastproject.assetmanager.api.fn.Enrichments.enrich;
import static org.opencastproject.mediapackage.MediaPackageSupport.Filters.hasNoChecksum;
import static org.opencastproject.mediapackage.MediaPackageSupport.Filters.isNotPublication;
import static org.opencastproject.mediapackage.MediaPackageSupport.getFileName;
import static org.opencastproject.mediapackage.MediaPackageSupport.getMediaPackageElementId;
import static org.opencastproject.security.api.SecurityConstants.EPISODE_ROLE_ID_PREFIX;
import static org.opencastproject.security.api.SecurityConstants.GLOBAL_ADMIN_ROLE;
import static org.opencastproject.security.api.SecurityConstants.GLOBAL_CAPTURE_AGENT_ROLE;
import static org.opencastproject.security.util.SecurityUtil.getEpisodeRoleId;
import org.opencastproject.assetmanager.api.Asset;
import org.opencastproject.assetmanager.api.AssetId;
import org.opencastproject.assetmanager.api.AssetManager;
import org.opencastproject.assetmanager.api.AssetManagerException;
import org.opencastproject.assetmanager.api.Availability;
import org.opencastproject.assetmanager.api.Property;
import org.opencastproject.assetmanager.api.PropertyId;
import org.opencastproject.assetmanager.api.Snapshot;
import org.opencastproject.assetmanager.api.Value;
import org.opencastproject.assetmanager.api.Version;
import org.opencastproject.assetmanager.api.fn.Enrichments;
import org.opencastproject.assetmanager.api.query.ADeleteQuery;
import org.opencastproject.assetmanager.api.query.AQueryBuilder;
import org.opencastproject.assetmanager.api.query.ARecord;
import org.opencastproject.assetmanager.api.query.AResult;
import org.opencastproject.assetmanager.api.query.ASelectQuery;
import org.opencastproject.assetmanager.api.query.Predicate;
import org.opencastproject.assetmanager.api.query.RichAResult;
import org.opencastproject.assetmanager.api.query.Target;
import org.opencastproject.assetmanager.api.storage.AssetStore;
import org.opencastproject.assetmanager.api.storage.DeletionSelector;
import org.opencastproject.assetmanager.api.storage.RemoteAssetStore;
import org.opencastproject.assetmanager.api.storage.Source;
import org.opencastproject.assetmanager.api.storage.StoragePath;
import org.opencastproject.assetmanager.impl.persistence.Database;
import org.opencastproject.assetmanager.impl.persistence.SnapshotDto;
import org.opencastproject.assetmanager.impl.query.AQueryBuilderImpl;
import org.opencastproject.assetmanager.impl.query.AbstractADeleteQuery;
import org.opencastproject.authorization.xacml.manager.api.AclServiceFactory;
import org.opencastproject.authorization.xacml.manager.api.ManagedAcl;
import org.opencastproject.authorization.xacml.manager.util.AccessInformationUtil;
import org.opencastproject.db.DBSessionFactory;
import org.opencastproject.elasticsearch.api.SearchIndexException;
import org.opencastproject.elasticsearch.index.ElasticsearchIndex;
import org.opencastproject.elasticsearch.index.objects.event.Event;
import org.opencastproject.elasticsearch.index.objects.event.EventIndexUtils;
import org.opencastproject.elasticsearch.index.rebuild.AbstractIndexProducer;
import org.opencastproject.elasticsearch.index.rebuild.IndexProducer;
import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildException;
import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService;
import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService.DataType;
import org.opencastproject.mediapackage.Catalog;
import org.opencastproject.mediapackage.MediaPackage;
import org.opencastproject.mediapackage.MediaPackageElement;
import org.opencastproject.mediapackage.MediaPackageElements;
import org.opencastproject.mediapackage.MediaPackageParser;
import org.opencastproject.mediapackage.MediaPackageSupport;
import org.opencastproject.message.broker.api.assetmanager.AssetManagerItem;
import org.opencastproject.message.broker.api.update.AssetManagerUpdateHandler;
import org.opencastproject.metadata.dublincore.DublinCores;
import org.opencastproject.metadata.dublincore.EventCatalogUIAdapter;
import org.opencastproject.security.api.AccessControlEntry;
import org.opencastproject.security.api.AccessControlList;
import org.opencastproject.security.api.AccessControlParser;
import org.opencastproject.security.api.AuthorizationService;
import org.opencastproject.security.api.DefaultOrganization;
import org.opencastproject.security.api.Organization;
import org.opencastproject.security.api.OrganizationDirectoryService;
import org.opencastproject.security.api.Role;
import org.opencastproject.security.api.SecurityService;
import org.opencastproject.security.api.UnauthorizedException;
import org.opencastproject.security.api.User;
import org.opencastproject.security.util.SecurityUtil;
import org.opencastproject.util.Checksum;
import org.opencastproject.util.ChecksumType;
import org.opencastproject.util.MimeTypes;
import org.opencastproject.util.NotFoundException;
import org.opencastproject.util.RequireUtil;
import org.opencastproject.util.data.functions.Functions;
import org.opencastproject.workspace.api.Workspace;
import com.entwinemedia.fn.Fn;
import com.entwinemedia.fn.Fx;
import com.entwinemedia.fn.P1;
import com.entwinemedia.fn.P1Lazy;
import com.entwinemedia.fn.Pred;
import com.entwinemedia.fn.Prelude;
import com.entwinemedia.fn.fns.Booleans;
import com.google.common.collect.Sets;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.persistence.EntityManagerFactory;
/**
* The Asset Manager implementation.
*/
@Component(
property = {
"service.description=Opencast Asset Manager"
},
immediate = true,
service = { AssetManager.class, IndexProducer.class }
)
public class AssetManagerImpl extends AbstractIndexProducer implements AssetManager,
AbstractADeleteQuery.DeleteEpisodeHandler {
private static final Logger logger = LoggerFactory.getLogger(AssetManagerImpl.class);
private static final int PAGE_SIZE = 1000;
enum AdminRole {
GLOBAL, ORGANIZATION, NONE
}
public static final String WRITE_ACTION = "write";
public static final String READ_ACTION = "read";
public static final String SECURITY_NAMESPACE = "org.opencastproject.assetmanager.security";
private static final String MANIFEST_DEFAULT_NAME = "manifest";
private static final String CONFIG_EPISODE_ID_ROLE = "org.opencastproject.episode.id.role.access";
private static boolean episodeIdRole = false;
private CopyOnWriteArrayList<AssetManagerUpdateHandler> handlers = new CopyOnWriteArrayList<>();
private SecurityService securityService;
private AuthorizationService authorizationService;
private OrganizationDirectoryService orgDir;
private Workspace workspace;
private AssetStore assetStore;
private HttpAssetProvider httpAssetProvider;
private String systemUserName;
private Database db;
private DBSessionFactory dbSessionFactory;
private EntityManagerFactory emf;
private AclServiceFactory aclServiceFactory;
private ElasticsearchIndex index;
private Map<String, List<EventCatalogUIAdapter>> extendedEventCatalogUIAdapters = new HashMap<>();
// Settings for role filter
private boolean includeAPIRoles;
private boolean includeCARoles;
private boolean includeUIRoles;
public static final Set<MediaPackageElement.Type> MOVABLE_TYPES = Sets.newHashSet(
MediaPackageElement.Type.Attachment,
MediaPackageElement.Type.Catalog,
MediaPackageElement.Type.Track
);
private final HashMap<String, RemoteAssetStore> remoteStores = new LinkedHashMap<>();
/**
* OSGi callback.
*/
@Activate
public synchronized void activate(ComponentContext cc) {
logger.info("Activating AssetManager.");
db = new Database(dbSessionFactory.createSession(emf));
systemUserName = SecurityUtil.getSystemUserName(cc);
includeAPIRoles = BooleanUtils.toBoolean(Objects.toString(cc.getProperties().get("includeAPIRoles"), null));
includeCARoles = BooleanUtils.toBoolean(Objects.toString(cc.getProperties().get("includeCARoles"), null));
includeUIRoles = BooleanUtils.toBoolean(Objects.toString(cc.getProperties().get("includeUIRoles"), null));
episodeIdRole = BooleanUtils.toBoolean(Objects.toString(
cc.getBundleContext().getProperty(CONFIG_EPISODE_ID_ROLE), "false"));
logger.debug("Usage of episode ID roles is set to {}", episodeIdRole);
}
/**
* OSGi dependencies
*/
@Reference(target = "(osgi.unit.name=org.opencastproject.assetmanager.impl)")
public void setEntityManagerFactory(EntityManagerFactory emf) {
this.emf = emf;
}
@Reference
public void setDBSessionFactory(DBSessionFactory dbSessionFactory) {
this.dbSessionFactory = dbSessionFactory;
}
@Reference
public void setSecurityService(SecurityService securityService) {
this.securityService = securityService;
}
@Reference
public void setAuthorizationService(AuthorizationService authorizationService) {
this.authorizationService = authorizationService;
}
@Reference
public void setOrgDir(OrganizationDirectoryService orgDir) {
this.orgDir = orgDir;
}
@Reference
public void setWorkspace(Workspace workspace) {
this.workspace = workspace;
}
@Reference
public void setAssetStore(AssetStore assetStore) {
this.assetStore = assetStore;
}
@Reference(
cardinality = ReferenceCardinality.MULTIPLE,
policy = ReferencePolicy.DYNAMIC,
unbind = "removeEventHandler"
)
public void addEventHandler(AssetManagerUpdateHandler handler) {
this.handlers.add(handler);
}
public void removeEventHandler(AssetManagerUpdateHandler handler) {
this.handlers.remove(handler);
}
@Reference(
cardinality = ReferenceCardinality.MULTIPLE,
policy = ReferencePolicy.DYNAMIC,
unbind = "removeRemoteAssetStore"
)
public synchronized void addRemoteAssetStore(RemoteAssetStore assetStore) {
remoteStores.put(assetStore.getStoreType(), assetStore);
}
public void removeRemoteAssetStore(RemoteAssetStore store) {
remoteStores.remove(store.getStoreType());
}
@Reference
public void setHttpAssetProvider(HttpAssetProvider httpAssetProvider) {
this.httpAssetProvider = httpAssetProvider;
}
@Reference
public void setAclServiceFactory(AclServiceFactory aclServiceFactory) {
this.aclServiceFactory = aclServiceFactory;
}
@Reference
public void setIndex(ElasticsearchIndex index) {
this.index = index;
}
@Reference(cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC,
target = "(common-metadata=false)")
public synchronized void addCatalogUIAdapter(EventCatalogUIAdapter catalogUIAdapter) {
List<EventCatalogUIAdapter> list = extendedEventCatalogUIAdapters.computeIfAbsent(
catalogUIAdapter.getOrganization(), k -> new ArrayList());
list.add(catalogUIAdapter);
}
public synchronized void removeCatalogUIAdapter(EventCatalogUIAdapter catalogUIAdapter) {
if (extendedEventCatalogUIAdapters.containsKey(catalogUIAdapter.getOrganization())) {
extendedEventCatalogUIAdapters.get(catalogUIAdapter.getOrganization()).remove(catalogUIAdapter);
}
}
/**
* AssetManager implementation
*/
@Override
public Optional<MediaPackage> getMediaPackage(String mediaPackageId) {
final AQueryBuilder q = createQuery();
final AResult r = q.select(q.snapshot()).where(q.mediaPackageId(mediaPackageId).and(q.version().isLatest()))
.run();
if (r.getSize() == 0) {
return Optional.empty();
}
return Optional.of(r.getRecords().stream().findFirst().get().getSnapshot().get().getMediaPackage());
}
@Override
public Optional<Asset> getAsset(Version version, String mpId, String mpElementId) {
if (isAuthorized(mpId, READ_ACTION)) {
// try to fetch the asset
var asset = getDatabase().getAsset(RuntimeTypes.convert(version), mpId, mpElementId);
if (asset.isPresent()) {
var storageId = getSnapshotStorageLocation(version, mpId);
if (storageId.isPresent()) {
var store = getAssetStore(storageId.get());
if (store.isPresent()) {
var assetStream = store.get().get(StoragePath.mk(
asset.get().getOrganizationId(),
mpId,
version,
mpElementId
));
if (assetStream.isPresent()) {
Checksum checksum = null;
try {
checksum = Checksum.fromString(asset.get().getAssetDto().getChecksum());
} catch (NoSuchAlgorithmException e) {
logger.warn("Invalid checksum for asset {} of media package {}", mpElementId, mpId, e);
}
final Asset a = new AssetImpl(
AssetId.mk(version, mpId, mpElementId),
assetStream.get(),
asset.get().getAssetDto().getMimeType(),
asset.get().getAssetDto().getSize(),
asset.get().getStorageId(),
asset.get().getAvailability(),
checksum);
return Optional.of(a);
}
}
}
}
return Optional.empty();
}
return chuck(new UnauthorizedException(
format("Not allowed to read assets of snapshot %s, version=%s", mpId, version)
));
}
@Override
public Optional<AssetStore> getAssetStore(String storeId) {
if (assetStore.getStoreType().equals(storeId)) {
return Optional.of(assetStore);
} else {
if (remoteStores.containsKey(storeId)) {
return Optional.of(remoteStores.get(storeId));
} else {
return Optional.empty();
}
}
}
@Override
public AssetStore getLocalAssetStore() {
return assetStore;
}
@Override
public List<AssetStore> getRemoteAssetStores() {
return new ArrayList<>(remoteStores.values());
}
/** Snapshots */
@Override
public boolean snapshotExists(final String mediaPackageId) {
return getDatabase().snapshotExists(mediaPackageId);
}
@Override
public boolean snapshotExists(final String mediaPackageId, final String organization) {
return getDatabase().snapshotExists(mediaPackageId, organization);
}
@Override
public Snapshot takeSnapshot(MediaPackage mp) {
return takeSnapshot(null, mp);
}
@Override
public Snapshot takeSnapshot(String owner, MediaPackage mp) {
final String mediaPackageId = mp.getIdentifier().toString();
final boolean firstSnapshot = !snapshotExists(mediaPackageId);
// Allow this if:
// - no previous snapshot exists
// - the user has write access to the previous snapshot
if (firstSnapshot) {
// if it's the first snapshot, ensure that old, leftover properties are removed
deleteProperties(mediaPackageId);
}
if (firstSnapshot || isAuthorized(mediaPackageId, WRITE_ACTION)) {
final Snapshot snapshot;
if (owner == null) {
snapshot = takeSnapshotInternal(mp);
} else {
snapshot = takeSnapshotInternal(owner, mp);
}
final AccessControlList acl = authorizationService.getActiveAcl(mp).getA();
// store acl as properties
// Drop old ACL rules
deleteProperties(mediaPackageId, SECURITY_NAMESPACE);
// Set new ACL rules
for (final AccessControlEntry ace : acl.getEntries()) {
getDatabase().saveProperty(Property.mk(PropertyId.mk(mediaPackageId, SECURITY_NAMESPACE,
mkPropertyName(ace.getRole(), ace.getAction())), Value.mk(ace.isAllow())));
}
updateEventInIndex(snapshot);
logger.info("Trigger update handlers for snapshot {}, version {}",
snapshot.getMediaPackage().getIdentifier(), snapshot.getVersion());
fireEventHandlers(mkTakeSnapshotMessage(snapshot));
return snapshot;
}
return chuck(new UnauthorizedException("Not allowed to take snapshot of media package " + mediaPackageId));
}
private Snapshot takeSnapshotInternal(MediaPackage mediaPackage) {
final String mediaPackageId = mediaPackage.getIdentifier().toString();
AQueryBuilder queryBuilder = createQuery();
AResult result = queryBuilder.select(queryBuilder.snapshot())
.where(queryBuilder.mediaPackageId(mediaPackageId).and(queryBuilder.version().isLatest())).run();
Optional<ARecord> record = result.getRecords().stream().findFirst();
if (record.isPresent()) {
Optional<Snapshot> snapshot = Optional.of(record.get().getSnapshot().get());
if (snapshot.isPresent()) {
return takeSnapshotInternal(snapshot.get().getOwner(), mediaPackage);
}
}
return takeSnapshotInternal(DEFAULT_OWNER, mediaPackage);
}
private Snapshot takeSnapshotInternal(final String owner, final MediaPackage mp) {
return handleException(new P1Lazy<Snapshot>() {
@Override public Snapshot get1() {
try {
final Snapshot archived = addInternal(owner, MediaPackageSupport.copy(mp)).toSnapshot();
return getHttpAssetProvider().prepareForDelivery(archived);
} catch (Exception e) {
return Prelude.chuck(e);
}
}
});
}
/**
* Create a {@link AssetManagerItem.TakeSnapshot} message.
* <p>
* Do not call outside of a security context.
*/
private AssetManagerItem.TakeSnapshot mkTakeSnapshotMessage(Snapshot snapshot) {
final MediaPackage mp = snapshot.getMediaPackage();
long version;
try {
version = Long.parseLong(snapshot.getVersion().toString());
} catch (NumberFormatException e) {
// The index requires a version to be a long value.
// Since the asset manager default implementation uses long values that should be not a problem.
// However, a decent exception message is helpful if a different implementation of the asset manager
// is used.
throw new RuntimeException("The current implementation of the index requires versions being of type 'long'.");
}
return AssetManagerItem.add(workspace, mp, authorizationService.getActiveAcl(mp).getA(),
version, snapshot.getArchivalDate());
}
@Override
public void triggerIndexUpdate(String mediaPackageId) throws NotFoundException, UnauthorizedException {
if (!securityService.getUser().hasRole("ROLE_ADMIN")) {
throw new UnauthorizedException("Only global administrators may trigger manual event updates.");
}
final AQueryBuilder q = createQuery();
final AResult r = q.select(q.snapshot()).where(q.mediaPackageId(mediaPackageId).and(q.version().isLatest())).run();
if (r.getSize() == 0) {
throw new NotFoundException("No event with ID `" + mediaPackageId + "`");
}
// Update event index with latest snapshot
var snapshot = r.getRecords().stream().findFirst().get().getSnapshot().get();
updateEventInIndex(snapshot);
}
/**
* Update the event in the Elasticsearch index.
*
* @param snapshot
* The newest snapshot of the event to update
*/
private void updateEventInIndex(Snapshot snapshot) {
final MediaPackage mp = snapshot.getMediaPackage();
String eventId = mp.getIdentifier().toString();
final String organization = securityService.getOrganization().getId();
final User user = securityService.getUser();
logger.debug("Updating event {} in the {} index.", eventId, index.getIndexName());
Function<Optional<Event>, Optional<Event>> updateFunction = (Optional<Event> eventOpt) -> {
Event event = eventOpt.orElse(new Event(eventId, organization));
AccessControlList acl = authorizationService.getActiveAcl(mp).getA();
List<ManagedAcl> acls = aclServiceFactory.serviceFor(securityService.getOrganization()).getAcls();
for (final ManagedAcl managedAcl : AccessInformationUtil.matchAcls(acls, acl)) {
event.setManagedAcl(managedAcl.getName());
}
event.setAccessPolicy(AccessControlParser.toJsonSilent(acl));
event.setArchiveVersion(Long.parseLong(snapshot.getVersion().toString()));
if (StringUtils.isBlank(event.getCreator())) {
event.setCreator(securityService.getUser().getName());
}
EventIndexUtils.updateEvent(event, mp);
// common metadata
for (Catalog catalog: mp.getCatalogs(MediaPackageElements.EPISODE)) {
try (InputStream in = workspace.read(catalog.getURI())) {
EventIndexUtils.updateEvent(event, DublinCores.read(in));
} catch (IOException | NotFoundException e) {
throw new IllegalStateException(String.format("Unable to load common dublin core catalog for event '%s'",
mp.getIdentifier()), e);
}
}
// extended metadata
event.resetExtendedMetadata(); // getting rid of old data
for (EventCatalogUIAdapter extendedCatalogUIAdapter : extendedEventCatalogUIAdapters.getOrDefault(organization,
Collections.emptyList())) {
for (Catalog catalog: mp.getCatalogs(extendedCatalogUIAdapter.getFlavor())) {
try (InputStream in = workspace.read(catalog.getURI())) {
EventIndexUtils.updateEventExtendedMetadata(event, DublinCores.read(in),
extendedCatalogUIAdapter.getFlavor());
} catch (IOException | NotFoundException e) {
throw new IllegalStateException(String.format("Unable to load extended dublin core catalog '%s' for event "
+ "'%s'", catalog.getFlavor(), mp.getIdentifier()), e);
}
}
}
// Update series name if not already done
try {
EventIndexUtils.updateSeriesName(event, organization, user, index);
} catch (SearchIndexException e) {
logger.error("Error updating the series name of the event {} in the {} index.", eventId, index.getIndexName(),
e);
}
return Optional.of(event);
};
// Persist the scheduling event
try {
index.addOrUpdateEvent(eventId, updateFunction, organization, user);
logger.debug("Event {} updated in the {} index.", eventId, index.getIndexName());
} catch (SearchIndexException e) {
logger.error("Error updating the event {} in the {} index.", eventId, index.getIndexName(), e);
}
}
/**
* Remove the event from the Elasticsearch index
*
* @param eventId
* The id of the event to remove
*/
private void removeArchivedVersionFromIndex(String eventId) {
final String orgId = securityService.getOrganization().getId();
final User user = securityService.getUser();
logger.debug("Received AssetManager delete episode message {}", eventId);
Function<Optional<Event>, Optional<Event>> updateFunction = (Optional<Event> eventOpt) -> {
if (eventOpt.isEmpty()) {
logger.warn("Event {} not found for deletion", eventId);
return Optional.empty();
}
Event event = eventOpt.get();
event.setArchiveVersion(null);
return Optional.of(event);
};
try {
index.addOrUpdateEvent(eventId, updateFunction, orgId, user);
logger.debug("Event {} removed from the {} index", eventId, index.getIndexName());
} catch (SearchIndexException e) {
logger.error("Error deleting the event {} from the {} index.", eventId, index.getIndexName(), e);
}
}
@Override
public RichAResult getSnapshotsById(final String mpId) {
RequireUtil.requireNotBlank(mpId, "mpId");
AQueryBuilder q = createQuery();
ASelectQuery query = baseQuery(q, mpId);
return Enrichments.enrich(query.run());
}
@Override
public RichAResult getSnapshotsByIdOrderedByVersion(String mpId, boolean asc) {
RequireUtil.requireNotBlank(mpId, "mpId");
AQueryBuilder q = createQuery();
ASelectQuery query = baseQuery(q, mpId);
if (asc) {
query = query.orderBy(q.version().asc());
} else {
query = query.orderBy(q.version().desc());
}
return Enrichments.enrich(query.run());
}
@Override
public RichAResult getSnapshotsByIdAndVersion(final String mpId, final Version version) {
RequireUtil.requireNotBlank(mpId, "mpId");
RequireUtil.notNull(version, "version");
AQueryBuilder q = createQuery();
ASelectQuery query = baseQuery(q, version, mpId);
return Enrichments.enrich(query.run());
}
@Override
public RichAResult getSnapshotsByDate(final Date start, final Date end) {
RequireUtil.notNull(start, "start");
RequireUtil.notNull(end, "end");
AQueryBuilder q = createQuery();
ASelectQuery query = baseQuery(q).where(q.archived().ge(start)).where(q.archived().le(end));
return Enrichments.enrich(query.run());
}
@Override
public RichAResult getSnapshotsByDateOrderedById(Date start, Date end) {
RequireUtil.notNull(start, "start");
RequireUtil.notNull(end, "end");
AQueryBuilder q = createQuery();
ASelectQuery query = baseQuery(q).where(q.archived().ge(start)).where(q.archived().le(end));
return Enrichments.enrich(query.orderBy(q.mediapackageId().asc()).run());
}
@Override
public RichAResult getSnapshotsByIdAndDate(final String mpId, final Date start, final Date end) {
RequireUtil.requireNotBlank(mpId, "mpId");
RequireUtil.notNull(start, "start");
RequireUtil.notNull(end, "end");
AQueryBuilder q = createQuery();
ASelectQuery query = baseQuery(q, mpId).where(q.archived().ge(start)).where(q.archived().le(end));
return Enrichments.enrich(query.run());
}
@Override
public RichAResult getSnapshotsByIdAndDateOrderedByVersion(String mpId, Date start, Date end, boolean asc) {
RequireUtil.requireNotBlank(mpId, "mpId");
RequireUtil.notNull(start, "start");
RequireUtil.notNull(end, "end");
AQueryBuilder q = createQuery();
ASelectQuery query = baseQuery(q, mpId).where(q.archived().ge(start)).where(q.archived().le(end));
if (asc) {
query = query.orderBy(q.version().asc());
} else {
query = query.orderBy(q.version().desc());
}
return Enrichments.enrich(query.run());
}
@Override
public void moveSnapshotsById(final String mpId, final String targetStore) throws NotFoundException {
RichAResult results = getSnapshotsById(mpId);
if (results.getRecords().isEmpty()) {
throw new NotFoundException("Mediapackage " + mpId + " not found!");
}
processOperations(results, targetStore);
}
@Override
public void moveSnapshotsByIdAndVersion(final String mpId, final Version version, final String targetStore)
throws NotFoundException {
RichAResult results = getSnapshotsByIdAndVersion(mpId, version);
if (results.getRecords().isEmpty()) {
throw new NotFoundException("Mediapackage " + mpId + "@" + version.toString() + " not found!");
}
processOperations(results, targetStore);
}
@Override
public void moveSnapshotsByDate(final Date start, final Date end, final String targetStore)
throws NotFoundException {
// We don't use #getSnapshotsByDate() as this includes also all snapshots already in targetStore. On large installs
// this could lead to memory overflow.
AQueryBuilder q = createQuery();
ASelectQuery query = baseQuery(q)
.where(q.storage(targetStore).not())
.where(q.archived().ge(start))
.where(q.archived().le(end));
RichAResult results = Enrichments.enrich(query.run());
if (results.getRecords().isEmpty()) {
throw new NotFoundException("No media packages found between " + start + " and " + end);
}
processOperations(results, targetStore);
}
@Override
public void moveSnapshotsByIdAndDate(final String mpId, final Date start, final Date end, final String targetStore)
throws NotFoundException {
RichAResult results = getSnapshotsByIdAndDate(mpId, start, end);
if (results.getRecords().isEmpty()) {
throw new NotFoundException("No media package with id " + mpId + " found between " + start + " and " + end);
}
processOperations(results, targetStore);
}
@Override
public void moveSnapshotToStore(final Version version, final String mpId, final String storeId)
throws NotFoundException {
//Find the snapshot
AQueryBuilder q = createQuery();
RichAResult results = Enrichments.enrich(baseQuery(q, version, mpId).run());
if (results.getRecords().isEmpty()) {
throw new NotFoundException("Mediapackage " + mpId + "@" + version.toString() + " not found!");
}
processOperations(results, storeId);
}
//Do the actual moving
private void processOperations(final RichAResult results, final String targetStoreId) {
results.getRecords().forEach(record -> {
Snapshot s = record.getSnapshot().get();
Optional<String> currentStoreId = getSnapshotStorageLocation(s);
if (currentStoreId.isEmpty()) {
logger.warn("IsNone store ID");
return;
}
//If this snapshot is already stored in the desired store
if (currentStoreId.get().equals(targetStoreId)) {
//return, since we don't need to move anything
return;
}
AssetStore currentStore;
AssetStore targetStore;
Optional<AssetStore> optCurrentStore = getAssetStore(currentStoreId.get());
Optional<AssetStore> optTargetStore = getAssetStore(targetStoreId);
if (!optCurrentStore.isEmpty()) {
currentStore = optCurrentStore.get();
} else {
logger.error("Unknown current store: " + currentStoreId.get());
return;
}
if (!optTargetStore.isEmpty()) {
targetStore = optTargetStore.get();
} else {
logger.error("Unknown target store: " + targetStoreId);
return;
}
//If the content is already local, or is moving from a remote to the local
// Returns true if the store id is equal to the local asset store's id
String localAssetStoreType = getLocalAssetStore().getStoreType();
if (localAssetStoreType.equals(currentStoreId.get()) || localAssetStoreType.equals(targetStoreId)) {
logger.debug("Moving {} from {} to {}", s, currentStoreId, targetStoreId);
try {
copyAssetsToStore(s, targetStore);
copyManifest(s, targetStore);
} catch (Exception e) {
Functions.chuck(e);
}
getDatabase().setStorageLocation(s, targetStoreId);
currentStore.delete(DeletionSelector.delete(s.getOrganizationId(),
s.getMediaPackage().getIdentifier().toString(), s.getVersion()
));
} else {
//Else, the content is *not* local and is going to a *different* remote
String intermediateStore = getLocalAssetStore().getStoreType();
logger.debug("Moving {} from {} to {}, then to {}",
s, currentStoreId, intermediateStore, targetStoreId);
Version version = s.getVersion();
String mpId = s.getMediaPackage().getIdentifier().toString();
try {
moveSnapshotToStore(version, mpId, intermediateStore);
moveSnapshotToStore(version, mpId, targetStoreId);
} catch (NotFoundException e) {
Functions.chuck(e);
}
}
});
}
// Return the asset store ID that is currently storing the snapshot
public Optional<String> getSnapshotStorageLocation(final Version version, final String mpId) {
RichAResult result = getSnapshotsByIdAndVersion(mpId, version);
for (Snapshot snapshot : result.getSnapshots()) {
return Optional.of(snapshot.getStorageId());
}
logger.error("Mediapackage " + mpId + "@" + version + " not found!");
return Optional.empty();
}
public Optional<String> getSnapshotStorageLocation(final Snapshot snap) {
return getSnapshotStorageLocation(snap.getVersion(), snap.getMediaPackage().getIdentifier().toString());
}
/** Properties */
@Override
public boolean setProperty(Property property) {
final String mpId = property.getId().getMediaPackageId();
if (isAuthorized(mpId, WRITE_ACTION)) {
return getDatabase().saveProperty(property);
}
return chuck(new UnauthorizedException("Not allowed to set property on episode " + mpId));
}
@Override
public List<Property> selectProperties(final String mediaPackageId, String namespace) {
if (isAuthorized(mediaPackageId, READ_ACTION)) {
return getDatabase().selectProperties(mediaPackageId, namespace);
}
return chuck(new UnauthorizedException(format("Not allowed to read properties of event %s", mediaPackageId)));
}
@Override
public int deleteProperties(final String mediaPackageId) {
return getDatabase().deleteProperties(mediaPackageId);
}
@Override
public int deleteProperties(final String mediaPackageId, final String namespace) {
return getDatabase().deleteProperties(mediaPackageId, namespace);
}
/** Misc. */
@Override
public AQueryBuilder createQuery() {
return new AQueryBuilderDecorator(createQueryWithoutSecurityCheck()) {
@Override public ASelectQuery select(Target... target) {
switch (isAdmin()) {
case GLOBAL:
return super.select(target);
case ORGANIZATION:
return super.select(target).where(restrictToUsersOrganization());
default:
return super.select(target).where(mkAuthPredicate(READ_ACTION));
}
}
@Override public ADeleteQuery delete(String owner, Target target) {
switch (isAdmin()) {
case GLOBAL:
return super.delete(owner, target);
case ORGANIZATION:
return super.delete(owner, target).where(restrictToUsersOrganization());
default:
return super.delete(owner, target).where(mkAuthPredicate(WRITE_ACTION));
}
}
};
}
private AQueryBuilder createQueryWithoutSecurityCheck() {
return new AQueryBuilderDecorator(new AQueryBuilderImpl(this)) {
@Override
public ADeleteQuery delete(String owner, Target target) {
return new ADeleteQueryWithMessaging(super.delete(owner, target));
}
};
}
@Override
public Optional<Version> toVersion(String version) {
try {
return Optional.of(VersionImpl.mk(Long.parseLong(version)));
} catch (NumberFormatException e) {
return Optional.empty();
}
}
@Override
public long countEvents(final String organization) {
return getDatabase().countEvents(organization);
}
@Override
public void handleDeletedEpisode(String mpId) {
logger.info("Firing event handlers for deleting event {}", mpId);
fireEventHandlers(AssetManagerItem.deleteEpisode(mpId, new Date()));
removeArchivedVersionFromIndex(mpId);
}
/**
* AbstractIndexProducer Implementation
*/
@Override
public IndexRebuildService.Service getService() {
return IndexRebuildService.Service.AssetManager;
}
@Override
public DataType[] getSupportedDataTypes() {
return new DataType[]{ DataType.ALL, DataType.ACL };
}
@Override
public void repopulate(DataType dataType) throws IndexRebuildException {
final Organization originalOrg = securityService.getOrganization();
final User originalUser = (originalOrg != null ? securityService.getUser() : null);
try {
final Organization defaultOrg = new DefaultOrganization();
final User defaultSystemUser = SecurityUtil.createSystemUser(systemUserName, defaultOrg);
securityService.setOrganization(defaultOrg);
securityService.setUser(defaultSystemUser);
int offset = 0;
int total = (int) countEvents(null);
final AQueryBuilder q = createQuery();
RichAResult r;
int current = 0;
logIndexRebuildBegin(logger, total, "snapshot(s)");
var updatedEventRange = new ArrayList<Event>();
do {
r = enrich(q.select(q.snapshot()).where(q.version().isLatest()).orderBy(q.mediapackageId().desc())
.page(offset, PAGE_SIZE).run());
offset += PAGE_SIZE;
int n = 20;
final Map<String, List<Snapshot>> byOrg = r.getSnapshots().stream()
.collect(Collectors.groupingBy(Snapshot::getOrganizationId));
for (String orgId : byOrg.keySet()) {
final Organization snapshotOrg;
try {
snapshotOrg = orgDir.getOrganization(orgId);
User snapshotSystemUser = SecurityUtil.createSystemUser(systemUserName, snapshotOrg);
securityService.setOrganization(snapshotOrg);
securityService.setUser(snapshotSystemUser);
for (Snapshot snapshot : byOrg.get(orgId)) {
try {
current++;
var updatedEventData = index.getEvent(snapshot.getMediaPackage().getIdentifier().toString(), orgId,
snapshotSystemUser);
if (dataType == DataType.ALL) {
// Reindex everything (default)
updatedEventData = getEventUpdateFunction(snapshot, orgId, snapshotSystemUser)
.apply(updatedEventData);
} else if (dataType == DataType.ACL) {
// Only reindex ACLs
updatedEventData = getEventUpdateFunctionOnlyAcl(snapshot, orgId, snapshotSystemUser)
.apply(updatedEventData);
} else {
throw new IndexRebuildException(dataType + " is not a supported data type. "
+ "Accepted values are " + Arrays.toString(getSupportedDataTypes()) + ".");
}
updatedEventRange.add(updatedEventData.get());
if (updatedEventRange.size() >= n || current >= total) {
index.bulkEventUpdate(updatedEventRange);
logIndexRebuildProgress(logger, total, current, n);
updatedEventRange.clear();
}
} catch (Throwable t) {
logSkippingElement(logger, "event", snapshot.getMediaPackage().getIdentifier().toString(),
snapshotOrg, t);
}
}
} catch (Throwable t) {
logIndexRebuildError(logger, t, originalOrg);
throw new IndexRebuildException(getService(), originalOrg, t);
} finally {
securityService.setOrganization(defaultOrg);
securityService.setUser(defaultSystemUser);
}
}
} while (offset < total);
} finally {
securityService.setOrganization(originalOrg);
securityService.setUser(originalUser);
}
}
/**
* Used for testing
*/
public void setAvailability(Version version, String mpId, Availability availability) {
if (isAuthorized(mpId, WRITE_ACTION)) {
getDatabase().setAvailability(RuntimeTypes.convert(version), mpId, availability);
} else {
chuck(new UnauthorizedException("Not allowed to set availability of episode " + mpId));
}
}
public void setDatabase(Database database) {
this.db = database;
}
public Database getDatabase() {
return db;
}
public HttpAssetProvider getHttpAssetProvider() {
return httpAssetProvider;
}
/*
* Security handling
*/
/**
* Create an authorization predicate to be used with {@link #isAuthorized(String, String)},
* restricting access to the user's organization and the given action.
*
* @param action
* the action to restrict access to
*/
private Predicate mkAuthPredicate(final String action) {
final AQueryBuilder q = createQueryWithoutSecurityCheck();
return securityService.getUser().getRoles().stream()
.filter(roleFilter)
.map((role) -> {
if (episodeIdRole && role.getName().startsWith(EPISODE_ROLE_ID_PREFIX)) {
return q.mediapackageId().eq(StringUtils.substringBetween(
role.getName(), EPISODE_ROLE_ID_PREFIX + "_", "_"));
} else {
return q.property(Value.BOOLEAN, SECURITY_NAMESPACE, mkPropertyName(role.getName(), action)).eq(true);
}
})
.reduce(Predicate::or)
.orElseGet(() -> q.always().not())
.and(restrictToUsersOrganization());
}
/** Create a predicate that restricts access to the user's organization. */
private Predicate restrictToUsersOrganization() {
return createQueryWithoutSecurityCheck().organizationId().eq(securityService.getUser().getOrganization().getId());
}
/** Check authorization based on the given predicate. */
private boolean isAuthorized(final String mediaPackageId, final String action) {
switch (isAdmin()) {
case GLOBAL:
// grant general access
logger.debug("Access granted since user is global admin");
return true;
case ORGANIZATION:
// ensure that the requested assets belong to this organization
logger.debug("User is organization admin. Checking organization. Checking organization ID of asset.");
return snapshotExists(mediaPackageId, securityService.getOrganization().getId());
default:
// check organization
logger.debug("Non admin user. Checking organization.");
final String org = securityService.getOrganization().getId();
if (!snapshotExists(mediaPackageId, org)) {
return false;
}
// check episode role id
User user = securityService.getUser();
if (episodeIdRole && user.hasRole(getEpisodeRoleId(mediaPackageId, action))) {
return true;
}
// check acl rules
logger.debug("Non admin user. Checking ACL rules.");
final List<String> roles = user.getRoles().parallelStream()
.filter(roleFilter)
.map((role) -> mkPropertyName(role.getName(), action))
.collect(Collectors.toList());
return getDatabase().selectProperties(mediaPackageId, SECURITY_NAMESPACE).parallelStream()
.map(p -> p.getId().getName())
.filter(p -> p.endsWith(action))
.anyMatch(p -> roles.stream().anyMatch(r -> r.equals(p)));
}
}
private AdminRole isAdmin() {
final User user = securityService.getUser();
if (user.hasRole(GLOBAL_ADMIN_ROLE)) {
return AdminRole.GLOBAL;
} else if (user.hasRole(securityService.getOrganization().getAdminRole())
|| user.hasRole(GLOBAL_CAPTURE_AGENT_ROLE)) {
// In this context, we treat capture agents the same way as organization admins, allowing them access so that
// they can ingest new media without requiring them to be explicitly specified in the ACLs.
return AdminRole.ORGANIZATION;
} else {
return AdminRole.NONE;
}
}
private String mkPropertyName(String role, String action) {
return role + " | " + action;
}
/**
* Configurable filter for roles
*/
private final java.util.function.Predicate<Role> roleFilter = (role) -> {
final String name = role.getName();
return (includeAPIRoles || !name.startsWith("ROLE_API_"))
&& (includeCARoles || !name.startsWith("ROLE_CAPTURE_AGENT_"))
&& (includeUIRoles || !name.startsWith("ROLE_UI_"));
};
/*
* Utility
*/
/**
* Return a basic query which returns the snapshot and its current storage location
*
* @param q
* The query builder object to configure
* @return
* The {@link ASelectQuery} configured with as described above
*/
private ASelectQuery baseQuery(final AQueryBuilder q) {
RequireUtil.notNull(q, "q");
return q.select(q.snapshot());
}
/**
* Return a mediapackage filtered query which returns the snapshot and its current storage location
*
* @param q
* The query builder object to configure
* @param mpId
* The mediapackage ID to filter results for
* @return
* The {@link ASelectQuery} configured with as described above
*/
private ASelectQuery baseQuery(final AQueryBuilder q, final String mpId) {
RequireUtil.notNull(q, "q");
ASelectQuery query = baseQuery(q);
if (StringUtils.isNotEmpty(mpId)) {
return query.where(q.mediaPackageId(mpId));
} else {
return query;
}
}
/**
* Return a mediapackage and version filtered query which returns the snapshot and its current storage location
*
* @param q
* The query builder object to configure
* @param version
* The version to filter results for
* @param mpId
* The mediapackage ID to filter results for
* @return
* The {@link ASelectQuery} configured with as described above
*/
private ASelectQuery baseQuery(final AQueryBuilder q, final Version version, final String mpId) {
RequireUtil.notNull(q, "q");
RequireUtil.requireNotBlank(mpId, "mpId");
ASelectQuery query = baseQuery(q, mpId);
if (null != version) {
return query.where(q.version().eq(version));
} else {
return query;
}
}
/** Move the assets for a snapshot to the target store */
private void copyAssetsToStore(Snapshot snap, AssetStore store) {
final String mpId = snap.getMediaPackage().getIdentifier().toString();
final String orgId = snap.getOrganizationId();
final Version version = snap.getVersion();
final String prettyMpId = mpId + "@v" + version;
logger.debug("Moving assets for snapshot {} to store {}", prettyMpId, store.getStoreType());
for (final MediaPackageElement e : snap.getMediaPackage().getElements()) {
if (!MOVABLE_TYPES.contains(e.getElementType())) {
logger.debug("Skipping {} because type is {}", e.getIdentifier(), e.getElementType());
continue;
}
logger.debug("Moving {} to store {}", e.getIdentifier(), store.getStoreType());
final StoragePath storagePath = StoragePath.mk(orgId, mpId, version, e.getIdentifier());
if (store.contains(storagePath)) {
logger.debug("Element {} (version {}) is already in store {} so skipping it", e.getIdentifier(),
version, store.getStoreType());
continue;
}
// find asset in versions & stores
final Optional<StoragePath> existingAssetOpt =
getDatabase()
.findAssetByChecksumAndStoreAndOrg(e.getChecksum().toString(), store.getStoreType(), orgId)
.map(dto -> StoragePath.mk(
dto.getOrganizationId(),
dto.getMediaPackageId(),
dto.getVersion(),
dto.getAssetDto().getMediaPackageElementId()
));
if (existingAssetOpt.isPresent()) {
final StoragePath existingAsset = existingAssetOpt.get();
logger.debug("Content of asset {} with checksum {} already exists in {}",
existingAsset.getMediaPackageElementId(), e.getChecksum(), store.getStoreType());
if (!store.copy(existingAsset, storagePath)) {
throw new AssetManagerException(format(
"An asset with checksum %s has already been archived but trying to copy or link asset %s to it "
+ "failed",
e.getChecksum(),
existingAsset
));
}
} else {
final Optional<Long> size = e.getSize() > 0 ? Optional.of(e.getSize()) : Optional.empty();
store.put(storagePath, Source.mk(e.getURI(), size, Optional.ofNullable(e.getMimeType())));
}
getDatabase().setAssetStorageLocation(VersionImpl.mk(version), mpId, e.getIdentifier(), store.getStoreType());
}
}
private void copyManifest(Snapshot snap, AssetStore targetStore) throws IOException, NotFoundException {
final String mpId = snap.getMediaPackage().getIdentifier().toString();
final String orgId = snap.getOrganizationId();
final Version version = snap.getVersion();
AssetStore currentStore = getAssetStore(snap.getStorageId()).get();
Optional<String> manifestOpt = findManifestBaseName(snap, MANIFEST_DEFAULT_NAME, currentStore);
if (manifestOpt.isEmpty()) {
return; // Nothing to do, already moved to long-term storage
}
// Copy the manifest file
String manifestBaseName = manifestOpt.get();
StoragePath pathToManifest = new StoragePath(orgId, mpId, version, manifestBaseName);
// Already copied?
if (!targetStore.contains(pathToManifest)) {
Optional<InputStream> inputStreamOpt;
InputStream inputStream = null;
String manifestFileName = null;
try {
inputStreamOpt = currentStore.get(pathToManifest);
if (inputStreamOpt.isEmpty()) { // This should never happen because it has been tested before
throw new NotFoundException(
String.format("Unexpected error. Manifest %s not found in current asset store", manifestBaseName));
}
inputStream = inputStreamOpt.get();
manifestFileName = UUID.randomUUID() + ".xml";
URI manifestTmpUri = workspace.putInCollection("archive", manifestFileName, inputStream);
targetStore.put(pathToManifest, Source.mk(manifestTmpUri, Optional.empty(), Optional.of(MimeTypes.XML)));
} finally {
IOUtils.closeQuietly(inputStream);
try {
// Make sure to clean up the temporary file
workspace.deleteFromCollection("archive", manifestFileName);
} catch (NotFoundException e) {
// This is OK, we are deleting it anyway
} catch (IOException e) {
// This usually happens when the collection directory cannot be deleted
// because another process is running at the same time and wrote a file there
// after it was tested but before it was actually deleted. We will consider this ok.
// Does the error message mention the manifest file name?
if (e.getMessage().contains(manifestFileName)) {
logger.warn("The manifest file {} didn't get deleted from the archive collection",
manifestBaseName, e);
}
// Else the error is related to the file-archive collection, which is fine
}
}
}
}
Optional<String> findManifestBaseName(Snapshot snap, String manifestName, AssetStore store) {
StoragePath path = new StoragePath(snap.getOrganizationId(), snap.getMediaPackage().getIdentifier().toString(),
snap.getVersion(), manifestName);
// If manifest_.xml, etc not found, return previous name (copied from the EpsiodeServiceImpl logic)
if (!store.contains(path)) {
// If first call, manifest is not found, which probably means it has already been moved
if (MANIFEST_DEFAULT_NAME.equals(manifestName)) {
return Optional.empty(); // No manifest found in current store
} else {
return Optional.of(manifestName.substring(0, manifestName.length() - 1));
}
}
// This is the same logic as when building the manifest name: manifest, manifest_, manifest__, etc
return findManifestBaseName(snap, manifestName + "_", store);
}
/* -------------------------------------------------------------------------------------------------------------- */
/**
* Make sure each of the elements has a checksum.
*/
void calcChecksumsForMediaPackageElements(PartialMediaPackage pmp) {
final Fx<MediaPackageElement> addChecksum = new Fx<MediaPackageElement>() {
@Override public void apply(MediaPackageElement mpe) {
File file = null;
try {
logger.trace("Calculate checksum for {}", mpe.getURI());
file = workspace.get(mpe.getURI(), true);
mpe.setChecksum(Checksum.create(ChecksumType.DEFAULT_TYPE, file));
} catch (IOException | NotFoundException e) {
throw new AssetManagerException(format(
"Cannot calculate checksum for media package element %s",
mpe.getURI()
), e);
} finally {
if (file != null) {
FileUtils.deleteQuietly(file);
}
}
}
};
pmp.getElements().filter(hasNoChecksum.toFn()).each(addChecksum).run();
}
/** Mutates mp and its elements, so make sure to work on a copy. */
private SnapshotDto addInternal(String owner, final MediaPackage mp) throws Exception {
final Date now = new Date();
// claim a new version for the media package
final String mpId = mp.getIdentifier().toString();
final VersionImpl version = getDatabase().claimVersion(mpId);
logger.info("Creating new version {} of media package {}", version, mp);
final PartialMediaPackage pmp = assetsOnly(mp);
// make sure they have a checksum
calcChecksumsForMediaPackageElements(pmp);
// download and archive elements
storeAssets(pmp, version);
// store mediapackage in db
final SnapshotDto snapshotDto;
try {
// rewrite URIs for archival
Fn<MediaPackageElement, URI> uriCreator = new Fn<MediaPackageElement, URI>() {
@Override
public URI apply(MediaPackageElement mpe) {
try {
String fileName = getFileName(mpe).getOr("unknown");
return new URI(
"urn",
"matterhorn:" + mpId + ":" + version + ":" + mpe.getIdentifier() + ":" + fileName,
null
);
} catch (URISyntaxException e) {
throw new AssetManagerException(e);
}
}
};
for (MediaPackageElement mpe : pmp.getElements()) {
mpe.setURI(uriCreator.apply(mpe));
}
String currentOrgId = securityService.getOrganization().getId();
snapshotDto = getDatabase().saveSnapshot(
currentOrgId, pmp, now, version,
Availability.ONLINE, getLocalAssetStore().getStoreType(), owner
);
} catch (AssetManagerException e) {
logger.error("Could not take snapshot {}", mpId, e);
throw new AssetManagerException(e);
}
// save manifest to element store
// this is done at the end after the media package element ids have been rewritten to neutral URNs
storeManifest(pmp, version);
return snapshotDto;
}
/**
* Store all elements of <code>pmp</code> under the given version.
*/
private void storeAssets(final PartialMediaPackage pmp, final Version version) {
final String mpId = pmp.getMediaPackage().getIdentifier().toString();
final String orgId = securityService.getOrganization().getId();
for (final MediaPackageElement e : pmp.getElements()) {
logger.debug("Archiving {} {} {}", e.getFlavor(), e.getMimeType(), e.getURI());
final StoragePath storagePath = StoragePath.mk(orgId, mpId, version, e.getIdentifier());
// find asset in versions
final Optional<StoragePath> existingAssetOpt = getDatabase()
.findAssetByChecksumAndStoreAndOrg(e.getChecksum().toString(), getLocalAssetStore().getStoreType(), orgId)
.map(dto -> StoragePath.mk(
dto.getOrganizationId(),
dto.getMediaPackageId(),
dto.getVersion(),
dto.getAssetDto().getMediaPackageElementId()));
if (existingAssetOpt.isPresent()) {
final StoragePath existingAsset = existingAssetOpt.get();
logger.debug("Content of asset {} with checksum {} has been archived before",
existingAsset.getMediaPackageElementId(), e.getChecksum());
if (!getLocalAssetStore().copy(existingAsset, storagePath)) {
throw new AssetManagerException(format(
"An asset with checksum %s has already been archived but trying to copy or link asset %s to it "
+ "failed",
e.getChecksum(),
existingAsset
));
}
} else {
final Optional<Long> size = e.getSize() > 0 ? Optional.of(e.getSize()) : Optional.empty();
getLocalAssetStore().put(storagePath, Source.mk(e.getURI(), size, Optional.ofNullable(e.getMimeType())));
}
}
}
private void storeManifest(final PartialMediaPackage pmp, final Version version) throws Exception {
final String mpId = pmp.getMediaPackage().getIdentifier().toString();
final String orgId = securityService.getOrganization().getId();
// store the manifest.xml
// TODO make use of checksums
logger.debug("Archiving manifest of media package {} version {}", mpId, version);
// temporarily save the manifest XML into the workspace to
// Fix file not found exception when several snapshots are taken at the same time
final String manifestFileName = format("manifest_%s_%s.xml", pmp.getMediaPackage().getIdentifier(), version);
final URI manifestTmpUri = workspace.putInCollection(
"archive",
manifestFileName,
IOUtils.toInputStream(MediaPackageParser.getAsXml(pmp.getMediaPackage()), "UTF-8"));
try {
getLocalAssetStore().put(
StoragePath.mk(orgId, mpId, version, manifestAssetId(pmp, "manifest")),
Source.mk(manifestTmpUri, Optional.empty(), Optional.of(MimeTypes.XML)));
} finally {
// make sure to clean up the temporary file
workspace.deleteFromCollection("archive", manifestFileName);
}
}
/**
* Create a unique id for the manifest xml. This is to avoid an id collision
* in the rare case that the media package contains an XML element with the id
* used for the manifest. A UUID could also be used but this is far less
* readable.
*
* @param seedId
* the id to start with
*/
private String manifestAssetId(PartialMediaPackage pmp, String seedId) {
if ($(pmp.getElements()).map(getMediaPackageElementId.toFn()).exists(Booleans.eq(seedId))) {
return manifestAssetId(pmp, seedId + "_");
} else {
return seedId;
}
}
/* --------------------------------------------------------------------------------------------------------------- */
/**
* Unify exception handling by wrapping any occurring exception in an
* {@link AssetManagerException}.
*/
static <A> A handleException(final P1<A> p) throws AssetManagerException {
try {
return p.get1();
} catch (Exception e) {
logger.error("An error occurred", e);
throw unwrapExceptionUntil(AssetManagerException.class, e).orElse(new AssetManagerException(e));
}
}
/**
* Walk up the stacktrace to find a cause of type <code>type</code>. Return none if no such
* type can be found.
*/
static <A extends Throwable> Optional<A> unwrapExceptionUntil(Class<A> type, Throwable e) {
if (e == null) {
return Optional.empty();
} else if (type.isAssignableFrom(e.getClass())) {
return Optional.of((A) e);
} else {
return unwrapExceptionUntil(type, e.getCause());
}
}
/**
* Return a partial media package filtering assets. Assets are elements the archive is going to manager, i.e. all
* non-publication elements.
*/
static PartialMediaPackage assetsOnly(MediaPackage mp) {
final Pred<MediaPackageElement> isAsset = Pred.mk(isNotPublication.toFn());
return PartialMediaPackage.mk(mp, isAsset);
}
/**
* Extract the file name from a media package elements URN.
*
* @return the file name or none if it could not be determined
*/
public static Optional<String> getFileNameFromUrn(MediaPackageElement mpe) {
Fn<URI, String> toString = new Fn<URI, String>() {
@Override
public String apply(URI uri) {
return uri.toString();
}
};
Optional<URI> uri = Optional.ofNullable(mpe.getURI());
if (uri.isPresent() && "urn".equals(uri.get().getScheme())) {
String[] tmp = uri.get().toString().split(":");
if (tmp.length < 1) {
return Optional.empty();
}
return Optional.of(tmp[tmp.length - 1]);
}
return Optional.empty();
}
/**
* Rewrite URIs of all asset elements of a snapshot's media package.
* This method does not mutate anything.
*/
public static Snapshot rewriteUris(Snapshot snapshot, Fn<MediaPackageElement, URI> uriCreator) {
final MediaPackage mpCopy = MediaPackageSupport.copy(snapshot.getMediaPackage());
for (final MediaPackageElement mpe : assetsOnly(mpCopy).getElements()) {
mpe.setURI(uriCreator.apply(mpe));
}
return new SnapshotImpl(
snapshot.getVersion(),
snapshot.getOrganizationId(),
snapshot.getArchivalDate(),
snapshot.getAvailability(),
snapshot.getStorageId(),
snapshot.getOwner(),
mpCopy);
}
public void fireEventHandlers(AssetManagerItem item) {
while (handlers.size() != 2) {
logger.warn("Expecting 2 handlers, but {} are registered. Waiting 10s then retrying...", handlers.size());
try {
Thread.sleep(10000L);
} catch (InterruptedException e) { /* swallow this, nothing to do */ }
}
for (AssetManagerUpdateHandler handler : handlers) {
handler.execute(item);
}
}
/**
* Call {@link
* org.opencastproject.assetmanager.impl.query.AbstractADeleteQuery#run(AbstractADeleteQuery.DeleteEpisodeHandler)}
* with a delete handler. Also make sure to propagate the behaviour to subsequent instances.
*/
private final class ADeleteQueryWithMessaging extends ADeleteQueryDecorator {
ADeleteQueryWithMessaging(ADeleteQuery delegate) {
super(delegate);
}
@Override
public long run() {
return RuntimeTypes.convert(delegate).run(AssetManagerImpl.this);
}
@Override
protected ADeleteQueryDecorator mkDecorator(ADeleteQuery delegate) {
return new ADeleteQueryWithMessaging(delegate);
}
}
/**
* Get the function to update a commented event in the Elasticsearch index.
*
* @return the function to do the update
*/
private Function<Optional<Event>, Optional<Event>> getEventUpdateFunction(Snapshot snapshot,
String orgId, User user) {
return (Optional<Event> eventOpt) -> {
MediaPackage mp = snapshot.getMediaPackage();
String eventId = mp.getIdentifier().toString();
Event event = eventOpt.orElse(new Event(eventId, orgId));
event = updateAclInEvent(event, mp, eventId);
event.setArchiveVersion(Long.parseLong(snapshot.getVersion().toString()));
if (StringUtils.isBlank(event.getCreator())) {
event.setCreator(securityService.getUser().getName());
}
EventIndexUtils.updateEvent(event, mp);
for (Catalog catalog: mp.getCatalogs(MediaPackageElements.EPISODE)) {
try (InputStream in = workspace.read(catalog.getURI())) {
EventIndexUtils.updateEvent(event, DublinCores.read(in));
} catch (IOException | NotFoundException e) {
throw new IllegalStateException(String.format("Unable to load dublin core catalog for event '%s'",
mp.getIdentifier()), e);
}
}
// Update series name if not already done
try {
EventIndexUtils.updateSeriesName(event, orgId, user, index);
} catch (SearchIndexException e) {
logger.error("Error updating the series name of the event {} in the {} index.", eventId, index.getIndexName(),
e);
}
return Optional.of(event);
};
}
private Function<Optional<Event>, Optional<Event>> getEventUpdateFunctionOnlyAcl(Snapshot snapshot,
String orgId, User user) {
return (Optional<Event> eventOpt) -> {
MediaPackage mp = snapshot.getMediaPackage();
String eventId = mp.getIdentifier().toString();
Event event = eventOpt.orElse(new Event(eventId, orgId));
event = updateAclInEvent(event, mp, eventId);
return Optional.of(event);
};
}
private Event updateAclInEvent(Event event, MediaPackage mp, String eventId) {
AccessControlList acl = authorizationService.getActiveAcl(mp).getA();
List<ManagedAcl> acls = aclServiceFactory.serviceFor(securityService.getOrganization()).getAcls();
for (final ManagedAcl managedAcl : AccessInformationUtil.matchAcls(acls, acl)) {
event.setManagedAcl(managedAcl.getName());
}
event.setAccessPolicy(AccessControlParser.toJsonSilent(acl));
return event;
}
}