IngestDownloadServiceImpl.java

/*
 * Licensed to The Apereo Foundation under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional
 * information regarding copyright ownership.
 *
 *
 * The Apereo Foundation licenses this file to you under the Educational
 * Community License, Version 2.0 (the "License"); you may not use this file
 * except in compliance with the License. You may obtain a copy of the License
 * at:
 *
 *   http://opensource.org/licenses/ecl2.txt
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 * License for the specific language governing permissions and limitations under
 * the License.
 *
 */

package org.opencastproject.ingestdownloadservice.impl;

import org.opencastproject.ingestdownloadservice.api.IngestDownloadService;
import org.opencastproject.job.api.AbstractJobProducer;
import org.opencastproject.job.api.Job;
import org.opencastproject.mediapackage.MediaPackage;
import org.opencastproject.mediapackage.MediaPackageElement;
import org.opencastproject.mediapackage.MediaPackageException;
import org.opencastproject.mediapackage.MediaPackageParser;
import org.opencastproject.mediapackage.selector.AbstractMediaPackageElementSelector;
import org.opencastproject.mediapackage.selector.SimpleElementSelector;
import org.opencastproject.security.api.OrganizationDirectoryService;
import org.opencastproject.security.api.SecurityService;
import org.opencastproject.security.api.TrustedHttpClient;
import org.opencastproject.security.api.TrustedHttpClientException;
import org.opencastproject.security.api.UserDirectoryService;
import org.opencastproject.serviceregistry.api.ServiceRegistration;
import org.opencastproject.serviceregistry.api.ServiceRegistry;
import org.opencastproject.serviceregistry.api.ServiceRegistryException;
import org.opencastproject.util.NotFoundException;
import org.opencastproject.util.UrlSupport;
import org.opencastproject.workingfilerepository.api.WorkingFileRepository;
import org.opencastproject.workspace.api.Workspace;

import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpDelete;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

/**
 * A simple tutorial class to learn about Opencast Services
 */
@Component(
    immediate = true,
    service = IngestDownloadService.class,
    property = {
        "service.description=Ingest download service",
        "service.pid=org.opencastproject.ingestdownloadservice.impl.IngestDownloadServiceImpl"
    }
)
public class IngestDownloadServiceImpl extends AbstractJobProducer implements IngestDownloadService {

  public enum Operation {
    Download
  }

  /**
   * The module specific logger
   */
  private static final Logger logger = LoggerFactory.getLogger(IngestDownloadServiceImpl.class);

  /**
   * Reference to the receipt service
   */
  private ServiceRegistry serviceRegistry = null;

  /**
   * The security service
   */
  private SecurityService securityService = null;

  /**
   * The user directory service
   */
  private UserDirectoryService userDirectoryService = null;

  /**
   * The organization directory service
   */
  private OrganizationDirectoryService organizationDirectoryService = null;

  /**
   * The workspace service
   */
  private Workspace workspace;

  /**
   * The http client to use when connecting to remote servers
   */
  private TrustedHttpClient client = null;

  /**
   * Creates a new abstract job producer for jobs of the given type.
   *
   */
  public IngestDownloadServiceImpl() {
    super(JOB_TYPE);
  }

  /**
   * Sets the workspace to use.
   *
   * @param workspace the workspace
   */
  @Reference
  public void setWorkspace(Workspace workspace) {
    this.workspace = workspace;
  }

  /**
   * Sets the receipt service
   *
   * @param serviceRegistry the service registry
   */
  @Reference
  public void setServiceRegistry(ServiceRegistry serviceRegistry) {
    this.serviceRegistry = serviceRegistry;
  }

  /**
   * {@inheritDoc}
   *
   * @see org.opencastproject.job.api.AbstractJobProducer#getServiceRegistry()
   */
  @Override
  protected ServiceRegistry getServiceRegistry() {
    return serviceRegistry;
  }

  /**
   * {@inheritDoc}
   *
   * @see org.opencastproject.job.api.AbstractJobProducer#getSecurityService()
   */
  @Override
  protected SecurityService getSecurityService() {
    return securityService;
  }

  /**
   * Callback for setting the security service.
   *
   * @param securityService the securityService to set
   */
  @Reference
  public void setSecurityService(SecurityService securityService) {
    this.securityService = securityService;
  }

  /**
   * Callback for setting the user directory service.
   *
   * @param userDirectoryService the userDirectoryService to set
   */
  @Reference
  public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
    this.userDirectoryService = userDirectoryService;
  }

  /**
   * {@inheritDoc}
   *
   * @see org.opencastproject.job.api.AbstractJobProducer#getUserDirectoryService()
   */
  @Override
  protected UserDirectoryService getUserDirectoryService() {
    return userDirectoryService;
  }

  /**
   * {@inheritDoc}
   *
   * @see org.opencastproject.job.api.AbstractJobProducer#getOrganizationDirectoryService()
   */
  @Override
  protected OrganizationDirectoryService getOrganizationDirectoryService() {
    return organizationDirectoryService;
  }

  /**
   * Sets a reference to the organization directory service.
   *
   * @param organizationDirectory the organization directory
   */
  @Reference
  public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectory) {
    this.organizationDirectoryService = organizationDirectory;
  }

  @Override
  public Job ingestDownload(MediaPackage mediaPackage, String sourceFlavors, String sourceTags, boolean deleteExternal,
          boolean tagsAndFlavor) throws ServiceRegistryException {

    final List<String> paramList = new ArrayList<>(5);
    paramList.add(MediaPackageParser.getAsXml(mediaPackage));
    paramList.add(sourceFlavors);
    paramList.add(sourceTags);
    paramList.add(Boolean.toString(deleteExternal));
    paramList.add(Boolean.toString(tagsAndFlavor));

    return serviceRegistry.createJob(JOB_TYPE, Operation.Download.toString(), paramList);

  }

  @Override
  protected String process(Job job) throws MediaPackageException, IOException {
    final List<String> arguments = new ArrayList<>(job.getArguments());

    final MediaPackage mediaPackage = MediaPackageParser.getFromXml(arguments.get(0));
    final String sourceFlavors = arguments.get(1);
    final String sourceTags = arguments.get(2);
    final boolean deleteExternal = Boolean.parseBoolean(arguments.get(3));
    final boolean tagsAndFlavor = Boolean.parseBoolean(arguments.get(4));

    // building elementSelector with tags and flavors
    AbstractMediaPackageElementSelector<MediaPackageElement> elementSelector = new SimpleElementSelector();
    for (String tag : StringUtils.split(sourceTags, ", ")) {
      elementSelector.addTag(tag);
    }
    for (String flavor : StringUtils.split(sourceFlavors, ", ")) {
      elementSelector.addFlavor(flavor);
    }

    final String baseUrl = workspace.getBaseUri().toString();

    List<URI> externalUris = new ArrayList<>();
    for (MediaPackageElement element : elementSelector.select(mediaPackage, tagsAndFlavor)) {
      if (element.getURI() == null) {
        continue;
      }

      if (element.getElementType() == MediaPackageElement.Type.Publication) {
        logger.debug("Skipping publication {} from media package {}", element.getIdentifier(),
                     mediaPackage.getIdentifier());
        continue;
      }

      if (element.getURI().toString().startsWith(baseUrl)) {
        logger.info("Skipping already existing element {}", element.getURI());
        continue;
      }

      // Download the external URI
      File file;
      try {
        file = workspace.get(element.getURI());
      } catch (NotFoundException e) {
        logger.warn("Unable to download the external element {}", element.getURI());
        continue;
      }

      // Put to working file repository and rewrite URI on element
      final URI originalUri = element.getURI();
      try (InputStream in = new FileInputStream(file)) {
        final String filename = FilenameUtils.getName(element.getURI().getPath());
        final URI uri = workspace.put(mediaPackage.getIdentifier().toString(), element.getIdentifier(), filename, in);
        element.setURI(uri);
      } finally {
        try {
          workspace.delete(originalUri);
        } catch (Exception e) {
          logger.warn("Unable to delete ingest-downloaded element {}", element.getURI(), e);
        }
      }

      logger.info("Downloaded the external element {}", originalUri);

      // Store original URI for deletion
      externalUris.add(originalUri);
    }

    if (!deleteExternal || externalUris.size() == 0)
      return MediaPackageParser.getAsXml(mediaPackage);

    // Find all external working file repository base Urls
    logger.debug("Assembling list of external working file repositories");
    List<String> externalWfrBaseUrls = new ArrayList<>();
    try {
      final String wfrServiceType = WorkingFileRepository.SERVICE_TYPE;
      for (ServiceRegistration reg : serviceRegistry.getServiceRegistrationsByType(wfrServiceType)) {
        if (baseUrl.startsWith(reg.getHost())) {
          logger.trace("Skipping local working file repository");
          continue;
        }
        externalWfrBaseUrls.add(UrlSupport.concat(reg.getHost(), reg.getPath()));
      }
      logger.debug("{} external working file repositories found", externalWfrBaseUrls.size());
    } catch (ServiceRegistryException e) {
      logger.error("Unable to load WFR services from service registry", e);
    }

    // try deleting files from external working file reposities
    for (URI uri : externalUris) {

      String elementUri = uri.toString();

      // Delete external working file repository URI's
      Optional<String> wfrBaseUrl = externalWfrBaseUrls.parallelStream().filter(elementUri::startsWith).findAny();

      if (!wfrBaseUrl.isPresent()) {
        logger.debug("Unable to delete {}, no working file repository found for this URI", elementUri);
        continue;
      }

      final String deleteUrl;
      if (uri.getPath().startsWith(WorkingFileRepository.MEDIAPACKAGE_PATH_PREFIX)) {
        deleteUrl = elementUri.substring(0, elementUri.lastIndexOf("/"));
      } else if (uri.getPath().startsWith(WorkingFileRepository.COLLECTION_PATH_PREFIX)) {
        deleteUrl = elementUri;
      } else {
        logger.info("Unable to handle working file repository URI {}", elementUri);
        continue;
      }
      HttpDelete delete = new HttpDelete(deleteUrl);

      HttpResponse response = null;
      try {
        response = client.execute(delete);
        final int statusCode = response.getStatusLine().getStatusCode();
        if (statusCode == HttpStatus.SC_NO_CONTENT || statusCode == HttpStatus.SC_OK) {
          logger.info("Successfully deleted external URI {}", delete.getURI());
        } else if (statusCode == HttpStatus.SC_NOT_FOUND) {
          logger.debug("External URI {} has already been deleted", delete.getURI());
        } else {
          logger.warn("Unable to delete external URI {}, status code '{}' returned", delete.getURI(), statusCode);
        }
      } catch (TrustedHttpClientException e) {
        logger.warn("Unable to execute DELETE request on external URI {}", delete.getURI());
      } finally {
        client.close(response);
      }
    }

    return MediaPackageParser.getAsXml(mediaPackage);

  }
}