1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.distribution.download;
22
23 import static java.lang.String.format;
24 import static org.opencastproject.systems.OpencastConstants.DIGEST_USER_PROPERTY;
25 import static org.opencastproject.util.EqualsUtil.ne;
26 import static org.opencastproject.util.HttpUtil.waitForResource;
27 import static org.opencastproject.util.PathSupport.path;
28 import static org.opencastproject.util.RequireUtil.notNull;
29
30 import org.opencastproject.distribution.api.AbstractDistributionService;
31 import org.opencastproject.distribution.api.DistributionException;
32 import org.opencastproject.distribution.api.DistributionService;
33 import org.opencastproject.distribution.api.DownloadDistributionService;
34 import org.opencastproject.job.api.Job;
35 import org.opencastproject.mediapackage.AdaptivePlaylist;
36 import org.opencastproject.mediapackage.MediaPackage;
37 import org.opencastproject.mediapackage.MediaPackageElement;
38 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
39 import org.opencastproject.mediapackage.MediaPackageElementParser;
40 import org.opencastproject.mediapackage.MediaPackageException;
41 import org.opencastproject.mediapackage.MediaPackageParser;
42 import org.opencastproject.mediapackage.Track;
43 import org.opencastproject.security.api.Organization;
44 import org.opencastproject.security.api.OrganizationDirectoryService;
45 import org.opencastproject.security.api.SecurityService;
46 import org.opencastproject.security.api.TrustedHttpClient;
47 import org.opencastproject.security.api.User;
48 import org.opencastproject.security.api.UserDirectoryService;
49 import org.opencastproject.security.util.SecurityUtil;
50 import org.opencastproject.serviceregistry.api.ServiceRegistry;
51 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
52 import org.opencastproject.util.FileSupport;
53 import org.opencastproject.util.LoadUtil;
54 import org.opencastproject.util.NotFoundException;
55 import org.opencastproject.util.OsgiUtil;
56 import org.opencastproject.util.UrlSupport;
57 import org.opencastproject.util.data.functions.Misc;
58 import org.opencastproject.workspace.api.Workspace;
59
60 import com.google.gson.Gson;
61 import com.google.gson.reflect.TypeToken;
62
63 import org.apache.commons.io.FileUtils;
64 import org.apache.commons.io.FilenameUtils;
65 import org.apache.commons.io.IOUtils;
66 import org.apache.commons.lang3.exception.ExceptionUtils;
67 import org.osgi.service.cm.ConfigurationException;
68 import org.osgi.service.cm.ManagedService;
69 import org.osgi.service.component.ComponentContext;
70 import org.osgi.service.component.annotations.Activate;
71 import org.osgi.service.component.annotations.Component;
72 import org.osgi.service.component.annotations.Reference;
73 import org.slf4j.Logger;
74 import org.slf4j.LoggerFactory;
75
76 import java.io.File;
77 import java.io.IOException;
78 import java.io.InputStream;
79 import java.net.URI;
80 import java.net.URISyntaxException;
81 import java.nio.file.DirectoryStream;
82 import java.nio.file.FileVisitResult;
83 import java.nio.file.Files;
84 import java.nio.file.Path;
85 import java.nio.file.Paths;
86 import java.nio.file.SimpleFileVisitor;
87 import java.nio.file.attribute.BasicFileAttributes;
88 import java.util.ArrayList;
89 import java.util.Arrays;
90 import java.util.Dictionary;
91 import java.util.HashMap;
92 import java.util.HashSet;
93 import java.util.List;
94 import java.util.Map.Entry;
95 import java.util.Set;
96 import java.util.stream.Collectors;
97
98 import javax.servlet.http.HttpServletResponse;
99
100
101
102
103 @Component(
104 immediate = true,
105 service = { DistributionService.class,DownloadDistributionService.class,ManagedService.class },
106 property = {
107 "service.description=Distribution Service (Local)",
108 "service.pid=org.opencastproject.distribution.download.DownloadDistributionServiceImpl",
109 "distribution.channel=download"
110 }
111 )
112 public class DownloadDistributionServiceImpl extends AbstractDistributionService
113 implements DistributionService, DownloadDistributionService, ManagedService {
114
115
116 private static final Logger logger = LoggerFactory.getLogger(DownloadDistributionServiceImpl.class);
117
118
119 private enum Operation {
120 Distribute, Retract
121 }
122
123
124 public static final String JOB_TYPE = "org.opencastproject.distribution.download";
125
126
127 private static final long TIMEOUT = 60000L;
128
129
130 public static final float DEFAULT_DISTRIBUTE_JOB_LOAD = 0.1f;
131
132
133 public static final float DEFAULT_RETRACT_JOB_LOAD = 0.1f;
134
135
136 public static final String DISTRIBUTE_JOB_LOAD_KEY = "job.load.download.distribute";
137
138
139 public static final String RETRACT_JOB_LOAD_KEY = "job.load.download.retract";
140
141
142 private float distributeJobLoad = DEFAULT_DISTRIBUTE_JOB_LOAD;
143
144
145 private float retractJobLoad = DEFAULT_RETRACT_JOB_LOAD;
146
147
148 private static final long INTERVAL = 300L;
149
150 private Gson gson = new Gson();
151
152 private String systemUserName = null;
153
154
155
156
157 public DownloadDistributionServiceImpl() {
158 super(JOB_TYPE);
159 }
160
161
162
163
164
165
166
167 @Override
168 @Activate
169 public void activate(ComponentContext cc) {
170 super.activate(cc);
171 serviceUrl = cc.getBundleContext().getProperty("org.opencastproject.download.url");
172 if (serviceUrl == null) {
173 throw new IllegalStateException("Download url must be set (org.opencastproject.download.url)");
174 }
175 logger.info("Download url is {}", serviceUrl);
176
177 String ccDistributionDirectory = cc.getBundleContext().getProperty("org.opencastproject.download.directory");
178 if (ccDistributionDirectory == null) {
179 throw new IllegalStateException("Distribution directory must be set (org.opencastproject.download.directory)");
180 }
181 this.distributionDirectory = new File(ccDistributionDirectory);
182 logger.info("Download distribution directory is {}", distributionDirectory);
183 this.distributionChannel = OsgiUtil.getComponentContextProperty(cc, CONFIG_KEY_STORE_TYPE);
184 systemUserName = cc.getBundleContext().getProperty(DIGEST_USER_PROPERTY);
185 }
186
187 @Override
188 public String getDistributionType() {
189 return this.distributionChannel;
190 }
191
192 @Override
193 public Job distribute(String channelId, MediaPackage mediapackage, String elementId)
194 throws DistributionException, MediaPackageException {
195 return distribute(channelId, mediapackage, elementId, true);
196 }
197
198 @Override
199 public Job distribute(String channelId, MediaPackage mediapackage, String elementId, boolean checkAvailability)
200 throws DistributionException, MediaPackageException {
201 Set<String> elementIds = new HashSet<String>();
202 elementIds.add(elementId);
203 return distribute(channelId, mediapackage, elementIds, checkAvailability, false);
204 }
205
206 @Override
207 public Job distribute(String channelId, MediaPackage mediapackage, Set<String> elementIds, boolean checkAvailability)
208 throws DistributionException, MediaPackageException {
209 return distribute(channelId, mediapackage, elementIds, checkAvailability, false);
210 }
211
212 @Override
213 public Job distribute(
214 String channelId,
215 MediaPackage mediapackage,
216 Set<String> elementIds,
217 boolean checkAvailability,
218 boolean preserveReference
219 ) throws DistributionException, MediaPackageException {
220 notNull(mediapackage, "mediapackage");
221 notNull(elementIds, "elementIds");
222 notNull(channelId, "channelId");
223 try {
224 return serviceRegistry.createJob(
225 JOB_TYPE,
226 Operation.Distribute.toString(),
227 Arrays.asList(channelId, MediaPackageParser.getAsXml(mediapackage), gson.toJson(elementIds),
228 Boolean.toString(checkAvailability), Boolean.toString(preserveReference)), distributeJobLoad);
229 } catch (ServiceRegistryException e) {
230 throw new DistributionException("Unable to create a job", e);
231 }
232 }
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250 public MediaPackageElement[] distributeElements(String channelId, MediaPackage mediapackage, Set<String> elementIds,
251 boolean checkAvailability) throws DistributionException {
252 return distributeElements(channelId, mediapackage, elementIds, checkAvailability, false);
253 }
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273 public MediaPackageElement[] distributeElements(String channelId, MediaPackage mediapackage, Set<String> elementIds,
274 boolean checkAvailability, boolean preserveReference) throws DistributionException {
275 notNull(mediapackage, "mediapackage");
276 notNull(elementIds, "elementIds");
277 notNull(channelId, "channelId");
278
279 final Set<MediaPackageElement> elements = getElements(channelId, mediapackage, elementIds);
280 List<MediaPackageElement> distributedElements = new ArrayList<MediaPackageElement>();
281
282 if (AdaptivePlaylist.hasHLSPlaylist(elements)) {
283 return distributeHLSElements(channelId, mediapackage, elements, checkAvailability, preserveReference);
284 } else {
285 for (MediaPackageElement element : elements) {
286 MediaPackageElement distributedElement = distributeElement(channelId, mediapackage, element, checkAvailability,
287 preserveReference);
288 distributedElements.add(distributedElement);
289 }
290 }
291 return distributedElements.toArray(new MediaPackageElement[distributedElements.size()]);
292 }
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313 public MediaPackageElement distributeElement(String channelId, MediaPackage mediapackage, MediaPackageElement element,
314 boolean checkAvailability, boolean preserveReference) throws DistributionException {
315
316 final String mediapackageId = mediapackage.getIdentifier().toString();
317 final String elementId = element.getIdentifier();
318
319 try {
320 File source;
321 try {
322 source = workspace.get(element.getURI());
323 } catch (NotFoundException e) {
324 throw new DistributionException("Unable to find " + element.getURI() + " in the workspace", e);
325 } catch (IOException e) {
326 throw new DistributionException("Error loading " + element.getURI() + " from the workspace", e);
327 }
328
329
330 try {
331 source = findDuplicatedElementSource(source, mediapackageId);
332 } catch (IOException e) {
333 logger.warn("Unable to find duplicated source {}: {}", source, ExceptionUtils.getMessage(e));
334 }
335
336 File destination = getDistributionFile(channelId, mediapackage, element);
337 if (!destination.equals(source)) {
338
339 try {
340 FileUtils.forceMkdir(destination.getParentFile());
341 } catch (IOException e) {
342 throw new DistributionException("Unable to create " + destination.getParentFile(), e);
343 }
344 logger.debug("Distributing element {} of media package {} to publication channel {} ({})", elementId,
345 mediapackageId, channelId, destination);
346
347 try {
348 FileSupport.link(source, destination, true);
349 } catch (IOException e) {
350 throw new DistributionException(format("Unable to copy %s to %s", source, destination), e);
351 }
352 }
353
354 MediaPackageElement distributedElement = (MediaPackageElement) element.clone();
355 try {
356 distributedElement.setURI(getDistributionUri(channelId, mediapackageId, element));
357 if (preserveReference) {
358 distributedElement.setReference(element.getReference());
359 }
360 } catch (URISyntaxException e) {
361 throw new DistributionException("Distributed element produces an invalid URI", e);
362 }
363
364 logger.debug("Finished distributing element {} of media package {} to publication channel {}", elementId,
365 mediapackageId, channelId);
366 final URI uri = distributedElement.getURI();
367 if (checkAvailability) {
368 logger.debug("Checking availability of distributed artifact {} at {}", distributedElement, uri);
369 checkAvailability(uri);
370 }
371 return distributedElement;
372 } catch (Exception e) {
373 logger.warn("Error distributing " + element, e);
374 if (e instanceof DistributionException) {
375 throw (DistributionException) e;
376 } else {
377 throw new DistributionException(e);
378 }
379 }
380 }
381
382 private MediaPackageElement[] distributeHLSElements(String channelId, MediaPackage mediapackage,
383 Set<MediaPackageElement> elements, boolean checkAvailability, boolean preserveReference)
384 throws DistributionException {
385
386 List<MediaPackageElement> distributedElements = new ArrayList<MediaPackageElement>();
387 File distributionDir = getMediaPackageDirectory(channelId, mediapackage);
388 List<MediaPackageElement> nontrackElements = elements.stream()
389 .filter(e -> e.getElementType() != MediaPackageElement.Type.Track).collect(Collectors.toList());
390
391 for (MediaPackageElement element : nontrackElements) {
392 MediaPackageElement distributedElement = distributeElement(channelId, mediapackage, element, checkAvailability,
393 preserveReference);
394 distributedElements.add(distributedElement);
395 }
396
397 List<Track> trackElements = elements.stream()
398 .filter(e -> e.getElementType() == MediaPackageElement.Type.Track).map(e -> (Track) e)
399 .collect(Collectors.toList());
400 HashMap<MediaPackageElementFlavor, List<Track>> trackElementsMap
401 = new HashMap<MediaPackageElementFlavor, List<Track>>();
402
403 for (Track element : trackElements) {
404
405 Track t = setUpHLSElementforDistribution(channelId, mediapackage, element, preserveReference);
406 List<Track> l = trackElementsMap.get(t.getFlavor());
407 if (l == null) {
408 l = new ArrayList<Track>();
409 }
410 l.add(t);
411 trackElementsMap.put(t.getFlavor(), l);
412 }
413
414
415 for (Entry<MediaPackageElementFlavor, List<Track>> elementSet : trackElementsMap.entrySet()) {
416 try {
417 List<Track> tracks = elementSet.getValue();
418
419 if (tracks.stream().anyMatch(AdaptivePlaylist.isHLSTrackPred)) {
420 tracks = AdaptivePlaylist.fixReferences(tracks, distributionDir);
421 }
422 for (Track track : tracks) {
423 MediaPackageElement distributedElement = checkDistributeHLSElement(track, checkAvailability);
424 distributedElements.add(distributedElement);
425 }
426 } catch (MediaPackageException | NotFoundException | IOException e1) {
427 logger.error("HLS Prepare failed for mediapackage {} in {}", elementSet.getKey(), mediapackage, e1);
428 throw new DistributionException("Cannot distribute " + mediapackage);
429 } catch (URISyntaxException e1) {
430 logger.error("HLS Prepare failed - Bad URI syntax {} in {}", elementSet.getKey(), mediapackage, e1);
431 throw new DistributionException("Cannot distribute - BAD URI syntax " + mediapackage);
432 }
433 }
434 return distributedElements.toArray(new MediaPackageElement[distributedElements.size()]);
435 }
436
437 public Track setUpHLSElementforDistribution(String channelId, MediaPackage mediapackage, Track element,
438 boolean preserveReference)
439 throws DistributionException {
440
441 final String mediapackageId = mediapackage.getIdentifier().toString();
442 final String elementId = element.getIdentifier();
443
444 File source;
445 try {
446 source = workspace.get(element.getURI());
447 } catch (NotFoundException e) {
448 throw new DistributionException("Unable to find " + element.getURI() + " in the workspace", e);
449 } catch (IOException e) {
450 throw new DistributionException("Error loading " + element.getURI() + " from the workspace", e);
451 }
452
453
454 try {
455 source = findDuplicatedElementSource(source, mediapackageId);
456 } catch (IOException e) {
457 logger.warn("Unable to find duplicated source {}: {}", source, ExceptionUtils.getMessage(e));
458 }
459
460 File destination = getDistributionFile(channelId, mediapackage, element);
461 if (!destination.equals(source)) {
462
463 try {
464 FileUtils.forceMkdir(destination.getParentFile());
465 } catch (IOException e) {
466 throw new DistributionException("Unable to create " + destination.getParentFile(), e);
467 }
468 logger.debug("Distributing element {} of media package {} to publication channel {} ({})", elementId,
469 mediapackageId, channelId, destination);
470
471 try {
472 if (AdaptivePlaylist.isPlaylist(source)) {
473 FileSupport.copy(source, destination, true);
474 } else {
475 FileSupport.link(source, destination, true);
476 }
477 } catch (IOException e) {
478 throw new DistributionException(format("Unable to copy %s to %s", source, destination), e);
479 }
480 }
481
482 MediaPackageElement distributeElement = (MediaPackageElement) element.clone();
483
484 try {
485 distributeElement.setURI(getDistributionUri(channelId, mediapackageId, element));
486 if (preserveReference) {
487 distributeElement.setReference(element.getReference());
488 }
489 } catch (URISyntaxException e) {
490 throw new DistributionException("Distributed element produces an invalid URI", e);
491 }
492
493 logger.debug("Setting up element {} of media package {} for publication channel {}", elementId,
494 mediapackageId, channelId);
495 return (Track) distributeElement;
496 }
497
498 public Track checkDistributeHLSElement(Track element, boolean checkAvailability)
499 throws DistributionException {
500
501 final URI uri = element.getURI();
502 try {
503 if (checkAvailability) {
504 logger.debug("Checking availability of distributed artifact {} at {}", element, uri);
505 checkAvailability(uri);
506 }
507 return element;
508 } catch (Exception e) {
509 logger.warn("Error distributing " + element, e);
510 if (e instanceof DistributionException) {
511 throw (DistributionException) e;
512 } else {
513 throw new DistributionException(e);
514 }
515 }
516 }
517
518 @Override
519 public Job retract(String channelId, MediaPackage mediapackage, String elementId) throws DistributionException {
520 Set<String> elementIds = new HashSet();
521 elementIds.add(elementId);
522 return retract(channelId, mediapackage, elementIds);
523 }
524
525 @Override
526 public Job retract(String channelId, MediaPackage mediapackage, Set<String> elementIds)
527 throws DistributionException {
528 notNull(mediapackage, "mediapackage");
529 notNull(elementIds, "elementIds");
530 notNull(channelId, "channelId");
531 try {
532 return serviceRegistry.createJob(JOB_TYPE, Operation.Retract.toString(),
533 Arrays.asList(channelId, MediaPackageParser.getAsXml(mediapackage), gson.toJson(elementIds)),
534 retractJobLoad);
535 } catch (ServiceRegistryException e) {
536 throw new DistributionException("Unable to create a job", e);
537 }
538 }
539
540 @Override
541 public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediapackage, String elementId)
542 throws DistributionException, MediaPackageException {
543 Set<String> elementIds = new HashSet<String>();
544 elementIds.add(elementId);
545 return distributeSync(channelId, mediapackage, elementIds, true, false);
546 }
547
548 public List<MediaPackageElement> distributeSync(
549 String channelId,
550 MediaPackage mediapackage,
551 Set<String> elementIds,
552 boolean checkAvailability,
553 boolean preserveReference
554 ) throws DistributionException, MediaPackageException {
555 notNull(mediapackage, "mediapackage");
556 notNull(elementIds, "elementIds");
557 notNull(channelId, "channelId");
558
559 MediaPackageElement[] distributedElements = distributeElements(channelId, mediapackage, elementIds,
560 checkAvailability, preserveReference);
561 return Arrays.asList(distributedElements);
562 }
563
564 @Override
565 public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediapackage, Set<String> elementIds,
566 boolean checkAvailability) throws DistributionException {
567 Job job = null;
568 try {
569 job = serviceRegistry
570 .createJob(
571 JOB_TYPE, Operation.Distribute.toString(), null, null, false, distributeJobLoad);
572 job.setStatus(Job.Status.RUNNING);
573 job = serviceRegistry.updateJob(job);
574 final MediaPackageElement[] mediaPackageElements
575 = this.distributeElements(channelId, mediapackage, elementIds, checkAvailability);
576 job.setStatus(Job.Status.FINISHED);
577 return Arrays.asList(mediaPackageElements);
578 } catch (ServiceRegistryException e) {
579 throw new DistributionException(e);
580 } catch (NotFoundException e) {
581 throw new DistributionException("Unable to update distribution job", e);
582 } finally {
583 finallyUpdateJob(job);
584 }
585 }
586
587 @Override
588 public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediapackage, String elementId)
589 throws DistributionException, MediaPackageException {
590 Set<String> elementIds = new HashSet();
591 elementIds.add(elementId);
592 return retractSync(channelId, mediapackage, elementIds);
593 }
594
595 public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediapackage, Set<String> elementIds)
596 throws DistributionException {
597 notNull(mediapackage, "mediapackage");
598 notNull(elementIds, "elementIds");
599 notNull(channelId, "channelId");
600 MediaPackageElement[] retractedElements = retractElements(channelId, mediapackage, elementIds);
601 return Arrays.asList(retractedElements);
602 }
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619 protected MediaPackageElement[] retractElements(String channelId, MediaPackage mediapackage, Set<String> elementIds)
620 throws DistributionException {
621 notNull(mediapackage, "mediapackage");
622 notNull(elementIds, "elementIds");
623 notNull(channelId, "channelId");
624
625 Set<MediaPackageElement> elements = getElements(channelId, mediapackage, elementIds);
626 List<MediaPackageElement> retractedElements = new ArrayList<MediaPackageElement>();
627
628 for (MediaPackageElement element : elements) {
629 MediaPackageElement retractedElement = retractElement(channelId, mediapackage, element);
630 retractedElements.add(retractedElement);
631 }
632 return retractedElements.toArray(new MediaPackageElement[retractedElements.size()]);
633 }
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650 protected MediaPackageElement retractElement(String channelId, MediaPackage mediapackage, MediaPackageElement element)
651 throws DistributionException {
652 notNull(mediapackage, "mediapackage");
653 notNull(element, "element");
654 notNull(channelId, "channelId");
655
656 String mediapackageId = mediapackage.getIdentifier().toString();
657 String elementId = element.getIdentifier();
658
659 try {
660 final File elementFile = getDistributionFile(channelId, mediapackage, element);
661 final File mediapackageDir = getMediaPackageDirectory(channelId, mediapackage);
662
663
664 if (!elementFile.exists()) {
665 logger.info("Element {} from media package {} has already been removed or has never been distributed to "
666 + "publication channel {}", elementId, mediapackageId, channelId);
667 return element;
668 }
669
670 logger.debug("Retracting element {} ({})", element, elementFile);
671
672
673 if (!FileUtils.deleteQuietly(elementFile.getParentFile())) {
674
675 logger.debug("Unable to delete folder {}", elementFile.getParentFile().getAbsolutePath());
676 }
677
678 if (mediapackageDir.isDirectory() && mediapackageDir.list().length == 0) {
679 FileSupport.delete(mediapackageDir);
680 }
681
682 logger.debug("Finished retracting element {} of media package {} from publication channel {}", elementId,
683 mediapackageId, channelId);
684 return element;
685 } catch (Exception e) {
686 logger.warn("Error retracting element {} of media package {} from publication channel {}", elementId,
687 mediapackageId, channelId, e);
688 if (e instanceof DistributionException) {
689 throw (DistributionException) e;
690 } else {
691 throw new DistributionException(e);
692 }
693 }
694 }
695
696
697
698
699
700
701 @Override
702 protected String process(Job job) throws Exception {
703 Operation op = null;
704 String operation = job.getOperation();
705 List<String> arguments = job.getArguments();
706 try {
707 op = Operation.valueOf(operation);
708 String channelId = arguments.get(0);
709 MediaPackage mediapackage = MediaPackageParser.getFromXml(arguments.get(1));
710 Set<String> elementIds = gson.fromJson(arguments.get(2), new TypeToken<Set<String>>() { }.getType());
711
712 switch (op) {
713 case Distribute:
714 Boolean checkAvailability = Boolean.parseBoolean(arguments.get(3));
715 Boolean preserveReference = Boolean.parseBoolean(arguments.get(4));
716 MediaPackageElement[] distributedElements = distributeElements(channelId, mediapackage, elementIds,
717 checkAvailability, preserveReference);
718 return (distributedElements != null)
719 ? MediaPackageElementParser.getArrayAsXml(Arrays.asList(distributedElements)) : null;
720 case Retract:
721 MediaPackageElement[] retractedElements = retractElements(channelId, mediapackage, elementIds);
722 return (retractedElements != null) ? MediaPackageElementParser.getArrayAsXml(Arrays.asList(retractedElements))
723 : null;
724 default:
725 throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
726 }
727 } catch (IllegalArgumentException e) {
728 throw new ServiceRegistryException("This service can't handle operations of type '" + op + "'", e);
729 } catch (IndexOutOfBoundsException e) {
730 throw new ServiceRegistryException("This argument list for operation '" + op + "' does not meet expectations", e);
731 } catch (Exception e) {
732 throw new ServiceRegistryException("Error handling operation '" + op + "'", e);
733 }
734 }
735
736 private Set<MediaPackageElement> getElements(String channelId, MediaPackage mediapackage, Set<String> elementIds)
737 throws IllegalStateException {
738 final Set<MediaPackageElement> elements = new HashSet<MediaPackageElement>();
739 for (String elementId : elementIds) {
740 MediaPackageElement element = mediapackage.getElementById(elementId);
741 if (element != null) {
742 elements.add(element);
743 } else {
744 element = Arrays.stream(mediapackage.getPublications())
745 .filter(p -> p.getChannel().equals(channelId))
746 .flatMap(p -> Arrays.stream(p.getAttachments())
747 .filter(a -> a.getIdentifier().equals(elementId)))
748 .findAny()
749 .orElseThrow(() ->
750 new IllegalStateException(format("No element %s found in mediapackage %s", elementId,
751 mediapackage.getIdentifier())));
752 elements.add(element);
753 }
754 }
755 return elements;
756 }
757
758
759
760
761
762
763
764
765
766
767
768
769 private File findDuplicatedElementSource(final File source, final String mpId) throws IOException {
770 String orgId = securityService.getOrganization().getId();
771 final Path rootPath = Paths.get(distributionDirectory.getAbsolutePath(), orgId);
772
773 if (!Files.exists(rootPath)) {
774 return source;
775 }
776
777 List<Path> mediaPackageDirectories = new ArrayList<>();
778 try (DirectoryStream<Path> directoryStream = Files.newDirectoryStream(rootPath)) {
779 for (Path path : directoryStream) {
780 Path mpDir = path.resolve(mpId);
781 if (Files.exists(mpDir)) {
782 mediaPackageDirectories.add(mpDir);
783 }
784 }
785 }
786
787 if (mediaPackageDirectories.isEmpty()) {
788 return source;
789 }
790
791 final long size = Files.size(source.toPath());
792
793 final File[] result = new File[1];
794 for (Path p : mediaPackageDirectories) {
795 Files.walkFileTree(p, new SimpleFileVisitor<Path>() {
796 @Override
797 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
798 if (attrs.isDirectory()) {
799 return FileVisitResult.CONTINUE;
800 }
801
802 if (size != attrs.size()) {
803 return FileVisitResult.CONTINUE;
804 }
805
806 try (InputStream is1 = Files.newInputStream(source.toPath()); InputStream is2 = Files.newInputStream(file)) {
807 if (!IOUtils.contentEquals(is1, is2)) {
808 return FileVisitResult.CONTINUE;
809 }
810 }
811 result[0] = file.toFile();
812 return FileVisitResult.TERMINATE;
813 }
814 });
815 if (result[0] != null) {
816 break;
817 }
818 }
819 if (result[0] != null) {
820 return result[0];
821 }
822
823 return source;
824 }
825
826
827
828
829
830
831 protected File getDistributionFile(String channelId, MediaPackage mp, MediaPackageElement element) {
832 final String uriString = element.getURI().toString().split("\\?")[0];
833 final String directoryName = distributionDirectory.getAbsolutePath();
834 final String orgId = securityService.getOrganization().getId();
835 if (uriString.startsWith(serviceUrl)) {
836 String[] splitUrl = uriString.substring(serviceUrl.length() + 1).split("/");
837 if (splitUrl.length < 5) {
838 logger.warn("Malformed URI {}. Format must be .../{orgId}/{channelId}/{mediapackageId}/{elementId}/{fileName}."
839 + " Trying URI without channelId", uriString);
840 return new File(path(directoryName, orgId, splitUrl[1], splitUrl[2], splitUrl[3]));
841 } else {
842 return new File(path(directoryName, orgId, splitUrl[1], splitUrl[2], splitUrl[3], splitUrl[4]));
843 }
844 }
845 return new File(path(directoryName, orgId, channelId, mp.getIdentifier().toString(), element.getIdentifier(),
846 FilenameUtils.getName(uriString)));
847 }
848
849
850
851
852
853
854 protected File getMediaPackageDirectory(String channelId, MediaPackage mp) {
855 final String orgId = securityService.getOrganization().getId();
856 return new File(distributionDirectory, path(orgId, channelId, mp.getIdentifier().toString()));
857 }
858
859
860
861
862
863
864
865
866
867
868
869
870 protected URI getDistributionUri(String channelId, String mediaPackageId, MediaPackageElement element)
871 throws URISyntaxException {
872 String elementId = element.getIdentifier();
873 String fileName = FilenameUtils.getName(element.getURI().toString());
874 String orgId = securityService.getOrganization().getId();
875 String destinationURI = UrlSupport.concat(serviceUrl, orgId, channelId, mediaPackageId, elementId, fileName);
876 return new URI(destinationURI);
877 }
878
879 @Override
880 public void updated(@SuppressWarnings("rawtypes") Dictionary properties) throws ConfigurationException {
881 distributeJobLoad = LoadUtil.getConfiguredLoadValue(properties, DISTRIBUTE_JOB_LOAD_KEY,
882 DEFAULT_DISTRIBUTE_JOB_LOAD, serviceRegistry);
883 retractJobLoad = LoadUtil.getConfiguredLoadValue(properties, RETRACT_JOB_LOAD_KEY, DEFAULT_RETRACT_JOB_LOAD,
884 serviceRegistry);
885 }
886
887
888
889
890
891
892 private void checkAvailability(URI uri) {
893 final Organization organization = getSecurityService().getOrganization();
894 final User systemUser = SecurityUtil.createSystemUser(systemUserName, organization);
895 SecurityUtil.runAs(getSecurityService(), organization, systemUser, () -> {
896 waitForResource(trustedHttpClient, uri, HttpServletResponse.SC_OK, TIMEOUT, INTERVAL)
897 .fold(
898 Misc.chuck(),
899 status -> {
900 if (ne(status, HttpServletResponse.SC_OK)) {
901 logger.warn("Attempt to access distributed file {} returned code {}", uri, status);
902 Misc.chuck(new DistributionException("Unable to load distributed file " + uri.toString()));
903 }
904 return null;
905 }
906 );
907 });
908 }
909
910 @Reference
911 @Override
912 public void setWorkspace(Workspace workspace) {
913 super.setWorkspace(workspace);
914 }
915
916 @Reference
917 @Override
918 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
919 super.setServiceRegistry(serviceRegistry);
920 }
921
922 @Reference
923 @Override
924 public void setSecurityService(SecurityService securityService) {
925 super.setSecurityService(securityService);
926 }
927
928 @Reference
929 @Override
930 public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
931 super.setUserDirectoryService(userDirectoryService);
932 }
933
934 @Reference
935 @Override
936 public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
937 super.setOrganizationDirectoryService(organizationDirectoryService);
938 }
939
940 @Reference
941 @Override
942 public void setTrustedHttpClient(TrustedHttpClient trustedHttpClient) {
943 super.setTrustedHttpClient(trustedHttpClient);
944 }
945
946 }