LiveScheduleServiceImpl.java
/*
* Licensed to The Apereo Foundation under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional
* information regarding copyright ownership.
*
*
* The Apereo Foundation licenses this file to you under the Educational
* Community License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License
* at:
*
* http://opensource.org/licenses/ecl2.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.opencastproject.liveschedule.impl;
import org.opencastproject.assetmanager.api.AssetManager;
import org.opencastproject.assetmanager.api.Snapshot;
import org.opencastproject.assetmanager.api.Version;
import org.opencastproject.assetmanager.api.query.AQueryBuilder;
import org.opencastproject.assetmanager.api.query.ARecord;
import org.opencastproject.assetmanager.api.query.AResult;
import org.opencastproject.capture.admin.api.CaptureAgentStateService;
import org.opencastproject.distribution.api.DistributionException;
import org.opencastproject.distribution.api.DownloadDistributionService;
import org.opencastproject.job.api.Job;
import org.opencastproject.job.api.JobBarrier;
import org.opencastproject.liveschedule.api.LiveScheduleException;
import org.opencastproject.liveschedule.api.LiveScheduleService;
import org.opencastproject.mediapackage.Attachment;
import org.opencastproject.mediapackage.MediaPackage;
import org.opencastproject.mediapackage.MediaPackageElement;
import org.opencastproject.mediapackage.MediaPackageElementBuilder;
import org.opencastproject.mediapackage.MediaPackageElementBuilderFactory;
import org.opencastproject.mediapackage.MediaPackageElementFlavor;
import org.opencastproject.mediapackage.MediaPackageElementParser;
import org.opencastproject.mediapackage.MediaPackageElements;
import org.opencastproject.mediapackage.Publication;
import org.opencastproject.mediapackage.PublicationImpl;
import org.opencastproject.mediapackage.Track;
import org.opencastproject.mediapackage.VideoStream;
import org.opencastproject.mediapackage.selector.SimpleElementSelector;
import org.opencastproject.mediapackage.track.TrackImpl;
import org.opencastproject.mediapackage.track.VideoStreamImpl;
import org.opencastproject.metadata.dublincore.DCMIPeriod;
import org.opencastproject.metadata.dublincore.DublinCore;
import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
import org.opencastproject.metadata.dublincore.DublinCoreCatalogService;
import org.opencastproject.metadata.dublincore.EncodingSchemeUtils;
import org.opencastproject.search.api.SearchService;
import org.opencastproject.security.api.AccessControlList;
import org.opencastproject.security.api.AclScope;
import org.opencastproject.security.api.AuthorizationService;
import org.opencastproject.security.api.Organization;
import org.opencastproject.security.api.OrganizationDirectoryService;
import org.opencastproject.security.api.SecurityService;
import org.opencastproject.security.api.UnauthorizedException;
import org.opencastproject.security.api.User;
import org.opencastproject.security.util.SecurityUtil;
import org.opencastproject.series.api.SeriesService;
import org.opencastproject.serviceregistry.api.ServiceRegistry;
import org.opencastproject.util.MimeTypes;
import org.opencastproject.util.NotFoundException;
import org.opencastproject.util.UrlSupport;
import org.opencastproject.workspace.api.Workspace;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.Equator;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.utils.URIUtils;
import org.osgi.framework.BundleContext;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Dictionary;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@Component(
immediate = true,
service = LiveScheduleService.class,
property = {
"service.description=Live Schedule Service"
}
)
public class LiveScheduleServiceImpl implements LiveScheduleService {
/** The server url property **/
static final String SERVER_URL_PROPERTY = "org.opencastproject.server.url";
/** The engage base url property **/
static final String ENGAGE_URL_PROPERTY = "org.opencastproject.engage.ui.url";
/** The default path to the player **/
static final String PLAYER_PATH = "/play/";
/** Default values for configuration options */
private static final String DEFAULT_STREAM_MIME_TYPE = "video/mp4";
private static final String DEFAULT_STREAM_RESOLUTION = "1920x1080";
private static final String DEFAULT_STREAM_NAME = "live-stream";
private static final String DEFAULT_LIVE_TARGET_FLAVORS = "presenter/delivery";
static final String DEFAULT_LIVE_DISTRIBUTION_SERVICE = "download";
// Deactivating checkstyle to preserve the long URL
// CHECKSTYLE:OFF
// If the capture agent registered this property, we expect to get a resolution and
// a url in the following format:
// capture.device.live.resolution.WIDTHxHEIGHT=COMPLETE_STREAMING_URL e.g.
// capture.device.live.resolution.960x270=rtmp://cp398121.live.edgefcs.net/live/dev-epiphan005-2-presenter-delivery.stream-960x270_1_200@355694
public static final String CA_PROPERTY_RESOLUTION_URL_PREFIX = "capture.device.live.resolution.";
// CHECKSTYLE:ON
/** Variables that can be replaced in stream name */
public static final String REPLACE_ID = "id";
public static final String REPLACE_FLAVOR = "flavor";
public static final String REPLACE_CA_NAME = "caName";
public static final String REPLACE_RESOLUTION = "resolution";
public static final String LIVE_STREAMING_URL = "live.streamingUrl";
public static final String LIVE_STREAM_NAME = "live.streamName";
public static final String LIVE_STREAM_MIME_TYPE = "live.mimeType";
public static final String LIVE_STREAM_RESOLUTION = "live.resolution";
public static final String LIVE_TARGET_FLAVORS = "live.targetFlavors";
public static final String LIVE_DISTRIBUTION_SERVICE = "live.distributionService";
public static final String LIVE_PUBLISH_STREAMING = "live.publishStreaming";
private static final MediaPackageElementFlavor[] publishFlavors = { MediaPackageElements.EPISODE,
MediaPackageElements.SERIES, MediaPackageElements.XACML_POLICY_EPISODE,
MediaPackageElements.XACML_POLICY_SERIES }; // make configurable later
/** The logger */
private static final Logger logger = LoggerFactory.getLogger(LiveScheduleServiceImpl.class);
private String liveStreamingUrl;
private String streamName;
private String streamMimeType;
private String[] streamResolution;
private MediaPackageElementFlavor[] liveFlavors;
private String serverUrl;
private Cache<String, Version> snapshotVersionCache
= CacheBuilder.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build();
/** Which streaming formats should be published automatically */
private List<String> publishedStreamingFormats = null;
private String systemUserName;
/** Services */
private DownloadDistributionService downloadDistributionService; // to distribute episode and series catalogs
private SearchService searchService; // to publish/retract live media package
private SeriesService seriesService; // to get series metadata
private DublinCoreCatalogService dublinCoreService; // to setialize dc catalogs
private CaptureAgentStateService captureAgentService; // to get agent capabilities
private ServiceRegistry serviceRegistry; // to create publish/retract jobs
private Workspace workspace; // to save dc catalogs before distributing
private AssetManager assetManager; // to get current media package
private AuthorizationService authService;
private OrganizationDirectoryService organizationService;
private SecurityService securityService;
private long jobPollingInterval = JobBarrier.DEFAULT_POLLING_INTERVAL;
private SimpleElementSelector publishElementSelector;
/**
* OSGi callback on component activation.
*
* @param context
* the component context
*/
@Activate
protected void activate(ComponentContext context) {
BundleContext bundleContext = context.getBundleContext();
serverUrl = StringUtils.trimToNull(bundleContext.getProperty(SERVER_URL_PROPERTY));
if (serverUrl == null) {
logger.warn("Server url was not set in '{}'", SERVER_URL_PROPERTY);
} else {
logger.info("Server url is {}", serverUrl);
}
systemUserName = bundleContext.getProperty(SecurityUtil.PROPERTY_KEY_SYS_USER);
@SuppressWarnings("rawtypes")
Dictionary properties = context.getProperties();
if (!StringUtils.isBlank((String) properties.get(LIVE_STREAMING_URL))) {
liveStreamingUrl = StringUtils.trimToEmpty((String) properties.get(LIVE_STREAMING_URL));
logger.info("Live streaming server url is {}", liveStreamingUrl);
} else {
logger.info("Live streaming url not set in '{}'. Streaming urls must be provided by capture agent properties.",
LIVE_STREAMING_URL);
}
if (!StringUtils.isBlank((String) properties.get(LIVE_STREAM_NAME))) {
streamName = StringUtils.trimToEmpty((String) properties.get(LIVE_STREAM_NAME));
} else {
streamName = DEFAULT_STREAM_NAME;
}
if (!StringUtils.isBlank((String) properties.get(LIVE_STREAM_MIME_TYPE))) {
streamMimeType = StringUtils.trimToEmpty((String) properties.get(LIVE_STREAM_MIME_TYPE));
} else {
streamMimeType = DEFAULT_STREAM_MIME_TYPE;
}
String resolution = null;
if (!StringUtils.isBlank((String) properties.get(LIVE_STREAM_RESOLUTION))) {
resolution = StringUtils.trimToEmpty((String) properties.get(LIVE_STREAM_RESOLUTION));
} else {
resolution = DEFAULT_STREAM_RESOLUTION;
}
streamResolution = resolution.split(",");
String flavors = null;
if (!StringUtils.isBlank((String) properties.get(LIVE_TARGET_FLAVORS))) {
flavors = StringUtils.trimToEmpty((String) properties.get(LIVE_TARGET_FLAVORS));
} else {
flavors = DEFAULT_LIVE_TARGET_FLAVORS;
}
String[] flavorArray = StringUtils.split(flavors, ",");
liveFlavors = new MediaPackageElementFlavor[flavorArray.length];
int i = 0;
for (String f : flavorArray) {
liveFlavors[i++] = MediaPackageElementFlavor.parseFlavor(f);
}
publishedStreamingFormats = Arrays.asList(Optional.ofNullable(StringUtils.split(
(String)properties.get(LIVE_PUBLISH_STREAMING), ",")).orElse(new String[0]));
publishElementSelector = new SimpleElementSelector();
for (MediaPackageElementFlavor flavor : publishFlavors) {
publishElementSelector.addFlavor(flavor);
}
logger.info(
"Configured live stream name: {}, mime type: {}, resolution: {}, target flavors: {}",
streamName, streamMimeType, resolution, flavors);
}
@Override
public boolean createOrUpdateLiveEvent(String mpId, DublinCoreCatalog episodeDC) throws LiveScheduleException {
MediaPackage mp = getMediaPackageFromSearch(mpId);
if (mp == null) {
// Check if capture not over. We have to check because we may get a notification for past events if
// the admin ui index is rebuilt
DCMIPeriod period = EncodingSchemeUtils.decodeMandatoryPeriod(episodeDC.getFirst(DublinCore.PROPERTY_TEMPORAL));
if (period.getEnd().getTime() <= System.currentTimeMillis()) {
logger.info("Live media package {} not created in search index because event is already past (end date: {})",
mpId, period.getEnd());
return false;
}
return createLiveEvent(mpId, episodeDC);
} else {
// Check if the media package found in the search index is live. We have to check because we may get a
// notification for past events if the admin ui index is rebuilt
if (!mp.isLive()) {
logger.info("Media package {} is in search index but not live so not updating it.", mpId);
return false;
}
return updateLiveEvent(mp, episodeDC);
}
}
@Override
public boolean deleteLiveEvent(String mpId) throws LiveScheduleException {
MediaPackage mp = getMediaPackageFromSearch(mpId);
if (mp == null) {
logger.debug("Live media package {} not found in search index", mpId);
return false;
} else {
if (!mp.isLive()) {
logger.info("Media package {} is not live. Not retracting.", mpId);
return false;
}
return retractLiveEvent(mp);
}
}
@Override
public boolean updateLiveEventAcl(String mpId, AccessControlList acl) throws LiveScheduleException {
MediaPackage previousMp = getMediaPackageFromSearch(mpId);
if (previousMp != null) {
if (!previousMp.isLive()) {
logger.info("Media package {} is not live. Not updating acl.", mpId);
return false;
}
// Replace and distribute acl, this creates new mp
MediaPackage newMp = replaceAndDistributeAcl(previousMp, acl);
// Publish mp to engage search index
publishToSearch(newMp);
// Don't leave garbage there!
retractPreviousElements(previousMp, newMp);
logger.info("Updated live acl for media package {}", newMp);
return true;
}
return false;
}
boolean createLiveEvent(String mpId, DublinCoreCatalog episodeDC) throws LiveScheduleException {
try {
logger.info("Creating live media package {}", mpId);
Snapshot snapshot = getSnapshotFromArchive(mpId);
// generate live tracks
MediaPackage tmpMp = (MediaPackage) snapshot.getMediaPackage().clone();
setDurationForMediaPackage(tmpMp, episodeDC); // duration is used by live tracks
Map<String, Track> liveTracks = addLiveTracksToMediaPackage(tmpMp, episodeDC);
// publish to search
MediaPackage mpForSearch = distributeAclsAndCatalogs(snapshot);
for (Track t : tmpMp.getTracks()) {
mpForSearch.add(t);
}
publishToSearch(mpForSearch);
// add live publication to archive
MediaPackage updatedArchivedMp = addLivePublicationToMediaPackage(snapshot, liveTracks);
snapshotVersionCache.put(mpId, assetManager.takeSnapshot(updatedArchivedMp).getVersion());
return true;
} catch (Exception e) {
throw new LiveScheduleException(e);
}
}
boolean updateLiveEvent(MediaPackage mpFromSearch, DublinCoreCatalog episodeDC) throws LiveScheduleException {
String mpId = mpFromSearch.getIdentifier().toString();
Snapshot snapshot = getSnapshotFromArchive(mpId);
// If the snapshot version is in our local cache, it means that this snapshot was created by us so
// nothing to do. Note that this is just to save time; if the entry has already been deleted, the mp
// will be compared below.
if (snapshot.getVersion().equals(snapshotVersionCache.getIfPresent(mpId))) {
logger.debug("Snapshot version {} was created by us so this change is ignored.", snapshot.getVersion());
return false;
}
// create temp mp for comparison
MediaPackage tmpMp = (MediaPackage) snapshot.getMediaPackage().clone();
// remove all elements that would not be published
Collection<MediaPackageElement> elements = publishElementSelector.select(tmpMp, false);
Arrays.stream(tmpMp.getElements()).filter(Predicate.not(elements::contains)).collect(Collectors.toList())
.forEach(tmpMp::remove);
// generate new live tracks
setDurationForMediaPackage(tmpMp, episodeDC); // duration is used by live tracks
Map<String, Track> liveTracks = addLiveTracksToMediaPackage(tmpMp, episodeDC);
// if nothing changed, no need to do anything
if (isSameMediaPackage(mpFromSearch, tmpMp)) {
logger.debug("Live media package {} seems to be the same. Not updating.", mpFromSearch);
return false;
}
logger.info("Updating live media package {}", mpFromSearch);
// update mp in search
MediaPackage mpForSearch = distributeAclsAndCatalogs(snapshot);
for (Track t : tmpMp.getTracks()) {
mpForSearch.add(t);
}
removeLivePublicationChannel(mpForSearch); // we don't need the live publication in search
publishToSearch(mpForSearch);
retractPreviousElements(mpFromSearch, mpForSearch); // cleanup
// update live publication in archive
MediaPackage updatedArchivedMp = updateLivePublication(snapshot.getMediaPackage(), liveTracks);
snapshotVersionCache.put(mpId, assetManager.takeSnapshot(updatedArchivedMp).getVersion());
return true;
}
private void createOrUpdatePublicationTracks(Publication publication, Map<String, Track> generatedTracks) {
if (publication.getTracks().length > 0) {
publication.clearTracks();
}
for (String publishedStreamingFormat : publishedStreamingFormats) {
Track track = generatedTracks.get(publishedStreamingFormat);
if (track != null) {
publication.addTrack(track);
}
}
}
private MediaPackage updateLivePublication(MediaPackage mediaPackage, Map<String, Track> generatedTracks) {
Publication[] publications = mediaPackage.getPublications();
for (Publication publication : publications) {
if (publication.getChannel().equals(CHANNEL_ID)) {
createOrUpdatePublicationTracks(publication, generatedTracks);
}
}
return mediaPackage;
}
boolean retractLiveEvent(MediaPackage mp) throws LiveScheduleException {
retract(mp);
// Get latest mp from the asset manager if there to remove the publication
try {
String mpId = mp.getIdentifier().toString();
Snapshot snapshot = getSnapshotFromArchive(mpId);
MediaPackage archivedMp = snapshot.getMediaPackage();
removeLivePublicationChannel(archivedMp);
logger.debug("Removed live pub channel from archived media package {}", mp);
// Take a snapshot with the publication removed and put its version in our local cache
// so that we ignore notifications for this snapshot version.
snapshotVersionCache.put(mpId, assetManager.takeSnapshot(archivedMp).getVersion());
} catch (LiveScheduleException e) {
// It was not found in asset manager. This is ok.
}
return true;
}
void publishToSearch(MediaPackage mp) throws LiveScheduleException {
try {
// Add media package to the search index
logger.info("Publishing LIVE media package {} to search index", mp);
Job publishJob = searchService.add(mp);
if (!waitForStatus(publishJob).isSuccess()) {
throw new LiveScheduleException("Live media package " + mp.getIdentifier() + " could not be published");
}
} catch (LiveScheduleException e) {
throw e;
} catch (Exception e) {
throw new LiveScheduleException(e);
}
}
void retract(MediaPackage mp) throws LiveScheduleException {
Organization org = securityService.getOrganization();
User prevUser = org != null ? securityService.getUser() : null;
try {
securityService.setUser(SecurityUtil.createSystemUser(systemUserName, org));
Set<String> elementIds = new HashSet<String>();
String mpId = mp.getIdentifier().toString();
logger.info("Removing LIVE media package {} from the search index", mpId);
for (MediaPackageElement mpe : mp.getElements()) {
if (!MediaPackageElement.Type.Publication.equals(mpe.getElementType())) {
elementIds.add(mpe.getIdentifier());
}
}
List<String> failedJobs = new ArrayList<>();
// Remove media package from the search index
Job searchDeleteJob = searchService.delete(mpId);
if (!waitForStatus(searchDeleteJob).isSuccess()) {
failedJobs.add("Search Index");
}
// Removing media from the download distribution service
Job distributionRetractJob = downloadDistributionService.retract(CHANNEL_ID, mp, elementIds);
if (!waitForStatus(distributionRetractJob).isSuccess()) {
failedJobs.add("Distribution");
}
if (!failedJobs.isEmpty()) {
throw new LiveScheduleException(
String.format("Removing live media package %s from %s failed", mpId, String.join(" and ", failedJobs)));
}
} catch (LiveScheduleException e) {
throw e;
} catch (Exception e) {
throw new LiveScheduleException(e);
} finally {
securityService.setUser(prevUser);
}
}
/**
* Retrieves the media package from the search index.
*
* @param mediaPackageId
* the media package id
* @return the media package in the search index or null if not there
* @throws LiveScheduleException
* if found many media packages with the same id
*/
MediaPackage getMediaPackageFromSearch(String mediaPackageId) throws LiveScheduleException {
// Issue #2504: make sure the search index is read by admin so that the media package is always found.
Organization org = securityService.getOrganization();
User prevUser = org != null ? securityService.getUser() : null;
securityService.setUser(SecurityUtil.createSystemUser(systemUserName, org));
try {
// Look for the media package in the search index
return searchService.get(mediaPackageId);
} catch (UnauthorizedException e) {
logger.warn("Unexpected unauthorized exception when querying the search index for mp {}", mediaPackageId, e);
return null;
} catch (NotFoundException e) {
return null;
} finally {
securityService.setUser(prevUser);
}
}
void setDurationForMediaPackage(MediaPackage mp, DublinCoreCatalog dc) {
DCMIPeriod period = EncodingSchemeUtils.decodeMandatoryPeriod(dc.getFirst(DublinCore.PROPERTY_TEMPORAL));
long duration = period.getEnd().getTime() - period.getStart().getTime();
mp.setDuration(duration);
logger.debug("Live media package {} has start {} and duration {}", mp.getIdentifier(), mp.getDate(),
mp.getDuration());
}
Map<String, Track> addLiveTracksToMediaPackage(MediaPackage mp, DublinCoreCatalog episodeDC)
throws LiveScheduleException {
String caName = episodeDC.getFirst(DublinCore.PROPERTY_SPATIAL);
HashMap<String, Track> generatedTracks = new HashMap<>();
String mpId = mp.getIdentifier().toString();
try {
// If capture agent registered the properties:
// capture.device.live.resolution.WIDTHxHEIGHT=COMPLETE_STREAMING_URL, use them!
try {
Properties caProps = captureAgentService.getAgentCapabilities(caName);
if (caProps != null) {
Enumeration<Object> en = caProps.keys();
while (en.hasMoreElements()) {
String key = (String) en.nextElement();
if (key.startsWith(CA_PROPERTY_RESOLUTION_URL_PREFIX)) {
String resolution = key.substring(CA_PROPERTY_RESOLUTION_URL_PREFIX.length());
String url = caProps.getProperty(key);
// Note: only one flavor is supported in this format (the default: presenter/delivery)
MediaPackageElementFlavor flavor = MediaPackageElementFlavor.parseFlavor(DEFAULT_LIVE_TARGET_FLAVORS);
String replacedUrl = replaceVariables(mpId, caName, url, flavor, resolution);
mp.add(buildStreamingTrack(replacedUrl, flavor, streamMimeType, resolution, mp.getDuration()));
}
}
}
} catch (NotFoundException e) {
// Capture agent not found so we can't get its properties. Assume the service configuration should
// be used instead. Note that we can't schedule anything on a CA that has not registered so this is
// unlikely to happen.
}
// Capture agent did not pass any CA_PROPERTY_RESOLUTION_URL_PREFIX property when registering
// so use the service configuration
if (mp.getTracks().length == 0) {
if (liveStreamingUrl == null) {
throw new LiveScheduleException(
"Cannot build live tracks because '" + LIVE_STREAMING_URL + "' configuration was not set.");
}
for (MediaPackageElementFlavor flavor : liveFlavors) {
for (int i = 0; i < streamResolution.length; i++) {
String uri = replaceVariables(mpId, caName, UrlSupport.concat(liveStreamingUrl.toString(), streamName),
flavor, streamResolution[i]);
Track track = buildStreamingTrack(uri, flavor, streamMimeType, streamResolution[i], mp.getDuration());
mp.add(track);
generatedTracks.put(flavor + ":" + streamResolution[i], track);
}
}
}
} catch (URISyntaxException e) {
throw new LiveScheduleException(e);
}
return generatedTracks;
}
Track buildStreamingTrack(String uriString, MediaPackageElementFlavor flavor, String mimeType, String resolution,
long duration) throws URISyntaxException {
URI uri = new URI(uriString);
MediaPackageElementBuilder elementBuilder = MediaPackageElementBuilderFactory.newInstance().newElementBuilder();
MediaPackageElement element = elementBuilder.elementFromURI(uri, MediaPackageElement.Type.Track, flavor);
TrackImpl track = (TrackImpl) element;
// Set duration and mime type
track.setDuration(duration);
track.setLive(true);
track.setMimeType(MimeTypes.parseMimeType(mimeType));
VideoStreamImpl video = new VideoStreamImpl("video-" + flavor.getType() + "-" + flavor.getSubtype());
// Set video resolution
String[] dimensions = resolution.split("x");
video.setFrameWidth(Integer.parseInt(dimensions[0]));
video.setFrameHeight(Integer.parseInt(dimensions[1]));
track.addStream(video);
logger.debug("Creating live track element of flavor {}, resolution {}, and url {}",
new Object[] { flavor, resolution, uriString });
return track;
}
/**
* Replaces variables in the live stream name. Currently, this is only prepared to handle the following: #{id} = media
* package id, #{flavor} = type-subtype of flavor, #{caName} = capture agent name, #{resolution} = stream resolution
*/
String replaceVariables(String mpId, String caName, String toBeReplaced, MediaPackageElementFlavor flavor,
String resolution) {
// Substitution pattern: any string in the form #{name}, where 'name' has only word characters: [a-zA-Z_0-9].
final Pattern pat = Pattern.compile("#\\{(\\w+)\\}");
Matcher matcher = pat.matcher(toBeReplaced);
StringBuffer sb = new StringBuffer();
while (matcher.find()) {
if (matcher.group(1).equals(REPLACE_ID)) {
matcher.appendReplacement(sb, mpId);
} else if (matcher.group(1).equals(REPLACE_FLAVOR)) {
matcher.appendReplacement(sb, flavor.getType() + "-" + flavor.getSubtype());
} else if (matcher.group(1).equals(REPLACE_CA_NAME)) {
// Taking the easy route to find the capture agent name...
matcher.appendReplacement(sb, caName);
} else if (matcher.group(1).equals(REPLACE_RESOLUTION)) {
// Taking the easy route to find the capture agent name...
matcher.appendReplacement(sb, resolution);
} // else will not replace
}
matcher.appendTail(sb);
return sb.toString();
}
private JobBarrier.Result waitForStatus(Job... jobs) throws IllegalStateException, IllegalArgumentException {
if (serviceRegistry == null) {
throw new IllegalStateException("Can't wait for job status without providing a service registry first");
}
JobBarrier barrier = new JobBarrier(null, serviceRegistry, jobPollingInterval, jobs);
return barrier.waitForJobs();
}
Snapshot getSnapshotFromArchive(String mpId) throws LiveScheduleException {
AQueryBuilder query = assetManager.createQuery();
AResult result = query.select(query.snapshot()).where(query.mediaPackageId(mpId).and(query.version().isLatest()))
.run();
if (result.getSize() == 0) {
// Media package not archived?.
throw new LiveScheduleException(String.format("Unexpected error: media package %s has not been archived.", mpId));
}
Optional<ARecord> record = result.getRecords().stream().findFirst();
if (record.isEmpty()) {
// No snapshot?
throw new LiveScheduleException(String.format("Unexpected error: media package %s has not been archived.", mpId));
}
return record.get().getSnapshot().get();
}
MediaPackage distributeAclsAndCatalogs(Snapshot snapshot) throws LiveScheduleException {
try {
MediaPackage mp = (MediaPackage) snapshot.getMediaPackage().clone();
// Select elements
Collection<MediaPackageElement> elements = publishElementSelector.select(mp, false);
Set<String> elementIds = elements.stream().map(MediaPackageElement::getIdentifier).collect(Collectors.toSet());
// Distribute elements
Job distributionJob = downloadDistributionService.distribute(CHANNEL_ID, mp, elementIds, false);
if (!waitForStatus(distributionJob).isSuccess()) {
throw new LiveScheduleException(
"Element(s) for live media package " + mp.getIdentifier() + " could not be distributed");
}
// Remove all elements from mp
for (MediaPackageElement e: mp.getElements()) {
mp.remove(e);
}
// Re-add distributed elements
List<MediaPackageElement> distributedElements = (List<MediaPackageElement>) MediaPackageElementParser
.getArrayFromXml(distributionJob.getPayload());
for (MediaPackageElement distributedElement : distributedElements) {
mp.add(distributedElement);
}
// Clean up
for (String id : elementIds) {
MediaPackageElement e = mp.getElementById(id);
workspace.delete(e.getURI());
}
return mp;
} catch (LiveScheduleException e) {
throw e;
} catch (Exception e) {
throw new LiveScheduleException(e);
}
}
MediaPackage replaceAndDistributeAcl(MediaPackage previousMp, AccessControlList acl) throws LiveScheduleException {
try {
// This is the mp from the search index
MediaPackage mp = (MediaPackage) previousMp.clone();
// Remove previous Acl from the mp
Attachment[] atts = mp.getAttachments(MediaPackageElements.XACML_POLICY_EPISODE);
if (atts.length > 0) {
mp.remove(atts[0]);
}
// Attach current ACL to mp, acl will be created in the ws/wfr
authService.setAcl(mp, AclScope.Episode, acl);
atts = mp.getAttachments(MediaPackageElements.XACML_POLICY_EPISODE);
if (atts.length > 0) {
String aclId = atts[0].getIdentifier();
// Distribute new acl
Job distributionJob = downloadDistributionService.distribute(CHANNEL_ID, mp, aclId, false);
if (!waitForStatus(distributionJob).isSuccess()) {
throw new LiveScheduleException(
"Acl for live media package " + mp.getIdentifier() + " could not be distributed");
}
MediaPackageElement e = mp.getElementById(aclId);
// Cleanup workspace/wfr
mp.remove(e);
workspace.delete(e.getURI());
// Add distributed acl to mp
mp.add(MediaPackageElementParser.getFromXml(distributionJob.getPayload()));
}
return mp;
} catch (LiveScheduleException e) {
throw e;
} catch (Exception e) {
throw new LiveScheduleException(e);
}
}
MediaPackage addLivePublicationToMediaPackage(Snapshot snapshot, Map<String, Track> generatedTracks)
throws LiveScheduleException {
MediaPackage mp = snapshot.getMediaPackage();
Organization currentOrg = null;
try {
currentOrg = organizationService.getOrganization(snapshot.getOrganizationId());
} catch (NotFoundException e) {
logger.warn("Organization in snapshot not found: {}", snapshot.getOrganizationId());
}
logger.debug("Adding live channel publication element to media package {}", mp);
String engageUrlString = null;
if (currentOrg != null) {
engageUrlString = StringUtils.trimToNull(currentOrg.getProperties().get(ENGAGE_URL_PROPERTY));
}
if (engageUrlString == null) {
engageUrlString = serverUrl;
logger.info(
"Using 'server.url' as a fallback for the non-existing organization level key '{}' for the publication url",
ENGAGE_URL_PROPERTY);
}
try {
// Create new distribution element
URI engageUri = URIUtils.resolve(new URI(engageUrlString), PLAYER_PATH + mp.getIdentifier().toString());
Publication publicationElement = PublicationImpl.publication(UUID.randomUUID().toString(), CHANNEL_ID, engageUri,
MimeTypes.parseMimeType("text/html"));
mp.add(publicationElement);
createOrUpdatePublicationTracks(publicationElement, generatedTracks);
return mp;
} catch (URISyntaxException e) {
throw new LiveScheduleException(e);
}
}
void removeLivePublicationChannel(MediaPackage mp) {
// Remove publication element
Publication[] publications = mp.getPublications();
if (publications != null) {
for (Publication publication : publications) {
if (CHANNEL_ID.equals(publication.getChannel())) {
mp.remove(publication);
}
}
}
}
boolean isSameMediaPackage(MediaPackage previous, MediaPackage current) {
Equator<Track> liveTrackEquator = new Equator<>() {
@Override
public boolean equate(Track track1, Track track2) {
// we can safely assume that each live track has exactly one video stream since we generated that ourselves
VideoStream videostream1 = (VideoStream) track1.getStreams()[0];
VideoStream videostream2 = (VideoStream) track2.getStreams()[0];
return Objects.equals(track1.getURI(), track2.getURI())
&& Objects.equals(track1.getFlavor(), track2.getFlavor())
&& Objects.equals(track1.getMimeType(), track2.getMimeType())
&& Objects.equals(track1.getDuration(), track2.getDuration())
&& Objects.equals(videostream1.getFrameWidth(), videostream2.getFrameWidth())
&& Objects.equals(videostream1.getFrameHeight(), videostream2.getFrameHeight());
}
@Override
public int hash(Track track) {
VideoStream videostream = (VideoStream) track.getStreams()[0];
return Objects.hash(track.getURI(), track.getFlavor(), track.getMimeType(), track.getDuration(),
videostream.getFrameWidth(), videostream.getFrameHeight());
}
};
Equator<MediaPackageElement> catalogAndAttachmentEquator = new Equator<>() {
@Override
public boolean equate(MediaPackageElement mpe1, MediaPackageElement mpe2) {
return Objects.equals(mpe1.getIdentifier(), mpe2.getIdentifier())
&& Objects.equals(mpe1.getElementType(), mpe2.getElementType())
&& Objects.equals(mpe1.getChecksum(), mpe2.getChecksum())
&& Objects.equals(mpe1.getFlavor(), mpe2.getFlavor());
}
@Override
public int hash(MediaPackageElement mpe) {
return Objects.hash(mpe.getIdentifier(), mpe.getElementType(), mpe.getChecksum(), mpe.getFlavor());
}
};
if (!CollectionUtils.isEqualCollection(Arrays.asList(previous.getTracks()), Arrays.asList(current.getTracks()),
liveTrackEquator)) {
return false;
} else if (!CollectionUtils.isEqualCollection(Arrays.asList(previous.getCatalogs()),
Arrays.asList(current.getCatalogs()), catalogAndAttachmentEquator)) {
return false;
} else {
return CollectionUtils.isEqualCollection(Arrays.asList(previous.getAttachments()),
Arrays.asList(current.getAttachments()), catalogAndAttachmentEquator);
}
}
void retractPreviousElements(MediaPackage previousMp, MediaPackage newMp) throws LiveScheduleException {
try {
// Now can retract elements from previous publish. Before creating a retraction
// job, check if the element url is still used by the new media package.
Set<String> elementIds = new HashSet<String>();
for (MediaPackageElement element : previousMp.getElements()) {
// We don't retract tracks because they are just live links
if (!Track.TYPE.equals(element.getElementType())) {
boolean canBeDeleted = true;
for (MediaPackageElement newElement : newMp.getElements()) {
if (element.getURI().equals(newElement.getURI())) {
logger.debug(
"Not retracting element {} with URI {} from download distribution because it is "
+ "still used by updated live media package",
element.getIdentifier(), element.getURI());
canBeDeleted = false;
break;
}
}
if (canBeDeleted) {
elementIds.add(element.getIdentifier());
}
}
}
if (elementIds.size() > 0) {
Job job = downloadDistributionService.retract(CHANNEL_ID, previousMp, elementIds);
// Wait for retraction to finish
if (!waitForStatus(job).isSuccess()) {
logger.warn("One of the download retract jobs did not complete successfully");
} else {
logger.debug("Retraction of previously published elements complete");
}
}
} catch (DistributionException e) {
throw new LiveScheduleException(e);
}
}
@Reference
public void setDublinCoreService(DublinCoreCatalogService service) {
this.dublinCoreService = service;
}
@Reference
public void setSearchService(SearchService service) {
this.searchService = service;
}
@Reference
public void setSeriesService(SeriesService service) {
this.seriesService = service;
}
@Reference
public void setServiceRegistry(ServiceRegistry service) {
this.serviceRegistry = service;
}
@Reference
public void setCaptureAgentService(CaptureAgentStateService service) {
this.captureAgentService = service;
}
@Reference(
name = "DownloadDistributionService",
target = "(distribution.channel=download)"
)
public void setDownloadDistributionService(DownloadDistributionService service) {
this.downloadDistributionService = service;
logger.info("Distribution service with type '{}' set.", downloadDistributionService.getDistributionType());
}
@Reference
public void setWorkspace(Workspace ws) {
this.workspace = ws;
}
@Reference
public void setAssetManager(AssetManager assetManager) {
this.assetManager = assetManager;
}
@Reference
public void setAuthorizationService(AuthorizationService service) {
this.authService = service;
}
@Reference
public void setOrganizationService(OrganizationDirectoryService service) {
this.organizationService = service;
}
@Reference
public void setSecurityService(SecurityService service) {
this.securityService = service;
}
// === Set by OSGI - end
// === Used by unit tests - begin
void setJobPollingInterval(long jobPollingInterval) {
this.jobPollingInterval = jobPollingInterval;
}
Cache<String, Version> getSnapshotVersionCache() {
return this.snapshotVersionCache;
}
// === Used by unit tests - end
}