View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.publication.oaipmh;
22  
23  import static java.lang.String.format;
24  import static org.opencastproject.util.JobUtil.waitForJobs;
25  
26  import org.opencastproject.distribution.api.DistributionException;
27  import org.opencastproject.distribution.api.DownloadDistributionService;
28  import org.opencastproject.distribution.api.StreamingDistributionService;
29  import org.opencastproject.job.api.AbstractJobProducer;
30  import org.opencastproject.job.api.Job;
31  import org.opencastproject.mediapackage.MediaPackage;
32  import org.opencastproject.mediapackage.MediaPackageElement;
33  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
34  import org.opencastproject.mediapackage.MediaPackageElementParser;
35  import org.opencastproject.mediapackage.MediaPackageException;
36  import org.opencastproject.mediapackage.MediaPackageParser;
37  import org.opencastproject.mediapackage.MediaPackageReference;
38  import org.opencastproject.mediapackage.MediaPackageRuntimeException;
39  import org.opencastproject.mediapackage.MediaPackageSupport;
40  import org.opencastproject.mediapackage.Publication;
41  import org.opencastproject.mediapackage.PublicationImpl;
42  import org.opencastproject.mediapackage.selector.SimpleElementSelector;
43  import org.opencastproject.oaipmh.persistence.OaiPmhDatabase;
44  import org.opencastproject.oaipmh.persistence.OaiPmhDatabaseException;
45  import org.opencastproject.oaipmh.persistence.QueryBuilder;
46  import org.opencastproject.oaipmh.persistence.SearchResult;
47  import org.opencastproject.oaipmh.persistence.SearchResultItem;
48  import org.opencastproject.oaipmh.server.OaiPmhServerInfo;
49  import org.opencastproject.oaipmh.server.OaiPmhServerInfoUtil;
50  import org.opencastproject.publication.api.OaiPmhPublicationService;
51  import org.opencastproject.publication.api.PublicationException;
52  import org.opencastproject.security.api.OrganizationDirectoryService;
53  import org.opencastproject.security.api.SecurityService;
54  import org.opencastproject.security.api.UserDirectoryService;
55  import org.opencastproject.serviceregistry.api.ServiceRegistry;
56  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
57  import org.opencastproject.util.MimeTypes;
58  import org.opencastproject.util.NotFoundException;
59  import org.opencastproject.util.UrlSupport;
60  import org.opencastproject.util.data.Collections;
61  
62  import org.apache.commons.lang3.BooleanUtils;
63  import org.apache.commons.lang3.StringUtils;
64  import org.apache.http.client.utils.URIUtils;
65  import org.osgi.service.component.annotations.Component;
66  import org.osgi.service.component.annotations.Reference;
67  import org.slf4j.Logger;
68  import org.slf4j.LoggerFactory;
69  
70  import java.net.URI;
71  import java.util.ArrayList;
72  import java.util.Arrays;
73  import java.util.HashSet;
74  import java.util.Hashtable;
75  import java.util.List;
76  import java.util.Map;
77  import java.util.Optional;
78  import java.util.Set;
79  import java.util.UUID;
80  import java.util.stream.Collectors;
81  import java.util.stream.Stream;
82  
83  /**
84   * Publishes a recording to an OAI-PMH publication repository.
85   */
86  @Component(
87      immediate = true,
88      service = OaiPmhPublicationService.class,
89      property = {
90          "service.description=Publication Service (OAI-PMH)"
91      }
92  )
93  public class OaiPmhPublicationServiceImpl extends AbstractJobProducer implements OaiPmhPublicationService {
94  
95    /** Logging facility */
96    private static final Logger logger = LoggerFactory.getLogger(OaiPmhPublicationServiceImpl.class);
97  
98    public enum Operation {
99      Publish, Retract, UpdateMetadata, Replace
100   }
101 
102   private DownloadDistributionService downloadDistributionService;
103   private OaiPmhServerInfo oaiPmhServerInfo;
104   private OaiPmhDatabase oaiPmhDatabase;
105   private OrganizationDirectoryService organizationDirectoryService;
106   private SecurityService securityService;
107   private ServiceRegistry serviceRegistry;
108   private StreamingDistributionService streamingDistributionService;
109   private UserDirectoryService userDirectoryService;
110 
111   public OaiPmhPublicationServiceImpl() {
112     super(JOB_TYPE);
113   }
114 
115   @Override
116   protected String process(Job job) throws Exception {
117     if (!StringUtils.equalsIgnoreCase(JOB_TYPE, job.getJobType())) {
118       throw new IllegalArgumentException("Can not handle job type " + job.getJobType());
119     }
120 
121     Publication publication = null;
122     MediaPackage mediaPackage = MediaPackageParser.getFromXml(job.getArguments().get(0));
123     String repository = job.getArguments().get(1);
124     boolean checkAvailability = false;
125     switch (Operation.valueOf(job.getOperation())) {
126       case Publish:
127         String[] downloadElementIds = StringUtils.split(job.getArguments().get(2), SEPARATOR);
128         String[] streamingElementIds = StringUtils.split(job.getArguments().get(3), SEPARATOR);
129         checkAvailability = BooleanUtils.toBoolean(job.getArguments().get(4));
130         publication = publish(job, mediaPackage, repository,
131                 Collections.set(downloadElementIds), Collections.set(streamingElementIds), checkAvailability);
132         break;
133       case Replace:
134         final Set<? extends MediaPackageElement> downloadElements =
135             Collections.toSet(MediaPackageElementParser.getArrayFromXml(job.getArguments().get(2)));
136         final Set<? extends MediaPackageElement> streamingElements =
137             Collections.toSet(MediaPackageElementParser.getArrayFromXml(job.getArguments().get(3)));
138         final Set<MediaPackageElementFlavor> retractDownloadFlavors = Arrays.stream(
139             StringUtils.split(job.getArguments().get(4), SEPARATOR))
140             .filter(s -> !s.isEmpty())
141             .map(MediaPackageElementFlavor::parseFlavor)
142             .collect(Collectors.toSet());
143         final Set<MediaPackageElementFlavor> retractStreamingFlavors = Arrays.stream(
144             StringUtils.split(job.getArguments().get(5), SEPARATOR))
145             .filter(s -> !s.isEmpty())
146             .map(MediaPackageElementFlavor::parseFlavor)
147             .collect(Collectors.toSet());
148         final Set<? extends MediaPackageElement> publications =
149             Collections.toSet(MediaPackageElementParser.getArrayFromXml(job.getArguments().get(6)));
150         checkAvailability = BooleanUtils.toBoolean(job.getArguments().get(7));
151         publication = replace(job, mediaPackage, repository, downloadElements, streamingElements,
152             retractDownloadFlavors, retractStreamingFlavors, publications, checkAvailability);
153         break;
154       case Retract:
155         publication = retract(job, mediaPackage, repository);
156         break;
157       case UpdateMetadata:
158         checkAvailability = BooleanUtils.toBoolean(job.getArguments().get(4));
159         String[] flavors = StringUtils.split(job.getArguments().get(2), SEPARATOR);
160         String[] tags = StringUtils.split(job.getArguments().get(3), SEPARATOR);
161         publication = updateMetadata(job, mediaPackage, repository,
162                 Collections.set(flavors), Collections.set(tags), checkAvailability);
163         break;
164       default:
165         throw new IllegalArgumentException("Can not handle this type of operation: " + job.getOperation());
166     }
167     return publication != null ? MediaPackageElementParser.getAsXml(publication) : null;
168   }
169 
170   @Override
171   public Job replace(
172       MediaPackage mediaPackage,
173       String repository,
174       Set<? extends MediaPackageElement> downloadElements,
175       Set<? extends MediaPackageElement> streamingElements,
176       Set<MediaPackageElementFlavor> retractDownloadFlavors,
177       Set<MediaPackageElementFlavor> retractStreamingFlavors,
178       Set<? extends Publication> publications,
179       boolean checkAvailability
180   ) throws PublicationException {
181     checkInputArguments(mediaPackage, repository);
182     try {
183       return serviceRegistry.createJob(JOB_TYPE, Operation.Replace.name(),
184           Arrays.asList(MediaPackageParser.getAsXml(mediaPackage), // 0
185               repository, // 1
186               MediaPackageElementParser.getArrayAsXml(Collections.toList(downloadElements)), // 2
187               MediaPackageElementParser.getArrayAsXml(Collections.toList(streamingElements)), // 3
188               StringUtils.join(retractDownloadFlavors, SEPARATOR), // 4
189               StringUtils.join(retractStreamingFlavors, SEPARATOR), // 5
190               MediaPackageElementParser.getArrayAsXml(Collections.toList(publications)), // 6
191               Boolean.toString(checkAvailability))); // 7
192     } catch (ServiceRegistryException e) {
193       throw new PublicationException("Unable to create job", e);
194     } catch (MediaPackageException e) {
195       throw new PublicationException("Unable to serialize media package elements", e);
196     }
197   }
198 
199   @Override
200   public Publication replaceSync(
201       MediaPackage mediaPackage, String repository, Set<? extends MediaPackageElement> downloadElements,
202       Set<? extends MediaPackageElement> streamingElements, Set<MediaPackageElementFlavor> retractDownloadFlavors,
203       Set<MediaPackageElementFlavor> retractStreamingFlavors, Set<? extends Publication> publications,
204       boolean checkAvailability) throws PublicationException, MediaPackageException {
205     return replace(null, mediaPackage, repository, downloadElements, streamingElements, retractDownloadFlavors,
206         retractStreamingFlavors, publications, checkAvailability);
207   }
208 
209   @Override
210   public Job publish(MediaPackage mediaPackage, String repository, Set<String> downloadElementIds,
211           Set<String> streamingElementIds, boolean checkAvailability)
212           throws PublicationException, MediaPackageException {
213     checkInputArguments(mediaPackage, repository);
214 
215     try {
216       return serviceRegistry.createJob(JOB_TYPE, Operation.Publish.toString(),
217               Arrays.asList(MediaPackageParser.getAsXml(mediaPackage), // 0
218                       repository, // 1
219                       StringUtils.join(downloadElementIds, SEPARATOR), // 2
220                       StringUtils.join(streamingElementIds, SEPARATOR), // 3
221                       Boolean.toString(checkAvailability))); // 4
222     } catch (ServiceRegistryException e) {
223       throw new PublicationException("Unable to create job", e);
224     }
225   }
226 
227   @Override
228   public Job retract(MediaPackage mediaPackage, String repository) throws PublicationException, NotFoundException {
229     checkInputArguments(mediaPackage, repository);
230 
231     try {
232       return serviceRegistry.createJob(JOB_TYPE, Operation.Retract.toString(),
233               Arrays.asList(MediaPackageParser.getAsXml(mediaPackage), repository));
234     } catch (ServiceRegistryException e) {
235       throw new PublicationException("Unable to create job", e);
236     }
237   }
238 
239   @Override
240   public Job updateMetadata(MediaPackage mediaPackage, String repository, Set<String> flavors, Set<String> tags,
241           boolean checkAvailability) throws PublicationException, MediaPackageException {
242     checkInputArguments(mediaPackage, repository);
243     if ((flavors == null || flavors.isEmpty()) && (tags == null || tags.isEmpty())) {
244       throw new IllegalArgumentException("Flavors or tags must be set");
245     }
246 
247     try {
248       return serviceRegistry.createJob(JOB_TYPE, Operation.UpdateMetadata.toString(),
249               Arrays.asList(MediaPackageParser.getAsXml(mediaPackage), // 0
250                       repository, // 1
251                       StringUtils.join(flavors, SEPARATOR), // 2
252                       StringUtils.join(tags, SEPARATOR), // 3
253                       Boolean.toString(checkAvailability))); // 4
254     } catch (ServiceRegistryException e) {
255       throw new PublicationException("Unable to create job", e);
256     }
257   }
258 
259   protected Publication publish(Job job, MediaPackage mediaPackage, String repository, Set<String> downloadElementIds,
260           Set<String> streamingElementIds, boolean checkAvailability)
261           throws PublicationException, MediaPackageException {
262     String mpId = mediaPackage.getIdentifier().toString();
263     SearchResult searchResult = oaiPmhDatabase.search(QueryBuilder.queryRepo(repository).mediaPackageId(mpId)
264             .isDeleted(false).build());
265     // retract oai-pmh if published
266     if (searchResult.size() > 0) {
267       try {
268         Publication p = retract(job, mediaPackage, repository);
269         if (p != null && mediaPackage.contains(p)) {
270           mediaPackage.remove(p);
271         }
272       } catch (NotFoundException e) {
273         logger.debug("No OAI-PMH publication found for media package {}.", mpId, e);
274         // this is ok
275       }
276     }
277     List<Job> distributionJobs = new ArrayList<>(2);
278     if (downloadElementIds != null && !downloadElementIds.isEmpty()) {
279       // select elements for download distribution
280       MediaPackage mpDownloadDist = (MediaPackage) mediaPackage.clone();
281       for (MediaPackageElement mpe : mpDownloadDist.getElements()) {
282         if (downloadElementIds.contains(mpe.getIdentifier())) {
283           continue;
284         }
285         mpDownloadDist.remove(mpe);
286       }
287       // publish to download
288       if (mpDownloadDist.getElements().length > 0) {
289         try {
290           Job downloadDistributionJob = downloadDistributionService
291                   .distribute(getPublicationChannelName(repository), mpDownloadDist, downloadElementIds,
292                           checkAvailability);
293           if (downloadDistributionJob != null) {
294             distributionJobs.add(downloadDistributionJob);
295           }
296         } catch (DistributionException e) {
297           throw new PublicationException(
298               format("Unable to distribute media package %s to download distribution.", mpId),
299               e);
300         }
301       }
302     }
303     if (streamingElementIds != null && !streamingElementIds.isEmpty()) {
304       // select elements for streaming distribution
305       MediaPackage mpStreamingDist = (MediaPackage) mediaPackage.clone();
306       for (MediaPackageElement mpe : mpStreamingDist.getElements()) {
307         if (streamingElementIds.contains(mpe.getIdentifier())) {
308           continue;
309         }
310         mpStreamingDist.remove(mpe);
311       }
312       // publish to streaming
313       if (mpStreamingDist.getElements().length > 0) {
314         try {
315           Job streamingDistributionJob = streamingDistributionService
316                   .distribute(getPublicationChannelName(repository), mpStreamingDist, streamingElementIds);
317           if (streamingDistributionJob != null) {
318             distributionJobs.add(streamingDistributionJob);
319           }
320         } catch (DistributionException e) {
321           throw new PublicationException(
322               format("Unable to distribute media package %s to streaming distribution.", mpId),
323               e);
324         }
325       }
326     }
327     if (distributionJobs.isEmpty()) {
328       throw new IllegalStateException(format(
329               "The media package %s does not contain any elements for publishing to OAI-PMH", mpId));
330     }
331     // wait for distribution jobs
332     if (!waitForJobs(job, serviceRegistry, distributionJobs).isSuccess()) {
333       throw new PublicationException(format(
334               "Unable to distribute elements of media package %s to distribution channels.", mpId));
335     }
336 
337     List<MediaPackageElement> distributedElements = new ArrayList<>();
338     for (Job distributionJob : distributionJobs) {
339       String distributedElementsXml = distributionJob.getPayload();
340       if (StringUtils.isNotBlank(distributedElementsXml)) {
341         for (MediaPackageElement distributedElement
342             : MediaPackageElementParser.getArrayFromXml(distributedElementsXml)) {
343           distributedElements.add(distributedElement);
344         }
345       }
346     }
347     MediaPackage oaiPmhDistMp = (MediaPackage) mediaPackage.clone();
348     // cleanup media package elements
349     for (MediaPackageElement mpe : oaiPmhDistMp.getElements()) {
350       // keep publications
351       if (MediaPackageElement.Type.Publication == mpe.getElementType()) {
352         continue;
353       }
354       oaiPmhDistMp.remove(mpe);
355     }
356     // ...add the distributed elements
357     for (MediaPackageElement mpe : distributedElements) {
358       oaiPmhDistMp.add(mpe);
359     }
360 
361     // publish to oai-pmh
362     try {
363       oaiPmhDatabase.store(oaiPmhDistMp, repository);
364     } catch (OaiPmhDatabaseException e) {
365       // todo: should we retract the elements from download and streaming here?
366       throw new PublicationException(
367           format("Unable to distribute media package %s to OAI-PMH repository %s", mpId, repository),
368           e);
369     }
370     return createPublicationElement(mpId, repository);
371   }
372 
373   private Publication replace(
374       Job job,
375       MediaPackage mediaPackage,
376       String repository,
377       Set<? extends MediaPackageElement> downloadElements,
378       Set<? extends MediaPackageElement> streamingElements,
379       Set<MediaPackageElementFlavor> retractDownloadFlavors,
380       Set<MediaPackageElementFlavor> retractStreamingFlavors,
381       Set<? extends MediaPackageElement> publications,
382       boolean checkAvailable
383   ) throws MediaPackageException, PublicationException {
384     final String mpId = mediaPackage.getIdentifier().toString();
385     final String channel = getPublicationChannelName(repository);
386 
387     try {
388       final SearchResult search = oaiPmhDatabase.search(QueryBuilder.queryRepo(repository).mediaPackageId(mpId)
389           .isDeleted(false).build());
390       if (search.size() > 1) {
391         throw new PublicationException("Found multiple OAI-PMH records for id " + mpId);
392       }
393       final Optional<MediaPackage> existingMp = search.getItems().stream().findFirst().map(
394               SearchResultItem::getMediaPackage);
395 
396       // Collect Ids of elements to distribute
397       final Set<String> addDownloadElementIds = downloadElements.stream()
398           .map(MediaPackageElement::getIdentifier)
399           .collect(Collectors.toSet());
400       final Set<String> addStreamingElementIds = streamingElements.stream()
401           .map(MediaPackageElement::getIdentifier)
402           .collect(Collectors.toSet());
403 
404       // Use retractFlavors to search for existing elements to retract
405       final Set<MediaPackageElement> removeDownloadElements = existingMp.map(mp ->
406           Arrays.stream(mp.getElements())
407           .filter(e -> retractDownloadFlavors.stream().anyMatch(f -> f.matches(e.getFlavor())))
408           .collect(Collectors.toSet())
409       ).orElse(java.util.Collections.emptySet());
410       final Set<MediaPackageElement> removeStreamingElements = existingMp.map(mp ->
411           Arrays.stream(mp.getElements())
412               .filter(e -> retractStreamingFlavors.stream().anyMatch(f -> f.matches(e.getFlavor())))
413               .collect(Collectors.toSet())
414       ).orElse(java.util.Collections.emptySet());
415 
416       // Element IDs to retract. Elements identified by flavor and elements to re-distribute
417       final Set<String> removeDownloadElementIds = Stream
418           .concat(removeDownloadElements.stream(), downloadElements.stream())
419           .map(MediaPackageElement::getIdentifier)
420           .collect(Collectors.toSet());
421       final Set<String> removeStreamingElementIds = Stream
422           .concat(removeStreamingElements.stream(), streamingElements.stream())
423           .map(MediaPackageElement::getIdentifier)
424           .collect(Collectors.toSet());
425 
426       if (removeDownloadElementIds.isEmpty() && removeStreamingElementIds.isEmpty()
427           && addDownloadElementIds.isEmpty() && addStreamingElementIds.isEmpty()) {
428         // Nothing to do
429         return Arrays.stream(mediaPackage.getPublications())
430             .filter(p -> channel.equals(p.getChannel()))
431             .findFirst()
432             .orElse(null);
433       }
434 
435       final MediaPackage temporaryMediaPackage = (MediaPackage) mediaPackage.clone();
436       downloadElements.forEach(temporaryMediaPackage::add);
437       streamingElements.forEach(temporaryMediaPackage::add);
438       removeDownloadElements.forEach(temporaryMediaPackage::add);
439       removeStreamingElements.forEach(temporaryMediaPackage::add);
440 
441       final List<MediaPackageElement> retractedElements = new ArrayList<>();
442       final List<MediaPackageElement> distributedElements = new ArrayList<>();
443       if (job != null) {
444         retractedElements
445             .addAll(retract(job, channel, temporaryMediaPackage, removeDownloadElementIds, removeStreamingElementIds));
446         distributedElements
447             .addAll(distribute(job, channel, temporaryMediaPackage, addDownloadElementIds, addStreamingElementIds,
448                 checkAvailable));
449       } else {
450         retractedElements
451             .addAll(retractSync(channel, temporaryMediaPackage, removeDownloadElementIds, removeStreamingElementIds));
452         distributedElements
453             .addAll(distributeSync(channel, temporaryMediaPackage, addDownloadElementIds, addStreamingElementIds,
454                 checkAvailable));
455       }
456 
457       final MediaPackage oaiPmhDistMp = (MediaPackage) existingMp.orElse(mediaPackage).clone();
458 
459       // Remove OAI-PMH publication
460       Arrays.stream(oaiPmhDistMp.getPublications())
461           .filter(p -> channel.equals(p.getChannel()))
462           .forEach(oaiPmhDistMp::remove);
463 
464       // Remove retracted elements
465       retractedElements.stream()
466           .map(MediaPackageElement::getIdentifier)
467           .forEach(oaiPmhDistMp::removeElementById);
468       // Add new distributed elements
469       distributedElements.forEach(oaiPmhDistMp::add);
470 
471       // Remove old publications
472       publications.stream()
473           .map(p -> ((Publication) p).getChannel())
474           .forEach(c -> Arrays.stream(oaiPmhDistMp.getPublications())
475               .filter(p -> c.equals(p.getChannel()))
476               .forEach(oaiPmhDistMp::remove));
477 
478       // Add updated publications
479       publications.forEach(oaiPmhDistMp::add);
480 
481       // publish to oai-pmh
482       oaiPmhDatabase.store(oaiPmhDistMp, repository);
483 
484       return Arrays.stream(mediaPackage.getPublications())
485           .filter(p -> channel.equals(p.getChannel()))
486           .findFirst()
487           .orElse(createPublicationElement(mpId, repository));
488     } catch (OaiPmhDatabaseException e) {
489       throw new PublicationException(format("Unable to update media package %s in OAI-PMH repository %s", mpId,
490           repository), e);
491     } catch (DistributionException e) {
492       throw new PublicationException(format("Unable to update OAI-PMH distributions of media package %s.", mpId), e);
493     } catch (MediaPackageRuntimeException e) {
494       throw e.getWrappedException();
495     }
496   }
497 
498   private List<MediaPackageElement> retract(
499           Job job, String channel, MediaPackage mp, Set<String> removeDownloadElementIds,
500           Set<String> removeStreamingElementIds) throws PublicationException, DistributionException {
501     final List<Job> retractJobs = new ArrayList<>(2);
502     if (!removeDownloadElementIds.isEmpty()) {
503       retractJobs.add(downloadDistributionService.retract(channel, mp, removeDownloadElementIds));
504     }
505     if (!removeStreamingElementIds.isEmpty()) {
506       retractJobs.add(streamingDistributionService.retract(channel, mp, removeStreamingElementIds));
507     }
508 
509     // wait for retract jobs
510     if (!waitForJobs(job, serviceRegistry, retractJobs).isSuccess()) {
511       throw new PublicationException(format("Unable to retract OAI-PMH distributions of media package %s",
512           mp.getIdentifier().toString()));
513     }
514 
515     return retractJobs.stream()
516         .filter(j -> StringUtils.isNotBlank(j.getPayload()))
517         .map(Job::getPayload)
518         .flatMap(p -> MediaPackageElementParser.getArrayFromXmlUnchecked(p).stream())
519         .collect(Collectors.toList());
520   }
521 
522   private List<MediaPackageElement> retractSync(String channel, MediaPackage mp, Set<String> removeDownloadElementIds,
523       Set<String> removeStreamingElementIds) throws DistributionException {
524     final List<MediaPackageElement> retracted = new ArrayList<>();
525     if (!removeDownloadElementIds.isEmpty()) {
526       retracted.addAll(downloadDistributionService.retractSync(channel, mp, removeDownloadElementIds));
527     }
528     if (!removeStreamingElementIds.isEmpty()) {
529       retracted.addAll(streamingDistributionService.retractSync(channel, mp, removeStreamingElementIds));
530     }
531     return retracted;
532   }
533 
534   private List<MediaPackageElement> distribute(
535       Job job,
536       String channel,
537       MediaPackage mp,
538       Set<String> addDownloadElementIds,
539       Set<String> addStreamingElementIds,
540       boolean checkAvailable
541   ) throws PublicationException, MediaPackageException, DistributionException {
542     final List<Job> distributeJobs = new ArrayList<>(2);
543     if (!addDownloadElementIds.isEmpty()) {
544       distributeJobs.add(downloadDistributionService.distribute(channel, mp, addDownloadElementIds,
545           checkAvailable));
546     }
547     if (!addStreamingElementIds.isEmpty()) {
548       distributeJobs.add(streamingDistributionService.distribute(channel, mp, addStreamingElementIds));
549     }
550 
551     // wait for distribute jobs
552     if (!waitForJobs(job, serviceRegistry, distributeJobs).isSuccess()) {
553       throw new PublicationException(format("Unable to distribute OAI-PMH distributions of media package %s",
554           mp.getIdentifier().toString()));
555     }
556 
557     return distributeJobs.stream()
558         .filter(j -> StringUtils.isNotBlank(j.getPayload()))
559         .map(Job::getPayload)
560         .flatMap(p -> MediaPackageElementParser.getArrayFromXmlUnchecked(p).stream())
561         .collect(Collectors.toList());
562   }
563 
564   private List<MediaPackageElement> distributeSync(
565       String channel, MediaPackage mp, Set<String> addDownloadElementIds, Set<String> addStreamingElementIds,
566       boolean checkAvailable) throws DistributionException {
567     final List<MediaPackageElement> distributed = new ArrayList<>();
568     if (!addDownloadElementIds.isEmpty()) {
569       distributed.addAll(downloadDistributionService.distributeSync(channel, mp, addDownloadElementIds,
570           checkAvailable));
571     }
572     if (!addStreamingElementIds.isEmpty()) {
573       distributed.addAll(streamingDistributionService.distributeSync(channel, mp, addStreamingElementIds));
574     }
575     return distributed;
576   }
577 
578 
579   protected Publication retract(Job job, MediaPackage mediaPackage, String repository)
580           throws PublicationException, NotFoundException {
581     String mpId = mediaPackage.getIdentifier().toString();
582 
583     // track elements for retraction
584     MediaPackage oaiPmhMp = null;
585     SearchResult searchResult = oaiPmhDatabase.search(QueryBuilder.queryRepo(repository).mediaPackageId(mpId)
586             .isDeleted(false).build());
587     for (SearchResultItem searchResultItem : searchResult.getItems()) {
588       if (oaiPmhMp == null) {
589         oaiPmhMp = searchResultItem.getMediaPackage();
590       } else {
591         for (MediaPackageElement mpe : searchResultItem.getMediaPackage().getElements()) {
592           oaiPmhMp.add(mpe);
593         }
594       }
595     }
596 
597     // retract oai-pmh
598     try {
599       oaiPmhDatabase.delete(mpId, repository);
600     } catch (OaiPmhDatabaseException e) {
601       throw new PublicationException(format("Unable to retract media package %s from OAI-PMH repository %s",
602               mpId, repository), e);
603     } catch (NotFoundException e) {
604       logger.debug("Skip retracting media package {} from OIA-PMH repository {} as it isn't published.",
605               mpId, repository, e);
606     }
607 
608     if (oaiPmhMp != null && oaiPmhMp.getElements().length > 0) {
609       // retract files from distribution channels
610       Set<String> mpeIds = new HashSet<>();
611       for (MediaPackageElement mpe : oaiPmhMp.elements()) {
612         if (MediaPackageElement.Type.Publication == mpe.getElementType()) {
613           continue;
614         }
615 
616         mpeIds.add(mpe.getIdentifier());
617       }
618       if (!mpeIds.isEmpty()) {
619         List<Job> retractionJobs = new ArrayList<>();
620         // retract download
621         try {
622           Job retractDownloadJob = downloadDistributionService
623                   .retract(getPublicationChannelName(repository), oaiPmhMp, mpeIds);
624           if (retractDownloadJob != null) {
625             retractionJobs.add(retractDownloadJob);
626           }
627         } catch (DistributionException e) {
628           throw new PublicationException(
629               format(
630                   "Unable to create retraction job from distribution channel download for the media package %s ",
631                   mpId),
632               e);
633         }
634 
635         // retract streaming
636         try {
637           Job retractDownloadJob = streamingDistributionService
638                   .retract(getPublicationChannelName(repository), oaiPmhMp, mpeIds);
639           if (retractDownloadJob != null) {
640             retractionJobs.add(retractDownloadJob);
641           }
642         } catch (DistributionException e) {
643           throw new PublicationException(
644               format("Unable to create retraction job from distribution channel streaming for the media package %s ",
645                   mpId),
646               e);
647         }
648         if (retractionJobs.size() > 0) {
649           // wait for distribution jobs
650           if (!waitForJobs(job, serviceRegistry, retractionJobs).isSuccess()) {
651             throw new PublicationException(
652                     format("Unable to retract elements of media package %s from distribution channels.", mpId));
653           }
654         }
655       }
656     }
657 
658     String publicationChannel = getPublicationChannelName(repository);
659     for (Publication p : mediaPackage.getPublications()) {
660       if (StringUtils.equals(publicationChannel, p.getChannel())) {
661         return p;
662       }
663     }
664     return null;
665   }
666 
667   protected Publication updateMetadata(
668       Job job,
669       MediaPackage mediaPackage,
670       String repository,
671       Set<String> flavors,
672       Set<String> tags,
673       boolean checkAvailability
674   ) throws PublicationException {
675     final Set<MediaPackageElementFlavor> parsedFlavors = new HashSet<>();
676     for (String flavor : flavors) {
677       parsedFlavors.add(MediaPackageElementFlavor.parseFlavor(flavor));
678     }
679 
680     final MediaPackage filteredMp;
681     final SearchResult result = oaiPmhDatabase.search(
682         QueryBuilder.queryRepo(repository)
683             .mediaPackageId(mediaPackage)
684             .isDeleted(false)
685             .build()
686     );
687     if (result.size() == 1) {
688       // apply tags and flavors to the current media package
689       try {
690         logger.debug("filter elements with flavors {} and tags {} on media package {}",
691                 StringUtils.join(flavors, ", "), StringUtils.join(tags, ", "),
692                 MediaPackageParser.getAsXml(mediaPackage));
693 
694         filteredMp = filterMediaPackage(mediaPackage, parsedFlavors, tags);
695       } catch (MediaPackageException e) {
696         throw new PublicationException("Error filtering media package", e);
697       }
698     } else if (result.size() == 0) {
699       logger.info("Skipping update of media package {} since it is not currently published to {}",
700               mediaPackage, repository);
701       return null;
702     } else {
703       final String msg = format(
704           "More than one media package with id %s found",
705           mediaPackage.getIdentifier().toString()
706       );
707       logger.warn(msg);
708       throw new PublicationException(msg);
709     }
710     // re-distribute elements to download
711     Set<String> elementIdsToDistribute = new HashSet<>();
712     for (MediaPackageElement mpe : filteredMp.getElements()) {
713       // do not distribute publications
714       if (MediaPackageElement.Type.Publication == mpe.getElementType()) {
715         continue;
716       }
717       elementIdsToDistribute.add(mpe.getIdentifier());
718     }
719     if (elementIdsToDistribute.isEmpty()) {
720       logger.debug("The media package {} does not contain any elements to update. "
721                       + "Skip OAI-PMH metadata update operation for repository {}",
722               mediaPackage.getIdentifier().toString(), repository);
723       return null;
724     }
725     logger.debug("distribute elements {}", StringUtils.join(elementIdsToDistribute, ", "));
726     final List<MediaPackageElement> distributedElements = new ArrayList<>();
727     try {
728       Job distJob = downloadDistributionService
729           .distribute(getPublicationChannelName(repository), filteredMp, elementIdsToDistribute, checkAvailability);
730       if (job == null) {
731         throw new PublicationException("The distribution service can not handle this type of media package elements.");
732       }
733       if (!waitForJobs(job, serviceRegistry, distJob).isSuccess()) {
734         throw new PublicationException(format(
735             "Unable to distribute updated elements from media package %s to the download distribution service",
736             mediaPackage.getIdentifier().toString()));
737       }
738       if (distJob.getPayload() != null) {
739         for (MediaPackageElement mpe : MediaPackageElementParser.getArrayFromXml(distJob.getPayload())) {
740           distributedElements.add(mpe);
741         }
742       }
743     } catch (DistributionException | MediaPackageException e) {
744       throw new PublicationException(format(
745           "Unable to distribute updated elements from media package %s to the download distribution service",
746           mediaPackage.getIdentifier().toString()), e);
747     }
748 
749     // update elements (URLs)
750     for (MediaPackageElement e : filteredMp.getElements()) {
751       if (MediaPackageElement.Type.Publication.equals(e.getElementType())) {
752         continue;
753       }
754       filteredMp.remove(e);
755     }
756     for (MediaPackageElement e : distributedElements) {
757       filteredMp.add(e);
758     }
759     MediaPackage publishedMp = merge(filteredMp, removeMatchingNonExistantElements(filteredMp,
760             (MediaPackage) result.getItems().get(0).getMediaPackage().clone(), parsedFlavors, tags));
761     // Publish the media package to OAI-PMH
762     try {
763       logger.debug("Updating metadata of media package {} in {}",
764               publishedMp.getIdentifier().toString(), repository);
765       oaiPmhDatabase.store(publishedMp, repository);
766     } catch (OaiPmhDatabaseException e) {
767       throw new PublicationException(format("Media package %s could not be updated",
768               publishedMp.getIdentifier().toString()));
769     }
770     // retract orphaned elements from download distribution
771     // orphaned elements are all those elements to which the updated media package no longer refers
772     // (in terms of element uri)
773     Map<URI, MediaPackageElement> elementUriMap = new Hashtable<>();
774     for (SearchResultItem oaiPmhSearchResultItem : result.getItems()) {
775       for (MediaPackageElement mpe : oaiPmhSearchResultItem.getMediaPackage().getElements()) {
776         if (MediaPackageElement.Type.Publication == mpe.getElementType() || null == mpe.getURI()) {
777           continue;
778         }
779         elementUriMap.put(mpe.getURI(), mpe);
780       }
781     }
782     for (MediaPackageElement publishedMpe : publishedMp.getElements()) {
783       if (MediaPackageElement.Type.Publication == publishedMpe.getElementType()) {
784         continue;
785       }
786       if (elementUriMap.containsKey(publishedMpe.getURI())) {
787         elementUriMap.remove(publishedMpe.getURI());
788       }
789     }
790     Set<String> orphanedElementIds = new HashSet<>();
791     for (MediaPackageElement orphanedMpe : elementUriMap.values()) {
792       orphanedElementIds.add(orphanedMpe.getIdentifier());
793     }
794     if (!orphanedElementIds.isEmpty()) {
795       for (SearchResultItem oaiPmhSearchResultItem : result.getItems()) {
796         try {
797           Job retractJob = downloadDistributionService.retract(getPublicationChannelName(repository),
798                   oaiPmhSearchResultItem.getMediaPackage(), orphanedElementIds);
799           if (retractJob != null) {
800             if (!waitForJobs(job, serviceRegistry, retractJob).isSuccess()) {
801               logger.warn(
802                   "The download distribution retract job for the orphaned elements from media package {} "
803                       + "does not end successfully",
804                   oaiPmhSearchResultItem.getMediaPackage().getIdentifier().toString());
805             }
806           }
807         } catch (DistributionException e) {
808           logger.warn(
809               "Unable to retract orphaned elements from download distribution service for the "
810                   + "media package {} channel {}",
811               oaiPmhSearchResultItem.getMediaPackage().getIdentifier().toString(),
812               getPublicationChannelName(repository),
813               e);
814         }
815       }
816     }
817 
818     // return the publication
819     String publicationChannel = getPublicationChannelName(repository);
820     for (Publication p : mediaPackage.getPublications()) {
821       if (StringUtils.equals(publicationChannel, p.getChannel())) {
822         return p;
823       }
824     }
825     return null;
826   }
827 
828   protected void checkInputArguments(MediaPackage mediaPackage, String repository) {
829     if (mediaPackage == null) {
830       throw new IllegalArgumentException("Media package must be specified");
831     }
832     if (StringUtils.isEmpty(repository)) {
833       throw new IllegalArgumentException("Repository must be specified");
834     }
835     if (!oaiPmhServerInfo.hasRepo(repository)) {
836       throw new IllegalArgumentException("OAI-PMH repository '" + repository + "' does not exist");
837     }
838   }
839 
840   protected String getPublicationChannelName(String repository) {
841     return PUBLICATION_CHANNEL_PREFIX.concat(repository);
842   }
843 
844   /** Create a new publication element. */
845   protected Publication createPublicationElement(String mpId, String repository) throws PublicationException {
846     for (String hostUrl : OaiPmhServerInfoUtil.oaiPmhServerUrlOfCurrentOrganization(securityService)) {
847       final URI engageUri = URIUtils.resolve(
848           URI.create(UrlSupport.concat(hostUrl, oaiPmhServerInfo.getMountPoint(), repository)),
849           "?verb=ListMetadataFormats&identifier=" + mpId);
850       return PublicationImpl.publication(UUID.randomUUID().toString(), getPublicationChannelName(repository), engageUri,
851               MimeTypes.parseMimeType(MimeTypes.XML.toString()));
852     }
853     // no host URL
854     final String msg = format("No host url for oai-pmh server configured for organization %s " + "("
855             + OaiPmhServerInfoUtil.ORG_CFG_OAIPMH_SERVER_HOSTURL + ")", securityService.getOrganization().getId());
856     throw new PublicationException(msg);
857   }
858 
859   private static MediaPackage updateMediaPackageFields(MediaPackage oaiPmhMp, MediaPackage mediaPackage) {
860     oaiPmhMp.setTitle(mediaPackage.getTitle());
861     oaiPmhMp.setDate(mediaPackage.getDate());
862     oaiPmhMp.setLanguage(mediaPackage.getLanguage());
863     oaiPmhMp.setLicense(mediaPackage.getLicense());
864     oaiPmhMp.setSeries(mediaPackage.getSeries());
865     oaiPmhMp.setSeriesTitle(mediaPackage.getSeriesTitle());
866     for (String contributor : oaiPmhMp.getContributors()) {
867       oaiPmhMp.removeContributor(contributor);
868     }
869     for (String contributor : mediaPackage.getContributors()) {
870       oaiPmhMp.addContributor(contributor);
871     }
872     for (String creator : oaiPmhMp.getCreators()) {
873       oaiPmhMp.removeCreator(creator);
874     }
875     for (String creator : mediaPackage.getCreators()) {
876       oaiPmhMp.addCreator(creator);
877     }
878     for (String subject : oaiPmhMp.getSubjects()) {
879       oaiPmhMp.removeSubject(subject);
880     }
881     for (String subject : mediaPackage.getSubjects()) {
882       oaiPmhMp.addSubject(subject);
883     }
884     return oaiPmhMp;
885   }
886 
887   /**
888    * Creates a clone of the media package and removes those elements that do not match the flavor and tags filter
889    * criteria.
890    *
891    * @param mediaPackage
892    *          the media package
893    * @param flavors
894    *          the flavors
895    * @param tags
896    *          the tags
897    * @return the filtered media package
898    */
899   private MediaPackage filterMediaPackage(
900       MediaPackage mediaPackage,
901       Set<MediaPackageElementFlavor> flavors,
902       Set<String> tags
903   ) throws MediaPackageException {
904     if (flavors.isEmpty() && tags.isEmpty()) {
905       throw new IllegalArgumentException("Flavors or tags parameter must be set");
906     }
907 
908     MediaPackage filteredMediaPackage = (MediaPackage) mediaPackage.clone();
909 
910     // The list of elements to keep
911     List<MediaPackageElement> keep = new ArrayList<>();
912 
913     SimpleElementSelector selector = new SimpleElementSelector();
914     // Filter elements
915     for (MediaPackageElementFlavor flavor : flavors) {
916       selector.addFlavor(flavor);
917     }
918     for (String tag : tags) {
919       selector.addTag(tag);
920     }
921     keep.addAll(selector.select(mediaPackage, true));
922 
923     // Keep publications
924     for (Publication p : filteredMediaPackage.getPublications()) {
925       keep.add(p);
926     }
927 
928     // Fix references and flavors
929     for (MediaPackageElement element : filteredMediaPackage.getElements()) {
930 
931       if (!keep.contains(element)) {
932         logger.debug("Removing {} '{}' from media package '{}'", element.getElementType().toString().toLowerCase(),
933             element.getIdentifier(), filteredMediaPackage.getIdentifier().toString());
934         filteredMediaPackage.remove(element);
935         continue;
936       }
937 
938       // Is the element referencing anything?
939       MediaPackageReference reference = element.getReference();
940       if (reference != null) {
941         MediaPackageElement referencedElement = mediaPackage.getElementByReference(reference);
942 
943         // if we are distributing the referenced element, everything is fine. Otherwise...
944         if (referencedElement != null && !keep.contains(referencedElement)) {
945 
946           // Follow the references until we find a flavor
947           MediaPackageElement parent = null;
948           while ((parent = mediaPackage.getElementByReference(reference)) != null) {
949             if (parent.getFlavor() != null && element.getFlavor() == null) {
950               element.setFlavor(parent.getFlavor());
951             }
952             if (parent.getReference() == null) {
953               break;
954             }
955             reference = parent.getReference();
956           }
957 
958           // Done. Let's cut the path but keep references to the mediapackage itself
959           if (reference != null && reference.getType().equals(MediaPackageReference.TYPE_MEDIAPACKAGE)) {
960             element.setReference(reference);
961           } else {
962             element.clearReference();
963           }
964         }
965       }
966     }
967 
968     return filteredMediaPackage;
969   }
970 
971   /**
972    * Remove all these elements from {@code publishedMp}, that matches the given flavors and tags
973    * but are not in the {@code updatedMp}.
974    *
975    * @param updatedMp the updated media package
976    * @param publishedMp the media package that is currently published
977    * @param flavors flavors of elements to update
978    * @param tags tags of elements to update
979    * @return published media package without elements, that matches the flavors and tags
980    *     but are not in the updated media package
981    */
982   public static MediaPackage removeMatchingNonExistantElements(MediaPackage updatedMp, MediaPackage publishedMp,
983           Set<MediaPackageElementFlavor> flavors, Set<String> tags) {
984     SimpleElementSelector selector = new SimpleElementSelector();
985     // Filter elements
986     for (MediaPackageElementFlavor flavor : flavors) {
987       selector.addFlavor(flavor);
988     }
989     for (String tag : tags) {
990       selector.addTag(tag);
991     }
992     for (MediaPackageElement publishedMpe : selector.select(publishedMp, true)) {
993       boolean foundInUpdatedMp = false;
994       for (MediaPackageElement updatedMpe : updatedMp.getElementsByFlavor(publishedMpe.getFlavor())) {
995         if (!updatedMpe.containsTag(tags)) {
996           // todo: this case shouldn't happen!
997         }
998         foundInUpdatedMp = true;
999         break;
1000       }
1001 
1002       if (!foundInUpdatedMp) {
1003         publishedMp.remove(publishedMpe);
1004       }
1005     }
1006     return publishedMp;
1007   }
1008 
1009   /**
1010    * Merges the updated media package with the one that is currently published in a way where the updated elements
1011    * replace existing ones in the published media package based on their flavor.
1012    * <p>
1013    * If <code>publishedMp</code> is <code>null</code>, this method returns the updated media package without any
1014    * modifications.
1015    *
1016    * @param updatedMp
1017    *          the updated media package
1018    * @param publishedMp
1019    *          the media package that is currently published
1020    * @return the merged media package
1021    */
1022   public static MediaPackage merge(MediaPackage updatedMp, MediaPackage publishedMp) {
1023     if (publishedMp == null) {
1024       return updatedMp;
1025     }
1026 
1027     final MediaPackage mergedMp = MediaPackageSupport.copy(publishedMp);
1028 
1029     // Merge the elements
1030     for (final MediaPackageElement updatedElement : updatedMp.elements()) {
1031       MediaPackageElementFlavor flavor = updatedElement.getFlavor();
1032       if (flavor != null) {
1033         for (final MediaPackageElement outdated : mergedMp.getElementsByFlavor(flavor)) {
1034           mergedMp.remove(outdated);
1035         }
1036         logger.debug("Update {} {} of type {}", updatedElement.getElementType().toString().toLowerCase(),
1037                 updatedElement.getIdentifier(), updatedElement.getElementType());
1038         mergedMp.add(updatedElement);
1039       }
1040     }
1041 
1042     // Remove publications
1043     for (final Publication p : mergedMp.getPublications()) {
1044       mergedMp.remove(p);
1045     }
1046 
1047     // Add updated publications
1048     for (final Publication updatedPublication : updatedMp.getPublications()) {
1049       mergedMp.add(updatedPublication);
1050     }
1051 
1052     // Merge media package fields
1053     updateMediaPackageFields(mergedMp, updatedMp);
1054     return mergedMp;
1055   }
1056 
1057   @Override
1058   protected ServiceRegistry getServiceRegistry() {
1059     return serviceRegistry;
1060   }
1061 
1062   /**
1063    * {@inheritDoc}
1064    *
1065    * @see org.opencastproject.job.api.AbstractJobProducer#getSecurityService()
1066    */
1067   @Override
1068   protected SecurityService getSecurityService() {
1069     return securityService;
1070   }
1071 
1072   /**
1073    * {@inheritDoc}
1074    *
1075    * @see org.opencastproject.job.api.AbstractJobProducer#getUserDirectoryService()
1076    */
1077   @Override
1078   protected UserDirectoryService getUserDirectoryService() {
1079     return userDirectoryService;
1080   }
1081 
1082   /**
1083    * {@inheritDoc}
1084    *
1085    * @see org.opencastproject.job.api.AbstractJobProducer#getOrganizationDirectoryService()
1086    */
1087   @Override
1088   protected OrganizationDirectoryService getOrganizationDirectoryService() {
1089     return organizationDirectoryService;
1090   }
1091 
1092   /** OSGI DI */
1093   @Reference
1094   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
1095     this.serviceRegistry = serviceRegistry;
1096   }
1097 
1098   /** OSGI DI */
1099   @Reference
1100   public void setSecurityService(SecurityService securityService) {
1101     this.securityService = securityService;
1102   }
1103 
1104   /** OSGI DI */
1105   @Reference
1106   public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
1107     this.userDirectoryService = userDirectoryService;
1108   }
1109 
1110   /** OSGI DI */
1111   @Reference
1112   public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
1113     this.organizationDirectoryService = organizationDirectoryService;
1114   }
1115 
1116   /** OSGI DI */
1117   @Reference(target = "(distribution.channel=download)")
1118   public void setDownloadDistributionService(DownloadDistributionService downloadDistributionService) {
1119     this.downloadDistributionService = downloadDistributionService;
1120   }
1121 
1122   /** OSGI DI */
1123   @Reference
1124   public void setStreamingDistributionService(StreamingDistributionService streamingDistributionService) {
1125     this.streamingDistributionService = streamingDistributionService;
1126   }
1127 
1128   /** OSGI DI */
1129   @Reference
1130   public void setOaiPmhServerInfo(OaiPmhServerInfo oaiPmhServerInfo) {
1131     this.oaiPmhServerInfo = oaiPmhServerInfo;
1132   }
1133 
1134   /** OSGI DI */
1135   @Reference
1136   public void setOaiPmhDatabase(OaiPmhDatabase oaiPmhDatabase) {
1137     this.oaiPmhDatabase = oaiPmhDatabase;
1138   }
1139 }