ExecuteServiceImpl.java

/*
 * Licensed to The Apereo Foundation under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional
 * information regarding copyright ownership.
 *
 *
 * The Apereo Foundation licenses this file to you under the Educational
 * Community License, Version 2.0 (the "License"); you may not use this file
 * except in compliance with the License. You may obtain a copy of the License
 * at:
 *
 *   http://opensource.org/licenses/ecl2.txt
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 * License for the specific language governing permissions and limitations under
 * the License.
 *
 */

package org.opencastproject.execute.impl;

import org.opencastproject.execute.api.ExecuteException;
import org.opencastproject.execute.api.ExecuteService;
import org.opencastproject.job.api.AbstractJobProducer;
import org.opencastproject.job.api.Job;
import org.opencastproject.mediapackage.MediaPackage;
import org.opencastproject.mediapackage.MediaPackageElement;
import org.opencastproject.mediapackage.MediaPackageElement.Type;
import org.opencastproject.mediapackage.MediaPackageElementBuilderFactory;
import org.opencastproject.mediapackage.MediaPackageElementFlavor;
import org.opencastproject.mediapackage.MediaPackageElementParser;
import org.opencastproject.mediapackage.MediaPackageException;
import org.opencastproject.mediapackage.MediaPackageParser;
import org.opencastproject.mediapackage.UnsupportedElementException;
import org.opencastproject.security.api.OrganizationDirectoryService;
import org.opencastproject.security.api.SecurityService;
import org.opencastproject.security.api.UserDirectoryService;
import org.opencastproject.serviceregistry.api.ServiceRegistry;
import org.opencastproject.serviceregistry.api.ServiceRegistryException;
import org.opencastproject.util.ConfigurationException;
import org.opencastproject.util.IoSupport;
import org.opencastproject.util.LoadUtil;
import org.opencastproject.util.NotFoundException;
import org.opencastproject.workspace.api.Workspace;

import org.apache.commons.lang3.StringUtils;
import org.osgi.framework.BundleContext;
import org.osgi.service.cm.ManagedService;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Dictionary;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * Implements a service that runs CLI commands with MediaPackage elements as arguments
 */
@Component(
    immediate = true,
    service = { ExecuteService.class,ManagedService.class },
    property = {
        "service.description=Execute Service",
        "service.pid=org.opencastproject.execute.impl.ExecuteServiceImpl"
    }
)
public class ExecuteServiceImpl extends AbstractJobProducer implements ExecuteService, ManagedService {

  public enum Operation {
    Execute_Element, Execute_Mediapackage
  }

  /** The logging facility */
  private static final Logger logger = LoggerFactory.getLogger(ExecuteServiceImpl.class);

  /** Reference to the receipt service */
  private ServiceRegistry serviceRegistry = null;

  /** The security service */
  protected SecurityService securityService = null;

  /** The user directory service */
  protected UserDirectoryService userDirectoryService = null;

  /** The organization directory service */
  protected OrganizationDirectoryService organizationDirectoryService = null;

  /** The workspace service */
  protected Workspace workspace;

  /**
   * List of allowed commands that can be run with an executor. By convention, an empty set doesn't mean any command can
   * be run. An '*' in the service configuration means any command can be executed
   */
  protected final Set<String> allowedCommands = new HashSet<String>();

  /** Bundle property specifying which commands can be run with this executor */
  public static final String COMMANDS_ALLOWED_PROPERTY = "commands.allowed";

  /** To allow command-line parameter substitutions configured globally i.e. in config.properties */
  private BundleContext bundleContext;

  /** To allow command-line parameter substitutions configured at the service level */
  @SuppressWarnings("rawtypes")
  private Dictionary properties = null;

  /** The approximate load placed on the system by running an execute operation */
  public static final float DEFAULT_EXECUTE_JOB_LOAD = 0.1f;

  /** The key to look for in the service configuration file to override the {@link DEFAULT_EXECUTE_JOB_LOAD} */
  public static final String EXECUTE_JOB_LOAD_KEY = "job.load.execute";

  private float executeJobLoad = 1.0f;

  /**
   * Creates a new instance of the execute service.
   */
  public ExecuteServiceImpl() {
    super(JOB_TYPE);
  }

  /**
   * Activates this component with its properties once all of the collaborating services have been set
   *
   * @param cc
   *          The component's context, containing the properties used for configuration
   */
  @Override
  @Activate
  public void activate(ComponentContext cc) {
    super.activate(cc);

    properties = cc.getProperties();

    if (properties != null) {
      String commandString = (String) properties.get(COMMANDS_ALLOWED_PROPERTY);
      if (StringUtils.isNotBlank(commandString)) {
        logger.info("Execute Service permitted commands: {}", commandString);
        for (String command : commandString.split("\\s+"))
          allowedCommands.add(command);
      }
    }

    this.bundleContext = cc.getBundleContext();
  }

  /**
   * {@inheritDoc}
   *
   * @see org.opencastproject.execute.api.ExecuteService#execute(java.lang.String, java.lang.String,
   *      org.opencastproject.mediapackage.MediaPackageElement, java.lang.String,
   *      org.opencastproject.mediapackage.MediaPackageElement.Type, float)
   * @throws IllegalArgumentException
   *           if the input arguments are incorrect
   * @throws ExecuteException
   *           if an internal error occurs
   */
  @Override
  public Job execute(String exec, String params, MediaPackageElement inElement, String outFileName, Type expectedType,
          float load) throws ExecuteException, IllegalArgumentException {

    logger.debug("Creating Execute Job for command: {}", exec);

    if (StringUtils.isBlank(exec))
      throw new IllegalArgumentException("The command to execute cannot be null");

    if (StringUtils.isBlank(params))
      throw new IllegalArgumentException("The command arguments cannot be null");

    if (inElement == null)
      throw new IllegalArgumentException("The input MediaPackage element cannot be null");

    outFileName = StringUtils.trimToNull(outFileName);
    if ((outFileName == null) && (expectedType != null) || (outFileName != null) && (expectedType == null))
      throw new IllegalArgumentException("Expected element type and output filename cannot be null");

    try {
      List<String> paramList = new ArrayList<String>(5);
      paramList.add(exec);
      paramList.add(params);
      paramList.add(MediaPackageElementParser.getAsXml(inElement));
      paramList.add(outFileName);
      paramList.add((expectedType == null) ? null : expectedType.toString());

      return serviceRegistry.createJob(JOB_TYPE, Operation.Execute_Element.toString(), paramList, load);

    } catch (ServiceRegistryException e) {
      throw new ExecuteException(String.format("Unable to create a job of type '%s'", JOB_TYPE), e);
    } catch (MediaPackageException e) {
      throw new ExecuteException("Error serializing an element", e);
    }
  }

  /**
   * {@inheritDoc}
   *
   * @see org.opencastproject.execute.api.ExecuteService#execute(java.lang.String, java.lang.String,
   *      org.opencastproject.mediapackage.MediaPackage, java.lang.String,
   *      org.opencastproject.mediapackage.MediaPackageElement.Type, float)
   */
  @Override
  public Job execute(String exec, String params, MediaPackage mp, String outFileName, Type expectedType, float load)
          throws ExecuteException {
    if (StringUtils.isBlank(exec))
      throw new IllegalArgumentException("The command to execute cannot be null");

    if (StringUtils.isBlank(params))
      throw new IllegalArgumentException("The command arguments cannot be null");

    if (mp == null)
      throw new IllegalArgumentException("The input MediaPackage cannot be null");

    outFileName = StringUtils.trimToNull(outFileName);
    if ((outFileName == null) && (expectedType != null) || (outFileName != null) && (expectedType == null))
      throw new IllegalArgumentException("Expected element type and output filename cannot be null");

    try {
      List<String> paramList = new ArrayList<String>(5);
      paramList.add(exec);
      paramList.add(params);
      paramList.add(MediaPackageParser.getAsXml(mp));
      paramList.add(outFileName);
      paramList.add((expectedType == null) ? null : expectedType.toString());

      return serviceRegistry.createJob(JOB_TYPE, Operation.Execute_Mediapackage.toString(), paramList, load);
    } catch (ServiceRegistryException e) {
      throw new ExecuteException(String.format("Unable to create a job of type '%s'", JOB_TYPE), e);
    }
  }

  /**
   * {@inheritDoc}
   *
   * @throws ExecuteException
   *
   * @see org.opencastproject.job.api.AbstractJobProducer#process(org.opencastproject.job.api.Job)
   */
  @Override
  protected String process(Job job) throws ExecuteException {
    List<String> arguments = new ArrayList<String>(job.getArguments());

    // Check this operation is allowed
    if (!allowedCommands.contains("*") && !allowedCommands.contains(arguments.get(0)))
      throw new ExecuteException("Command '" + arguments.get(0) + "' is not allowed");

    String outFileName = null;
    String strAux = null;
    MediaPackage mp = null;
    Type expectedType = null;
    MediaPackageElement element = null;
    Operation op = null;

    try {
      op = Operation.valueOf(job.getOperation());

      int nargs = arguments.size();

      if (nargs != 3 && nargs != 5) {
        throw new IndexOutOfBoundsException(
                "Incorrect number of parameters for operation execute_" + op + ": " + arguments.size());
      }
      if (nargs == 5) {
        strAux = arguments.remove(4);
        expectedType = (strAux == null) ? null : Type.valueOf(strAux);
        outFileName = StringUtils.trimToNull(arguments.remove(3));
        if ((StringUtils.isNotBlank(outFileName) && (expectedType == null))
                || (StringUtils.isBlank(outFileName) && (expectedType != null))) {
          throw new ExecuteException("The output type and filename must be both specified");
        }
        outFileName = (outFileName == null) ? null : job.getId() + "_" + outFileName;
      }

      switch (op) {
        case Execute_Mediapackage:
          mp = MediaPackageParser.getFromXml(arguments.remove(2));
          return doProcess(arguments, mp, outFileName, expectedType);
        case Execute_Element:
          element = MediaPackageElementParser.getFromXml(arguments.remove(2));
          return doProcess(arguments, element, outFileName, expectedType);
        default:
          throw new IllegalStateException("Don't know how to handle operation '" + job.getOperation() + "'");
      }

    } catch (MediaPackageException e) {
      throw new ExecuteException("Error unmarshalling the input mediapackage/element", e);
    } catch (IllegalArgumentException e) {
      throw new ExecuteException("This service can't handle operations of type '" + op + "'", e);
    } catch (IndexOutOfBoundsException e) {
      throw new ExecuteException("The argument list for operation '" + op + "' does not meet expectations", e);
    }
  }

  /**
   * Does the actual processing, given a mediapackage (Execute Once WOH)
   *
   * @param arguments
   *          The list containing the program and its arguments
   * @param mp
   *          MediaPackage used in the operation
   * @param outFileName
   *          The name of the resulting file
   * @param expectedType
   *          The expected element type
   * @return A {@code String} containing the command output
   * @throws ExecuteException
   *           if some internal error occurred
   */
  protected String doProcess(List<String> arguments, MediaPackage mp, String outFileName, Type expectedType)
          throws ExecuteException {

    String params = arguments.remove(1);

    File outFile = null;
    MediaPackageElement[] elements = null;

    try {
      if (outFileName != null) {
        // FIXME : Find a better way to place the output File
        File firstElement = workspace.get(mp.getElements()[0].getURI());
        outFile = new File(firstElement.getParentFile(), outFileName);
      }

      // Get the substitution pattern.
      // The following pattern matches, any construct with the form
      // #{name}
      // , where 'name' is the value of a certain property. It is stored in the backreference group 1.
      // Optionally, expressions can take a parameter, like
      // #{name(parameter)}
      // , where 'parameter' is the name of a certain parameter.
      // If specified, 'parameter' is stored in the group 2. Otherwise it's null.
      // Both name and parameter match any character sequence that does not contain {, }, ( or ) .
      Pattern pat = Pattern.compile("#\\{([^\\{\\}\\(\\)]+)(?:\\(([^\\{\\}\\(\\)]+)\\))?\\}");

      // Substitute the appearances of the patterns with the actual absolute paths
      Matcher matcher = pat.matcher(params);
      StringBuffer sb = new StringBuffer();
      while (matcher.find()) {
        // group(1) = property. group(2) = (optional) parameter
        if (matcher.group(1).equals("id")) {
          matcher.appendReplacement(sb, mp.getIdentifier().toString());
        } else if (matcher.group(1).equals("flavor")) {
          elements = mp.getElementsByFlavor(MediaPackageElementFlavor.parseFlavor(matcher.group(2)));
          if (elements.length == 0)
            throw new ExecuteException("No elements in the MediaPackage match the flavor '" + matcher.group(2) + "'.");

          if (elements.length > 1)
            logger.warn("Found more than one element with flavor '{}'. Using {} by default...", matcher.group(2),
                    elements[0].getIdentifier());

          File elementFile = workspace.get(elements[0].getURI());
          matcher.appendReplacement(sb, elementFile.getAbsolutePath());
        } else if (matcher.group(1).equals("tags")) {
          elements = mp.getElementsByTags(Arrays.asList(StringUtils.split(matcher.group(2), ",")));

          if (elements.length == 0)
            throw new ExecuteException("No elements in the MediaPackage match the tags '" + matcher.group(2) + "'.");

          if (elements.length > 1)
            logger.warn("Found more than one element with matching tags '{}'. Using {} by default...", matcher.group(2),
                elements[0].getIdentifier());

          File elementFile = workspace.get(elements[0].getURI());
          matcher.appendReplacement(sb, elementFile.getAbsolutePath());
        } else if (matcher.group(1).equals("out")) {
          matcher.appendReplacement(sb, outFile.getAbsolutePath());
        } else if (matcher.group(1).equals("org_id")) {
          matcher.appendReplacement(sb, securityService.getOrganization().getId());
        } else if (properties.get(matcher.group(1)) != null) {
          matcher.appendReplacement(sb, (String) properties.get(matcher.group(1)));
        } else if (bundleContext.getProperty(matcher.group(1)) != null) {
          matcher.appendReplacement(sb, bundleContext.getProperty(matcher.group(1)));
        }
      }
      matcher.appendTail(sb);
      params = sb.toString();
    } catch (IllegalArgumentException e) {
      throw new ExecuteException("Tag 'flavor' must specify a valid MediaPackage element flavor.", e);
    } catch (NotFoundException e) {
      throw new ExecuteException(
              "The element '" + elements[0].getURI().toString() + "' does not exist in the workspace.", e);
    } catch (IOException e) {
      throw new ExecuteException("Error retrieving MediaPackage element from workspace: '"
              + elements[0].getURI().toString() + "'.", e);
    }

    arguments.addAll(splitParameters(params));

    return runCommand(arguments, outFile, expectedType);
  }

  /**
   * Does the actual processing, given a mediapackage element (Execute Many WOH)
   *
   * @param arguments
   *          The list containing the program and its arguments
   * @param outFileName
   *          The name of the resulting file
   * @param expectedType
   *          The expected element type
   * @return A {@code String} containing the command output
   * @throws ExecuteException
   *           if some internal error occurred
   */
  protected String doProcess(List<String> arguments, MediaPackageElement element, String outFileName, Type expectedType)
          throws ExecuteException {

    // arguments(1) contains a list of space-separated arguments for the command
    String params = arguments.remove(1);
    arguments.addAll(splitParameters(params));

    File outFile = null;

    try {
      // Get the track file from the workspace
      File trackFile = workspace.get(element.getURI());

      // Put the destination file in the same folder as the source file
      if (outFileName != null)
        outFile = new File(trackFile.getParentFile(), outFileName);

      // Substitute the appearances of the patterns with the actual absolute paths
      for (int i = 1; i < arguments.size(); i++) {
        if (arguments.get(i).contains(INPUT_FILE_PATTERN)) {
          arguments.set(i, arguments.get(i).replace(INPUT_FILE_PATTERN, trackFile.getAbsolutePath()));
          continue;
        }

        if (arguments.get(i).contains(OUTPUT_FILE_PATTERN)) {
          if (outFile != null) {
            arguments.set(i, arguments.get(i).replace(OUTPUT_FILE_PATTERN, outFile.getAbsolutePath()));
            continue;
          } else {
            logger.error("{} pattern found, but no valid output filename was specified", OUTPUT_FILE_PATTERN);
            throw new ExecuteException(
                    OUTPUT_FILE_PATTERN + " pattern found, but no valid output filename was specified");
          }
        }

        if (arguments.get(i).contains(MP_ID_PATTERN)) {
          arguments.set(i, arguments.get(i).replace(MP_ID_PATTERN, element.getMediaPackage().getIdentifier().toString()));
        }

        if (arguments.get(i).contains(ORG_ID_PATTERN)) {
          arguments.set(i, arguments.get(i).replace(ORG_ID_PATTERN, securityService.getOrganization().getId()));
        }
      }

      return runCommand(arguments, outFile, expectedType);
    } catch (IOException e) {
      logger.error("Error retrieving file from workspace: {}", element.getURI());
      throw new ExecuteException("Error retrieving file from workspace: " + element.getURI(), e);
    } catch (NotFoundException e) {
      logger.error("Element '{}' cannot be found in the workspace.", element.getURI());
      throw new ExecuteException("Element " + element.getURI() + " cannot be found in the workspace");
    }
  }

  private String runCommand(List<String> command, File outFile, Type expectedType) throws ExecuteException {

    Process p = null;
    int result = 0;

    try {
      logger.info("Running command {}", command.get(0));
      logger.debug("Starting subprocess {} with arguments {}", command.get(0),
              StringUtils.join(command.subList(1, command.size()), ", "));

      ProcessBuilder pb = new ProcessBuilder(command);
      pb.redirectErrorStream(true);

      p = pb.start();
      BufferedReader stdout = new BufferedReader(new InputStreamReader(p.getInputStream()));
      String line;
      while ((line = stdout.readLine()) != null) {
        logger.debug(line);
      }
      result = p.waitFor();

      logger.debug("Command {} finished with result {}", command.get(0), result);

      if (result == 0) {
        // Read the command output
        if (outFile != null) {
          if (outFile.isFile()) {
            URI newURI = workspace.putInCollection(ExecuteService.COLLECTION, outFile.getName(), new FileInputStream(outFile));
            if (outFile.delete()) {
              logger.debug("Deleted the local copy of the encoded file at {}", outFile.getAbsolutePath());
            } else {
              logger.warn("Unable to delete the encoding output at {}", outFile.getAbsolutePath());
            }
            return MediaPackageElementParser.getAsXml(MediaPackageElementBuilderFactory.newInstance()
                    .newElementBuilder().elementFromURI(newURI, expectedType, null));
          } else {
            throw new ExecuteException("Expected output file does not exist: " + outFile.getAbsolutePath());
          }
        }
        return "";
      } else {
        throw new ExecuteException(String.format("Process %s returned error code %d", command.get(0), result));
      }
    } catch (InterruptedException e) {
      throw new ExecuteException("The executor thread has been unexpectedly interrupted", e);
    } catch (IOException e) {
      // Only log the first argument, the executable, as other arguments may contain sensitive values
      // e.g. MySQL password/user, paths, etc. that should not be shown to caller
      logger.error("Could not start subprocess {}", command.get(0));
      throw new ExecuteException("Could not start subprocess: " + command.get(0), e);
    } catch (UnsupportedElementException e) {
      throw new ExecuteException("Couldn't create a new MediaPackage element of type " + expectedType.toString(), e);
    } catch (ConfigurationException e) {
      throw new ExecuteException("Couldn't instantiate a new MediaPackage element builder", e);
    } catch (MediaPackageException e) {
      throw new ExecuteException("Couldn't serialize a new Mediapackage element of type " + expectedType.toString(), e);
    } finally {
      IoSupport.closeQuietly(p);
    }
  }

  /**
   * Returns a list of strings broken on whitespace characters except where those whitespace characters are escaped or
   * quoted.
   *
   * @return list of individual arguments
   */
  private List<String> splitParameters(String input) {

    // This delimiter matches any non-escaped quote
    final String quoteDelim = "(?<!\\\\)\"";

    // This delimiter matches any number of non-escaped spaces
    final String spaceDelim = "((?<!\\\\)\\s)+";

    ArrayList<String> parsedInput = new ArrayList<String>();
    boolean quoted = false;

    for (String token1 : input.split(quoteDelim))
      if (quoted) {
        parsedInput.add(token1);
        quoted = false;
      } else {
        for (String token2 : token1.split(spaceDelim))
          // This ignores empty tokens if quotes are at the beginning or the end of the string
          if (!token2.isEmpty())
            parsedInput.add(token2);
        quoted = true;
      }

    return parsedInput;
  }

  /**
   * Sets the receipt service
   *
   * @param serviceRegistry
   *          the service registry
   */
  @Reference
  public void setServiceRegistry(ServiceRegistry serviceRegistry) {
    this.serviceRegistry = serviceRegistry;
  }

  /**
   * {@inheritDoc}
   *
   * @see org.opencastproject.job.api.AbstractJobProducer#getServiceRegistry()
   */
  @Override
  protected ServiceRegistry getServiceRegistry() {
    return serviceRegistry;
  }

  /**
   * {@inheritDoc}
   *
   * @see org.opencastproject.job.api.AbstractJobProducer#getSecurityService()
   */
  @Override
  protected SecurityService getSecurityService() {
    return securityService;
  }

  /**
   * Callback for setting the security service.
   *
   * @param securityService
   *          the securityService to set
   */
  @Reference
  public void setSecurityService(SecurityService securityService) {
    this.securityService = securityService;
  }

  /**
   * Callback for setting the user directory service.
   *
   * @param userDirectoryService
   *          the userDirectoryService to set
   */
  @Reference
  public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
    this.userDirectoryService = userDirectoryService;
  }

  /**
   * {@inheritDoc}
   *
   * @see org.opencastproject.job.api.AbstractJobProducer#getUserDirectoryService()
   */
  @Override
  protected UserDirectoryService getUserDirectoryService() {
    return userDirectoryService;
  }

  /**
   * {@inheritDoc}
   *
   * @see org.opencastproject.job.api.AbstractJobProducer#getOrganizationDirectoryService()
   */
  @Override
  protected OrganizationDirectoryService getOrganizationDirectoryService() {
    return organizationDirectoryService;
  }

  /**
   * Sets a reference to the organization directory service.
   *
   * @param organizationDirectory
   *          the organization directory
   */
  @Reference
  public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectory) {
    this.organizationDirectoryService = organizationDirectory;
  }

  /**
   * Sets a reference to the workspace service.
   *
   * @param workspace
   */
  @Reference
  public void setWorkspace(Workspace workspace) {
    this.workspace = workspace;
  }

  @Override
  public void updated(@SuppressWarnings("rawtypes") Dictionary properties)
          throws org.osgi.service.cm.ConfigurationException {
    executeJobLoad = LoadUtil.getConfiguredLoadValue(properties, EXECUTE_JOB_LOAD_KEY, DEFAULT_EXECUTE_JOB_LOAD,
            serviceRegistry);
  }

}