OaiPmhPublicationServiceImpl.java

/*
 * Licensed to The Apereo Foundation under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional
 * information regarding copyright ownership.
 *
 *
 * The Apereo Foundation licenses this file to you under the Educational
 * Community License, Version 2.0 (the "License"); you may not use this file
 * except in compliance with the License. You may obtain a copy of the License
 * at:
 *
 *   http://opensource.org/licenses/ecl2.txt
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 * License for the specific language governing permissions and limitations under
 * the License.
 *
 */
package org.opencastproject.publication.oaipmh;

import static java.lang.String.format;
import static org.opencastproject.util.JobUtil.waitForJobs;

import org.opencastproject.distribution.api.DistributionException;
import org.opencastproject.distribution.api.DownloadDistributionService;
import org.opencastproject.distribution.api.StreamingDistributionService;
import org.opencastproject.job.api.AbstractJobProducer;
import org.opencastproject.job.api.Job;
import org.opencastproject.mediapackage.MediaPackage;
import org.opencastproject.mediapackage.MediaPackageElement;
import org.opencastproject.mediapackage.MediaPackageElementFlavor;
import org.opencastproject.mediapackage.MediaPackageElementParser;
import org.opencastproject.mediapackage.MediaPackageException;
import org.opencastproject.mediapackage.MediaPackageParser;
import org.opencastproject.mediapackage.MediaPackageReference;
import org.opencastproject.mediapackage.MediaPackageRuntimeException;
import org.opencastproject.mediapackage.MediaPackageSupport;
import org.opencastproject.mediapackage.Publication;
import org.opencastproject.mediapackage.PublicationImpl;
import org.opencastproject.mediapackage.selector.SimpleElementSelector;
import org.opencastproject.oaipmh.persistence.OaiPmhDatabase;
import org.opencastproject.oaipmh.persistence.OaiPmhDatabaseException;
import org.opencastproject.oaipmh.persistence.QueryBuilder;
import org.opencastproject.oaipmh.persistence.SearchResult;
import org.opencastproject.oaipmh.persistence.SearchResultItem;
import org.opencastproject.oaipmh.server.OaiPmhServerInfo;
import org.opencastproject.oaipmh.server.OaiPmhServerInfoUtil;
import org.opencastproject.publication.api.OaiPmhPublicationService;
import org.opencastproject.publication.api.PublicationException;
import org.opencastproject.security.api.OrganizationDirectoryService;
import org.opencastproject.security.api.SecurityService;
import org.opencastproject.security.api.UserDirectoryService;
import org.opencastproject.serviceregistry.api.ServiceRegistry;
import org.opencastproject.serviceregistry.api.ServiceRegistryException;
import org.opencastproject.util.MimeTypes;
import org.opencastproject.util.NotFoundException;
import org.opencastproject.util.UrlSupport;
import org.opencastproject.util.data.Collections;

import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.utils.URIUtils;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * Publishes a recording to an OAI-PMH publication repository.
 */
@Component(
    immediate = true,
    service = OaiPmhPublicationService.class,
    property = {
        "service.description=Publication Service (OAI-PMH)"
    }
)
public class OaiPmhPublicationServiceImpl extends AbstractJobProducer implements OaiPmhPublicationService {

  /** Logging facility */
  private static final Logger logger = LoggerFactory.getLogger(OaiPmhPublicationServiceImpl.class);

  public enum Operation {
    Publish, Retract, UpdateMetadata, Replace
  }

  private DownloadDistributionService downloadDistributionService;
  private OaiPmhServerInfo oaiPmhServerInfo;
  private OaiPmhDatabase oaiPmhDatabase;
  private OrganizationDirectoryService organizationDirectoryService;
  private SecurityService securityService;
  private ServiceRegistry serviceRegistry;
  private StreamingDistributionService streamingDistributionService;
  private UserDirectoryService userDirectoryService;

  public OaiPmhPublicationServiceImpl() {
    super(JOB_TYPE);
  }

  @Override
  protected String process(Job job) throws Exception {
    if (!StringUtils.equalsIgnoreCase(JOB_TYPE, job.getJobType())) {
      throw new IllegalArgumentException("Can not handle job type " + job.getJobType());
    }

    Publication publication = null;
    MediaPackage mediaPackage = MediaPackageParser.getFromXml(job.getArguments().get(0));
    String repository = job.getArguments().get(1);
    boolean checkAvailability = false;
    switch (Operation.valueOf(job.getOperation())) {
      case Publish:
        String[] downloadElementIds = StringUtils.split(job.getArguments().get(2), SEPARATOR);
        String[] streamingElementIds = StringUtils.split(job.getArguments().get(3), SEPARATOR);
        checkAvailability = BooleanUtils.toBoolean(job.getArguments().get(4));
        publication = publish(job, mediaPackage, repository,
                Collections.set(downloadElementIds), Collections.set(streamingElementIds), checkAvailability);
        break;
      case Replace:
        final Set<? extends MediaPackageElement> downloadElements =
            Collections.toSet(MediaPackageElementParser.getArrayFromXml(job.getArguments().get(2)));
        final Set<? extends MediaPackageElement> streamingElements =
            Collections.toSet(MediaPackageElementParser.getArrayFromXml(job.getArguments().get(3)));
        final Set<MediaPackageElementFlavor> retractDownloadFlavors = Arrays.stream(
            StringUtils.split(job.getArguments().get(4), SEPARATOR))
            .filter(s -> !s.isEmpty())
            .map(MediaPackageElementFlavor::parseFlavor)
            .collect(Collectors.toSet());
        final Set<MediaPackageElementFlavor> retractStreamingFlavors = Arrays.stream(
            StringUtils.split(job.getArguments().get(5), SEPARATOR))
            .filter(s -> !s.isEmpty())
            .map(MediaPackageElementFlavor::parseFlavor)
            .collect(Collectors.toSet());
        final Set<? extends MediaPackageElement> publications =
            Collections.toSet(MediaPackageElementParser.getArrayFromXml(job.getArguments().get(6)));
        checkAvailability = BooleanUtils.toBoolean(job.getArguments().get(7));
        publication = replace(job, mediaPackage, repository, downloadElements, streamingElements,
            retractDownloadFlavors, retractStreamingFlavors, publications, checkAvailability);
        break;
      case Retract:
        publication = retract(job, mediaPackage, repository);
        break;
      case UpdateMetadata:
        checkAvailability = BooleanUtils.toBoolean(job.getArguments().get(4));
        String[] flavors = StringUtils.split(job.getArguments().get(2), SEPARATOR);
        String[] tags = StringUtils.split(job.getArguments().get(3), SEPARATOR);
        publication = updateMetadata(job, mediaPackage, repository,
                Collections.set(flavors), Collections.set(tags), checkAvailability);
        break;
      default:
        throw new IllegalArgumentException("Can not handle this type of operation: " + job.getOperation());
    }
    return publication != null ? MediaPackageElementParser.getAsXml(publication) : null;
  }

  @Override
  public Job replace(
      MediaPackage mediaPackage,
      String repository,
      Set<? extends MediaPackageElement> downloadElements,
      Set<? extends MediaPackageElement> streamingElements,
      Set<MediaPackageElementFlavor> retractDownloadFlavors,
      Set<MediaPackageElementFlavor> retractStreamingFlavors,
      Set<? extends Publication> publications,
      boolean checkAvailability
  ) throws PublicationException {
    checkInputArguments(mediaPackage, repository);
    try {
      return serviceRegistry.createJob(JOB_TYPE, Operation.Replace.name(),
          Arrays.asList(MediaPackageParser.getAsXml(mediaPackage), // 0
              repository, // 1
              MediaPackageElementParser.getArrayAsXml(Collections.toList(downloadElements)), // 2
              MediaPackageElementParser.getArrayAsXml(Collections.toList(streamingElements)), // 3
              StringUtils.join(retractDownloadFlavors, SEPARATOR), // 4
              StringUtils.join(retractStreamingFlavors, SEPARATOR), // 5
              MediaPackageElementParser.getArrayAsXml(Collections.toList(publications)), // 6
              Boolean.toString(checkAvailability))); // 7
    } catch (ServiceRegistryException e) {
      throw new PublicationException("Unable to create job", e);
    } catch (MediaPackageException e) {
      throw new PublicationException("Unable to serialize media package elements", e);
    }
  }

  @Override
  public Publication replaceSync(
      MediaPackage mediaPackage, String repository, Set<? extends MediaPackageElement> downloadElements,
      Set<? extends MediaPackageElement> streamingElements, Set<MediaPackageElementFlavor> retractDownloadFlavors,
      Set<MediaPackageElementFlavor> retractStreamingFlavors, Set<? extends Publication> publications,
      boolean checkAvailability) throws PublicationException, MediaPackageException {
    return replace(null, mediaPackage, repository, downloadElements, streamingElements, retractDownloadFlavors,
        retractStreamingFlavors, publications, checkAvailability);
  }

  @Override
  public Job publish(MediaPackage mediaPackage, String repository, Set<String> downloadElementIds,
          Set<String> streamingElementIds, boolean checkAvailability)
          throws PublicationException, MediaPackageException {
    checkInputArguments(mediaPackage, repository);

    try {
      return serviceRegistry.createJob(JOB_TYPE, Operation.Publish.toString(),
              Arrays.asList(MediaPackageParser.getAsXml(mediaPackage), // 0
                      repository, // 1
                      StringUtils.join(downloadElementIds, SEPARATOR), // 2
                      StringUtils.join(streamingElementIds, SEPARATOR), // 3
                      Boolean.toString(checkAvailability))); // 4
    } catch (ServiceRegistryException e) {
      throw new PublicationException("Unable to create job", e);
    }
  }

  @Override
  public Job retract(MediaPackage mediaPackage, String repository) throws PublicationException, NotFoundException {
    checkInputArguments(mediaPackage, repository);

    try {
      return serviceRegistry.createJob(JOB_TYPE, Operation.Retract.toString(),
              Arrays.asList(MediaPackageParser.getAsXml(mediaPackage), repository));
    } catch (ServiceRegistryException e) {
      throw new PublicationException("Unable to create job", e);
    }
  }

  @Override
  public Job updateMetadata(MediaPackage mediaPackage, String repository, Set<String> flavors, Set<String> tags,
          boolean checkAvailability) throws PublicationException, MediaPackageException {
    checkInputArguments(mediaPackage, repository);
    if ((flavors == null || flavors.isEmpty()) && (tags == null || tags.isEmpty())) {
      throw new IllegalArgumentException("Flavors or tags must be set");
    }

    try {
      return serviceRegistry.createJob(JOB_TYPE, Operation.UpdateMetadata.toString(),
              Arrays.asList(MediaPackageParser.getAsXml(mediaPackage), // 0
                      repository, // 1
                      StringUtils.join(flavors, SEPARATOR), // 2
                      StringUtils.join(tags, SEPARATOR), // 3
                      Boolean.toString(checkAvailability))); // 4
    } catch (ServiceRegistryException e) {
      throw new PublicationException("Unable to create job", e);
    }
  }

  protected Publication publish(Job job, MediaPackage mediaPackage, String repository, Set<String> downloadElementIds,
          Set<String> streamingElementIds, boolean checkAvailability)
          throws PublicationException, MediaPackageException {
    String mpId = mediaPackage.getIdentifier().toString();
    SearchResult searchResult = oaiPmhDatabase.search(QueryBuilder.queryRepo(repository).mediaPackageId(mpId)
            .isDeleted(false).build());
    // retract oai-pmh if published
    if (searchResult.size() > 0) {
      try {
        Publication p = retract(job, mediaPackage, repository);
        if (p != null && mediaPackage.contains(p)) {
          mediaPackage.remove(p);
        }
      } catch (NotFoundException e) {
        logger.debug("No OAI-PMH publication found for media package {}.", mpId, e);
        // this is ok
      }
    }
    List<Job> distributionJobs = new ArrayList<>(2);
    if (downloadElementIds != null && !downloadElementIds.isEmpty()) {
      // select elements for download distribution
      MediaPackage mpDownloadDist = (MediaPackage) mediaPackage.clone();
      for (MediaPackageElement mpe : mpDownloadDist.getElements()) {
        if (downloadElementIds.contains(mpe.getIdentifier())) {
          continue;
        }
        mpDownloadDist.remove(mpe);
      }
      // publish to download
      if (mpDownloadDist.getElements().length > 0) {
        try {
          Job downloadDistributionJob = downloadDistributionService
                  .distribute(getPublicationChannelName(repository), mpDownloadDist, downloadElementIds,
                          checkAvailability);
          if (downloadDistributionJob != null) {
            distributionJobs.add(downloadDistributionJob);
          }
        } catch (DistributionException e) {
          throw new PublicationException(
              format("Unable to distribute media package %s to download distribution.", mpId),
              e);
        }
      }
    }
    if (streamingElementIds != null && !streamingElementIds.isEmpty()) {
      // select elements for streaming distribution
      MediaPackage mpStreamingDist = (MediaPackage) mediaPackage.clone();
      for (MediaPackageElement mpe : mpStreamingDist.getElements()) {
        if (streamingElementIds.contains(mpe.getIdentifier())) {
          continue;
        }
        mpStreamingDist.remove(mpe);
      }
      // publish to streaming
      if (mpStreamingDist.getElements().length > 0) {
        try {
          Job streamingDistributionJob = streamingDistributionService
                  .distribute(getPublicationChannelName(repository), mpStreamingDist, streamingElementIds);
          if (streamingDistributionJob != null) {
            distributionJobs.add(streamingDistributionJob);
          }
        } catch (DistributionException e) {
          throw new PublicationException(
              format("Unable to distribute media package %s to streaming distribution.", mpId),
              e);
        }
      }
    }
    if (distributionJobs.isEmpty()) {
      throw new IllegalStateException(format(
              "The media package %s does not contain any elements for publishing to OAI-PMH", mpId));
    }
    // wait for distribution jobs
    if (!waitForJobs(job, serviceRegistry, distributionJobs).isSuccess()) {
      throw new PublicationException(format(
              "Unable to distribute elements of media package %s to distribution channels.", mpId));
    }

    List<MediaPackageElement> distributedElements = new ArrayList<>();
    for (Job distributionJob : distributionJobs) {
      String distributedElementsXml = distributionJob.getPayload();
      if (StringUtils.isNotBlank(distributedElementsXml)) {
        for (MediaPackageElement distributedElement
            : MediaPackageElementParser.getArrayFromXml(distributedElementsXml)) {
          distributedElements.add(distributedElement);
        }
      }
    }
    MediaPackage oaiPmhDistMp = (MediaPackage) mediaPackage.clone();
    // cleanup media package elements
    for (MediaPackageElement mpe : oaiPmhDistMp.getElements()) {
      // keep publications
      if (MediaPackageElement.Type.Publication == mpe.getElementType()) {
        continue;
      }
      oaiPmhDistMp.remove(mpe);
    }
    // ...add the distributed elements
    for (MediaPackageElement mpe : distributedElements) {
      oaiPmhDistMp.add(mpe);
    }

    // publish to oai-pmh
    try {
      oaiPmhDatabase.store(oaiPmhDistMp, repository);
    } catch (OaiPmhDatabaseException e) {
      // todo: should we retract the elements from download and streaming here?
      throw new PublicationException(
          format("Unable to distribute media package %s to OAI-PMH repository %s", mpId, repository),
          e);
    }
    return createPublicationElement(mpId, repository);
  }

  private Publication replace(
      Job job,
      MediaPackage mediaPackage,
      String repository,
      Set<? extends MediaPackageElement> downloadElements,
      Set<? extends MediaPackageElement> streamingElements,
      Set<MediaPackageElementFlavor> retractDownloadFlavors,
      Set<MediaPackageElementFlavor> retractStreamingFlavors,
      Set<? extends MediaPackageElement> publications,
      boolean checkAvailable
  ) throws MediaPackageException, PublicationException {
    final String mpId = mediaPackage.getIdentifier().toString();
    final String channel = getPublicationChannelName(repository);

    try {
      final SearchResult search = oaiPmhDatabase.search(QueryBuilder.queryRepo(repository).mediaPackageId(mpId)
          .isDeleted(false).build());
      if (search.size() > 1) {
        throw new PublicationException("Found multiple OAI-PMH records for id " + mpId);
      }
      final Optional<MediaPackage> existingMp = search.getItems().stream().findFirst().map(
              SearchResultItem::getMediaPackage);

      // Collect Ids of elements to distribute
      final Set<String> addDownloadElementIds = downloadElements.stream()
          .map(MediaPackageElement::getIdentifier)
          .collect(Collectors.toSet());
      final Set<String> addStreamingElementIds = streamingElements.stream()
          .map(MediaPackageElement::getIdentifier)
          .collect(Collectors.toSet());

      // Use retractFlavors to search for existing elements to retract
      final Set<MediaPackageElement> removeDownloadElements = existingMp.map(mp ->
          Arrays.stream(mp.getElements())
          .filter(e -> retractDownloadFlavors.stream().anyMatch(f -> f.matches(e.getFlavor())))
          .collect(Collectors.toSet())
      ).orElse(java.util.Collections.emptySet());
      final Set<MediaPackageElement> removeStreamingElements = existingMp.map(mp ->
          Arrays.stream(mp.getElements())
              .filter(e -> retractStreamingFlavors.stream().anyMatch(f -> f.matches(e.getFlavor())))
              .collect(Collectors.toSet())
      ).orElse(java.util.Collections.emptySet());

      // Element IDs to retract. Elements identified by flavor and elements to re-distribute
      final Set<String> removeDownloadElementIds = Stream
          .concat(removeDownloadElements.stream(), downloadElements.stream())
          .map(MediaPackageElement::getIdentifier)
          .collect(Collectors.toSet());
      final Set<String> removeStreamingElementIds = Stream
          .concat(removeStreamingElements.stream(), streamingElements.stream())
          .map(MediaPackageElement::getIdentifier)
          .collect(Collectors.toSet());

      if (removeDownloadElementIds.isEmpty() && removeStreamingElementIds.isEmpty()
          && addDownloadElementIds.isEmpty() && addStreamingElementIds.isEmpty()) {
        // Nothing to do
        return Arrays.stream(mediaPackage.getPublications())
            .filter(p -> channel.equals(p.getChannel()))
            .findFirst()
            .orElse(null);
      }

      final MediaPackage temporaryMediaPackage = (MediaPackage) mediaPackage.clone();
      downloadElements.forEach(temporaryMediaPackage::add);
      streamingElements.forEach(temporaryMediaPackage::add);
      removeDownloadElements.forEach(temporaryMediaPackage::add);
      removeStreamingElements.forEach(temporaryMediaPackage::add);

      final List<MediaPackageElement> retractedElements = new ArrayList<>();
      final List<MediaPackageElement> distributedElements = new ArrayList<>();
      if (job != null) {
        retractedElements
            .addAll(retract(job, channel, temporaryMediaPackage, removeDownloadElementIds, removeStreamingElementIds));
        distributedElements
            .addAll(distribute(job, channel, temporaryMediaPackage, addDownloadElementIds, addStreamingElementIds,
                checkAvailable));
      } else {
        retractedElements
            .addAll(retractSync(channel, temporaryMediaPackage, removeDownloadElementIds, removeStreamingElementIds));
        distributedElements
            .addAll(distributeSync(channel, temporaryMediaPackage, addDownloadElementIds, addStreamingElementIds,
                checkAvailable));
      }

      final MediaPackage oaiPmhDistMp = (MediaPackage) existingMp.orElse(mediaPackage).clone();

      // Remove OAI-PMH publication
      Arrays.stream(oaiPmhDistMp.getPublications())
          .filter(p -> channel.equals(p.getChannel()))
          .forEach(oaiPmhDistMp::remove);

      // Remove retracted elements
      retractedElements.stream()
          .map(MediaPackageElement::getIdentifier)
          .forEach(oaiPmhDistMp::removeElementById);
      // Add new distributed elements
      distributedElements.forEach(oaiPmhDistMp::add);

      // Remove old publications
      publications.stream()
          .map(p -> ((Publication) p).getChannel())
          .forEach(c -> Arrays.stream(oaiPmhDistMp.getPublications())
              .filter(p -> c.equals(p.getChannel()))
              .forEach(oaiPmhDistMp::remove));

      // Add updated publications
      publications.forEach(oaiPmhDistMp::add);

      // publish to oai-pmh
      oaiPmhDatabase.store(oaiPmhDistMp, repository);

      return Arrays.stream(mediaPackage.getPublications())
          .filter(p -> channel.equals(p.getChannel()))
          .findFirst()
          .orElse(createPublicationElement(mpId, repository));
    } catch (OaiPmhDatabaseException e) {
      throw new PublicationException(format("Unable to update media package %s in OAI-PMH repository %s", mpId,
          repository), e);
    } catch (DistributionException e) {
      throw new PublicationException(format("Unable to update OAI-PMH distributions of media package %s.", mpId), e);
    } catch (MediaPackageRuntimeException e) {
      throw e.getWrappedException();
    }
  }

  private List<MediaPackageElement> retract(
          Job job, String channel, MediaPackage mp, Set<String> removeDownloadElementIds,
          Set<String> removeStreamingElementIds) throws PublicationException, DistributionException {
    final List<Job> retractJobs = new ArrayList<>(2);
    if (!removeDownloadElementIds.isEmpty()) {
      retractJobs.add(downloadDistributionService.retract(channel, mp, removeDownloadElementIds));
    }
    if (!removeStreamingElementIds.isEmpty()) {
      retractJobs.add(streamingDistributionService.retract(channel, mp, removeStreamingElementIds));
    }

    // wait for retract jobs
    if (!waitForJobs(job, serviceRegistry, retractJobs).isSuccess()) {
      throw new PublicationException(format("Unable to retract OAI-PMH distributions of media package %s",
          mp.getIdentifier().toString()));
    }

    return retractJobs.stream()
        .filter(j -> StringUtils.isNotBlank(j.getPayload()))
        .map(Job::getPayload)
        .flatMap(p -> MediaPackageElementParser.getArrayFromXmlUnchecked(p).stream())
        .collect(Collectors.toList());
  }

  private List<MediaPackageElement> retractSync(String channel, MediaPackage mp, Set<String> removeDownloadElementIds,
      Set<String> removeStreamingElementIds) throws DistributionException {
    final List<MediaPackageElement> retracted = new ArrayList<>();
    if (!removeDownloadElementIds.isEmpty()) {
      retracted.addAll(downloadDistributionService.retractSync(channel, mp, removeDownloadElementIds));
    }
    if (!removeStreamingElementIds.isEmpty()) {
      retracted.addAll(streamingDistributionService.retractSync(channel, mp, removeStreamingElementIds));
    }
    return retracted;
  }

  private List<MediaPackageElement> distribute(
      Job job,
      String channel,
      MediaPackage mp,
      Set<String> addDownloadElementIds,
      Set<String> addStreamingElementIds,
      boolean checkAvailable
  ) throws PublicationException, MediaPackageException, DistributionException {
    final List<Job> distributeJobs = new ArrayList<>(2);
    if (!addDownloadElementIds.isEmpty()) {
      distributeJobs.add(downloadDistributionService.distribute(channel, mp, addDownloadElementIds,
          checkAvailable));
    }
    if (!addStreamingElementIds.isEmpty()) {
      distributeJobs.add(streamingDistributionService.distribute(channel, mp, addStreamingElementIds));
    }

    // wait for distribute jobs
    if (!waitForJobs(job, serviceRegistry, distributeJobs).isSuccess()) {
      throw new PublicationException(format("Unable to distribute OAI-PMH distributions of media package %s",
          mp.getIdentifier().toString()));
    }

    return distributeJobs.stream()
        .filter(j -> StringUtils.isNotBlank(j.getPayload()))
        .map(Job::getPayload)
        .flatMap(p -> MediaPackageElementParser.getArrayFromXmlUnchecked(p).stream())
        .collect(Collectors.toList());
  }

  private List<MediaPackageElement> distributeSync(
      String channel, MediaPackage mp, Set<String> addDownloadElementIds, Set<String> addStreamingElementIds,
      boolean checkAvailable) throws DistributionException {
    final List<MediaPackageElement> distributed = new ArrayList<>();
    if (!addDownloadElementIds.isEmpty()) {
      distributed.addAll(downloadDistributionService.distributeSync(channel, mp, addDownloadElementIds,
          checkAvailable));
    }
    if (!addStreamingElementIds.isEmpty()) {
      distributed.addAll(streamingDistributionService.distributeSync(channel, mp, addStreamingElementIds));
    }
    return distributed;
  }


  protected Publication retract(Job job, MediaPackage mediaPackage, String repository)
          throws PublicationException, NotFoundException {
    String mpId = mediaPackage.getIdentifier().toString();

    // track elements for retraction
    MediaPackage oaiPmhMp = null;
    SearchResult searchResult = oaiPmhDatabase.search(QueryBuilder.queryRepo(repository).mediaPackageId(mpId)
            .isDeleted(false).build());
    for (SearchResultItem searchResultItem : searchResult.getItems()) {
      if (oaiPmhMp == null) {
        oaiPmhMp = searchResultItem.getMediaPackage();
      } else {
        for (MediaPackageElement mpe : searchResultItem.getMediaPackage().getElements()) {
          oaiPmhMp.add(mpe);
        }
      }
    }

    // retract oai-pmh
    try {
      oaiPmhDatabase.delete(mpId, repository);
    } catch (OaiPmhDatabaseException e) {
      throw new PublicationException(format("Unable to retract media package %s from OAI-PMH repository %s",
              mpId, repository), e);
    } catch (NotFoundException e) {
      logger.debug("Skip retracting media package {} from OIA-PMH repository {} as it isn't published.",
              mpId, repository, e);
    }

    if (oaiPmhMp != null && oaiPmhMp.getElements().length > 0) {
      // retract files from distribution channels
      Set<String> mpeIds = new HashSet<>();
      for (MediaPackageElement mpe : oaiPmhMp.elements()) {
        if (MediaPackageElement.Type.Publication == mpe.getElementType()) {
          continue;
        }

        mpeIds.add(mpe.getIdentifier());
      }
      if (!mpeIds.isEmpty()) {
        List<Job> retractionJobs = new ArrayList<>();
        // retract download
        try {
          Job retractDownloadJob = downloadDistributionService
                  .retract(getPublicationChannelName(repository), oaiPmhMp, mpeIds);
          if (retractDownloadJob != null) {
            retractionJobs.add(retractDownloadJob);
          }
        } catch (DistributionException e) {
          throw new PublicationException(
              format(
                  "Unable to create retraction job from distribution channel download for the media package %s ",
                  mpId),
              e);
        }

        // retract streaming
        try {
          Job retractDownloadJob = streamingDistributionService
                  .retract(getPublicationChannelName(repository), oaiPmhMp, mpeIds);
          if (retractDownloadJob != null) {
            retractionJobs.add(retractDownloadJob);
          }
        } catch (DistributionException e) {
          throw new PublicationException(
              format("Unable to create retraction job from distribution channel streaming for the media package %s ",
                  mpId),
              e);
        }
        if (retractionJobs.size() > 0) {
          // wait for distribution jobs
          if (!waitForJobs(job, serviceRegistry, retractionJobs).isSuccess()) {
            throw new PublicationException(
                    format("Unable to retract elements of media package %s from distribution channels.", mpId));
          }
        }
      }
    }

    String publicationChannel = getPublicationChannelName(repository);
    for (Publication p : mediaPackage.getPublications()) {
      if (StringUtils.equals(publicationChannel, p.getChannel())) {
        return p;
      }
    }
    return null;
  }

  protected Publication updateMetadata(
      Job job,
      MediaPackage mediaPackage,
      String repository,
      Set<String> flavors,
      Set<String> tags,
      boolean checkAvailability
  ) throws PublicationException {
    final Set<MediaPackageElementFlavor> parsedFlavors = new HashSet<>();
    for (String flavor : flavors) {
      parsedFlavors.add(MediaPackageElementFlavor.parseFlavor(flavor));
    }

    final MediaPackage filteredMp;
    final SearchResult result = oaiPmhDatabase.search(
        QueryBuilder.queryRepo(repository)
            .mediaPackageId(mediaPackage)
            .isDeleted(false)
            .build()
    );
    if (result.size() == 1) {
      // apply tags and flavors to the current media package
      try {
        logger.debug("filter elements with flavors {} and tags {} on media package {}",
                StringUtils.join(flavors, ", "), StringUtils.join(tags, ", "),
                MediaPackageParser.getAsXml(mediaPackage));

        filteredMp = filterMediaPackage(mediaPackage, parsedFlavors, tags);
      } catch (MediaPackageException e) {
        throw new PublicationException("Error filtering media package", e);
      }
    } else if (result.size() == 0) {
      logger.info("Skipping update of media package {} since it is not currently published to {}",
              mediaPackage, repository);
      return null;
    } else {
      final String msg = format(
          "More than one media package with id %s found",
          mediaPackage.getIdentifier().toString()
      );
      logger.warn(msg);
      throw new PublicationException(msg);
    }
    // re-distribute elements to download
    Set<String> elementIdsToDistribute = new HashSet<>();
    for (MediaPackageElement mpe : filteredMp.getElements()) {
      // do not distribute publications
      if (MediaPackageElement.Type.Publication == mpe.getElementType()) {
        continue;
      }
      elementIdsToDistribute.add(mpe.getIdentifier());
    }
    if (elementIdsToDistribute.isEmpty()) {
      logger.debug("The media package {} does not contain any elements to update. "
                      + "Skip OAI-PMH metadata update operation for repository {}",
              mediaPackage.getIdentifier().toString(), repository);
      return null;
    }
    logger.debug("distribute elements {}", StringUtils.join(elementIdsToDistribute, ", "));
    final List<MediaPackageElement> distributedElements = new ArrayList<>();
    try {
      Job distJob = downloadDistributionService
          .distribute(getPublicationChannelName(repository), filteredMp, elementIdsToDistribute, checkAvailability);
      if (job == null) {
        throw new PublicationException("The distribution service can not handle this type of media package elements.");
      }
      if (!waitForJobs(job, serviceRegistry, distJob).isSuccess()) {
        throw new PublicationException(format(
            "Unable to distribute updated elements from media package %s to the download distribution service",
            mediaPackage.getIdentifier().toString()));
      }
      if (distJob.getPayload() != null) {
        for (MediaPackageElement mpe : MediaPackageElementParser.getArrayFromXml(distJob.getPayload())) {
          distributedElements.add(mpe);
        }
      }
    } catch (DistributionException | MediaPackageException e) {
      throw new PublicationException(format(
          "Unable to distribute updated elements from media package %s to the download distribution service",
          mediaPackage.getIdentifier().toString()), e);
    }

    // update elements (URLs)
    for (MediaPackageElement e : filteredMp.getElements()) {
      if (MediaPackageElement.Type.Publication.equals(e.getElementType())) {
        continue;
      }
      filteredMp.remove(e);
    }
    for (MediaPackageElement e : distributedElements) {
      filteredMp.add(e);
    }
    MediaPackage publishedMp = merge(filteredMp, removeMatchingNonExistantElements(filteredMp,
            (MediaPackage) result.getItems().get(0).getMediaPackage().clone(), parsedFlavors, tags));
    // Publish the media package to OAI-PMH
    try {
      logger.debug("Updating metadata of media package {} in {}",
              publishedMp.getIdentifier().toString(), repository);
      oaiPmhDatabase.store(publishedMp, repository);
    } catch (OaiPmhDatabaseException e) {
      throw new PublicationException(format("Media package %s could not be updated",
              publishedMp.getIdentifier().toString()));
    }
    // retract orphaned elements from download distribution
    // orphaned elements are all those elements to which the updated media package no longer refers
    // (in terms of element uri)
    Map<URI, MediaPackageElement> elementUriMap = new Hashtable<>();
    for (SearchResultItem oaiPmhSearchResultItem : result.getItems()) {
      for (MediaPackageElement mpe : oaiPmhSearchResultItem.getMediaPackage().getElements()) {
        if (MediaPackageElement.Type.Publication == mpe.getElementType() || null == mpe.getURI()) {
          continue;
        }
        elementUriMap.put(mpe.getURI(), mpe);
      }
    }
    for (MediaPackageElement publishedMpe : publishedMp.getElements()) {
      if (MediaPackageElement.Type.Publication == publishedMpe.getElementType()) {
        continue;
      }
      if (elementUriMap.containsKey(publishedMpe.getURI())) {
        elementUriMap.remove(publishedMpe.getURI());
      }
    }
    Set<String> orphanedElementIds = new HashSet<>();
    for (MediaPackageElement orphanedMpe : elementUriMap.values()) {
      orphanedElementIds.add(orphanedMpe.getIdentifier());
    }
    if (!orphanedElementIds.isEmpty()) {
      for (SearchResultItem oaiPmhSearchResultItem : result.getItems()) {
        try {
          Job retractJob = downloadDistributionService.retract(getPublicationChannelName(repository),
                  oaiPmhSearchResultItem.getMediaPackage(), orphanedElementIds);
          if (retractJob != null) {
            if (!waitForJobs(job, serviceRegistry, retractJob).isSuccess()) {
              logger.warn(
                  "The download distribution retract job for the orphaned elements from media package {} "
                      + "does not end successfully",
                  oaiPmhSearchResultItem.getMediaPackage().getIdentifier().toString());
            }
          }
        } catch (DistributionException e) {
          logger.warn(
              "Unable to retract orphaned elements from download distribution service for the "
                  + "media package {} channel {}",
              oaiPmhSearchResultItem.getMediaPackage().getIdentifier().toString(),
              getPublicationChannelName(repository),
              e);
        }
      }
    }

    // return the publication
    String publicationChannel = getPublicationChannelName(repository);
    for (Publication p : mediaPackage.getPublications()) {
      if (StringUtils.equals(publicationChannel, p.getChannel())) {
        return p;
      }
    }
    return null;
  }

  protected void checkInputArguments(MediaPackage mediaPackage, String repository) {
    if (mediaPackage == null) {
      throw new IllegalArgumentException("Media package must be specified");
    }
    if (StringUtils.isEmpty(repository)) {
      throw new IllegalArgumentException("Repository must be specified");
    }
    if (!oaiPmhServerInfo.hasRepo(repository)) {
      throw new IllegalArgumentException("OAI-PMH repository '" + repository + "' does not exist");
    }
  }

  protected String getPublicationChannelName(String repository) {
    return PUBLICATION_CHANNEL_PREFIX.concat(repository);
  }

  /** Create a new publication element. */
  protected Publication createPublicationElement(String mpId, String repository) throws PublicationException {
    for (String hostUrl : OaiPmhServerInfoUtil.oaiPmhServerUrlOfCurrentOrganization(securityService)) {
      final URI engageUri = URIUtils.resolve(
          URI.create(UrlSupport.concat(hostUrl, oaiPmhServerInfo.getMountPoint(), repository)),
          "?verb=ListMetadataFormats&identifier=" + mpId);
      return PublicationImpl.publication(UUID.randomUUID().toString(), getPublicationChannelName(repository), engageUri,
              MimeTypes.parseMimeType(MimeTypes.XML.toString()));
    }
    // no host URL
    final String msg = format("No host url for oai-pmh server configured for organization %s " + "("
            + OaiPmhServerInfoUtil.ORG_CFG_OAIPMH_SERVER_HOSTURL + ")", securityService.getOrganization().getId());
    throw new PublicationException(msg);
  }

  private static MediaPackage updateMediaPackageFields(MediaPackage oaiPmhMp, MediaPackage mediaPackage) {
    oaiPmhMp.setTitle(mediaPackage.getTitle());
    oaiPmhMp.setDate(mediaPackage.getDate());
    oaiPmhMp.setLanguage(mediaPackage.getLanguage());
    oaiPmhMp.setLicense(mediaPackage.getLicense());
    oaiPmhMp.setSeries(mediaPackage.getSeries());
    oaiPmhMp.setSeriesTitle(mediaPackage.getSeriesTitle());
    for (String contributor : oaiPmhMp.getContributors()) {
      oaiPmhMp.removeContributor(contributor);
    }
    for (String contributor : mediaPackage.getContributors()) {
      oaiPmhMp.addContributor(contributor);
    }
    for (String creator : oaiPmhMp.getCreators()) {
      oaiPmhMp.removeCreator(creator);
    }
    for (String creator : mediaPackage.getCreators()) {
      oaiPmhMp.addCreator(creator);
    }
    for (String subject : oaiPmhMp.getSubjects()) {
      oaiPmhMp.removeSubject(subject);
    }
    for (String subject : mediaPackage.getSubjects()) {
      oaiPmhMp.addSubject(subject);
    }
    return oaiPmhMp;
  }

  /**
   * Creates a clone of the media package and removes those elements that do not match the flavor and tags filter
   * criteria.
   *
   * @param mediaPackage
   *          the media package
   * @param flavors
   *          the flavors
   * @param tags
   *          the tags
   * @return the filtered media package
   */
  private MediaPackage filterMediaPackage(
      MediaPackage mediaPackage,
      Set<MediaPackageElementFlavor> flavors,
      Set<String> tags
  ) throws MediaPackageException {
    if (flavors.isEmpty() && tags.isEmpty()) {
      throw new IllegalArgumentException("Flavors or tags parameter must be set");
    }

    MediaPackage filteredMediaPackage = (MediaPackage) mediaPackage.clone();

    // The list of elements to keep
    List<MediaPackageElement> keep = new ArrayList<>();

    SimpleElementSelector selector = new SimpleElementSelector();
    // Filter elements
    for (MediaPackageElementFlavor flavor : flavors) {
      selector.addFlavor(flavor);
    }
    for (String tag : tags) {
      selector.addTag(tag);
    }
    keep.addAll(selector.select(mediaPackage, true));

    // Keep publications
    for (Publication p : filteredMediaPackage.getPublications()) {
      keep.add(p);
    }

    // Fix references and flavors
    for (MediaPackageElement element : filteredMediaPackage.getElements()) {

      if (!keep.contains(element)) {
        logger.debug("Removing {} '{}' from media package '{}'", element.getElementType().toString().toLowerCase(),
            element.getIdentifier(), filteredMediaPackage.getIdentifier().toString());
        filteredMediaPackage.remove(element);
        continue;
      }

      // Is the element referencing anything?
      MediaPackageReference reference = element.getReference();
      if (reference != null) {
        MediaPackageElement referencedElement = mediaPackage.getElementByReference(reference);

        // if we are distributing the referenced element, everything is fine. Otherwise...
        if (referencedElement != null && !keep.contains(referencedElement)) {

          // Follow the references until we find a flavor
          MediaPackageElement parent = null;
          while ((parent = mediaPackage.getElementByReference(reference)) != null) {
            if (parent.getFlavor() != null && element.getFlavor() == null) {
              element.setFlavor(parent.getFlavor());
            }
            if (parent.getReference() == null) {
              break;
            }
            reference = parent.getReference();
          }

          // Done. Let's cut the path but keep references to the mediapackage itself
          if (reference != null && reference.getType().equals(MediaPackageReference.TYPE_MEDIAPACKAGE)) {
            element.setReference(reference);
          } else {
            element.clearReference();
          }
        }
      }
    }

    return filteredMediaPackage;
  }

  /**
   * Remove all these elements from {@code publishedMp}, that matches the given flavors and tags
   * but are not in the {@code updatedMp}.
   *
   * @param updatedMp the updated media package
   * @param publishedMp the media package that is currently published
   * @param flavors flavors of elements to update
   * @param tags tags of elements to update
   * @return published media package without elements, that matches the flavors and tags
   *     but are not in the updated media package
   */
  public static MediaPackage removeMatchingNonExistantElements(MediaPackage updatedMp, MediaPackage publishedMp,
          Set<MediaPackageElementFlavor> flavors, Set<String> tags) {
    SimpleElementSelector selector = new SimpleElementSelector();
    // Filter elements
    for (MediaPackageElementFlavor flavor : flavors) {
      selector.addFlavor(flavor);
    }
    for (String tag : tags) {
      selector.addTag(tag);
    }
    for (MediaPackageElement publishedMpe : selector.select(publishedMp, true)) {
      boolean foundInUpdatedMp = false;
      for (MediaPackageElement updatedMpe : updatedMp.getElementsByFlavor(publishedMpe.getFlavor())) {
        if (!updatedMpe.containsTag(tags)) {
          // todo: this case shouldn't happen!
        }
        foundInUpdatedMp = true;
        break;
      }

      if (!foundInUpdatedMp) {
        publishedMp.remove(publishedMpe);
      }
    }
    return publishedMp;
  }

  /**
   * Merges the updated media package with the one that is currently published in a way where the updated elements
   * replace existing ones in the published media package based on their flavor.
   * <p>
   * If <code>publishedMp</code> is <code>null</code>, this method returns the updated media package without any
   * modifications.
   *
   * @param updatedMp
   *          the updated media package
   * @param publishedMp
   *          the media package that is currently published
   * @return the merged media package
   */
  public static MediaPackage merge(MediaPackage updatedMp, MediaPackage publishedMp) {
    if (publishedMp == null) {
      return updatedMp;
    }

    final MediaPackage mergedMp = MediaPackageSupport.copy(publishedMp);

    // Merge the elements
    for (final MediaPackageElement updatedElement : updatedMp.elements()) {
      MediaPackageElementFlavor flavor = updatedElement.getFlavor();
      if (flavor != null) {
        for (final MediaPackageElement outdated : mergedMp.getElementsByFlavor(flavor)) {
          mergedMp.remove(outdated);
        }
        logger.debug("Update {} {} of type {}", updatedElement.getElementType().toString().toLowerCase(),
                updatedElement.getIdentifier(), updatedElement.getElementType());
        mergedMp.add(updatedElement);
      }
    }

    // Remove publications
    for (final Publication p : mergedMp.getPublications()) {
      mergedMp.remove(p);
    }

    // Add updated publications
    for (final Publication updatedPublication : updatedMp.getPublications()) {
      mergedMp.add(updatedPublication);
    }

    // Merge media package fields
    updateMediaPackageFields(mergedMp, updatedMp);
    return mergedMp;
  }

  @Override
  protected ServiceRegistry getServiceRegistry() {
    return serviceRegistry;
  }

  /**
   * {@inheritDoc}
   *
   * @see org.opencastproject.job.api.AbstractJobProducer#getSecurityService()
   */
  @Override
  protected SecurityService getSecurityService() {
    return securityService;
  }

  /**
   * {@inheritDoc}
   *
   * @see org.opencastproject.job.api.AbstractJobProducer#getUserDirectoryService()
   */
  @Override
  protected UserDirectoryService getUserDirectoryService() {
    return userDirectoryService;
  }

  /**
   * {@inheritDoc}
   *
   * @see org.opencastproject.job.api.AbstractJobProducer#getOrganizationDirectoryService()
   */
  @Override
  protected OrganizationDirectoryService getOrganizationDirectoryService() {
    return organizationDirectoryService;
  }

  /** OSGI DI */
  @Reference
  public void setServiceRegistry(ServiceRegistry serviceRegistry) {
    this.serviceRegistry = serviceRegistry;
  }

  /** OSGI DI */
  @Reference
  public void setSecurityService(SecurityService securityService) {
    this.securityService = securityService;
  }

  /** OSGI DI */
  @Reference
  public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
    this.userDirectoryService = userDirectoryService;
  }

  /** OSGI DI */
  @Reference
  public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
    this.organizationDirectoryService = organizationDirectoryService;
  }

  /** OSGI DI */
  @Reference(target = "(distribution.channel=download)")
  public void setDownloadDistributionService(DownloadDistributionService downloadDistributionService) {
    this.downloadDistributionService = downloadDistributionService;
  }

  /** OSGI DI */
  @Reference
  public void setStreamingDistributionService(StreamingDistributionService streamingDistributionService) {
    this.streamingDistributionService = streamingDistributionService;
  }

  /** OSGI DI */
  @Reference
  public void setOaiPmhServerInfo(OaiPmhServerInfo oaiPmhServerInfo) {
    this.oaiPmhServerInfo = oaiPmhServerInfo;
  }

  /** OSGI DI */
  @Reference
  public void setOaiPmhDatabase(OaiPmhDatabase oaiPmhDatabase) {
    this.oaiPmhDatabase = oaiPmhDatabase;
  }
}