View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.distribution.download;
22  
23  import static java.lang.String.format;
24  import static org.opencastproject.systems.OpencastConstants.DIGEST_USER_PROPERTY;
25  import static org.opencastproject.util.EqualsUtil.ne;
26  import static org.opencastproject.util.HttpUtil.waitForResource;
27  import static org.opencastproject.util.PathSupport.path;
28  import static org.opencastproject.util.RequireUtil.notNull;
29  
30  import org.opencastproject.distribution.api.AbstractDistributionService;
31  import org.opencastproject.distribution.api.DistributionException;
32  import org.opencastproject.distribution.api.DistributionService;
33  import org.opencastproject.distribution.api.DownloadDistributionService;
34  import org.opencastproject.job.api.Job;
35  import org.opencastproject.mediapackage.AdaptivePlaylist;
36  import org.opencastproject.mediapackage.MediaPackage;
37  import org.opencastproject.mediapackage.MediaPackageElement;
38  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
39  import org.opencastproject.mediapackage.MediaPackageElementParser;
40  import org.opencastproject.mediapackage.MediaPackageException;
41  import org.opencastproject.mediapackage.MediaPackageParser;
42  import org.opencastproject.mediapackage.Track;
43  import org.opencastproject.security.api.Organization;
44  import org.opencastproject.security.api.OrganizationDirectoryService;
45  import org.opencastproject.security.api.SecurityService;
46  import org.opencastproject.security.api.TrustedHttpClient;
47  import org.opencastproject.security.api.User;
48  import org.opencastproject.security.api.UserDirectoryService;
49  import org.opencastproject.security.util.SecurityUtil;
50  import org.opencastproject.serviceregistry.api.ServiceRegistry;
51  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
52  import org.opencastproject.util.FileSupport;
53  import org.opencastproject.util.LoadUtil;
54  import org.opencastproject.util.NotFoundException;
55  import org.opencastproject.util.OsgiUtil;
56  import org.opencastproject.util.UrlSupport;
57  import org.opencastproject.util.data.functions.Misc;
58  import org.opencastproject.workspace.api.Workspace;
59  
60  import com.google.gson.Gson;
61  import com.google.gson.reflect.TypeToken;
62  
63  import org.apache.commons.io.FileUtils;
64  import org.apache.commons.io.FilenameUtils;
65  import org.apache.commons.io.IOUtils;
66  import org.apache.commons.lang3.exception.ExceptionUtils;
67  import org.osgi.service.cm.ConfigurationException;
68  import org.osgi.service.cm.ManagedService;
69  import org.osgi.service.component.ComponentContext;
70  import org.osgi.service.component.annotations.Activate;
71  import org.osgi.service.component.annotations.Component;
72  import org.osgi.service.component.annotations.Reference;
73  import org.slf4j.Logger;
74  import org.slf4j.LoggerFactory;
75  
76  import java.io.File;
77  import java.io.IOException;
78  import java.io.InputStream;
79  import java.net.URI;
80  import java.net.URISyntaxException;
81  import java.nio.file.DirectoryStream;
82  import java.nio.file.FileVisitResult;
83  import java.nio.file.Files;
84  import java.nio.file.Path;
85  import java.nio.file.Paths;
86  import java.nio.file.SimpleFileVisitor;
87  import java.nio.file.attribute.BasicFileAttributes;
88  import java.util.ArrayList;
89  import java.util.Arrays;
90  import java.util.Dictionary;
91  import java.util.HashMap;
92  import java.util.HashSet;
93  import java.util.List;
94  import java.util.Map.Entry;
95  import java.util.Set;
96  import java.util.stream.Collectors;
97  
98  import javax.servlet.http.HttpServletResponse;
99  
100 /**
101  * Distributes media to the local media delivery directory.
102  */
103 @Component(
104     immediate = true,
105     service = { DistributionService.class,DownloadDistributionService.class,ManagedService.class },
106     property = {
107         "service.description=Distribution Service (Local)",
108         "service.pid=org.opencastproject.distribution.download.DownloadDistributionServiceImpl",
109         "distribution.channel=download"
110     }
111 )
112 public class DownloadDistributionServiceImpl extends AbstractDistributionService
113         implements DistributionService, DownloadDistributionService, ManagedService {
114 
115   /** Logging facility */
116   private static final Logger logger = LoggerFactory.getLogger(DownloadDistributionServiceImpl.class);
117 
118   /** List of available operations on jobs */
119   private enum Operation {
120     Distribute, Retract
121   }
122 
123   /** Receipt type */
124   public static final String JOB_TYPE = "org.opencastproject.distribution.download";
125 
126   /** Timeout in millis for checking distributed file request */
127   private static final long TIMEOUT = 60000L;
128 
129   /** The load on the system introduced by creating a distribute job */
130   public static final float DEFAULT_DISTRIBUTE_JOB_LOAD = 0.1f;
131 
132   /** The load on the system introduced by creating a retract job */
133   public static final float DEFAULT_RETRACT_JOB_LOAD = 0.1f;
134 
135   /** The key to look for in the service configuration file to override the {@link #DEFAULT_DISTRIBUTE_JOB_LOAD} */
136   public static final String DISTRIBUTE_JOB_LOAD_KEY = "job.load.download.distribute";
137 
138   /** The key to look for in the service configuration file to override the {@link #DEFAULT_RETRACT_JOB_LOAD} */
139   public static final String RETRACT_JOB_LOAD_KEY = "job.load.download.retract";
140 
141   /** The load on the system introduced by creating a distribute job */
142   private float distributeJobLoad = DEFAULT_DISTRIBUTE_JOB_LOAD;
143 
144   /** The load on the system introduced by creating a retract job */
145   private float retractJobLoad = DEFAULT_RETRACT_JOB_LOAD;
146 
147   /** Interval time in millis for checking distributed file request */
148   private static final long INTERVAL = 300L;
149 
150   private Gson gson = new Gson();
151 
152   private String systemUserName = null;
153 
154   /**
155    * Creates a new instance of the download distribution service.
156    */
157   public DownloadDistributionServiceImpl() {
158     super(JOB_TYPE);
159   }
160 
161   /**
162    * Activate method for this OSGi service implementation.
163    *
164    * @param cc
165    *          the OSGi component context
166    */
167   @Override
168   @Activate
169   public void activate(ComponentContext cc) {
170     super.activate(cc);
171     serviceUrl = cc.getBundleContext().getProperty("org.opencastproject.download.url");
172     if (serviceUrl == null) {
173       throw new IllegalStateException("Download url must be set (org.opencastproject.download.url)");
174     }
175     logger.info("Download url is {}", serviceUrl);
176 
177     String ccDistributionDirectory = cc.getBundleContext().getProperty("org.opencastproject.download.directory");
178     if (ccDistributionDirectory == null) {
179       throw new IllegalStateException("Distribution directory must be set (org.opencastproject.download.directory)");
180     }
181     this.distributionDirectory = new File(ccDistributionDirectory);
182     logger.info("Download distribution directory is {}", distributionDirectory);
183     this.distributionChannel = OsgiUtil.getComponentContextProperty(cc, CONFIG_KEY_STORE_TYPE);
184     systemUserName = cc.getBundleContext().getProperty(DIGEST_USER_PROPERTY);
185   }
186 
187   @Override
188   public String getDistributionType() {
189     return this.distributionChannel;
190   }
191 
192   @Override
193   public Job distribute(String channelId, MediaPackage mediapackage, String elementId)
194           throws DistributionException, MediaPackageException {
195     return distribute(channelId, mediapackage, elementId, true);
196   }
197 
198   @Override
199   public Job distribute(String channelId, MediaPackage mediapackage, String elementId, boolean checkAvailability)
200           throws DistributionException, MediaPackageException {
201     Set<String> elementIds = new HashSet<String>();
202     elementIds.add(elementId);
203     return distribute(channelId, mediapackage, elementIds, checkAvailability, false);
204   }
205 
206   @Override
207   public Job distribute(String channelId, MediaPackage mediapackage, Set<String> elementIds, boolean checkAvailability)
208           throws DistributionException, MediaPackageException {
209     return distribute(channelId, mediapackage, elementIds, checkAvailability, false);
210   }
211 
212   @Override
213   public Job distribute(
214       String channelId,
215       MediaPackage mediapackage,
216       Set<String> elementIds,
217       boolean checkAvailability,
218       boolean preserveReference
219   ) throws DistributionException, MediaPackageException {
220     notNull(mediapackage, "mediapackage");
221     notNull(elementIds, "elementIds");
222     notNull(channelId, "channelId");
223     try {
224       return serviceRegistry.createJob(
225               JOB_TYPE,
226               Operation.Distribute.toString(),
227               Arrays.asList(channelId, MediaPackageParser.getAsXml(mediapackage), gson.toJson(elementIds),
228                       Boolean.toString(checkAvailability), Boolean.toString(preserveReference)), distributeJobLoad);
229     } catch (ServiceRegistryException e) {
230       throw new DistributionException("Unable to create a job", e);
231     }
232   }
233 
234   /**
235    * Distribute Mediapackage elements to the download distribution service.
236    *
237    * @param channelId
238    #          The id of the publication channel to be distributed to.
239    * @param mediapackage
240    *          The media package that contains the elements to be distributed.
241    * @param elementIds
242    *          The ids of the elements that should be distributed contained within the media package.
243    * @param checkAvailability
244    *          Check the availability of the distributed element via http.
245    * @return A reference to the MediaPackageElements that have been distributed.
246    * @throws DistributionException
247    *           Thrown if the parent directory of the MediaPackageElement cannot be created, if the MediaPackageElement
248    *           cannot be copied or another unexpected exception occurs.
249    */
250   public MediaPackageElement[] distributeElements(String channelId, MediaPackage mediapackage, Set<String> elementIds,
251           boolean checkAvailability) throws DistributionException {
252     return distributeElements(channelId, mediapackage, elementIds, checkAvailability, false);
253   }
254 
255   /**
256    * Distribute Mediapackage elements to the download distribution service.
257    *
258    * @param channelId
259    #          The id of the publication channel to be distributed to.
260    * @param mediapackage
261    *          The media package that contains the elements to be distributed.
262    * @param elementIds
263    *          The ids of the elements that should be distributed contained within the media package.
264    * @param checkAvailability
265    *          Check the availability of the distributed element via http.
266    * @param preserveReference
267    *          copy actual Reference to the new distributed element
268    * @return A reference to the MediaPackageElements that have been distributed.
269    * @throws DistributionException
270    *           Thrown if the parent directory of the MediaPackageElement cannot be created, if the MediaPackageElement
271    *           cannot be copied or another unexpected exception occurs.
272    */
273   public MediaPackageElement[] distributeElements(String channelId, MediaPackage mediapackage, Set<String> elementIds,
274           boolean checkAvailability, boolean preserveReference) throws DistributionException {
275     notNull(mediapackage, "mediapackage");
276     notNull(elementIds, "elementIds");
277     notNull(channelId, "channelId");
278 
279     final Set<MediaPackageElement> elements = getElements(channelId, mediapackage, elementIds);
280     List<MediaPackageElement> distributedElements = new ArrayList<MediaPackageElement>();
281 
282     if (AdaptivePlaylist.hasHLSPlaylist(elements)) {
283       return distributeHLSElements(channelId, mediapackage, elements, checkAvailability, preserveReference);
284     } else {
285       for (MediaPackageElement element : elements) {
286         MediaPackageElement distributedElement = distributeElement(channelId, mediapackage, element, checkAvailability,
287                 preserveReference);
288         distributedElements.add(distributedElement);
289       }
290     }
291     return distributedElements.toArray(new MediaPackageElement[distributedElements.size()]);
292   }
293 
294   /**
295    * Distribute a Mediapackage element to the download distribution service.
296    *
297    * @param channelId
298    #          The id of the publication channel to be distributed to.
299    * @param mediapackage
300    *          The media package that contains the element to be distributed.
301    * @param element
302    *          The the element that should be distributed contained within the media package.
303    * @param checkAvailability
304    *          Check the availability of the distributed element via http.
305    * @param preserveReference
306    *           Copy existing Track-Reference to the new distributed Track
307    * @return A reference to the MediaPackageElement that has been distributed.
308    * @throws DistributionException
309    *           Thrown if the parent directory of the MediaPackageElement cannot be created, if the MediaPackageElement
310    *           cannot be copied or another unexpected exception occurs.
311    */
312 
313   public MediaPackageElement distributeElement(String channelId, MediaPackage mediapackage, MediaPackageElement element,
314           boolean checkAvailability, boolean preserveReference) throws DistributionException {
315 
316     final String mediapackageId = mediapackage.getIdentifier().toString();
317     final String elementId = element.getIdentifier();
318 
319     try {
320       File source;
321       try {
322         source = workspace.get(element.getURI());
323       } catch (NotFoundException e) {
324         throw new DistributionException("Unable to find " + element.getURI() + " in the workspace", e);
325       } catch (IOException e) {
326         throw new DistributionException("Error loading " + element.getURI() + " from the workspace", e);
327       }
328 
329       // Try to find a duplicated element source
330       try {
331         source = findDuplicatedElementSource(source, mediapackageId);
332       } catch (IOException e) {
333         logger.warn("Unable to find duplicated source {}: {}", source, ExceptionUtils.getMessage(e));
334       }
335 
336       File destination = getDistributionFile(channelId, mediapackage, element);
337       if (!destination.equals(source)) {
338         // Put the file in place if sourcesfile differs destinationfile
339         try {
340           FileUtils.forceMkdir(destination.getParentFile());
341         } catch (IOException e) {
342           throw new DistributionException("Unable to create " + destination.getParentFile(), e);
343         }
344         logger.debug("Distributing element {} of media package {} to publication channel {} ({})", elementId,
345             mediapackageId, channelId, destination);
346 
347         try {
348           FileSupport.link(source, destination, true);
349         } catch (IOException e) {
350           throw new DistributionException(format("Unable to copy %s to %s", source, destination), e);
351         }
352       }
353       // Create a media package element representation of the distributed file
354       MediaPackageElement distributedElement = (MediaPackageElement) element.clone();
355       try {
356         distributedElement.setURI(getDistributionUri(channelId, mediapackageId, element));
357         if (preserveReference) {
358           distributedElement.setReference(element.getReference());
359         }
360       } catch (URISyntaxException e) {
361         throw new DistributionException("Distributed element produces an invalid URI", e);
362       }
363 
364       logger.debug("Finished distributing element {} of media package {} to publication channel {}", elementId,
365           mediapackageId, channelId);
366       final URI uri = distributedElement.getURI();
367       if (checkAvailability) {
368         logger.debug("Checking availability of distributed artifact {} at {}", distributedElement, uri);
369         checkAvailability(uri);
370       }
371       return distributedElement;
372     } catch (Exception e) {
373       logger.warn("Error distributing " + element, e);
374       if (e instanceof DistributionException) {
375         throw (DistributionException) e;
376       } else {
377         throw new DistributionException(e);
378       }
379     }
380   }
381 
382   private MediaPackageElement[] distributeHLSElements(String channelId, MediaPackage mediapackage,
383           Set<MediaPackageElement> elements, boolean checkAvailability, boolean preserveReference)
384                   throws DistributionException {
385 
386     List<MediaPackageElement> distributedElements = new ArrayList<MediaPackageElement>();
387     File distributionDir = getMediaPackageDirectory(channelId, mediapackage);
388     List<MediaPackageElement> nontrackElements = elements.stream()
389             .filter(e -> e.getElementType() != MediaPackageElement.Type.Track).collect(Collectors.toList());
390     // Distribute non track items
391     for (MediaPackageElement element : nontrackElements) {
392       MediaPackageElement distributedElement = distributeElement(channelId, mediapackage, element, checkAvailability,
393               preserveReference);
394       distributedElements.add(distributedElement);
395     }
396     // Get all tracks and look for adaptive playlists
397     List<Track> trackElements = elements.stream()
398             .filter(e -> e.getElementType() == MediaPackageElement.Type.Track).map(e -> (Track) e)
399             .collect(Collectors.toList());
400     HashMap<MediaPackageElementFlavor, List<Track>> trackElementsMap
401         = new HashMap<MediaPackageElementFlavor, List<Track>>();
402     // sort into one track list for each flavor - one video
403     for (Track element : trackElements) {
404       // clone track to destination mp and put into mediapackage
405       Track t = setUpHLSElementforDistribution(channelId, mediapackage, element, preserveReference);
406       List<Track> l = trackElementsMap.get(t.getFlavor());
407       if (l == null) {
408         l = new ArrayList<Track>();
409       }
410       l.add(t);
411       trackElementsMap.put(t.getFlavor(), l);
412     }
413 
414     // Run distribution flavor by flavor to ensure that there is only one master and its renditions
415     for (Entry<MediaPackageElementFlavor, List<Track>> elementSet : trackElementsMap.entrySet()) {
416       try {
417         List<Track> tracks = elementSet.getValue();
418         // If this flavor is a HLS playlist and therefore has internal references
419         if (tracks.stream().anyMatch(AdaptivePlaylist.isHLSTrackPred)) {
420           tracks = AdaptivePlaylist.fixReferences(tracks, distributionDir); // replace with fixed elements
421         }
422         for (Track track : tracks) {
423           MediaPackageElement distributedElement = checkDistributeHLSElement(track, checkAvailability);
424           distributedElements.add(distributedElement);
425         }
426       } catch (MediaPackageException | NotFoundException | IOException e1) {
427         logger.error("HLS Prepare failed for mediapackage {} in {}", elementSet.getKey(), mediapackage, e1);
428         throw new DistributionException("Cannot distribute " + mediapackage);
429       } catch (URISyntaxException e1) {
430         logger.error("HLS Prepare failed - Bad URI syntax {} in {}", elementSet.getKey(), mediapackage, e1);
431         throw new DistributionException("Cannot distribute - BAD URI syntax " + mediapackage);
432       }
433     }
434     return distributedElements.toArray(new MediaPackageElement[distributedElements.size()]);
435   }
436 
437   public Track setUpHLSElementforDistribution(String channelId, MediaPackage mediapackage, Track element,
438           boolean preserveReference)
439                   throws DistributionException {
440 
441     final String mediapackageId = mediapackage.getIdentifier().toString();
442     final String elementId = element.getIdentifier();
443 
444     File source;
445     try {
446       source = workspace.get(element.getURI());
447     } catch (NotFoundException e) {
448       throw new DistributionException("Unable to find " + element.getURI() + " in the workspace", e);
449     } catch (IOException e) {
450       throw new DistributionException("Error loading " + element.getURI() + " from the workspace", e);
451     }
452 
453     // Try to find a duplicated element source
454     try {
455       source = findDuplicatedElementSource(source, mediapackageId);
456     } catch (IOException e) {
457       logger.warn("Unable to find duplicated source {}: {}", source, ExceptionUtils.getMessage(e));
458     }
459 
460     File destination = getDistributionFile(channelId, mediapackage, element);
461     if (!destination.equals(source)) {
462       // Put the file in place if sourcesfile differs destinationfile
463       try {
464         FileUtils.forceMkdir(destination.getParentFile());
465       } catch (IOException e) {
466         throw new DistributionException("Unable to create " + destination.getParentFile(), e);
467       }
468       logger.debug("Distributing element {} of media package {} to publication channel {} ({})", elementId,
469               mediapackageId, channelId, destination);
470 
471       try {
472         if (AdaptivePlaylist.isPlaylist(source)) { // do not link text files
473           FileSupport.copy(source, destination, true);
474         } else {
475           FileSupport.link(source, destination, true);
476         }
477       } catch (IOException e) {
478         throw new DistributionException(format("Unable to copy %s to %s", source, destination), e);
479       }
480     }
481 
482     MediaPackageElement distributeElement = (MediaPackageElement) element.clone();
483     // Create a media package element representation of the distributed file
484     try {
485       distributeElement.setURI(getDistributionUri(channelId, mediapackageId, element));
486       if (preserveReference) {
487         distributeElement.setReference(element.getReference());
488       }
489     } catch (URISyntaxException e) {
490       throw new DistributionException("Distributed element produces an invalid URI", e);
491     }
492 
493     logger.debug("Setting up element {} of media package {} for publication channel {}", elementId,
494             mediapackageId, channelId);
495     return (Track) distributeElement;
496   }
497 
498   public Track checkDistributeHLSElement(Track element, boolean checkAvailability)
499           throws DistributionException {
500 
501     final URI uri = element.getURI();
502     try {
503       if (checkAvailability) {
504         logger.debug("Checking availability of distributed artifact {} at {}", element, uri);
505         checkAvailability(uri);
506       }
507       return element;
508     } catch (Exception e) {
509       logger.warn("Error distributing " + element, e);
510       if (e instanceof DistributionException) {
511         throw (DistributionException) e;
512       } else {
513         throw new DistributionException(e);
514       }
515     }
516   }
517 
518   @Override
519   public Job retract(String channelId, MediaPackage mediapackage, String elementId) throws DistributionException {
520     Set<String> elementIds = new HashSet();
521     elementIds.add(elementId);
522     return retract(channelId, mediapackage, elementIds);
523   }
524 
525   @Override
526   public Job retract(String channelId, MediaPackage mediapackage, Set<String> elementIds)
527           throws DistributionException {
528     notNull(mediapackage, "mediapackage");
529     notNull(elementIds, "elementIds");
530     notNull(channelId, "channelId");
531     try {
532       return serviceRegistry.createJob(JOB_TYPE, Operation.Retract.toString(),
533               Arrays.asList(channelId, MediaPackageParser.getAsXml(mediapackage), gson.toJson(elementIds)),
534                    retractJobLoad);
535     } catch (ServiceRegistryException e) {
536       throw new DistributionException("Unable to create a job", e);
537     }
538   }
539 
540   @Override
541   public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediapackage, String elementId)
542           throws DistributionException, MediaPackageException {
543     Set<String> elementIds = new HashSet<String>();
544     elementIds.add(elementId);
545     return distributeSync(channelId, mediapackage, elementIds, true, false);
546   }
547 
548   public List<MediaPackageElement> distributeSync(
549           String channelId,
550           MediaPackage mediapackage,
551           Set<String> elementIds,
552           boolean checkAvailability,
553           boolean preserveReference
554   ) throws DistributionException, MediaPackageException {
555     notNull(mediapackage, "mediapackage");
556     notNull(elementIds, "elementIds");
557     notNull(channelId, "channelId");
558 
559     MediaPackageElement[] distributedElements = distributeElements(channelId, mediapackage, elementIds,
560             checkAvailability, preserveReference);
561     return Arrays.asList(distributedElements);
562   }
563 
564   @Override
565   public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediapackage, Set<String> elementIds,
566                                                   boolean checkAvailability) throws DistributionException {
567     Job job = null;
568     try {
569       job = serviceRegistry
570           .createJob(
571               JOB_TYPE, Operation.Distribute.toString(), null, null, false, distributeJobLoad);
572       job.setStatus(Job.Status.RUNNING);
573       job = serviceRegistry.updateJob(job);
574       final MediaPackageElement[] mediaPackageElements
575           = this.distributeElements(channelId, mediapackage, elementIds, checkAvailability);
576       job.setStatus(Job.Status.FINISHED);
577       return Arrays.asList(mediaPackageElements);
578     } catch (ServiceRegistryException e) {
579       throw new DistributionException(e);
580     } catch (NotFoundException e) {
581       throw new DistributionException("Unable to update distribution job", e);
582     } finally {
583       finallyUpdateJob(job);
584     }
585   }
586 
587   @Override
588   public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediapackage, String elementId)
589           throws DistributionException, MediaPackageException {
590     Set<String> elementIds = new HashSet();
591     elementIds.add(elementId);
592     return retractSync(channelId, mediapackage, elementIds);
593   }
594 
595   public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediapackage, Set<String> elementIds)
596           throws DistributionException {
597     notNull(mediapackage, "mediapackage");
598     notNull(elementIds, "elementIds");
599     notNull(channelId, "channelId");
600     MediaPackageElement[] retractedElements = retractElements(channelId, mediapackage, elementIds);
601     return Arrays.asList(retractedElements);
602   }
603 
604   /**
605    * Retract a media package element from the distribution channel. The retracted element must not necessarily be the
606    * one given as parameter <code>elementId</code>. Instead, the element's distribution URI will be calculated. This way
607    * you are able to retract elements by providing the "original" element here.
608    *
609    * @param channelId
610    *          the channel id
611    * @param mediapackage
612    *          the mediapackage
613    * @param elementIds
614    *          the element identifiers
615    * @return the retracted element or <code>null</code> if the element was not retracted
616    * @throws org.opencastproject.distribution.api.DistributionException
617    *           in case of an error
618    */
619   protected MediaPackageElement[] retractElements(String channelId, MediaPackage mediapackage, Set<String> elementIds)
620           throws DistributionException {
621     notNull(mediapackage, "mediapackage");
622     notNull(elementIds, "elementIds");
623     notNull(channelId, "channelId");
624 
625     Set<MediaPackageElement> elements = getElements(channelId, mediapackage, elementIds);
626     List<MediaPackageElement> retractedElements = new ArrayList<MediaPackageElement>();
627 
628     for (MediaPackageElement element : elements) {
629       MediaPackageElement retractedElement = retractElement(channelId, mediapackage, element);
630       retractedElements.add(retractedElement);
631     }
632     return retractedElements.toArray(new MediaPackageElement[retractedElements.size()]);
633   }
634 
635   /**
636    * Retract a media package element from the distribution channel. The retracted element must not necessarily be the
637    * one given as parameter <code>elementId</code>. Instead, the element's distribution URI will be calculated. This way
638    * you are able to retract elements by providing the "original" element here.
639    *
640    * @param channelId
641    *          the channel id
642    * @param mediapackage
643    *          the mediapackage
644    * @param element
645    *          the element
646    * @return the retracted element or <code>null</code> if the element was not retracted
647    * @throws org.opencastproject.distribution.api.DistributionException
648    *           in case of an error
649    */
650   protected MediaPackageElement retractElement(String channelId, MediaPackage mediapackage, MediaPackageElement element)
651           throws DistributionException {
652     notNull(mediapackage, "mediapackage");
653     notNull(element, "element");
654     notNull(channelId, "channelId");
655 
656     String mediapackageId = mediapackage.getIdentifier().toString();
657     String elementId = element.getIdentifier();
658 
659     try {
660       final File elementFile = getDistributionFile(channelId, mediapackage, element);
661       final File mediapackageDir = getMediaPackageDirectory(channelId, mediapackage);
662       // Does the file exist? If not, the current element has not been distributed to this channel
663       // or has been removed otherwise
664       if (!elementFile.exists()) {
665         logger.info("Element {} from media package {} has already been removed or has never been distributed to "
666             + "publication channel {}", elementId, mediapackageId, channelId);
667         return element;
668       }
669 
670       logger.debug("Retracting element {} ({})", element, elementFile);
671 
672       // Try to remove the file and its parent folder representing the mediapackage element id
673       if (!FileUtils.deleteQuietly(elementFile.getParentFile())) {
674         // TODO Removing a folder containing deleted files may fail on NFS volumes. This needs a cleanup strategy.
675         logger.debug("Unable to delete folder {}", elementFile.getParentFile().getAbsolutePath());
676       }
677 
678       if (mediapackageDir.isDirectory() && mediapackageDir.list().length == 0) {
679         FileSupport.delete(mediapackageDir);
680       }
681 
682       logger.debug("Finished retracting element {} of media package {} from publication channel {}", elementId,
683           mediapackageId, channelId);
684       return element;
685     } catch (Exception e) {
686       logger.warn("Error retracting element {} of media package {} from publication channel {}", elementId,
687           mediapackageId, channelId, e);
688       if (e instanceof DistributionException) {
689         throw (DistributionException) e;
690       } else {
691         throw new DistributionException(e);
692       }
693     }
694   }
695 
696   /**
697    * {@inheritDoc}
698    *
699    * @see org.opencastproject.job.api.AbstractJobProducer#process(org.opencastproject.job.api.Job)
700    */
701   @Override
702   protected String process(Job job) throws Exception {
703     Operation op = null;
704     String operation = job.getOperation();
705     List<String> arguments = job.getArguments();
706     try {
707       op = Operation.valueOf(operation);
708       String channelId = arguments.get(0);
709       MediaPackage mediapackage = MediaPackageParser.getFromXml(arguments.get(1));
710       Set<String> elementIds = gson.fromJson(arguments.get(2), new TypeToken<Set<String>>() { }.getType());
711 
712       switch (op) {
713         case Distribute:
714           Boolean checkAvailability = Boolean.parseBoolean(arguments.get(3));
715           Boolean preserveReference = Boolean.parseBoolean(arguments.get(4));
716           MediaPackageElement[] distributedElements = distributeElements(channelId, mediapackage, elementIds,
717                   checkAvailability, preserveReference);
718           return (distributedElements != null)
719                   ? MediaPackageElementParser.getArrayAsXml(Arrays.asList(distributedElements)) : null;
720         case Retract:
721           MediaPackageElement[] retractedElements = retractElements(channelId, mediapackage, elementIds);
722           return (retractedElements != null) ? MediaPackageElementParser.getArrayAsXml(Arrays.asList(retractedElements))
723                   : null;
724         default:
725           throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
726       }
727     } catch (IllegalArgumentException e) {
728       throw new ServiceRegistryException("This service can't handle operations of type '" + op + "'", e);
729     } catch (IndexOutOfBoundsException e) {
730       throw new ServiceRegistryException("This argument list for operation '" + op + "' does not meet expectations", e);
731     } catch (Exception e) {
732       throw new ServiceRegistryException("Error handling operation '" + op + "'", e);
733     }
734   }
735 
736   private Set<MediaPackageElement> getElements(String channelId, MediaPackage mediapackage, Set<String> elementIds)
737           throws IllegalStateException {
738     final Set<MediaPackageElement> elements = new HashSet<MediaPackageElement>();
739     for (String elementId : elementIds) {
740       MediaPackageElement element = mediapackage.getElementById(elementId);
741       if (element != null) {
742         elements.add(element);
743       } else {
744         element = Arrays.stream(mediapackage.getPublications())
745                .filter(p -> p.getChannel().equals(channelId))
746                .flatMap(p -> Arrays.stream(p.getAttachments())
747                .filter(a -> a.getIdentifier().equals(elementId)))
748                .findAny()
749                .orElseThrow(() ->
750                        new IllegalStateException(format("No element %s found in mediapackage %s", elementId,
751                            mediapackage.getIdentifier())));
752         elements.add(element);
753       }
754     }
755     return elements;
756   }
757 
758   /**
759    * Try to find the same file being already distributed in one of the other channels
760    *
761    * @param source
762    *          the source file
763    * @param mpId
764    *          the element's mediapackage id
765    * @return the found duplicated file or the given source if nothing has been found
766    * @throws IOException
767    *           if an I/O error occurs
768    */
769   private File findDuplicatedElementSource(final File source, final String mpId) throws IOException {
770     String orgId = securityService.getOrganization().getId();
771     final Path rootPath = Paths.get(distributionDirectory.getAbsolutePath(), orgId);
772 
773     if (!Files.exists(rootPath)) {
774       return source;
775     }
776 
777     List<Path> mediaPackageDirectories = new ArrayList<>();
778     try (DirectoryStream<Path> directoryStream = Files.newDirectoryStream(rootPath)) {
779       for (Path path : directoryStream) {
780         Path mpDir = path.resolve(mpId);
781         if (Files.exists(mpDir)) {
782           mediaPackageDirectories.add(mpDir);
783         }
784       }
785     }
786 
787     if (mediaPackageDirectories.isEmpty()) {
788       return source;
789     }
790 
791     final long size = Files.size(source.toPath());
792 
793     final File[] result = new File[1];
794     for (Path p : mediaPackageDirectories) {
795       Files.walkFileTree(p, new SimpleFileVisitor<Path>() {
796         @Override
797         public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
798           if (attrs.isDirectory()) {
799             return FileVisitResult.CONTINUE;
800           }
801 
802           if (size != attrs.size()) {
803             return FileVisitResult.CONTINUE;
804           }
805 
806           try (InputStream is1 = Files.newInputStream(source.toPath()); InputStream is2 = Files.newInputStream(file)) {
807             if (!IOUtils.contentEquals(is1, is2)) {
808               return FileVisitResult.CONTINUE;
809             }
810           }
811           result[0] = file.toFile();
812           return FileVisitResult.TERMINATE;
813         }
814       });
815       if (result[0] != null) {
816         break;
817       }
818     }
819     if (result[0] != null) {
820       return result[0];
821     }
822 
823     return source;
824   }
825 
826   /**
827    * Gets the destination file to copy the contents of a mediapackage element.
828    *
829    * @return The file to copy the content to
830    */
831   protected File getDistributionFile(String channelId, MediaPackage mp, MediaPackageElement element) {
832     final String uriString = element.getURI().toString().split("\\?")[0];
833     final String directoryName = distributionDirectory.getAbsolutePath();
834     final String orgId = securityService.getOrganization().getId();
835     if (uriString.startsWith(serviceUrl)) {
836       String[] splitUrl = uriString.substring(serviceUrl.length() + 1).split("/");
837       if (splitUrl.length < 5) {
838         logger.warn("Malformed URI {}. Format must be .../{orgId}/{channelId}/{mediapackageId}/{elementId}/{fileName}."
839                         + " Trying URI without channelId", uriString);
840         return new File(path(directoryName, orgId, splitUrl[1], splitUrl[2], splitUrl[3]));
841       } else {
842         return new File(path(directoryName, orgId, splitUrl[1], splitUrl[2], splitUrl[3], splitUrl[4]));
843       }
844     }
845     return new File(path(directoryName, orgId, channelId, mp.getIdentifier().toString(), element.getIdentifier(),
846             FilenameUtils.getName(uriString)));
847   }
848 
849   /**
850    * Gets the directory containing the distributed files for this mediapackage.
851    *
852    * @return the filesystem directory
853    */
854   protected File getMediaPackageDirectory(String channelId, MediaPackage mp) {
855     final String orgId = securityService.getOrganization().getId();
856     return new File(distributionDirectory, path(orgId, channelId, mp.getIdentifier().toString()));
857   }
858 
859   /**
860    * Gets the URI for the element to be distributed.
861    *
862    * @param mediaPackageId
863    *          the mediapackage identifier
864    * @param element
865    *          The mediapackage element being distributed
866    * @return The resulting URI after distribution
867    * @throws URISyntaxException
868    *           if the concrete implementation tries to create a malformed uri
869    */
870   protected URI getDistributionUri(String channelId, String mediaPackageId, MediaPackageElement element)
871           throws URISyntaxException {
872     String elementId = element.getIdentifier();
873     String fileName = FilenameUtils.getName(element.getURI().toString());
874     String orgId = securityService.getOrganization().getId();
875     String destinationURI = UrlSupport.concat(serviceUrl, orgId, channelId, mediaPackageId, elementId, fileName);
876     return new URI(destinationURI);
877   }
878 
879   @Override
880   public void updated(@SuppressWarnings("rawtypes") Dictionary properties) throws ConfigurationException {
881     distributeJobLoad = LoadUtil.getConfiguredLoadValue(properties, DISTRIBUTE_JOB_LOAD_KEY,
882             DEFAULT_DISTRIBUTE_JOB_LOAD, serviceRegistry);
883     retractJobLoad = LoadUtil.getConfiguredLoadValue(properties, RETRACT_JOB_LOAD_KEY, DEFAULT_RETRACT_JOB_LOAD,
884             serviceRegistry);
885   }
886 
887   /**
888    * Checks whether requesting the given HTTP URI results in 200 OK. If not, a
889    * `DistributionException` is thrown. The HTTP request is done with the system
890    * user to ensure our request is properly authorized.
891    */
892   private void checkAvailability(URI uri) {
893     final Organization organization = getSecurityService().getOrganization();
894     final User systemUser = SecurityUtil.createSystemUser(systemUserName, organization);
895     SecurityUtil.runAs(getSecurityService(), organization, systemUser, () -> {
896       waitForResource(trustedHttpClient, uri, HttpServletResponse.SC_OK, TIMEOUT, INTERVAL)
897           .fold(
898               Misc.chuck(),
899               status -> {
900                 if (ne(status, HttpServletResponse.SC_OK)) {
901                   logger.warn("Attempt to access distributed file {} returned code {}", uri, status);
902                   Misc.chuck(new DistributionException("Unable to load distributed file " + uri.toString()));
903                 }
904                 return null;
905               }
906           );
907     });
908   }
909 
910   @Reference
911   @Override
912   public void setWorkspace(Workspace workspace) {
913     super.setWorkspace(workspace);
914   }
915 
916   @Reference
917   @Override
918   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
919     super.setServiceRegistry(serviceRegistry);
920   }
921 
922   @Reference
923   @Override
924   public void setSecurityService(SecurityService securityService) {
925     super.setSecurityService(securityService);
926   }
927 
928   @Reference
929   @Override
930   public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
931     super.setUserDirectoryService(userDirectoryService);
932   }
933 
934   @Reference
935   @Override
936   public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
937     super.setOrganizationDirectoryService(organizationDirectoryService);
938   }
939 
940   @Reference
941   @Override
942   public void setTrustedHttpClient(TrustedHttpClient trustedHttpClient) {
943     super.setTrustedHttpClient(trustedHttpClient);
944   }
945 
946 }