View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.publication.oaipmh;
22  
23  import static java.lang.String.format;
24  import static org.opencastproject.util.JobUtil.waitForJobs;
25  
26  import org.opencastproject.distribution.api.DistributionException;
27  import org.opencastproject.distribution.api.DownloadDistributionService;
28  import org.opencastproject.distribution.api.StreamingDistributionService;
29  import org.opencastproject.job.api.AbstractJobProducer;
30  import org.opencastproject.job.api.Job;
31  import org.opencastproject.mediapackage.MediaPackage;
32  import org.opencastproject.mediapackage.MediaPackageElement;
33  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
34  import org.opencastproject.mediapackage.MediaPackageElementParser;
35  import org.opencastproject.mediapackage.MediaPackageException;
36  import org.opencastproject.mediapackage.MediaPackageParser;
37  import org.opencastproject.mediapackage.MediaPackageReference;
38  import org.opencastproject.mediapackage.MediaPackageRuntimeException;
39  import org.opencastproject.mediapackage.MediaPackageSupport;
40  import org.opencastproject.mediapackage.Publication;
41  import org.opencastproject.mediapackage.PublicationImpl;
42  import org.opencastproject.mediapackage.selector.SimpleElementSelector;
43  import org.opencastproject.oaipmh.persistence.OaiPmhDatabase;
44  import org.opencastproject.oaipmh.persistence.OaiPmhDatabaseException;
45  import org.opencastproject.oaipmh.persistence.QueryBuilder;
46  import org.opencastproject.oaipmh.persistence.SearchResult;
47  import org.opencastproject.oaipmh.persistence.SearchResultItem;
48  import org.opencastproject.oaipmh.server.OaiPmhServerInfo;
49  import org.opencastproject.oaipmh.server.OaiPmhServerInfoUtil;
50  import org.opencastproject.publication.api.OaiPmhPublicationService;
51  import org.opencastproject.publication.api.PublicationException;
52  import org.opencastproject.security.api.OrganizationDirectoryService;
53  import org.opencastproject.security.api.SecurityService;
54  import org.opencastproject.security.api.UserDirectoryService;
55  import org.opencastproject.serviceregistry.api.ServiceRegistry;
56  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
57  import org.opencastproject.util.MimeTypes;
58  import org.opencastproject.util.NotFoundException;
59  import org.opencastproject.util.UrlSupport;
60  import org.opencastproject.util.data.Collections;
61  
62  import com.entwinemedia.fn.data.Opt;
63  
64  import org.apache.commons.lang3.BooleanUtils;
65  import org.apache.commons.lang3.StringUtils;
66  import org.apache.http.client.utils.URIUtils;
67  import org.osgi.service.component.annotations.Component;
68  import org.osgi.service.component.annotations.Reference;
69  import org.slf4j.Logger;
70  import org.slf4j.LoggerFactory;
71  
72  import java.net.URI;
73  import java.util.ArrayList;
74  import java.util.Arrays;
75  import java.util.HashSet;
76  import java.util.Hashtable;
77  import java.util.List;
78  import java.util.Map;
79  import java.util.Optional;
80  import java.util.Set;
81  import java.util.UUID;
82  import java.util.stream.Collectors;
83  import java.util.stream.Stream;
84  
85  /**
86   * Publishes a recording to an OAI-PMH publication repository.
87   */
88  @Component(
89      immediate = true,
90      service = OaiPmhPublicationService.class,
91      property = {
92          "service.description=Publication Service (OAI-PMH)"
93      }
94  )
95  public class OaiPmhPublicationServiceImpl extends AbstractJobProducer implements OaiPmhPublicationService {
96  
97    /** Logging facility */
98    private static final Logger logger = LoggerFactory.getLogger(OaiPmhPublicationServiceImpl.class);
99  
100   public enum Operation {
101     Publish, Retract, UpdateMetadata, Replace
102   }
103 
104   private DownloadDistributionService downloadDistributionService;
105   private OaiPmhServerInfo oaiPmhServerInfo;
106   private OaiPmhDatabase oaiPmhDatabase;
107   private OrganizationDirectoryService organizationDirectoryService;
108   private SecurityService securityService;
109   private ServiceRegistry serviceRegistry;
110   private StreamingDistributionService streamingDistributionService;
111   private UserDirectoryService userDirectoryService;
112 
113   public OaiPmhPublicationServiceImpl() {
114     super(JOB_TYPE);
115   }
116 
117   @Override
118   protected String process(Job job) throws Exception {
119     if (!StringUtils.equalsIgnoreCase(JOB_TYPE, job.getJobType())) {
120       throw new IllegalArgumentException("Can not handle job type " + job.getJobType());
121     }
122 
123     Publication publication = null;
124     MediaPackage mediaPackage = MediaPackageParser.getFromXml(job.getArguments().get(0));
125     String repository = job.getArguments().get(1);
126     boolean checkAvailability = false;
127     switch (Operation.valueOf(job.getOperation())) {
128       case Publish:
129         String[] downloadElementIds = StringUtils.split(job.getArguments().get(2), SEPARATOR);
130         String[] streamingElementIds = StringUtils.split(job.getArguments().get(3), SEPARATOR);
131         checkAvailability = BooleanUtils.toBoolean(job.getArguments().get(4));
132         publication = publish(job, mediaPackage, repository,
133                 Collections.set(downloadElementIds), Collections.set(streamingElementIds), checkAvailability);
134         break;
135       case Replace:
136         final Set<? extends MediaPackageElement> downloadElements =
137             Collections.toSet(MediaPackageElementParser.getArrayFromXml(job.getArguments().get(2)));
138         final Set<? extends MediaPackageElement> streamingElements =
139             Collections.toSet(MediaPackageElementParser.getArrayFromXml(job.getArguments().get(3)));
140         final Set<MediaPackageElementFlavor> retractDownloadFlavors = Arrays.stream(
141             StringUtils.split(job.getArguments().get(4), SEPARATOR))
142             .filter(s -> !s.isEmpty())
143             .map(MediaPackageElementFlavor::parseFlavor)
144             .collect(Collectors.toSet());
145         final Set<MediaPackageElementFlavor> retractStreamingFlavors = Arrays.stream(
146             StringUtils.split(job.getArguments().get(5), SEPARATOR))
147             .filter(s -> !s.isEmpty())
148             .map(MediaPackageElementFlavor::parseFlavor)
149             .collect(Collectors.toSet());
150         final Set<? extends MediaPackageElement> publications =
151             Collections.toSet(MediaPackageElementParser.getArrayFromXml(job.getArguments().get(6)));
152         checkAvailability = BooleanUtils.toBoolean(job.getArguments().get(7));
153         publication = replace(job, mediaPackage, repository, downloadElements, streamingElements,
154             retractDownloadFlavors, retractStreamingFlavors, publications, checkAvailability);
155         break;
156       case Retract:
157         publication = retract(job, mediaPackage, repository);
158         break;
159       case UpdateMetadata:
160         checkAvailability = BooleanUtils.toBoolean(job.getArguments().get(4));
161         String[] flavors = StringUtils.split(job.getArguments().get(2), SEPARATOR);
162         String[] tags = StringUtils.split(job.getArguments().get(3), SEPARATOR);
163         publication = updateMetadata(job, mediaPackage, repository,
164                 Collections.set(flavors), Collections.set(tags), checkAvailability);
165         break;
166       default:
167         throw new IllegalArgumentException("Can not handle this type of operation: " + job.getOperation());
168     }
169     return publication != null ? MediaPackageElementParser.getAsXml(publication) : null;
170   }
171 
172   @Override
173   public Job replace(
174       MediaPackage mediaPackage,
175       String repository,
176       Set<? extends MediaPackageElement> downloadElements,
177       Set<? extends MediaPackageElement> streamingElements,
178       Set<MediaPackageElementFlavor> retractDownloadFlavors,
179       Set<MediaPackageElementFlavor> retractStreamingFlavors,
180       Set<? extends Publication> publications,
181       boolean checkAvailability
182   ) throws PublicationException {
183     checkInputArguments(mediaPackage, repository);
184     try {
185       return serviceRegistry.createJob(JOB_TYPE, Operation.Replace.name(),
186           Arrays.asList(MediaPackageParser.getAsXml(mediaPackage), // 0
187               repository, // 1
188               MediaPackageElementParser.getArrayAsXml(Collections.toList(downloadElements)), // 2
189               MediaPackageElementParser.getArrayAsXml(Collections.toList(streamingElements)), // 3
190               StringUtils.join(retractDownloadFlavors, SEPARATOR), // 4
191               StringUtils.join(retractStreamingFlavors, SEPARATOR), // 5
192               MediaPackageElementParser.getArrayAsXml(Collections.toList(publications)), // 6
193               Boolean.toString(checkAvailability))); // 7
194     } catch (ServiceRegistryException e) {
195       throw new PublicationException("Unable to create job", e);
196     } catch (MediaPackageException e) {
197       throw new PublicationException("Unable to serialize media package elements", e);
198     }
199   }
200 
201   @Override
202   public Publication replaceSync(
203       MediaPackage mediaPackage, String repository, Set<? extends MediaPackageElement> downloadElements,
204       Set<? extends MediaPackageElement> streamingElements, Set<MediaPackageElementFlavor> retractDownloadFlavors,
205       Set<MediaPackageElementFlavor> retractStreamingFlavors, Set<? extends Publication> publications,
206       boolean checkAvailability) throws PublicationException, MediaPackageException {
207     return replace(null, mediaPackage, repository, downloadElements, streamingElements, retractDownloadFlavors,
208         retractStreamingFlavors, publications, checkAvailability);
209   }
210 
211   @Override
212   public Job publish(MediaPackage mediaPackage, String repository, Set<String> downloadElementIds,
213           Set<String> streamingElementIds, boolean checkAvailability)
214           throws PublicationException, MediaPackageException {
215     checkInputArguments(mediaPackage, repository);
216 
217     try {
218       return serviceRegistry.createJob(JOB_TYPE, Operation.Publish.toString(),
219               Arrays.asList(MediaPackageParser.getAsXml(mediaPackage), // 0
220                       repository, // 1
221                       StringUtils.join(downloadElementIds, SEPARATOR), // 2
222                       StringUtils.join(streamingElementIds, SEPARATOR), // 3
223                       Boolean.toString(checkAvailability))); // 4
224     } catch (ServiceRegistryException e) {
225       throw new PublicationException("Unable to create job", e);
226     }
227   }
228 
229   @Override
230   public Job retract(MediaPackage mediaPackage, String repository) throws PublicationException, NotFoundException {
231     checkInputArguments(mediaPackage, repository);
232 
233     try {
234       return serviceRegistry.createJob(JOB_TYPE, Operation.Retract.toString(),
235               Arrays.asList(MediaPackageParser.getAsXml(mediaPackage), repository));
236     } catch (ServiceRegistryException e) {
237       throw new PublicationException("Unable to create job", e);
238     }
239   }
240 
241   @Override
242   public Job updateMetadata(MediaPackage mediaPackage, String repository, Set<String> flavors, Set<String> tags,
243           boolean checkAvailability) throws PublicationException, MediaPackageException {
244     checkInputArguments(mediaPackage, repository);
245     if ((flavors == null || flavors.isEmpty()) && (tags == null || tags.isEmpty())) {
246       throw new IllegalArgumentException("Flavors or tags must be set");
247     }
248 
249     try {
250       return serviceRegistry.createJob(JOB_TYPE, Operation.UpdateMetadata.toString(),
251               Arrays.asList(MediaPackageParser.getAsXml(mediaPackage), // 0
252                       repository, // 1
253                       StringUtils.join(flavors, SEPARATOR), // 2
254                       StringUtils.join(tags, SEPARATOR), // 3
255                       Boolean.toString(checkAvailability))); // 4
256     } catch (ServiceRegistryException e) {
257       throw new PublicationException("Unable to create job", e);
258     }
259   }
260 
261   protected Publication publish(Job job, MediaPackage mediaPackage, String repository, Set<String> downloadElementIds,
262           Set<String> streamingElementIds, boolean checkAvailability)
263           throws PublicationException, MediaPackageException {
264     String mpId = mediaPackage.getIdentifier().toString();
265     SearchResult searchResult = oaiPmhDatabase.search(QueryBuilder.queryRepo(repository).mediaPackageId(mpId)
266             .isDeleted(false).build());
267     // retract oai-pmh if published
268     if (searchResult.size() > 0) {
269       try {
270         Publication p = retract(job, mediaPackage, repository);
271         if (p != null && mediaPackage.contains(p)) {
272           mediaPackage.remove(p);
273         }
274       } catch (NotFoundException e) {
275         logger.debug("No OAI-PMH publication found for media package {}.", mpId, e);
276         // this is ok
277       }
278     }
279     List<Job> distributionJobs = new ArrayList<>(2);
280     if (downloadElementIds != null && !downloadElementIds.isEmpty()) {
281       // select elements for download distribution
282       MediaPackage mpDownloadDist = (MediaPackage) mediaPackage.clone();
283       for (MediaPackageElement mpe : mpDownloadDist.getElements()) {
284         if (downloadElementIds.contains(mpe.getIdentifier())) {
285           continue;
286         }
287         mpDownloadDist.remove(mpe);
288       }
289       // publish to download
290       if (mpDownloadDist.getElements().length > 0) {
291         try {
292           Job downloadDistributionJob = downloadDistributionService
293                   .distribute(getPublicationChannelName(repository), mpDownloadDist, downloadElementIds,
294                           checkAvailability);
295           if (downloadDistributionJob != null) {
296             distributionJobs.add(downloadDistributionJob);
297           }
298         } catch (DistributionException e) {
299           throw new PublicationException(
300               format("Unable to distribute media package %s to download distribution.", mpId),
301               e);
302         }
303       }
304     }
305     if (streamingElementIds != null && !streamingElementIds.isEmpty()) {
306       // select elements for streaming distribution
307       MediaPackage mpStreamingDist = (MediaPackage) mediaPackage.clone();
308       for (MediaPackageElement mpe : mpStreamingDist.getElements()) {
309         if (streamingElementIds.contains(mpe.getIdentifier())) {
310           continue;
311         }
312         mpStreamingDist.remove(mpe);
313       }
314       // publish to streaming
315       if (mpStreamingDist.getElements().length > 0) {
316         try {
317           Job streamingDistributionJob = streamingDistributionService
318                   .distribute(getPublicationChannelName(repository), mpStreamingDist, streamingElementIds);
319           if (streamingDistributionJob != null) {
320             distributionJobs.add(streamingDistributionJob);
321           }
322         } catch (DistributionException e) {
323           throw new PublicationException(
324               format("Unable to distribute media package %s to streaming distribution.", mpId),
325               e);
326         }
327       }
328     }
329     if (distributionJobs.isEmpty()) {
330       throw new IllegalStateException(format(
331               "The media package %s does not contain any elements for publishing to OAI-PMH", mpId));
332     }
333     // wait for distribution jobs
334     if (!waitForJobs(job, serviceRegistry, distributionJobs).isSuccess()) {
335       throw new PublicationException(format(
336               "Unable to distribute elements of media package %s to distribution channels.", mpId));
337     }
338 
339     List<MediaPackageElement> distributedElements = new ArrayList<>();
340     for (Job distributionJob : distributionJobs) {
341       String distributedElementsXml = distributionJob.getPayload();
342       if (StringUtils.isNotBlank(distributedElementsXml)) {
343         for (MediaPackageElement distributedElement
344             : MediaPackageElementParser.getArrayFromXml(distributedElementsXml)) {
345           distributedElements.add(distributedElement);
346         }
347       }
348     }
349     MediaPackage oaiPmhDistMp = (MediaPackage) mediaPackage.clone();
350     // cleanup media package elements
351     for (MediaPackageElement mpe : oaiPmhDistMp.getElements()) {
352       // keep publications
353       if (MediaPackageElement.Type.Publication == mpe.getElementType()) {
354         continue;
355       }
356       oaiPmhDistMp.remove(mpe);
357     }
358     // ...add the distributed elements
359     for (MediaPackageElement mpe : distributedElements) {
360       oaiPmhDistMp.add(mpe);
361     }
362 
363     // publish to oai-pmh
364     try {
365       oaiPmhDatabase.store(oaiPmhDistMp, repository);
366     } catch (OaiPmhDatabaseException e) {
367       // todo: should we retract the elements from download and streaming here?
368       throw new PublicationException(
369           format("Unable to distribute media package %s to OAI-PMH repository %s", mpId, repository),
370           e);
371     }
372     return createPublicationElement(mpId, repository);
373   }
374 
375   private Publication replace(
376       Job job,
377       MediaPackage mediaPackage,
378       String repository,
379       Set<? extends MediaPackageElement> downloadElements,
380       Set<? extends MediaPackageElement> streamingElements,
381       Set<MediaPackageElementFlavor> retractDownloadFlavors,
382       Set<MediaPackageElementFlavor> retractStreamingFlavors,
383       Set<? extends MediaPackageElement> publications,
384       boolean checkAvailable
385   ) throws MediaPackageException, PublicationException {
386     final String mpId = mediaPackage.getIdentifier().toString();
387     final String channel = getPublicationChannelName(repository);
388 
389     try {
390       final SearchResult search = oaiPmhDatabase.search(QueryBuilder.queryRepo(repository).mediaPackageId(mpId)
391           .isDeleted(false).build());
392       if (search.size() > 1) {
393         throw new PublicationException("Found multiple OAI-PMH records for id " + mpId);
394       }
395       final Optional<MediaPackage> existingMp = search.getItems().stream().findFirst().map(
396               SearchResultItem::getMediaPackage);
397 
398       // Collect Ids of elements to distribute
399       final Set<String> addDownloadElementIds = downloadElements.stream()
400           .map(MediaPackageElement::getIdentifier)
401           .collect(Collectors.toSet());
402       final Set<String> addStreamingElementIds = streamingElements.stream()
403           .map(MediaPackageElement::getIdentifier)
404           .collect(Collectors.toSet());
405 
406       // Use retractFlavors to search for existing elements to retract
407       final Set<MediaPackageElement> removeDownloadElements = existingMp.map(mp ->
408           Arrays.stream(mp.getElements())
409           .filter(e -> retractDownloadFlavors.stream().anyMatch(f -> f.matches(e.getFlavor())))
410           .collect(Collectors.toSet())
411       ).orElse(java.util.Collections.emptySet());
412       final Set<MediaPackageElement> removeStreamingElements = existingMp.map(mp ->
413           Arrays.stream(mp.getElements())
414               .filter(e -> retractStreamingFlavors.stream().anyMatch(f -> f.matches(e.getFlavor())))
415               .collect(Collectors.toSet())
416       ).orElse(java.util.Collections.emptySet());
417 
418       // Element IDs to retract. Elements identified by flavor and elements to re-distribute
419       final Set<String> removeDownloadElementIds = Stream
420           .concat(removeDownloadElements.stream(), downloadElements.stream())
421           .map(MediaPackageElement::getIdentifier)
422           .collect(Collectors.toSet());
423       final Set<String> removeStreamingElementIds = Stream
424           .concat(removeStreamingElements.stream(), streamingElements.stream())
425           .map(MediaPackageElement::getIdentifier)
426           .collect(Collectors.toSet());
427 
428       if (removeDownloadElementIds.isEmpty() && removeStreamingElementIds.isEmpty()
429           && addDownloadElementIds.isEmpty() && addStreamingElementIds.isEmpty()) {
430         // Nothing to do
431         return Arrays.stream(mediaPackage.getPublications())
432             .filter(p -> channel.equals(p.getChannel()))
433             .findFirst()
434             .orElse(null);
435       }
436 
437       final MediaPackage temporaryMediaPackage = (MediaPackage) mediaPackage.clone();
438       downloadElements.forEach(temporaryMediaPackage::add);
439       streamingElements.forEach(temporaryMediaPackage::add);
440       removeDownloadElements.forEach(temporaryMediaPackage::add);
441       removeStreamingElements.forEach(temporaryMediaPackage::add);
442 
443       final List<MediaPackageElement> retractedElements = new ArrayList<>();
444       final List<MediaPackageElement> distributedElements = new ArrayList<>();
445       if (job != null) {
446         retractedElements
447             .addAll(retract(job, channel, temporaryMediaPackage, removeDownloadElementIds, removeStreamingElementIds));
448         distributedElements
449             .addAll(distribute(job, channel, temporaryMediaPackage, addDownloadElementIds, addStreamingElementIds,
450                 checkAvailable));
451       } else {
452         retractedElements
453             .addAll(retractSync(channel, temporaryMediaPackage, removeDownloadElementIds, removeStreamingElementIds));
454         distributedElements
455             .addAll(distributeSync(channel, temporaryMediaPackage, addDownloadElementIds, addStreamingElementIds,
456                 checkAvailable));
457       }
458 
459       final MediaPackage oaiPmhDistMp = (MediaPackage) existingMp.orElse(mediaPackage).clone();
460 
461       // Remove OAI-PMH publication
462       Arrays.stream(oaiPmhDistMp.getPublications())
463           .filter(p -> channel.equals(p.getChannel()))
464           .forEach(oaiPmhDistMp::remove);
465 
466       // Remove retracted elements
467       retractedElements.stream()
468           .map(MediaPackageElement::getIdentifier)
469           .forEach(oaiPmhDistMp::removeElementById);
470       // Add new distributed elements
471       distributedElements.forEach(oaiPmhDistMp::add);
472 
473       // Remove old publications
474       publications.stream()
475           .map(p -> ((Publication) p).getChannel())
476           .forEach(c -> Arrays.stream(oaiPmhDistMp.getPublications())
477               .filter(p -> c.equals(p.getChannel()))
478               .forEach(oaiPmhDistMp::remove));
479 
480       // Add updated publications
481       publications.forEach(oaiPmhDistMp::add);
482 
483       // publish to oai-pmh
484       oaiPmhDatabase.store(oaiPmhDistMp, repository);
485 
486       return Arrays.stream(mediaPackage.getPublications())
487           .filter(p -> channel.equals(p.getChannel()))
488           .findFirst()
489           .orElse(createPublicationElement(mpId, repository));
490     } catch (OaiPmhDatabaseException e) {
491       throw new PublicationException(format("Unable to update media package %s in OAI-PMH repository %s", mpId,
492           repository), e);
493     } catch (DistributionException e) {
494       throw new PublicationException(format("Unable to update OAI-PMH distributions of media package %s.", mpId), e);
495     } catch (MediaPackageRuntimeException e) {
496       throw e.getWrappedException();
497     }
498   }
499 
500   private List<MediaPackageElement> retract(
501           Job job, String channel, MediaPackage mp, Set<String> removeDownloadElementIds,
502           Set<String> removeStreamingElementIds) throws PublicationException, DistributionException {
503     final List<Job> retractJobs = new ArrayList<>(2);
504     if (!removeDownloadElementIds.isEmpty()) {
505       retractJobs.add(downloadDistributionService.retract(channel, mp, removeDownloadElementIds));
506     }
507     if (!removeStreamingElementIds.isEmpty()) {
508       retractJobs.add(streamingDistributionService.retract(channel, mp, removeStreamingElementIds));
509     }
510 
511     // wait for retract jobs
512     if (!waitForJobs(job, serviceRegistry, retractJobs).isSuccess()) {
513       throw new PublicationException(format("Unable to retract OAI-PMH distributions of media package %s",
514           mp.getIdentifier().toString()));
515     }
516 
517     return retractJobs.stream()
518         .filter(j -> StringUtils.isNotBlank(j.getPayload()))
519         .map(Job::getPayload)
520         .flatMap(p -> MediaPackageElementParser.getArrayFromXmlUnchecked(p).stream())
521         .collect(Collectors.toList());
522   }
523 
524   private List<MediaPackageElement> retractSync(String channel, MediaPackage mp, Set<String> removeDownloadElementIds,
525       Set<String> removeStreamingElementIds) throws DistributionException {
526     final List<MediaPackageElement> retracted = new ArrayList<>();
527     if (!removeDownloadElementIds.isEmpty()) {
528       retracted.addAll(downloadDistributionService.retractSync(channel, mp, removeDownloadElementIds));
529     }
530     if (!removeStreamingElementIds.isEmpty()) {
531       retracted.addAll(streamingDistributionService.retractSync(channel, mp, removeStreamingElementIds));
532     }
533     return retracted;
534   }
535 
536   private List<MediaPackageElement> distribute(
537       Job job,
538       String channel,
539       MediaPackage mp,
540       Set<String> addDownloadElementIds,
541       Set<String> addStreamingElementIds,
542       boolean checkAvailable
543   ) throws PublicationException, MediaPackageException, DistributionException {
544     final List<Job> distributeJobs = new ArrayList<>(2);
545     if (!addDownloadElementIds.isEmpty()) {
546       distributeJobs.add(downloadDistributionService.distribute(channel, mp, addDownloadElementIds,
547           checkAvailable));
548     }
549     if (!addStreamingElementIds.isEmpty()) {
550       distributeJobs.add(streamingDistributionService.distribute(channel, mp, addStreamingElementIds));
551     }
552 
553     // wait for distribute jobs
554     if (!waitForJobs(job, serviceRegistry, distributeJobs).isSuccess()) {
555       throw new PublicationException(format("Unable to distribute OAI-PMH distributions of media package %s",
556           mp.getIdentifier().toString()));
557     }
558 
559     return distributeJobs.stream()
560         .filter(j -> StringUtils.isNotBlank(j.getPayload()))
561         .map(Job::getPayload)
562         .flatMap(p -> MediaPackageElementParser.getArrayFromXmlUnchecked(p).stream())
563         .collect(Collectors.toList());
564   }
565 
566   private List<MediaPackageElement> distributeSync(
567       String channel, MediaPackage mp, Set<String> addDownloadElementIds, Set<String> addStreamingElementIds,
568       boolean checkAvailable) throws DistributionException {
569     final List<MediaPackageElement> distributed = new ArrayList<>();
570     if (!addDownloadElementIds.isEmpty()) {
571       distributed.addAll(downloadDistributionService.distributeSync(channel, mp, addDownloadElementIds,
572           checkAvailable));
573     }
574     if (!addStreamingElementIds.isEmpty()) {
575       distributed.addAll(streamingDistributionService.distributeSync(channel, mp, addStreamingElementIds));
576     }
577     return distributed;
578   }
579 
580 
581   protected Publication retract(Job job, MediaPackage mediaPackage, String repository)
582           throws PublicationException, NotFoundException {
583     String mpId = mediaPackage.getIdentifier().toString();
584 
585     // track elements for retraction
586     MediaPackage oaiPmhMp = null;
587     SearchResult searchResult = oaiPmhDatabase.search(QueryBuilder.queryRepo(repository).mediaPackageId(mpId)
588             .isDeleted(false).build());
589     for (SearchResultItem searchResultItem : searchResult.getItems()) {
590       if (oaiPmhMp == null) {
591         oaiPmhMp = searchResultItem.getMediaPackage();
592       } else {
593         for (MediaPackageElement mpe : searchResultItem.getMediaPackage().getElements()) {
594           oaiPmhMp.add(mpe);
595         }
596       }
597     }
598 
599     // retract oai-pmh
600     try {
601       oaiPmhDatabase.delete(mpId, repository);
602     } catch (OaiPmhDatabaseException e) {
603       throw new PublicationException(format("Unable to retract media package %s from OAI-PMH repository %s",
604               mpId, repository), e);
605     } catch (NotFoundException e) {
606       logger.debug("Skip retracting media package {} from OIA-PMH repository {} as it isn't published.",
607               mpId, repository, e);
608     }
609 
610     if (oaiPmhMp != null && oaiPmhMp.getElements().length > 0) {
611       // retract files from distribution channels
612       Set<String> mpeIds = new HashSet<>();
613       for (MediaPackageElement mpe : oaiPmhMp.elements()) {
614         if (MediaPackageElement.Type.Publication == mpe.getElementType()) {
615           continue;
616         }
617 
618         mpeIds.add(mpe.getIdentifier());
619       }
620       if (!mpeIds.isEmpty()) {
621         List<Job> retractionJobs = new ArrayList<>();
622         // retract download
623         try {
624           Job retractDownloadJob = downloadDistributionService
625                   .retract(getPublicationChannelName(repository), oaiPmhMp, mpeIds);
626           if (retractDownloadJob != null) {
627             retractionJobs.add(retractDownloadJob);
628           }
629         } catch (DistributionException e) {
630           throw new PublicationException(
631               format(
632                   "Unable to create retraction job from distribution channel download for the media package %s ",
633                   mpId),
634               e);
635         }
636 
637         // retract streaming
638         try {
639           Job retractDownloadJob = streamingDistributionService
640                   .retract(getPublicationChannelName(repository), oaiPmhMp, mpeIds);
641           if (retractDownloadJob != null) {
642             retractionJobs.add(retractDownloadJob);
643           }
644         } catch (DistributionException e) {
645           throw new PublicationException(
646               format("Unable to create retraction job from distribution channel streaming for the media package %s ",
647                   mpId),
648               e);
649         }
650         if (retractionJobs.size() > 0) {
651           // wait for distribution jobs
652           if (!waitForJobs(job, serviceRegistry, retractionJobs).isSuccess()) {
653             throw new PublicationException(
654                     format("Unable to retract elements of media package %s from distribution channels.", mpId));
655           }
656         }
657       }
658     }
659 
660     String publicationChannel = getPublicationChannelName(repository);
661     for (Publication p : mediaPackage.getPublications()) {
662       if (StringUtils.equals(publicationChannel, p.getChannel())) {
663         return p;
664       }
665     }
666     return null;
667   }
668 
669   protected Publication updateMetadata(
670       Job job,
671       MediaPackage mediaPackage,
672       String repository,
673       Set<String> flavors,
674       Set<String> tags,
675       boolean checkAvailability
676   ) throws PublicationException {
677     final Set<MediaPackageElementFlavor> parsedFlavors = new HashSet<>();
678     for (String flavor : flavors) {
679       parsedFlavors.add(MediaPackageElementFlavor.parseFlavor(flavor));
680     }
681 
682     final MediaPackage filteredMp;
683     final SearchResult result = oaiPmhDatabase.search(
684         QueryBuilder.queryRepo(repository)
685             .mediaPackageId(mediaPackage)
686             .isDeleted(false)
687             .build()
688     );
689     if (result.size() == 1) {
690       // apply tags and flavors to the current media package
691       try {
692         logger.debug("filter elements with flavors {} and tags {} on media package {}",
693                 StringUtils.join(flavors, ", "), StringUtils.join(tags, ", "),
694                 MediaPackageParser.getAsXml(mediaPackage));
695 
696         filteredMp = filterMediaPackage(mediaPackage, parsedFlavors, tags);
697       } catch (MediaPackageException e) {
698         throw new PublicationException("Error filtering media package", e);
699       }
700     } else if (result.size() == 0) {
701       logger.info("Skipping update of media package {} since it is not currently published to {}",
702               mediaPackage, repository);
703       return null;
704     } else {
705       final String msg = format(
706           "More than one media package with id %s found",
707           mediaPackage.getIdentifier().toString()
708       );
709       logger.warn(msg);
710       throw new PublicationException(msg);
711     }
712     // re-distribute elements to download
713     Set<String> elementIdsToDistribute = new HashSet<>();
714     for (MediaPackageElement mpe : filteredMp.getElements()) {
715       // do not distribute publications
716       if (MediaPackageElement.Type.Publication == mpe.getElementType()) {
717         continue;
718       }
719       elementIdsToDistribute.add(mpe.getIdentifier());
720     }
721     if (elementIdsToDistribute.isEmpty()) {
722       logger.debug("The media package {} does not contain any elements to update. "
723                       + "Skip OAI-PMH metadata update operation for repository {}",
724               mediaPackage.getIdentifier().toString(), repository);
725       return null;
726     }
727     logger.debug("distribute elements {}", StringUtils.join(elementIdsToDistribute, ", "));
728     final List<MediaPackageElement> distributedElements = new ArrayList<>();
729     try {
730       Job distJob = downloadDistributionService
731           .distribute(getPublicationChannelName(repository), filteredMp, elementIdsToDistribute, checkAvailability);
732       if (job == null) {
733         throw new PublicationException("The distribution service can not handle this type of media package elements.");
734       }
735       if (!waitForJobs(job, serviceRegistry, distJob).isSuccess()) {
736         throw new PublicationException(format(
737             "Unable to distribute updated elements from media package %s to the download distribution service",
738             mediaPackage.getIdentifier().toString()));
739       }
740       if (distJob.getPayload() != null) {
741         for (MediaPackageElement mpe : MediaPackageElementParser.getArrayFromXml(distJob.getPayload())) {
742           distributedElements.add(mpe);
743         }
744       }
745     } catch (DistributionException | MediaPackageException e) {
746       throw new PublicationException(format(
747           "Unable to distribute updated elements from media package %s to the download distribution service",
748           mediaPackage.getIdentifier().toString()), e);
749     }
750 
751     // update elements (URLs)
752     for (MediaPackageElement e : filteredMp.getElements()) {
753       if (MediaPackageElement.Type.Publication.equals(e.getElementType())) {
754         continue;
755       }
756       filteredMp.remove(e);
757     }
758     for (MediaPackageElement e : distributedElements) {
759       filteredMp.add(e);
760     }
761     MediaPackage publishedMp = merge(filteredMp, removeMatchingNonExistantElements(filteredMp,
762             (MediaPackage) result.getItems().get(0).getMediaPackage().clone(), parsedFlavors, tags));
763     // Publish the media package to OAI-PMH
764     try {
765       logger.debug("Updating metadata of media package {} in {}",
766               publishedMp.getIdentifier().toString(), repository);
767       oaiPmhDatabase.store(publishedMp, repository);
768     } catch (OaiPmhDatabaseException e) {
769       throw new PublicationException(format("Media package %s could not be updated",
770               publishedMp.getIdentifier().toString()));
771     }
772     // retract orphaned elements from download distribution
773     // orphaned elements are all those elements to which the updated media package no longer refers
774     // (in terms of element uri)
775     Map<URI, MediaPackageElement> elementUriMap = new Hashtable<>();
776     for (SearchResultItem oaiPmhSearchResultItem : result.getItems()) {
777       for (MediaPackageElement mpe : oaiPmhSearchResultItem.getMediaPackage().getElements()) {
778         if (MediaPackageElement.Type.Publication == mpe.getElementType() || null == mpe.getURI()) {
779           continue;
780         }
781         elementUriMap.put(mpe.getURI(), mpe);
782       }
783     }
784     for (MediaPackageElement publishedMpe : publishedMp.getElements()) {
785       if (MediaPackageElement.Type.Publication == publishedMpe.getElementType()) {
786         continue;
787       }
788       if (elementUriMap.containsKey(publishedMpe.getURI())) {
789         elementUriMap.remove(publishedMpe.getURI());
790       }
791     }
792     Set<String> orphanedElementIds = new HashSet<>();
793     for (MediaPackageElement orphanedMpe : elementUriMap.values()) {
794       orphanedElementIds.add(orphanedMpe.getIdentifier());
795     }
796     if (!orphanedElementIds.isEmpty()) {
797       for (SearchResultItem oaiPmhSearchResultItem : result.getItems()) {
798         try {
799           Job retractJob = downloadDistributionService.retract(getPublicationChannelName(repository),
800                   oaiPmhSearchResultItem.getMediaPackage(), orphanedElementIds);
801           if (retractJob != null) {
802             if (!waitForJobs(job, serviceRegistry, retractJob).isSuccess()) {
803               logger.warn(
804                   "The download distribution retract job for the orphaned elements from media package {} "
805                       + "does not end successfully",
806                   oaiPmhSearchResultItem.getMediaPackage().getIdentifier().toString());
807             }
808           }
809         } catch (DistributionException e) {
810           logger.warn(
811               "Unable to retract orphaned elements from download distribution service for the "
812                   + "media package {} channel {}",
813               oaiPmhSearchResultItem.getMediaPackage().getIdentifier().toString(),
814               getPublicationChannelName(repository),
815               e);
816         }
817       }
818     }
819 
820     // return the publication
821     String publicationChannel = getPublicationChannelName(repository);
822     for (Publication p : mediaPackage.getPublications()) {
823       if (StringUtils.equals(publicationChannel, p.getChannel())) {
824         return p;
825       }
826     }
827     return null;
828   }
829 
830   protected void checkInputArguments(MediaPackage mediaPackage, String repository) {
831     if (mediaPackage == null) {
832       throw new IllegalArgumentException("Media package must be specified");
833     }
834     if (StringUtils.isEmpty(repository)) {
835       throw new IllegalArgumentException("Repository must be specified");
836     }
837     if (!oaiPmhServerInfo.hasRepo(repository)) {
838       throw new IllegalArgumentException("OAI-PMH repository '" + repository + "' does not exist");
839     }
840   }
841 
842   protected String getPublicationChannelName(String repository) {
843     return PUBLICATION_CHANNEL_PREFIX.concat(repository);
844   }
845 
846   /** Create a new publication element. */
847   protected Publication createPublicationElement(String mpId, String repository) throws PublicationException {
848     for (String hostUrl : OaiPmhServerInfoUtil.oaiPmhServerUrlOfCurrentOrganization(securityService)) {
849       final URI engageUri = URIUtils.resolve(
850           URI.create(UrlSupport.concat(hostUrl, oaiPmhServerInfo.getMountPoint(), repository)),
851           "?verb=ListMetadataFormats&identifier=" + mpId);
852       return PublicationImpl.publication(UUID.randomUUID().toString(), getPublicationChannelName(repository), engageUri,
853               MimeTypes.parseMimeType(MimeTypes.XML.toString()));
854     }
855     // no host URL
856     final String msg = format("No host url for oai-pmh server configured for organization %s " + "("
857             + OaiPmhServerInfoUtil.ORG_CFG_OAIPMH_SERVER_HOSTURL + ")", securityService.getOrganization().getId());
858     throw new PublicationException(msg);
859   }
860 
861   private static MediaPackage updateMediaPackageFields(MediaPackage oaiPmhMp, MediaPackage mediaPackage) {
862     oaiPmhMp.setTitle(mediaPackage.getTitle());
863     oaiPmhMp.setDate(mediaPackage.getDate());
864     oaiPmhMp.setLanguage(mediaPackage.getLanguage());
865     oaiPmhMp.setLicense(mediaPackage.getLicense());
866     oaiPmhMp.setSeries(mediaPackage.getSeries());
867     oaiPmhMp.setSeriesTitle(mediaPackage.getSeriesTitle());
868     for (String contributor : oaiPmhMp.getContributors()) {
869       oaiPmhMp.removeContributor(contributor);
870     }
871     for (String contributor : mediaPackage.getContributors()) {
872       oaiPmhMp.addContributor(contributor);
873     }
874     for (String creator : oaiPmhMp.getCreators()) {
875       oaiPmhMp.removeCreator(creator);
876     }
877     for (String creator : mediaPackage.getCreators()) {
878       oaiPmhMp.addCreator(creator);
879     }
880     for (String subject : oaiPmhMp.getSubjects()) {
881       oaiPmhMp.removeSubject(subject);
882     }
883     for (String subject : mediaPackage.getSubjects()) {
884       oaiPmhMp.addSubject(subject);
885     }
886     return oaiPmhMp;
887   }
888 
889   /**
890    * Creates a clone of the media package and removes those elements that do not match the flavor and tags filter
891    * criteria.
892    *
893    * @param mediaPackage
894    *          the media package
895    * @param flavors
896    *          the flavors
897    * @param tags
898    *          the tags
899    * @return the filtered media package
900    */
901   private MediaPackage filterMediaPackage(
902       MediaPackage mediaPackage,
903       Set<MediaPackageElementFlavor> flavors,
904       Set<String> tags
905   ) throws MediaPackageException {
906     if (flavors.isEmpty() && tags.isEmpty()) {
907       throw new IllegalArgumentException("Flavors or tags parameter must be set");
908     }
909 
910     MediaPackage filteredMediaPackage = (MediaPackage) mediaPackage.clone();
911 
912     // The list of elements to keep
913     List<MediaPackageElement> keep = new ArrayList<>();
914 
915     SimpleElementSelector selector = new SimpleElementSelector();
916     // Filter elements
917     for (MediaPackageElementFlavor flavor : flavors) {
918       selector.addFlavor(flavor);
919     }
920     for (String tag : tags) {
921       selector.addTag(tag);
922     }
923     keep.addAll(selector.select(mediaPackage, true));
924 
925     // Keep publications
926     for (Publication p : filteredMediaPackage.getPublications()) {
927       keep.add(p);
928     }
929 
930     // Fix references and flavors
931     for (MediaPackageElement element : filteredMediaPackage.getElements()) {
932 
933       if (!keep.contains(element)) {
934         logger.debug("Removing {} '{}' from media package '{}'", element.getElementType().toString().toLowerCase(),
935             element.getIdentifier(), filteredMediaPackage.getIdentifier().toString());
936         filteredMediaPackage.remove(element);
937         continue;
938       }
939 
940       // Is the element referencing anything?
941       MediaPackageReference reference = element.getReference();
942       if (reference != null) {
943         MediaPackageElement referencedElement = mediaPackage.getElementByReference(reference);
944 
945         // if we are distributing the referenced element, everything is fine. Otherwise...
946         if (referencedElement != null && !keep.contains(referencedElement)) {
947 
948           // Follow the references until we find a flavor
949           MediaPackageElement parent = null;
950           while ((parent = mediaPackage.getElementByReference(reference)) != null) {
951             if (parent.getFlavor() != null && element.getFlavor() == null) {
952               element.setFlavor(parent.getFlavor());
953             }
954             if (parent.getReference() == null) {
955               break;
956             }
957             reference = parent.getReference();
958           }
959 
960           // Done. Let's cut the path but keep references to the mediapackage itself
961           if (reference != null && reference.getType().equals(MediaPackageReference.TYPE_MEDIAPACKAGE)) {
962             element.setReference(reference);
963           } else {
964             element.clearReference();
965           }
966         }
967       }
968     }
969 
970     return filteredMediaPackage;
971   }
972 
973   /**
974    * Remove all these elements from {@code publishedMp}, that matches the given flavors and tags
975    * but are not in the {@code updatedMp}.
976    *
977    * @param updatedMp the updated media package
978    * @param publishedMp the media package that is currently published
979    * @param flavors flavors of elements to update
980    * @param tags tags of elements to update
981    * @return published media package without elements, that matches the flavors and tags
982    *     but are not in the updated media package
983    */
984   public static MediaPackage removeMatchingNonExistantElements(MediaPackage updatedMp, MediaPackage publishedMp,
985           Set<MediaPackageElementFlavor> flavors, Set<String> tags) {
986     SimpleElementSelector selector = new SimpleElementSelector();
987     // Filter elements
988     for (MediaPackageElementFlavor flavor : flavors) {
989       selector.addFlavor(flavor);
990     }
991     for (String tag : tags) {
992       selector.addTag(tag);
993     }
994     for (MediaPackageElement publishedMpe : selector.select(publishedMp, true)) {
995       boolean foundInUpdatedMp = false;
996       for (MediaPackageElement updatedMpe : updatedMp.getElementsByFlavor(publishedMpe.getFlavor())) {
997         if (!updatedMpe.containsTag(tags)) {
998           // todo: this case shouldn't happen!
999         }
1000         foundInUpdatedMp = true;
1001         break;
1002       }
1003 
1004       if (!foundInUpdatedMp) {
1005         publishedMp.remove(publishedMpe);
1006       }
1007     }
1008     return publishedMp;
1009   }
1010 
1011   /**
1012    * Merges the updated media package with the one that is currently published in a way where the updated elements
1013    * replace existing ones in the published media package based on their flavor.
1014    * <p>
1015    * If <code>publishedMp</code> is <code>null</code>, this method returns the updated media package without any
1016    * modifications.
1017    *
1018    * @param updatedMp
1019    *          the updated media package
1020    * @param publishedMp
1021    *          the media package that is currently published
1022    * @return the merged media package
1023    */
1024   public static MediaPackage merge(MediaPackage updatedMp, MediaPackage publishedMp) {
1025     if (publishedMp == null) {
1026       return updatedMp;
1027     }
1028 
1029     final MediaPackage mergedMp = MediaPackageSupport.copy(publishedMp);
1030 
1031     // Merge the elements
1032     for (final MediaPackageElement updatedElement : updatedMp.elements()) {
1033       for (final MediaPackageElementFlavor flavor : Opt.nul(updatedElement.getFlavor())) {
1034         for (final MediaPackageElement outdated : mergedMp.getElementsByFlavor(flavor)) {
1035           mergedMp.remove(outdated);
1036         }
1037         logger.debug("Update {} {} of type {}", updatedElement.getElementType().toString().toLowerCase(),
1038                 updatedElement.getIdentifier(), updatedElement.getElementType());
1039         mergedMp.add(updatedElement);
1040       }
1041     }
1042 
1043     // Remove publications
1044     for (final Publication p : mergedMp.getPublications()) {
1045       mergedMp.remove(p);
1046     }
1047 
1048     // Add updated publications
1049     for (final Publication updatedPublication : updatedMp.getPublications()) {
1050       mergedMp.add(updatedPublication);
1051     }
1052 
1053     // Merge media package fields
1054     updateMediaPackageFields(mergedMp, updatedMp);
1055     return mergedMp;
1056   }
1057 
1058   @Override
1059   protected ServiceRegistry getServiceRegistry() {
1060     return serviceRegistry;
1061   }
1062 
1063   /**
1064    * {@inheritDoc}
1065    *
1066    * @see org.opencastproject.job.api.AbstractJobProducer#getSecurityService()
1067    */
1068   @Override
1069   protected SecurityService getSecurityService() {
1070     return securityService;
1071   }
1072 
1073   /**
1074    * {@inheritDoc}
1075    *
1076    * @see org.opencastproject.job.api.AbstractJobProducer#getUserDirectoryService()
1077    */
1078   @Override
1079   protected UserDirectoryService getUserDirectoryService() {
1080     return userDirectoryService;
1081   }
1082 
1083   /**
1084    * {@inheritDoc}
1085    *
1086    * @see org.opencastproject.job.api.AbstractJobProducer#getOrganizationDirectoryService()
1087    */
1088   @Override
1089   protected OrganizationDirectoryService getOrganizationDirectoryService() {
1090     return organizationDirectoryService;
1091   }
1092 
1093   /** OSGI DI */
1094   @Reference
1095   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
1096     this.serviceRegistry = serviceRegistry;
1097   }
1098 
1099   /** OSGI DI */
1100   @Reference
1101   public void setSecurityService(SecurityService securityService) {
1102     this.securityService = securityService;
1103   }
1104 
1105   /** OSGI DI */
1106   @Reference
1107   public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
1108     this.userDirectoryService = userDirectoryService;
1109   }
1110 
1111   /** OSGI DI */
1112   @Reference
1113   public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
1114     this.organizationDirectoryService = organizationDirectoryService;
1115   }
1116 
1117   /** OSGI DI */
1118   @Reference(target = "(distribution.channel=download)")
1119   public void setDownloadDistributionService(DownloadDistributionService downloadDistributionService) {
1120     this.downloadDistributionService = downloadDistributionService;
1121   }
1122 
1123   /** OSGI DI */
1124   @Reference
1125   public void setStreamingDistributionService(StreamingDistributionService streamingDistributionService) {
1126     this.streamingDistributionService = streamingDistributionService;
1127   }
1128 
1129   /** OSGI DI */
1130   @Reference
1131   public void setOaiPmhServerInfo(OaiPmhServerInfo oaiPmhServerInfo) {
1132     this.oaiPmhServerInfo = oaiPmhServerInfo;
1133   }
1134 
1135   /** OSGI DI */
1136   @Reference
1137   public void setOaiPmhDatabase(OaiPmhDatabase oaiPmhDatabase) {
1138     this.oaiPmhDatabase = oaiPmhDatabase;
1139   }
1140 }