VideoGridServiceImpl.java

/*
 * Licensed to The Apereo Foundation under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional
 * information regarding copyright ownership.
 *
 *
 * The Apereo Foundation licenses this file to you under the Educational
 * Community License, Version 2.0 (the "License"); you may not use this file
 * except in compliance with the License. You may obtain a copy of the License
 * at:
 *
 *   http://opensource.org/licenses/ecl2.txt
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 * License for the specific language governing permissions and limitations under
 * the License.
 *
 */

package org.opencastproject.videogrid.impl;

import org.opencastproject.job.api.AbstractJobProducer;
import org.opencastproject.job.api.Job;
import org.opencastproject.mediapackage.MediaPackageElementParser;
import org.opencastproject.mediapackage.MediaPackageException;
import org.opencastproject.mediapackage.Track;
import org.opencastproject.security.api.OrganizationDirectoryService;
import org.opencastproject.security.api.SecurityService;
import org.opencastproject.security.api.UserDirectoryService;
import org.opencastproject.serviceregistry.api.ServiceRegistry;
import org.opencastproject.serviceregistry.api.ServiceRegistryException;
import org.opencastproject.util.ConfigurationException;
import org.opencastproject.util.IoSupport;
import org.opencastproject.util.LoadUtil;
import org.opencastproject.util.NotFoundException;
import org.opencastproject.videogrid.api.VideoGridService;
import org.opencastproject.videogrid.api.VideoGridServiceException;
import org.opencastproject.workspace.api.Workspace;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;

import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.osgi.service.cm.ManagedService;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Type;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Dictionary;
import java.util.List;

/** Create video grids */
@Component(
    immediate = true,
    service = { VideoGridService.class,ManagedService.class },
    property = {
        "service.description=Video Grid Service",
        "service.pid=org.opencastproject.videogrid.impl.VideoGridServiceImpl"
    }
)
public class VideoGridServiceImpl extends AbstractJobProducer implements VideoGridService, ManagedService {

  /** Configuration key for this operation's job load */
  private static final String JOB_LOAD_CONFIG = "job.load.videogrid";

  /** The load introduced on the system by creating a job */
  private static final float JOB_LOAD_DEFAULT = 1.5f;

  /** The load introduced on the system by creating a job */
  private float jobLoad = JOB_LOAD_DEFAULT;

  private static final Logger logger = LoggerFactory.getLogger(VideoGridServiceImpl.class);

  /** List of available operations on jobs */
  private static final String OPERATION = "createPartialTrack";

  /** Services */
  private Workspace workspace;
  private ServiceRegistry serviceRegistry;
  private SecurityService securityService;
  private UserDirectoryService userDirectoryService;
  private OrganizationDirectoryService organizationDirectoryService;

  /** For JSON serialization */
  private static final Gson gson = new Gson();
  private static final Type stringListOfListType = new TypeToken<List<List<String>>>() { }.getType();

  /** Creates a new videogrid service instance. */
  public VideoGridServiceImpl() {
    super(JOB_TYPE);
  }

  @Override
  @Activate
  public void activate(ComponentContext cc) {
    super.activate(cc);
    logger.debug("Activated videogrid service");
  }

  @Override
  public void updated(Dictionary properties) throws ConfigurationException {
    if (properties == null) {
      return;
    }
    logger.debug("Start updating videogrid service");

    jobLoad = LoadUtil.getConfiguredLoadValue(properties, JOB_LOAD_CONFIG, JOB_LOAD_DEFAULT, serviceRegistry);
    logger.debug("Set videogrid job load to {}", jobLoad);

    logger.debug("Finished updating videogrid service");
  }

  /**
   * {@inheritDoc}
   *
   * @see org.opencastproject.job.api.AbstractJobProducer#process(org.opencastproject.job.api.Job)
   */
  @Override
  protected String process(Job job) throws Exception {
    logger.debug("Started processing job {}", job.getId());
    if (!OPERATION.equals(job.getOperation())) {
      throw new ServiceRegistryException(String.format("This service can't handle operations of type '%s'",
              job.getOperation()));
    }

    // Parse arguments
    List<String> arguments = job.getArguments();
    List<String> command = gson.fromJson(arguments.get(0), new TypeToken<List<String>>() { }.getType());
    List<Track> tracks = new ArrayList<>();
    for (int i = 1; i < arguments.size(); i++) {
      tracks.add(i - 1, (Track) MediaPackageElementParser.getFromXml(arguments.get(i)));
    }

    String outputDirPath = String.format("%s/videogrid/%d/", workspace.rootDirectory(), job.getId());
    FileUtils.forceMkdir(new File(outputDirPath));

    // Replace placeholders in command with track paths
    for (int i = 0; i < command.size(); i++) {
      String[] trackIds = StringUtils.substringsBetween(command.get(i), "#{","}");
      if (trackIds != null) {
        for (String trackId: trackIds) {
          Track replaceTrack = tracks.stream()
                  .filter(track -> track.getIdentifier().equals(trackId))
                  .findAny()
                  .orElse(null);
          if (replaceTrack == null) {
            throw new VideoGridServiceException(String.format("Track with id %s could not be found!", trackId));
          }
          command.set(i, command.get(i).replaceAll("#\\{" + trackId + "\\}", getTrackPath(replaceTrack)));
        }
      }
    }

    // Add output path to command
    String outputFile = outputDirPath + "videogrid_for_Job_" + job.getId() + ".mp4";
    command.add(outputFile);

    logger.info("Running command: {}", command);

    // Run ffmpeg
    ProcessBuilder pb = new ProcessBuilder(command);
    pb.redirectErrorStream(true);
    Process ffmpegProcess = null;
    int exitCode = 1;
    BufferedReader errStream = null;
    try {
      ffmpegProcess = pb.start();

      errStream = new BufferedReader(new InputStreamReader(ffmpegProcess.getInputStream()));
      String line = errStream.readLine();
      while (line != null) {
        logger.info(line);
        line = errStream.readLine();
      }

      exitCode = ffmpegProcess.waitFor();
    } catch (IOException ex) {
      throw new VideoGridServiceException("Start ffmpeg process failed", ex);
    } catch (InterruptedException ex) {
      throw new VideoGridServiceException("Waiting for encoder process exited was interrupted unexpectedly", ex);
    } finally {
      IoSupport.closeQuietly(ffmpegProcess);
      IoSupport.closeQuietly(errStream);
      if (exitCode != 0) {
        try {
          logger.warn("FFMPEG process exited with errorcode: " + exitCode);
          FileUtils.forceDelete(new File(outputDirPath));
        } catch (IOException e) {
          // it is ok, no output file was generated by ffmpeg
        }
      }
    }

    if (exitCode != 0) {
      throw new Exception(String.format("The encoder process exited abnormally with exit code %s "
              + "using command\n%s", exitCode, String.join(" ", command)));
    }

    // Put generated video into workspace
    FileInputStream outputFileInputStream = null;
    URI videoFileUri;
    try {
      outputFileInputStream = new FileInputStream(outputFile);
      videoFileUri = workspace.putInCollection("videogrid",
              FilenameUtils.getName(outputFile), outputFileInputStream);
      logger.info("Copied the created video to the workspace {}", videoFileUri);
    } catch (FileNotFoundException ex) {
      throw new VideoGridServiceException(String.format("Video file '%s' not found", outputFile), ex);
    } catch (IOException ex) {
      throw new VideoGridServiceException(String.format(
              "Can't write video file '%s' to workspace", outputFile), ex);
    } catch (IllegalArgumentException ex) {
      throw new VideoGridServiceException(ex);
    } finally {
      IoSupport.closeQuietly(outputFileInputStream);
    }

    FileUtils.deleteQuietly(new File(workspace.rootDirectory(), String.format("videogrid/%d", job.getId())));

    // Return URIs to the videos;
    return gson.toJson(videoFileUri);
  }

  @Override
  public Job createPartialTrack(List<String> command, Track... tracks)
          throws VideoGridServiceException, MediaPackageException {
    List<String> jobArguments = new ArrayList<>(Arrays.asList(gson.toJson(command)));
    for (int i = 0; i < tracks.length; i++) {
      jobArguments.add(i + 1, MediaPackageElementParser.getAsXml(tracks[i]));
    }
    try {
      logger.debug("Create videogrid service job");
      return serviceRegistry.createJob(JOB_TYPE, OPERATION, jobArguments, jobLoad);
    } catch (ServiceRegistryException e) {
      throw new VideoGridServiceException(e);
    }
  }

  /**
   * Returns the absolute path of the track
   *
   * @param track
   *          Track whose path you want
   * @return {@String} containing the absolute path of the given track
   * @throws VideoGridServiceException
   */
  private String getTrackPath(Track track) throws VideoGridServiceException {
    File mediaFile;
    try {
      mediaFile = workspace.get(track.getURI());
    } catch (NotFoundException e) {
      throw new VideoGridServiceException(
              "Error finding the media file in the workspace", e);
    } catch (IOException e) {
      throw new VideoGridServiceException(
              "Error reading the media file in the workspace", e);
    }
    return mediaFile.getAbsolutePath();
  }

  @Override
  protected ServiceRegistry getServiceRegistry() {
    return serviceRegistry;
  }

  @Override
  protected SecurityService getSecurityService() {
    return securityService;
  }

  @Override
  protected UserDirectoryService getUserDirectoryService() {
    return userDirectoryService;
  }

  @Override
  protected OrganizationDirectoryService getOrganizationDirectoryService() {
    return organizationDirectoryService;
  }

  @Reference
  public void setWorkspace(Workspace workspace) {
    this.workspace = workspace;
  }

  @Reference
  public void setServiceRegistry(ServiceRegistry jobManager) {
    this.serviceRegistry = jobManager;
  }

  @Reference
  public void setSecurityService(SecurityService securityService) {
    this.securityService = securityService;
  }

  @Reference
  public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
    this.userDirectoryService = userDirectoryService;
  }

  @Reference
  public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
    this.organizationDirectoryService = organizationDirectoryService;
  }
}