View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.distribution.download;
22  
23  import static java.lang.String.format;
24  import static org.opencastproject.systems.OpencastConstants.DIGEST_USER_PROPERTY;
25  import static org.opencastproject.util.EqualsUtil.ne;
26  import static org.opencastproject.util.HttpUtil.waitForResource;
27  import static org.opencastproject.util.PathSupport.path;
28  import static org.opencastproject.util.RequireUtil.notNull;
29  
30  import org.opencastproject.distribution.api.AbstractDistributionService;
31  import org.opencastproject.distribution.api.DistributionException;
32  import org.opencastproject.distribution.api.DistributionService;
33  import org.opencastproject.distribution.api.DownloadDistributionService;
34  import org.opencastproject.job.api.Job;
35  import org.opencastproject.mediapackage.AdaptivePlaylist;
36  import org.opencastproject.mediapackage.MediaPackage;
37  import org.opencastproject.mediapackage.MediaPackageElement;
38  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
39  import org.opencastproject.mediapackage.MediaPackageElementParser;
40  import org.opencastproject.mediapackage.MediaPackageException;
41  import org.opencastproject.mediapackage.MediaPackageParser;
42  import org.opencastproject.mediapackage.Track;
43  import org.opencastproject.security.api.Organization;
44  import org.opencastproject.security.api.OrganizationDirectoryService;
45  import org.opencastproject.security.api.SecurityService;
46  import org.opencastproject.security.api.TrustedHttpClient;
47  import org.opencastproject.security.api.User;
48  import org.opencastproject.security.api.UserDirectoryService;
49  import org.opencastproject.security.util.SecurityUtil;
50  import org.opencastproject.serviceregistry.api.ServiceRegistry;
51  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
52  import org.opencastproject.util.FileSupport;
53  import org.opencastproject.util.LoadUtil;
54  import org.opencastproject.util.NotFoundException;
55  import org.opencastproject.util.OsgiUtil;
56  import org.opencastproject.util.UrlSupport;
57  import org.opencastproject.util.data.Effect;
58  import org.opencastproject.util.data.functions.Misc;
59  import org.opencastproject.workspace.api.Workspace;
60  
61  import com.google.gson.Gson;
62  import com.google.gson.reflect.TypeToken;
63  
64  import org.apache.commons.io.FileUtils;
65  import org.apache.commons.io.FilenameUtils;
66  import org.apache.commons.io.IOUtils;
67  import org.apache.commons.lang3.exception.ExceptionUtils;
68  import org.osgi.service.cm.ConfigurationException;
69  import org.osgi.service.cm.ManagedService;
70  import org.osgi.service.component.ComponentContext;
71  import org.osgi.service.component.annotations.Activate;
72  import org.osgi.service.component.annotations.Component;
73  import org.osgi.service.component.annotations.Reference;
74  import org.slf4j.Logger;
75  import org.slf4j.LoggerFactory;
76  
77  import java.io.File;
78  import java.io.IOException;
79  import java.io.InputStream;
80  import java.net.URI;
81  import java.net.URISyntaxException;
82  import java.nio.file.DirectoryStream;
83  import java.nio.file.FileVisitResult;
84  import java.nio.file.Files;
85  import java.nio.file.Path;
86  import java.nio.file.Paths;
87  import java.nio.file.SimpleFileVisitor;
88  import java.nio.file.attribute.BasicFileAttributes;
89  import java.util.ArrayList;
90  import java.util.Arrays;
91  import java.util.Dictionary;
92  import java.util.HashMap;
93  import java.util.HashSet;
94  import java.util.List;
95  import java.util.Map.Entry;
96  import java.util.Set;
97  import java.util.stream.Collectors;
98  
99  import javax.servlet.http.HttpServletResponse;
100 
101 /**
102  * Distributes media to the local media delivery directory.
103  */
104 @Component(
105     immediate = true,
106     service = { DistributionService.class,DownloadDistributionService.class,ManagedService.class },
107     property = {
108         "service.description=Distribution Service (Local)",
109         "service.pid=org.opencastproject.distribution.download.DownloadDistributionServiceImpl",
110         "distribution.channel=download"
111     }
112 )
113 public class DownloadDistributionServiceImpl extends AbstractDistributionService
114         implements DistributionService, DownloadDistributionService, ManagedService {
115 
116   /** Logging facility */
117   private static final Logger logger = LoggerFactory.getLogger(DownloadDistributionServiceImpl.class);
118 
119   /** List of available operations on jobs */
120   private enum Operation {
121     Distribute, Retract
122   }
123 
124   /** Receipt type */
125   public static final String JOB_TYPE = "org.opencastproject.distribution.download";
126 
127   /** Timeout in millis for checking distributed file request */
128   private static final long TIMEOUT = 60000L;
129 
130   /** The load on the system introduced by creating a distribute job */
131   public static final float DEFAULT_DISTRIBUTE_JOB_LOAD = 0.1f;
132 
133   /** The load on the system introduced by creating a retract job */
134   public static final float DEFAULT_RETRACT_JOB_LOAD = 0.1f;
135 
136   /** The key to look for in the service configuration file to override the {@link #DEFAULT_DISTRIBUTE_JOB_LOAD} */
137   public static final String DISTRIBUTE_JOB_LOAD_KEY = "job.load.download.distribute";
138 
139   /** The key to look for in the service configuration file to override the {@link #DEFAULT_RETRACT_JOB_LOAD} */
140   public static final String RETRACT_JOB_LOAD_KEY = "job.load.download.retract";
141 
142   /** The load on the system introduced by creating a distribute job */
143   private float distributeJobLoad = DEFAULT_DISTRIBUTE_JOB_LOAD;
144 
145   /** The load on the system introduced by creating a retract job */
146   private float retractJobLoad = DEFAULT_RETRACT_JOB_LOAD;
147 
148   /** Interval time in millis for checking distributed file request */
149   private static final long INTERVAL = 300L;
150 
151   private Gson gson = new Gson();
152 
153   private String systemUserName = null;
154 
155   /**
156    * Creates a new instance of the download distribution service.
157    */
158   public DownloadDistributionServiceImpl() {
159     super(JOB_TYPE);
160   }
161 
162   /**
163    * Activate method for this OSGi service implementation.
164    *
165    * @param cc
166    *          the OSGi component context
167    */
168   @Override
169   @Activate
170   public void activate(ComponentContext cc) {
171     super.activate(cc);
172     serviceUrl = cc.getBundleContext().getProperty("org.opencastproject.download.url");
173     if (serviceUrl == null) {
174       throw new IllegalStateException("Download url must be set (org.opencastproject.download.url)");
175     }
176     logger.info("Download url is {}", serviceUrl);
177 
178     String ccDistributionDirectory = cc.getBundleContext().getProperty("org.opencastproject.download.directory");
179     if (ccDistributionDirectory == null) {
180       throw new IllegalStateException("Distribution directory must be set (org.opencastproject.download.directory)");
181     }
182     this.distributionDirectory = new File(ccDistributionDirectory);
183     logger.info("Download distribution directory is {}", distributionDirectory);
184     this.distributionChannel = OsgiUtil.getComponentContextProperty(cc, CONFIG_KEY_STORE_TYPE);
185     systemUserName = cc.getBundleContext().getProperty(DIGEST_USER_PROPERTY);
186   }
187 
188   @Override
189   public String getDistributionType() {
190     return this.distributionChannel;
191   }
192 
193   @Override
194   public Job distribute(String channelId, MediaPackage mediapackage, String elementId)
195           throws DistributionException, MediaPackageException {
196     return distribute(channelId, mediapackage, elementId, true);
197   }
198 
199   @Override
200   public Job distribute(String channelId, MediaPackage mediapackage, String elementId, boolean checkAvailability)
201           throws DistributionException, MediaPackageException {
202     Set<String> elementIds = new HashSet<String>();
203     elementIds.add(elementId);
204     return distribute(channelId, mediapackage, elementIds, checkAvailability, false);
205   }
206 
207   @Override
208   public Job distribute(String channelId, MediaPackage mediapackage, Set<String> elementIds, boolean checkAvailability)
209           throws DistributionException, MediaPackageException {
210     return distribute(channelId, mediapackage, elementIds, checkAvailability, false);
211   }
212 
213   @Override
214   public Job distribute(
215       String channelId,
216       MediaPackage mediapackage,
217       Set<String> elementIds,
218       boolean checkAvailability,
219       boolean preserveReference
220   ) throws DistributionException, MediaPackageException {
221     notNull(mediapackage, "mediapackage");
222     notNull(elementIds, "elementIds");
223     notNull(channelId, "channelId");
224     try {
225       return serviceRegistry.createJob(
226               JOB_TYPE,
227               Operation.Distribute.toString(),
228               Arrays.asList(channelId, MediaPackageParser.getAsXml(mediapackage), gson.toJson(elementIds),
229                       Boolean.toString(checkAvailability), Boolean.toString(preserveReference)), distributeJobLoad);
230     } catch (ServiceRegistryException e) {
231       throw new DistributionException("Unable to create a job", e);
232     }
233   }
234 
235   /**
236    * Distribute Mediapackage elements to the download distribution service.
237    *
238    * @param channelId
239    #          The id of the publication channel to be distributed to.
240    * @param mediapackage
241    *          The media package that contains the elements to be distributed.
242    * @param elementIds
243    *          The ids of the elements that should be distributed contained within the media package.
244    * @param checkAvailability
245    *          Check the availability of the distributed element via http.
246    * @return A reference to the MediaPackageElements that have been distributed.
247    * @throws DistributionException
248    *           Thrown if the parent directory of the MediaPackageElement cannot be created, if the MediaPackageElement
249    *           cannot be copied or another unexpected exception occurs.
250    */
251   public MediaPackageElement[] distributeElements(String channelId, MediaPackage mediapackage, Set<String> elementIds,
252           boolean checkAvailability) throws DistributionException {
253     return distributeElements(channelId, mediapackage, elementIds, checkAvailability, false);
254   }
255 
256   /**
257    * Distribute Mediapackage elements to the download distribution service.
258    *
259    * @param channelId
260    #          The id of the publication channel to be distributed to.
261    * @param mediapackage
262    *          The media package that contains the elements to be distributed.
263    * @param elementIds
264    *          The ids of the elements that should be distributed contained within the media package.
265    * @param checkAvailability
266    *          Check the availability of the distributed element via http.
267    * @param preserveReference
268    *          copy actual Reference to the new distributed element
269    * @return A reference to the MediaPackageElements that have been distributed.
270    * @throws DistributionException
271    *           Thrown if the parent directory of the MediaPackageElement cannot be created, if the MediaPackageElement
272    *           cannot be copied or another unexpected exception occurs.
273    */
274   public MediaPackageElement[] distributeElements(String channelId, MediaPackage mediapackage, Set<String> elementIds,
275           boolean checkAvailability, boolean preserveReference) throws DistributionException {
276     notNull(mediapackage, "mediapackage");
277     notNull(elementIds, "elementIds");
278     notNull(channelId, "channelId");
279 
280     final Set<MediaPackageElement> elements = getElements(channelId, mediapackage, elementIds);
281     List<MediaPackageElement> distributedElements = new ArrayList<MediaPackageElement>();
282 
283     if (AdaptivePlaylist.hasHLSPlaylist(elements)) {
284       return distributeHLSElements(channelId, mediapackage, elements, checkAvailability, preserveReference);
285     } else {
286       for (MediaPackageElement element : elements) {
287         MediaPackageElement distributedElement = distributeElement(channelId, mediapackage, element, checkAvailability,
288                 preserveReference);
289         distributedElements.add(distributedElement);
290       }
291     }
292     return distributedElements.toArray(new MediaPackageElement[distributedElements.size()]);
293   }
294 
295   /**
296    * Distribute a Mediapackage element to the download distribution service.
297    *
298    * @param channelId
299    #          The id of the publication channel to be distributed to.
300    * @param mediapackage
301    *          The media package that contains the element to be distributed.
302    * @param element
303    *          The the element that should be distributed contained within the media package.
304    * @param checkAvailability
305    *          Check the availability of the distributed element via http.
306    * @param preserveReference
307    *           Copy existing Track-Reference to the new distributed Track
308    * @return A reference to the MediaPackageElement that has been distributed.
309    * @throws DistributionException
310    *           Thrown if the parent directory of the MediaPackageElement cannot be created, if the MediaPackageElement
311    *           cannot be copied or another unexpected exception occurs.
312    */
313 
314   public MediaPackageElement distributeElement(String channelId, MediaPackage mediapackage, MediaPackageElement element,
315           boolean checkAvailability, boolean preserveReference) throws DistributionException {
316 
317     final String mediapackageId = mediapackage.getIdentifier().toString();
318     final String elementId = element.getIdentifier();
319 
320     try {
321       File source;
322       try {
323         source = workspace.get(element.getURI());
324       } catch (NotFoundException e) {
325         throw new DistributionException("Unable to find " + element.getURI() + " in the workspace", e);
326       } catch (IOException e) {
327         throw new DistributionException("Error loading " + element.getURI() + " from the workspace", e);
328       }
329 
330       // Try to find a duplicated element source
331       try {
332         source = findDuplicatedElementSource(source, mediapackageId);
333       } catch (IOException e) {
334         logger.warn("Unable to find duplicated source {}: {}", source, ExceptionUtils.getMessage(e));
335       }
336 
337       File destination = getDistributionFile(channelId, mediapackage, element);
338       if (!destination.equals(source)) {
339         // Put the file in place if sourcesfile differs destinationfile
340         try {
341           FileUtils.forceMkdir(destination.getParentFile());
342         } catch (IOException e) {
343           throw new DistributionException("Unable to create " + destination.getParentFile(), e);
344         }
345         logger.debug("Distributing element {} of media package {} to publication channel {} ({})", elementId,
346             mediapackageId, channelId, destination);
347 
348         try {
349           FileSupport.link(source, destination, true);
350         } catch (IOException e) {
351           throw new DistributionException(format("Unable to copy %s to %s", source, destination), e);
352         }
353       }
354       // Create a media package element representation of the distributed file
355       MediaPackageElement distributedElement = (MediaPackageElement) element.clone();
356       try {
357         distributedElement.setURI(getDistributionUri(channelId, mediapackageId, element));
358         if (preserveReference) {
359           distributedElement.setReference(element.getReference());
360         }
361       } catch (URISyntaxException e) {
362         throw new DistributionException("Distributed element produces an invalid URI", e);
363       }
364 
365       logger.debug("Finished distributing element {} of media package {} to publication channel {}", elementId,
366           mediapackageId, channelId);
367       final URI uri = distributedElement.getURI();
368       if (checkAvailability) {
369         logger.debug("Checking availability of distributed artifact {} at {}", distributedElement, uri);
370         checkAvailability(uri);
371       }
372       return distributedElement;
373     } catch (Exception e) {
374       logger.warn("Error distributing " + element, e);
375       if (e instanceof DistributionException) {
376         throw (DistributionException) e;
377       } else {
378         throw new DistributionException(e);
379       }
380     }
381   }
382 
383   private MediaPackageElement[] distributeHLSElements(String channelId, MediaPackage mediapackage,
384           Set<MediaPackageElement> elements, boolean checkAvailability, boolean preserveReference)
385                   throws DistributionException {
386 
387     List<MediaPackageElement> distributedElements = new ArrayList<MediaPackageElement>();
388     File distributionDir = getMediaPackageDirectory(channelId, mediapackage);
389     List<MediaPackageElement> nontrackElements = elements.stream()
390             .filter(e -> e.getElementType() != MediaPackageElement.Type.Track).collect(Collectors.toList());
391     // Distribute non track items
392     for (MediaPackageElement element : nontrackElements) {
393       MediaPackageElement distributedElement = distributeElement(channelId, mediapackage, element, checkAvailability,
394               preserveReference);
395       distributedElements.add(distributedElement);
396     }
397     // Get all tracks and look for adaptive playlists
398     List<Track> trackElements = elements.stream()
399             .filter(e -> e.getElementType() == MediaPackageElement.Type.Track).map(e -> (Track) e)
400             .collect(Collectors.toList());
401     HashMap<MediaPackageElementFlavor, List<Track>> trackElementsMap
402         = new HashMap<MediaPackageElementFlavor, List<Track>>();
403     // sort into one track list for each flavor - one video
404     for (Track element : trackElements) {
405       // clone track to destination mp and put into mediapackage
406       Track t = setUpHLSElementforDistribution(channelId, mediapackage, element, preserveReference);
407       List<Track> l = trackElementsMap.get(t.getFlavor());
408       if (l == null) {
409         l = new ArrayList<Track>();
410       }
411       l.add(t);
412       trackElementsMap.put(t.getFlavor(), l);
413     }
414 
415     // Run distribution flavor by flavor to ensure that there is only one master and its renditions
416     for (Entry<MediaPackageElementFlavor, List<Track>> elementSet : trackElementsMap.entrySet()) {
417       try {
418         List<Track> tracks = elementSet.getValue();
419         // If this flavor is a HLS playlist and therefore has internal references
420         if (tracks.stream().anyMatch(AdaptivePlaylist.isHLSTrackPred)) {
421           tracks = AdaptivePlaylist.fixReferences(tracks, distributionDir); // replace with fixed elements
422         }
423         for (Track track : tracks) {
424           MediaPackageElement distributedElement = checkDistributeHLSElement(track, checkAvailability);
425           distributedElements.add(distributedElement);
426         }
427       } catch (MediaPackageException | NotFoundException | IOException e1) {
428         logger.error("HLS Prepare failed for mediapackage {} in {}", elementSet.getKey(), mediapackage, e1);
429         throw new DistributionException("Cannot distribute " + mediapackage);
430       } catch (URISyntaxException e1) {
431         logger.error("HLS Prepare failed - Bad URI syntax {} in {}", elementSet.getKey(), mediapackage, e1);
432         throw new DistributionException("Cannot distribute - BAD URI syntax " + mediapackage);
433       }
434     }
435     return distributedElements.toArray(new MediaPackageElement[distributedElements.size()]);
436   }
437 
438   public Track setUpHLSElementforDistribution(String channelId, MediaPackage mediapackage, Track element,
439           boolean preserveReference)
440                   throws DistributionException {
441 
442     final String mediapackageId = mediapackage.getIdentifier().toString();
443     final String elementId = element.getIdentifier();
444 
445     File source;
446     try {
447       source = workspace.get(element.getURI());
448     } catch (NotFoundException e) {
449       throw new DistributionException("Unable to find " + element.getURI() + " in the workspace", e);
450     } catch (IOException e) {
451       throw new DistributionException("Error loading " + element.getURI() + " from the workspace", e);
452     }
453 
454     // Try to find a duplicated element source
455     try {
456       source = findDuplicatedElementSource(source, mediapackageId);
457     } catch (IOException e) {
458       logger.warn("Unable to find duplicated source {}: {}", source, ExceptionUtils.getMessage(e));
459     }
460 
461     File destination = getDistributionFile(channelId, mediapackage, element);
462     if (!destination.equals(source)) {
463       // Put the file in place if sourcesfile differs destinationfile
464       try {
465         FileUtils.forceMkdir(destination.getParentFile());
466       } catch (IOException e) {
467         throw new DistributionException("Unable to create " + destination.getParentFile(), e);
468       }
469       logger.debug("Distributing element {} of media package {} to publication channel {} ({})", elementId,
470               mediapackageId, channelId, destination);
471 
472       try {
473         if (AdaptivePlaylist.isPlaylist(source)) { // do not link text files
474           FileSupport.copy(source, destination, true);
475         } else {
476           FileSupport.link(source, destination, true);
477         }
478       } catch (IOException e) {
479         throw new DistributionException(format("Unable to copy %s to %s", source, destination), e);
480       }
481     }
482 
483     MediaPackageElement distributeElement = (MediaPackageElement) element.clone();
484     // Create a media package element representation of the distributed file
485     try {
486       distributeElement.setURI(getDistributionUri(channelId, mediapackageId, element));
487       if (preserveReference) {
488         distributeElement.setReference(element.getReference());
489       }
490     } catch (URISyntaxException e) {
491       throw new DistributionException("Distributed element produces an invalid URI", e);
492     }
493 
494     logger.debug("Setting up element {} of media package {} for publication channel {}", elementId,
495             mediapackageId, channelId);
496     return (Track) distributeElement;
497   }
498 
499   public Track checkDistributeHLSElement(Track element, boolean checkAvailability)
500           throws DistributionException {
501 
502     final URI uri = element.getURI();
503     try {
504       if (checkAvailability) {
505         logger.debug("Checking availability of distributed artifact {} at {}", element, uri);
506         checkAvailability(uri);
507       }
508       return element;
509     } catch (Exception e) {
510       logger.warn("Error distributing " + element, e);
511       if (e instanceof DistributionException) {
512         throw (DistributionException) e;
513       } else {
514         throw new DistributionException(e);
515       }
516     }
517   }
518 
519   @Override
520   public Job retract(String channelId, MediaPackage mediapackage, String elementId) throws DistributionException {
521     Set<String> elementIds = new HashSet();
522     elementIds.add(elementId);
523     return retract(channelId, mediapackage, elementIds);
524   }
525 
526   @Override
527   public Job retract(String channelId, MediaPackage mediapackage, Set<String> elementIds)
528           throws DistributionException {
529     notNull(mediapackage, "mediapackage");
530     notNull(elementIds, "elementIds");
531     notNull(channelId, "channelId");
532     try {
533       return serviceRegistry.createJob(JOB_TYPE, Operation.Retract.toString(),
534               Arrays.asList(channelId, MediaPackageParser.getAsXml(mediapackage), gson.toJson(elementIds)),
535                    retractJobLoad);
536     } catch (ServiceRegistryException e) {
537       throw new DistributionException("Unable to create a job", e);
538     }
539   }
540 
541   @Override
542   public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediapackage, String elementId)
543           throws DistributionException, MediaPackageException {
544     Set<String> elementIds = new HashSet<String>();
545     elementIds.add(elementId);
546     return distributeSync(channelId, mediapackage, elementIds, true, false);
547   }
548 
549   public List<MediaPackageElement> distributeSync(
550           String channelId,
551           MediaPackage mediapackage,
552           Set<String> elementIds,
553           boolean checkAvailability,
554           boolean preserveReference
555   ) throws DistributionException, MediaPackageException {
556     notNull(mediapackage, "mediapackage");
557     notNull(elementIds, "elementIds");
558     notNull(channelId, "channelId");
559 
560     MediaPackageElement[] distributedElements = distributeElements(channelId, mediapackage, elementIds,
561             checkAvailability, preserveReference);
562     return Arrays.asList(distributedElements);
563   }
564 
565   @Override
566   public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediapackage, Set<String> elementIds,
567                                                   boolean checkAvailability) throws DistributionException {
568     Job job = null;
569     try {
570       job = serviceRegistry
571           .createJob(
572               JOB_TYPE, Operation.Distribute.toString(), null, null, false, distributeJobLoad);
573       job.setStatus(Job.Status.RUNNING);
574       job = serviceRegistry.updateJob(job);
575       final MediaPackageElement[] mediaPackageElements
576           = this.distributeElements(channelId, mediapackage, elementIds, checkAvailability);
577       job.setStatus(Job.Status.FINISHED);
578       return Arrays.asList(mediaPackageElements);
579     } catch (ServiceRegistryException e) {
580       throw new DistributionException(e);
581     } catch (NotFoundException e) {
582       throw new DistributionException("Unable to update distribution job", e);
583     } finally {
584       finallyUpdateJob(job);
585     }
586   }
587 
588   @Override
589   public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediapackage, String elementId)
590           throws DistributionException, MediaPackageException {
591     Set<String> elementIds = new HashSet();
592     elementIds.add(elementId);
593     return retractSync(channelId, mediapackage, elementIds);
594   }
595 
596   public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediapackage, Set<String> elementIds)
597           throws DistributionException {
598     notNull(mediapackage, "mediapackage");
599     notNull(elementIds, "elementIds");
600     notNull(channelId, "channelId");
601     MediaPackageElement[] retractedElements = retractElements(channelId, mediapackage, elementIds);
602     return Arrays.asList(retractedElements);
603   }
604 
605   /**
606    * Retract a media package element from the distribution channel. The retracted element must not necessarily be the
607    * one given as parameter <code>elementId</code>. Instead, the element's distribution URI will be calculated. This way
608    * you are able to retract elements by providing the "original" element here.
609    *
610    * @param channelId
611    *          the channel id
612    * @param mediapackage
613    *          the mediapackage
614    * @param elementIds
615    *          the element identifiers
616    * @return the retracted element or <code>null</code> if the element was not retracted
617    * @throws org.opencastproject.distribution.api.DistributionException
618    *           in case of an error
619    */
620   protected MediaPackageElement[] retractElements(String channelId, MediaPackage mediapackage, Set<String> elementIds)
621           throws DistributionException {
622     notNull(mediapackage, "mediapackage");
623     notNull(elementIds, "elementIds");
624     notNull(channelId, "channelId");
625 
626     Set<MediaPackageElement> elements = getElements(channelId, mediapackage, elementIds);
627     List<MediaPackageElement> retractedElements = new ArrayList<MediaPackageElement>();
628 
629     for (MediaPackageElement element : elements) {
630       MediaPackageElement retractedElement = retractElement(channelId, mediapackage, element);
631       retractedElements.add(retractedElement);
632     }
633     return retractedElements.toArray(new MediaPackageElement[retractedElements.size()]);
634   }
635 
636   /**
637    * Retract a media package element from the distribution channel. The retracted element must not necessarily be the
638    * one given as parameter <code>elementId</code>. Instead, the element's distribution URI will be calculated. This way
639    * you are able to retract elements by providing the "original" element here.
640    *
641    * @param channelId
642    *          the channel id
643    * @param mediapackage
644    *          the mediapackage
645    * @param element
646    *          the element
647    * @return the retracted element or <code>null</code> if the element was not retracted
648    * @throws org.opencastproject.distribution.api.DistributionException
649    *           in case of an error
650    */
651   protected MediaPackageElement retractElement(String channelId, MediaPackage mediapackage, MediaPackageElement element)
652           throws DistributionException {
653     notNull(mediapackage, "mediapackage");
654     notNull(element, "element");
655     notNull(channelId, "channelId");
656 
657     String mediapackageId = mediapackage.getIdentifier().toString();
658     String elementId = element.getIdentifier();
659 
660     try {
661       final File elementFile = getDistributionFile(channelId, mediapackage, element);
662       final File mediapackageDir = getMediaPackageDirectory(channelId, mediapackage);
663       // Does the file exist? If not, the current element has not been distributed to this channel
664       // or has been removed otherwise
665       if (!elementFile.exists()) {
666         logger.info("Element {} from media package {} has already been removed or has never been distributed to "
667             + "publication channel {}", elementId, mediapackageId, channelId);
668         return element;
669       }
670 
671       logger.debug("Retracting element {} ({})", element, elementFile);
672 
673       // Try to remove the file and its parent folder representing the mediapackage element id
674       if (!FileUtils.deleteQuietly(elementFile.getParentFile())) {
675         // TODO Removing a folder containing deleted files may fail on NFS volumes. This needs a cleanup strategy.
676         logger.debug("Unable to delete folder {}", elementFile.getParentFile().getAbsolutePath());
677       }
678 
679       if (mediapackageDir.isDirectory() && mediapackageDir.list().length == 0) {
680         FileSupport.delete(mediapackageDir);
681       }
682 
683       logger.debug("Finished retracting element {} of media package {} from publication channel {}", elementId,
684           mediapackageId, channelId);
685       return element;
686     } catch (Exception e) {
687       logger.warn("Error retracting element {} of media package {} from publication channel {}", elementId,
688           mediapackageId, channelId, e);
689       if (e instanceof DistributionException) {
690         throw (DistributionException) e;
691       } else {
692         throw new DistributionException(e);
693       }
694     }
695   }
696 
697   /**
698    * {@inheritDoc}
699    *
700    * @see org.opencastproject.job.api.AbstractJobProducer#process(org.opencastproject.job.api.Job)
701    */
702   @Override
703   protected String process(Job job) throws Exception {
704     Operation op = null;
705     String operation = job.getOperation();
706     List<String> arguments = job.getArguments();
707     try {
708       op = Operation.valueOf(operation);
709       String channelId = arguments.get(0);
710       MediaPackage mediapackage = MediaPackageParser.getFromXml(arguments.get(1));
711       Set<String> elementIds = gson.fromJson(arguments.get(2), new TypeToken<Set<String>>() { }.getType());
712 
713       switch (op) {
714         case Distribute:
715           Boolean checkAvailability = Boolean.parseBoolean(arguments.get(3));
716           Boolean preserveReference = Boolean.parseBoolean(arguments.get(4));
717           MediaPackageElement[] distributedElements = distributeElements(channelId, mediapackage, elementIds,
718                   checkAvailability, preserveReference);
719           return (distributedElements != null)
720                   ? MediaPackageElementParser.getArrayAsXml(Arrays.asList(distributedElements)) : null;
721         case Retract:
722           MediaPackageElement[] retractedElements = retractElements(channelId, mediapackage, elementIds);
723           return (retractedElements != null) ? MediaPackageElementParser.getArrayAsXml(Arrays.asList(retractedElements))
724                   : null;
725         default:
726           throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
727       }
728     } catch (IllegalArgumentException e) {
729       throw new ServiceRegistryException("This service can't handle operations of type '" + op + "'", e);
730     } catch (IndexOutOfBoundsException e) {
731       throw new ServiceRegistryException("This argument list for operation '" + op + "' does not meet expectations", e);
732     } catch (Exception e) {
733       throw new ServiceRegistryException("Error handling operation '" + op + "'", e);
734     }
735   }
736 
737   private Set<MediaPackageElement> getElements(String channelId, MediaPackage mediapackage, Set<String> elementIds)
738           throws IllegalStateException {
739     final Set<MediaPackageElement> elements = new HashSet<MediaPackageElement>();
740     for (String elementId : elementIds) {
741       MediaPackageElement element = mediapackage.getElementById(elementId);
742       if (element != null) {
743         elements.add(element);
744       } else {
745         element = Arrays.stream(mediapackage.getPublications())
746                .filter(p -> p.getChannel().equals(channelId))
747                .flatMap(p -> Arrays.stream(p.getAttachments())
748                .filter(a -> a.getIdentifier().equals(elementId)))
749                .findAny()
750                .orElseThrow(() ->
751                        new IllegalStateException(format("No element %s found in mediapackage %s", elementId,
752                            mediapackage.getIdentifier())));
753         elements.add(element);
754       }
755     }
756     return elements;
757   }
758 
759   /**
760    * Try to find the same file being already distributed in one of the other channels
761    *
762    * @param source
763    *          the source file
764    * @param mpId
765    *          the element's mediapackage id
766    * @return the found duplicated file or the given source if nothing has been found
767    * @throws IOException
768    *           if an I/O error occurs
769    */
770   private File findDuplicatedElementSource(final File source, final String mpId) throws IOException {
771     String orgId = securityService.getOrganization().getId();
772     final Path rootPath = Paths.get(distributionDirectory.getAbsolutePath(), orgId);
773 
774     if (!Files.exists(rootPath)) {
775       return source;
776     }
777 
778     List<Path> mediaPackageDirectories = new ArrayList<>();
779     try (DirectoryStream<Path> directoryStream = Files.newDirectoryStream(rootPath)) {
780       for (Path path : directoryStream) {
781         Path mpDir = path.resolve(mpId);
782         if (Files.exists(mpDir)) {
783           mediaPackageDirectories.add(mpDir);
784         }
785       }
786     }
787 
788     if (mediaPackageDirectories.isEmpty()) {
789       return source;
790     }
791 
792     final long size = Files.size(source.toPath());
793 
794     final File[] result = new File[1];
795     for (Path p : mediaPackageDirectories) {
796       Files.walkFileTree(p, new SimpleFileVisitor<Path>() {
797         @Override
798         public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
799           if (attrs.isDirectory()) {
800             return FileVisitResult.CONTINUE;
801           }
802 
803           if (size != attrs.size()) {
804             return FileVisitResult.CONTINUE;
805           }
806 
807           try (InputStream is1 = Files.newInputStream(source.toPath()); InputStream is2 = Files.newInputStream(file)) {
808             if (!IOUtils.contentEquals(is1, is2)) {
809               return FileVisitResult.CONTINUE;
810             }
811           }
812           result[0] = file.toFile();
813           return FileVisitResult.TERMINATE;
814         }
815       });
816       if (result[0] != null) {
817         break;
818       }
819     }
820     if (result[0] != null) {
821       return result[0];
822     }
823 
824     return source;
825   }
826 
827   /**
828    * Gets the destination file to copy the contents of a mediapackage element.
829    *
830    * @return The file to copy the content to
831    */
832   protected File getDistributionFile(String channelId, MediaPackage mp, MediaPackageElement element) {
833     final String uriString = element.getURI().toString().split("\\?")[0];
834     final String directoryName = distributionDirectory.getAbsolutePath();
835     final String orgId = securityService.getOrganization().getId();
836     if (uriString.startsWith(serviceUrl)) {
837       String[] splitUrl = uriString.substring(serviceUrl.length() + 1).split("/");
838       if (splitUrl.length < 5) {
839         logger.warn("Malformed URI {}. Format must be .../{orgId}/{channelId}/{mediapackageId}/{elementId}/{fileName}."
840                         + " Trying URI without channelId", uriString);
841         return new File(path(directoryName, orgId, splitUrl[1], splitUrl[2], splitUrl[3]));
842       } else {
843         return new File(path(directoryName, orgId, splitUrl[1], splitUrl[2], splitUrl[3], splitUrl[4]));
844       }
845     }
846     return new File(path(directoryName, orgId, channelId, mp.getIdentifier().toString(), element.getIdentifier(),
847             FilenameUtils.getName(uriString)));
848   }
849 
850   /**
851    * Gets the directory containing the distributed files for this mediapackage.
852    *
853    * @return the filesystem directory
854    */
855   protected File getMediaPackageDirectory(String channelId, MediaPackage mp) {
856     final String orgId = securityService.getOrganization().getId();
857     return new File(distributionDirectory, path(orgId, channelId, mp.getIdentifier().toString()));
858   }
859 
860   /**
861    * Gets the URI for the element to be distributed.
862    *
863    * @param mediaPackageId
864    *          the mediapackage identifier
865    * @param element
866    *          The mediapackage element being distributed
867    * @return The resulting URI after distribution
868    * @throws URISyntaxException
869    *           if the concrete implementation tries to create a malformed uri
870    */
871   protected URI getDistributionUri(String channelId, String mediaPackageId, MediaPackageElement element)
872           throws URISyntaxException {
873     String elementId = element.getIdentifier();
874     String fileName = FilenameUtils.getName(element.getURI().toString());
875     String orgId = securityService.getOrganization().getId();
876     String destinationURI = UrlSupport.concat(serviceUrl, orgId, channelId, mediaPackageId, elementId, fileName);
877     return new URI(destinationURI);
878   }
879 
880   @Override
881   public void updated(@SuppressWarnings("rawtypes") Dictionary properties) throws ConfigurationException {
882     distributeJobLoad = LoadUtil.getConfiguredLoadValue(properties, DISTRIBUTE_JOB_LOAD_KEY,
883             DEFAULT_DISTRIBUTE_JOB_LOAD, serviceRegistry);
884     retractJobLoad = LoadUtil.getConfiguredLoadValue(properties, RETRACT_JOB_LOAD_KEY, DEFAULT_RETRACT_JOB_LOAD,
885             serviceRegistry);
886   }
887 
888   /**
889    * Checks whether requesting the given HTTP URI results in 200 OK. If not, a
890    * `DistributionException` is thrown. The HTTP request is done with the system
891    * user to ensure our request is properly authorized.
892    */
893   private void checkAvailability(URI uri) {
894     final Organization organization = getSecurityService().getOrganization();
895     final User systemUser = SecurityUtil.createSystemUser(systemUserName, organization);
896     SecurityUtil.runAs(getSecurityService(), organization, systemUser, () -> {
897       waitForResource(trustedHttpClient, uri, HttpServletResponse.SC_OK, TIMEOUT, INTERVAL)
898           .fold(Misc.chuck(), new Effect.X<Integer>() {
899             @Override
900             public void xrun(Integer status) throws Exception {
901               if (ne(status, HttpServletResponse.SC_OK)) {
902                 logger.warn("Attempt to access distributed file {} returned code {}", uri, status);
903                 throw new DistributionException("Unable to load distributed file " + uri.toString());
904               }
905             }
906           });
907     });
908   }
909 
910   @Reference
911   @Override
912   public void setWorkspace(Workspace workspace) {
913     super.setWorkspace(workspace);
914   }
915 
916   @Reference
917   @Override
918   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
919     super.setServiceRegistry(serviceRegistry);
920   }
921 
922   @Reference
923   @Override
924   public void setSecurityService(SecurityService securityService) {
925     super.setSecurityService(securityService);
926   }
927 
928   @Reference
929   @Override
930   public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
931     super.setUserDirectoryService(userDirectoryService);
932   }
933 
934   @Reference
935   @Override
936   public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
937     super.setOrganizationDirectoryService(organizationDirectoryService);
938   }
939 
940   @Reference
941   @Override
942   public void setTrustedHttpClient(TrustedHttpClient trustedHttpClient) {
943     super.setTrustedHttpClient(trustedHttpClient);
944   }
945 
946 }