1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.distribution.download;
22
23 import static java.lang.String.format;
24 import static org.opencastproject.systems.OpencastConstants.DIGEST_USER_PROPERTY;
25 import static org.opencastproject.util.EqualsUtil.ne;
26 import static org.opencastproject.util.HttpUtil.waitForResource;
27 import static org.opencastproject.util.PathSupport.path;
28 import static org.opencastproject.util.RequireUtil.notNull;
29
30 import org.opencastproject.distribution.api.AbstractDistributionService;
31 import org.opencastproject.distribution.api.DistributionException;
32 import org.opencastproject.distribution.api.DistributionService;
33 import org.opencastproject.distribution.api.DownloadDistributionService;
34 import org.opencastproject.job.api.Job;
35 import org.opencastproject.mediapackage.AdaptivePlaylist;
36 import org.opencastproject.mediapackage.MediaPackage;
37 import org.opencastproject.mediapackage.MediaPackageElement;
38 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
39 import org.opencastproject.mediapackage.MediaPackageElementParser;
40 import org.opencastproject.mediapackage.MediaPackageException;
41 import org.opencastproject.mediapackage.MediaPackageParser;
42 import org.opencastproject.mediapackage.Track;
43 import org.opencastproject.security.api.Organization;
44 import org.opencastproject.security.api.OrganizationDirectoryService;
45 import org.opencastproject.security.api.SecurityService;
46 import org.opencastproject.security.api.TrustedHttpClient;
47 import org.opencastproject.security.api.User;
48 import org.opencastproject.security.api.UserDirectoryService;
49 import org.opencastproject.security.util.SecurityUtil;
50 import org.opencastproject.serviceregistry.api.ServiceRegistry;
51 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
52 import org.opencastproject.util.FileSupport;
53 import org.opencastproject.util.LoadUtil;
54 import org.opencastproject.util.NotFoundException;
55 import org.opencastproject.util.OsgiUtil;
56 import org.opencastproject.util.UrlSupport;
57 import org.opencastproject.util.data.Effect;
58 import org.opencastproject.util.data.functions.Misc;
59 import org.opencastproject.workspace.api.Workspace;
60
61 import com.google.gson.Gson;
62 import com.google.gson.reflect.TypeToken;
63
64 import org.apache.commons.io.FileUtils;
65 import org.apache.commons.io.FilenameUtils;
66 import org.apache.commons.io.IOUtils;
67 import org.apache.commons.lang3.exception.ExceptionUtils;
68 import org.osgi.service.cm.ConfigurationException;
69 import org.osgi.service.cm.ManagedService;
70 import org.osgi.service.component.ComponentContext;
71 import org.osgi.service.component.annotations.Activate;
72 import org.osgi.service.component.annotations.Component;
73 import org.osgi.service.component.annotations.Reference;
74 import org.slf4j.Logger;
75 import org.slf4j.LoggerFactory;
76
77 import java.io.File;
78 import java.io.IOException;
79 import java.io.InputStream;
80 import java.net.URI;
81 import java.net.URISyntaxException;
82 import java.nio.file.DirectoryStream;
83 import java.nio.file.FileVisitResult;
84 import java.nio.file.Files;
85 import java.nio.file.Path;
86 import java.nio.file.Paths;
87 import java.nio.file.SimpleFileVisitor;
88 import java.nio.file.attribute.BasicFileAttributes;
89 import java.util.ArrayList;
90 import java.util.Arrays;
91 import java.util.Dictionary;
92 import java.util.HashMap;
93 import java.util.HashSet;
94 import java.util.List;
95 import java.util.Map.Entry;
96 import java.util.Set;
97 import java.util.stream.Collectors;
98
99 import javax.servlet.http.HttpServletResponse;
100
101
102
103
104 @Component(
105 immediate = true,
106 service = { DistributionService.class,DownloadDistributionService.class,ManagedService.class },
107 property = {
108 "service.description=Distribution Service (Local)",
109 "service.pid=org.opencastproject.distribution.download.DownloadDistributionServiceImpl",
110 "distribution.channel=download"
111 }
112 )
113 public class DownloadDistributionServiceImpl extends AbstractDistributionService
114 implements DistributionService, DownloadDistributionService, ManagedService {
115
116
117 private static final Logger logger = LoggerFactory.getLogger(DownloadDistributionServiceImpl.class);
118
119
120 private enum Operation {
121 Distribute, Retract
122 }
123
124
125 public static final String JOB_TYPE = "org.opencastproject.distribution.download";
126
127
128 private static final long TIMEOUT = 60000L;
129
130
131 public static final float DEFAULT_DISTRIBUTE_JOB_LOAD = 0.1f;
132
133
134 public static final float DEFAULT_RETRACT_JOB_LOAD = 0.1f;
135
136
137 public static final String DISTRIBUTE_JOB_LOAD_KEY = "job.load.download.distribute";
138
139
140 public static final String RETRACT_JOB_LOAD_KEY = "job.load.download.retract";
141
142
143 private float distributeJobLoad = DEFAULT_DISTRIBUTE_JOB_LOAD;
144
145
146 private float retractJobLoad = DEFAULT_RETRACT_JOB_LOAD;
147
148
149 private static final long INTERVAL = 300L;
150
151 private Gson gson = new Gson();
152
153 private String systemUserName = null;
154
155
156
157
158 public DownloadDistributionServiceImpl() {
159 super(JOB_TYPE);
160 }
161
162
163
164
165
166
167
168 @Override
169 @Activate
170 public void activate(ComponentContext cc) {
171 super.activate(cc);
172 serviceUrl = cc.getBundleContext().getProperty("org.opencastproject.download.url");
173 if (serviceUrl == null) {
174 throw new IllegalStateException("Download url must be set (org.opencastproject.download.url)");
175 }
176 logger.info("Download url is {}", serviceUrl);
177
178 String ccDistributionDirectory = cc.getBundleContext().getProperty("org.opencastproject.download.directory");
179 if (ccDistributionDirectory == null) {
180 throw new IllegalStateException("Distribution directory must be set (org.opencastproject.download.directory)");
181 }
182 this.distributionDirectory = new File(ccDistributionDirectory);
183 logger.info("Download distribution directory is {}", distributionDirectory);
184 this.distributionChannel = OsgiUtil.getComponentContextProperty(cc, CONFIG_KEY_STORE_TYPE);
185 systemUserName = cc.getBundleContext().getProperty(DIGEST_USER_PROPERTY);
186 }
187
188 @Override
189 public String getDistributionType() {
190 return this.distributionChannel;
191 }
192
193 @Override
194 public Job distribute(String channelId, MediaPackage mediapackage, String elementId)
195 throws DistributionException, MediaPackageException {
196 return distribute(channelId, mediapackage, elementId, true);
197 }
198
199 @Override
200 public Job distribute(String channelId, MediaPackage mediapackage, String elementId, boolean checkAvailability)
201 throws DistributionException, MediaPackageException {
202 Set<String> elementIds = new HashSet<String>();
203 elementIds.add(elementId);
204 return distribute(channelId, mediapackage, elementIds, checkAvailability, false);
205 }
206
207 @Override
208 public Job distribute(String channelId, MediaPackage mediapackage, Set<String> elementIds, boolean checkAvailability)
209 throws DistributionException, MediaPackageException {
210 return distribute(channelId, mediapackage, elementIds, checkAvailability, false);
211 }
212
213 @Override
214 public Job distribute(
215 String channelId,
216 MediaPackage mediapackage,
217 Set<String> elementIds,
218 boolean checkAvailability,
219 boolean preserveReference
220 ) throws DistributionException, MediaPackageException {
221 notNull(mediapackage, "mediapackage");
222 notNull(elementIds, "elementIds");
223 notNull(channelId, "channelId");
224 try {
225 return serviceRegistry.createJob(
226 JOB_TYPE,
227 Operation.Distribute.toString(),
228 Arrays.asList(channelId, MediaPackageParser.getAsXml(mediapackage), gson.toJson(elementIds),
229 Boolean.toString(checkAvailability), Boolean.toString(preserveReference)), distributeJobLoad);
230 } catch (ServiceRegistryException e) {
231 throw new DistributionException("Unable to create a job", e);
232 }
233 }
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251 public MediaPackageElement[] distributeElements(String channelId, MediaPackage mediapackage, Set<String> elementIds,
252 boolean checkAvailability) throws DistributionException {
253 return distributeElements(channelId, mediapackage, elementIds, checkAvailability, false);
254 }
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274 public MediaPackageElement[] distributeElements(String channelId, MediaPackage mediapackage, Set<String> elementIds,
275 boolean checkAvailability, boolean preserveReference) throws DistributionException {
276 notNull(mediapackage, "mediapackage");
277 notNull(elementIds, "elementIds");
278 notNull(channelId, "channelId");
279
280 final Set<MediaPackageElement> elements = getElements(channelId, mediapackage, elementIds);
281 List<MediaPackageElement> distributedElements = new ArrayList<MediaPackageElement>();
282
283 if (AdaptivePlaylist.hasHLSPlaylist(elements)) {
284 return distributeHLSElements(channelId, mediapackage, elements, checkAvailability, preserveReference);
285 } else {
286 for (MediaPackageElement element : elements) {
287 MediaPackageElement distributedElement = distributeElement(channelId, mediapackage, element, checkAvailability,
288 preserveReference);
289 distributedElements.add(distributedElement);
290 }
291 }
292 return distributedElements.toArray(new MediaPackageElement[distributedElements.size()]);
293 }
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314 public MediaPackageElement distributeElement(String channelId, MediaPackage mediapackage, MediaPackageElement element,
315 boolean checkAvailability, boolean preserveReference) throws DistributionException {
316
317 final String mediapackageId = mediapackage.getIdentifier().toString();
318 final String elementId = element.getIdentifier();
319
320 try {
321 File source;
322 try {
323 source = workspace.get(element.getURI());
324 } catch (NotFoundException e) {
325 throw new DistributionException("Unable to find " + element.getURI() + " in the workspace", e);
326 } catch (IOException e) {
327 throw new DistributionException("Error loading " + element.getURI() + " from the workspace", e);
328 }
329
330
331 try {
332 source = findDuplicatedElementSource(source, mediapackageId);
333 } catch (IOException e) {
334 logger.warn("Unable to find duplicated source {}: {}", source, ExceptionUtils.getMessage(e));
335 }
336
337 File destination = getDistributionFile(channelId, mediapackage, element);
338 if (!destination.equals(source)) {
339
340 try {
341 FileUtils.forceMkdir(destination.getParentFile());
342 } catch (IOException e) {
343 throw new DistributionException("Unable to create " + destination.getParentFile(), e);
344 }
345 logger.debug("Distributing element {} of media package {} to publication channel {} ({})", elementId,
346 mediapackageId, channelId, destination);
347
348 try {
349 FileSupport.link(source, destination, true);
350 } catch (IOException e) {
351 throw new DistributionException(format("Unable to copy %s to %s", source, destination), e);
352 }
353 }
354
355 MediaPackageElement distributedElement = (MediaPackageElement) element.clone();
356 try {
357 distributedElement.setURI(getDistributionUri(channelId, mediapackageId, element));
358 if (preserveReference) {
359 distributedElement.setReference(element.getReference());
360 }
361 } catch (URISyntaxException e) {
362 throw new DistributionException("Distributed element produces an invalid URI", e);
363 }
364
365 logger.debug("Finished distributing element {} of media package {} to publication channel {}", elementId,
366 mediapackageId, channelId);
367 final URI uri = distributedElement.getURI();
368 if (checkAvailability) {
369 logger.debug("Checking availability of distributed artifact {} at {}", distributedElement, uri);
370 checkAvailability(uri);
371 }
372 return distributedElement;
373 } catch (Exception e) {
374 logger.warn("Error distributing " + element, e);
375 if (e instanceof DistributionException) {
376 throw (DistributionException) e;
377 } else {
378 throw new DistributionException(e);
379 }
380 }
381 }
382
383 private MediaPackageElement[] distributeHLSElements(String channelId, MediaPackage mediapackage,
384 Set<MediaPackageElement> elements, boolean checkAvailability, boolean preserveReference)
385 throws DistributionException {
386
387 List<MediaPackageElement> distributedElements = new ArrayList<MediaPackageElement>();
388 File distributionDir = getMediaPackageDirectory(channelId, mediapackage);
389 List<MediaPackageElement> nontrackElements = elements.stream()
390 .filter(e -> e.getElementType() != MediaPackageElement.Type.Track).collect(Collectors.toList());
391
392 for (MediaPackageElement element : nontrackElements) {
393 MediaPackageElement distributedElement = distributeElement(channelId, mediapackage, element, checkAvailability,
394 preserveReference);
395 distributedElements.add(distributedElement);
396 }
397
398 List<Track> trackElements = elements.stream()
399 .filter(e -> e.getElementType() == MediaPackageElement.Type.Track).map(e -> (Track) e)
400 .collect(Collectors.toList());
401 HashMap<MediaPackageElementFlavor, List<Track>> trackElementsMap
402 = new HashMap<MediaPackageElementFlavor, List<Track>>();
403
404 for (Track element : trackElements) {
405
406 Track t = setUpHLSElementforDistribution(channelId, mediapackage, element, preserveReference);
407 List<Track> l = trackElementsMap.get(t.getFlavor());
408 if (l == null) {
409 l = new ArrayList<Track>();
410 }
411 l.add(t);
412 trackElementsMap.put(t.getFlavor(), l);
413 }
414
415
416 for (Entry<MediaPackageElementFlavor, List<Track>> elementSet : trackElementsMap.entrySet()) {
417 try {
418 List<Track> tracks = elementSet.getValue();
419
420 if (tracks.stream().anyMatch(AdaptivePlaylist.isHLSTrackPred)) {
421 tracks = AdaptivePlaylist.fixReferences(tracks, distributionDir);
422 }
423 for (Track track : tracks) {
424 MediaPackageElement distributedElement = checkDistributeHLSElement(track, checkAvailability);
425 distributedElements.add(distributedElement);
426 }
427 } catch (MediaPackageException | NotFoundException | IOException e1) {
428 logger.error("HLS Prepare failed for mediapackage {} in {}", elementSet.getKey(), mediapackage, e1);
429 throw new DistributionException("Cannot distribute " + mediapackage);
430 } catch (URISyntaxException e1) {
431 logger.error("HLS Prepare failed - Bad URI syntax {} in {}", elementSet.getKey(), mediapackage, e1);
432 throw new DistributionException("Cannot distribute - BAD URI syntax " + mediapackage);
433 }
434 }
435 return distributedElements.toArray(new MediaPackageElement[distributedElements.size()]);
436 }
437
438 public Track setUpHLSElementforDistribution(String channelId, MediaPackage mediapackage, Track element,
439 boolean preserveReference)
440 throws DistributionException {
441
442 final String mediapackageId = mediapackage.getIdentifier().toString();
443 final String elementId = element.getIdentifier();
444
445 File source;
446 try {
447 source = workspace.get(element.getURI());
448 } catch (NotFoundException e) {
449 throw new DistributionException("Unable to find " + element.getURI() + " in the workspace", e);
450 } catch (IOException e) {
451 throw new DistributionException("Error loading " + element.getURI() + " from the workspace", e);
452 }
453
454
455 try {
456 source = findDuplicatedElementSource(source, mediapackageId);
457 } catch (IOException e) {
458 logger.warn("Unable to find duplicated source {}: {}", source, ExceptionUtils.getMessage(e));
459 }
460
461 File destination = getDistributionFile(channelId, mediapackage, element);
462 if (!destination.equals(source)) {
463
464 try {
465 FileUtils.forceMkdir(destination.getParentFile());
466 } catch (IOException e) {
467 throw new DistributionException("Unable to create " + destination.getParentFile(), e);
468 }
469 logger.debug("Distributing element {} of media package {} to publication channel {} ({})", elementId,
470 mediapackageId, channelId, destination);
471
472 try {
473 if (AdaptivePlaylist.isPlaylist(source)) {
474 FileSupport.copy(source, destination, true);
475 } else {
476 FileSupport.link(source, destination, true);
477 }
478 } catch (IOException e) {
479 throw new DistributionException(format("Unable to copy %s to %s", source, destination), e);
480 }
481 }
482
483 MediaPackageElement distributeElement = (MediaPackageElement) element.clone();
484
485 try {
486 distributeElement.setURI(getDistributionUri(channelId, mediapackageId, element));
487 if (preserveReference) {
488 distributeElement.setReference(element.getReference());
489 }
490 } catch (URISyntaxException e) {
491 throw new DistributionException("Distributed element produces an invalid URI", e);
492 }
493
494 logger.debug("Setting up element {} of media package {} for publication channel {}", elementId,
495 mediapackageId, channelId);
496 return (Track) distributeElement;
497 }
498
499 public Track checkDistributeHLSElement(Track element, boolean checkAvailability)
500 throws DistributionException {
501
502 final URI uri = element.getURI();
503 try {
504 if (checkAvailability) {
505 logger.debug("Checking availability of distributed artifact {} at {}", element, uri);
506 checkAvailability(uri);
507 }
508 return element;
509 } catch (Exception e) {
510 logger.warn("Error distributing " + element, e);
511 if (e instanceof DistributionException) {
512 throw (DistributionException) e;
513 } else {
514 throw new DistributionException(e);
515 }
516 }
517 }
518
519 @Override
520 public Job retract(String channelId, MediaPackage mediapackage, String elementId) throws DistributionException {
521 Set<String> elementIds = new HashSet();
522 elementIds.add(elementId);
523 return retract(channelId, mediapackage, elementIds);
524 }
525
526 @Override
527 public Job retract(String channelId, MediaPackage mediapackage, Set<String> elementIds)
528 throws DistributionException {
529 notNull(mediapackage, "mediapackage");
530 notNull(elementIds, "elementIds");
531 notNull(channelId, "channelId");
532 try {
533 return serviceRegistry.createJob(JOB_TYPE, Operation.Retract.toString(),
534 Arrays.asList(channelId, MediaPackageParser.getAsXml(mediapackage), gson.toJson(elementIds)),
535 retractJobLoad);
536 } catch (ServiceRegistryException e) {
537 throw new DistributionException("Unable to create a job", e);
538 }
539 }
540
541 @Override
542 public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediapackage, String elementId)
543 throws DistributionException, MediaPackageException {
544 Set<String> elementIds = new HashSet<String>();
545 elementIds.add(elementId);
546 return distributeSync(channelId, mediapackage, elementIds, true, false);
547 }
548
549 public List<MediaPackageElement> distributeSync(
550 String channelId,
551 MediaPackage mediapackage,
552 Set<String> elementIds,
553 boolean checkAvailability,
554 boolean preserveReference
555 ) throws DistributionException, MediaPackageException {
556 notNull(mediapackage, "mediapackage");
557 notNull(elementIds, "elementIds");
558 notNull(channelId, "channelId");
559
560 MediaPackageElement[] distributedElements = distributeElements(channelId, mediapackage, elementIds,
561 checkAvailability, preserveReference);
562 return Arrays.asList(distributedElements);
563 }
564
565 @Override
566 public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediapackage, Set<String> elementIds,
567 boolean checkAvailability) throws DistributionException {
568 Job job = null;
569 try {
570 job = serviceRegistry
571 .createJob(
572 JOB_TYPE, Operation.Distribute.toString(), null, null, false, distributeJobLoad);
573 job.setStatus(Job.Status.RUNNING);
574 job = serviceRegistry.updateJob(job);
575 final MediaPackageElement[] mediaPackageElements
576 = this.distributeElements(channelId, mediapackage, elementIds, checkAvailability);
577 job.setStatus(Job.Status.FINISHED);
578 return Arrays.asList(mediaPackageElements);
579 } catch (ServiceRegistryException e) {
580 throw new DistributionException(e);
581 } catch (NotFoundException e) {
582 throw new DistributionException("Unable to update distribution job", e);
583 } finally {
584 finallyUpdateJob(job);
585 }
586 }
587
588 @Override
589 public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediapackage, String elementId)
590 throws DistributionException, MediaPackageException {
591 Set<String> elementIds = new HashSet();
592 elementIds.add(elementId);
593 return retractSync(channelId, mediapackage, elementIds);
594 }
595
596 public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediapackage, Set<String> elementIds)
597 throws DistributionException {
598 notNull(mediapackage, "mediapackage");
599 notNull(elementIds, "elementIds");
600 notNull(channelId, "channelId");
601 MediaPackageElement[] retractedElements = retractElements(channelId, mediapackage, elementIds);
602 return Arrays.asList(retractedElements);
603 }
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620 protected MediaPackageElement[] retractElements(String channelId, MediaPackage mediapackage, Set<String> elementIds)
621 throws DistributionException {
622 notNull(mediapackage, "mediapackage");
623 notNull(elementIds, "elementIds");
624 notNull(channelId, "channelId");
625
626 Set<MediaPackageElement> elements = getElements(channelId, mediapackage, elementIds);
627 List<MediaPackageElement> retractedElements = new ArrayList<MediaPackageElement>();
628
629 for (MediaPackageElement element : elements) {
630 MediaPackageElement retractedElement = retractElement(channelId, mediapackage, element);
631 retractedElements.add(retractedElement);
632 }
633 return retractedElements.toArray(new MediaPackageElement[retractedElements.size()]);
634 }
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651 protected MediaPackageElement retractElement(String channelId, MediaPackage mediapackage, MediaPackageElement element)
652 throws DistributionException {
653 notNull(mediapackage, "mediapackage");
654 notNull(element, "element");
655 notNull(channelId, "channelId");
656
657 String mediapackageId = mediapackage.getIdentifier().toString();
658 String elementId = element.getIdentifier();
659
660 try {
661 final File elementFile = getDistributionFile(channelId, mediapackage, element);
662 final File mediapackageDir = getMediaPackageDirectory(channelId, mediapackage);
663
664
665 if (!elementFile.exists()) {
666 logger.info("Element {} from media package {} has already been removed or has never been distributed to "
667 + "publication channel {}", elementId, mediapackageId, channelId);
668 return element;
669 }
670
671 logger.debug("Retracting element {} ({})", element, elementFile);
672
673
674 if (!FileUtils.deleteQuietly(elementFile.getParentFile())) {
675
676 logger.debug("Unable to delete folder {}", elementFile.getParentFile().getAbsolutePath());
677 }
678
679 if (mediapackageDir.isDirectory() && mediapackageDir.list().length == 0) {
680 FileSupport.delete(mediapackageDir);
681 }
682
683 logger.debug("Finished retracting element {} of media package {} from publication channel {}", elementId,
684 mediapackageId, channelId);
685 return element;
686 } catch (Exception e) {
687 logger.warn("Error retracting element {} of media package {} from publication channel {}", elementId,
688 mediapackageId, channelId, e);
689 if (e instanceof DistributionException) {
690 throw (DistributionException) e;
691 } else {
692 throw new DistributionException(e);
693 }
694 }
695 }
696
697
698
699
700
701
702 @Override
703 protected String process(Job job) throws Exception {
704 Operation op = null;
705 String operation = job.getOperation();
706 List<String> arguments = job.getArguments();
707 try {
708 op = Operation.valueOf(operation);
709 String channelId = arguments.get(0);
710 MediaPackage mediapackage = MediaPackageParser.getFromXml(arguments.get(1));
711 Set<String> elementIds = gson.fromJson(arguments.get(2), new TypeToken<Set<String>>() { }.getType());
712
713 switch (op) {
714 case Distribute:
715 Boolean checkAvailability = Boolean.parseBoolean(arguments.get(3));
716 Boolean preserveReference = Boolean.parseBoolean(arguments.get(4));
717 MediaPackageElement[] distributedElements = distributeElements(channelId, mediapackage, elementIds,
718 checkAvailability, preserveReference);
719 return (distributedElements != null)
720 ? MediaPackageElementParser.getArrayAsXml(Arrays.asList(distributedElements)) : null;
721 case Retract:
722 MediaPackageElement[] retractedElements = retractElements(channelId, mediapackage, elementIds);
723 return (retractedElements != null) ? MediaPackageElementParser.getArrayAsXml(Arrays.asList(retractedElements))
724 : null;
725 default:
726 throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
727 }
728 } catch (IllegalArgumentException e) {
729 throw new ServiceRegistryException("This service can't handle operations of type '" + op + "'", e);
730 } catch (IndexOutOfBoundsException e) {
731 throw new ServiceRegistryException("This argument list for operation '" + op + "' does not meet expectations", e);
732 } catch (Exception e) {
733 throw new ServiceRegistryException("Error handling operation '" + op + "'", e);
734 }
735 }
736
737 private Set<MediaPackageElement> getElements(String channelId, MediaPackage mediapackage, Set<String> elementIds)
738 throws IllegalStateException {
739 final Set<MediaPackageElement> elements = new HashSet<MediaPackageElement>();
740 for (String elementId : elementIds) {
741 MediaPackageElement element = mediapackage.getElementById(elementId);
742 if (element != null) {
743 elements.add(element);
744 } else {
745 element = Arrays.stream(mediapackage.getPublications())
746 .filter(p -> p.getChannel().equals(channelId))
747 .flatMap(p -> Arrays.stream(p.getAttachments())
748 .filter(a -> a.getIdentifier().equals(elementId)))
749 .findAny()
750 .orElseThrow(() ->
751 new IllegalStateException(format("No element %s found in mediapackage %s", elementId,
752 mediapackage.getIdentifier())));
753 elements.add(element);
754 }
755 }
756 return elements;
757 }
758
759
760
761
762
763
764
765
766
767
768
769
770 private File findDuplicatedElementSource(final File source, final String mpId) throws IOException {
771 String orgId = securityService.getOrganization().getId();
772 final Path rootPath = Paths.get(distributionDirectory.getAbsolutePath(), orgId);
773
774 if (!Files.exists(rootPath)) {
775 return source;
776 }
777
778 List<Path> mediaPackageDirectories = new ArrayList<>();
779 try (DirectoryStream<Path> directoryStream = Files.newDirectoryStream(rootPath)) {
780 for (Path path : directoryStream) {
781 Path mpDir = path.resolve(mpId);
782 if (Files.exists(mpDir)) {
783 mediaPackageDirectories.add(mpDir);
784 }
785 }
786 }
787
788 if (mediaPackageDirectories.isEmpty()) {
789 return source;
790 }
791
792 final long size = Files.size(source.toPath());
793
794 final File[] result = new File[1];
795 for (Path p : mediaPackageDirectories) {
796 Files.walkFileTree(p, new SimpleFileVisitor<Path>() {
797 @Override
798 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
799 if (attrs.isDirectory()) {
800 return FileVisitResult.CONTINUE;
801 }
802
803 if (size != attrs.size()) {
804 return FileVisitResult.CONTINUE;
805 }
806
807 try (InputStream is1 = Files.newInputStream(source.toPath()); InputStream is2 = Files.newInputStream(file)) {
808 if (!IOUtils.contentEquals(is1, is2)) {
809 return FileVisitResult.CONTINUE;
810 }
811 }
812 result[0] = file.toFile();
813 return FileVisitResult.TERMINATE;
814 }
815 });
816 if (result[0] != null) {
817 break;
818 }
819 }
820 if (result[0] != null) {
821 return result[0];
822 }
823
824 return source;
825 }
826
827
828
829
830
831
832 protected File getDistributionFile(String channelId, MediaPackage mp, MediaPackageElement element) {
833 final String uriString = element.getURI().toString().split("\\?")[0];
834 final String directoryName = distributionDirectory.getAbsolutePath();
835 final String orgId = securityService.getOrganization().getId();
836 if (uriString.startsWith(serviceUrl)) {
837 String[] splitUrl = uriString.substring(serviceUrl.length() + 1).split("/");
838 if (splitUrl.length < 5) {
839 logger.warn("Malformed URI {}. Format must be .../{orgId}/{channelId}/{mediapackageId}/{elementId}/{fileName}."
840 + " Trying URI without channelId", uriString);
841 return new File(path(directoryName, orgId, splitUrl[1], splitUrl[2], splitUrl[3]));
842 } else {
843 return new File(path(directoryName, orgId, splitUrl[1], splitUrl[2], splitUrl[3], splitUrl[4]));
844 }
845 }
846 return new File(path(directoryName, orgId, channelId, mp.getIdentifier().toString(), element.getIdentifier(),
847 FilenameUtils.getName(uriString)));
848 }
849
850
851
852
853
854
855 protected File getMediaPackageDirectory(String channelId, MediaPackage mp) {
856 final String orgId = securityService.getOrganization().getId();
857 return new File(distributionDirectory, path(orgId, channelId, mp.getIdentifier().toString()));
858 }
859
860
861
862
863
864
865
866
867
868
869
870
871 protected URI getDistributionUri(String channelId, String mediaPackageId, MediaPackageElement element)
872 throws URISyntaxException {
873 String elementId = element.getIdentifier();
874 String fileName = FilenameUtils.getName(element.getURI().toString());
875 String orgId = securityService.getOrganization().getId();
876 String destinationURI = UrlSupport.concat(serviceUrl, orgId, channelId, mediaPackageId, elementId, fileName);
877 return new URI(destinationURI);
878 }
879
880 @Override
881 public void updated(@SuppressWarnings("rawtypes") Dictionary properties) throws ConfigurationException {
882 distributeJobLoad = LoadUtil.getConfiguredLoadValue(properties, DISTRIBUTE_JOB_LOAD_KEY,
883 DEFAULT_DISTRIBUTE_JOB_LOAD, serviceRegistry);
884 retractJobLoad = LoadUtil.getConfiguredLoadValue(properties, RETRACT_JOB_LOAD_KEY, DEFAULT_RETRACT_JOB_LOAD,
885 serviceRegistry);
886 }
887
888
889
890
891
892
893 private void checkAvailability(URI uri) {
894 final Organization organization = getSecurityService().getOrganization();
895 final User systemUser = SecurityUtil.createSystemUser(systemUserName, organization);
896 SecurityUtil.runAs(getSecurityService(), organization, systemUser, () -> {
897 waitForResource(trustedHttpClient, uri, HttpServletResponse.SC_OK, TIMEOUT, INTERVAL)
898 .fold(Misc.chuck(), new Effect.X<Integer>() {
899 @Override
900 public void xrun(Integer status) throws Exception {
901 if (ne(status, HttpServletResponse.SC_OK)) {
902 logger.warn("Attempt to access distributed file {} returned code {}", uri, status);
903 throw new DistributionException("Unable to load distributed file " + uri.toString());
904 }
905 }
906 });
907 });
908 }
909
910 @Reference
911 @Override
912 public void setWorkspace(Workspace workspace) {
913 super.setWorkspace(workspace);
914 }
915
916 @Reference
917 @Override
918 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
919 super.setServiceRegistry(serviceRegistry);
920 }
921
922 @Reference
923 @Override
924 public void setSecurityService(SecurityService securityService) {
925 super.setSecurityService(securityService);
926 }
927
928 @Reference
929 @Override
930 public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
931 super.setUserDirectoryService(userDirectoryService);
932 }
933
934 @Reference
935 @Override
936 public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
937 super.setOrganizationDirectoryService(organizationDirectoryService);
938 }
939
940 @Reference
941 @Override
942 public void setTrustedHttpClient(TrustedHttpClient trustedHttpClient) {
943 super.setTrustedHttpClient(trustedHttpClient);
944 }
945
946 }