1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.distribution.aws.s3;
22
23 import static java.lang.String.format;
24 import static org.opencastproject.util.RequireUtil.notNull;
25
26 import org.opencastproject.distribution.api.AbstractDistributionService;
27 import org.opencastproject.distribution.api.DistributionException;
28 import org.opencastproject.distribution.api.DistributionService;
29 import org.opencastproject.distribution.api.DownloadDistributionService;
30 import org.opencastproject.distribution.aws.s3.api.AwsS3DistributionService;
31 import org.opencastproject.job.api.Job;
32 import org.opencastproject.mediapackage.AdaptivePlaylist;
33 import org.opencastproject.mediapackage.MediaPackage;
34 import org.opencastproject.mediapackage.MediaPackageElement;
35 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
36 import org.opencastproject.mediapackage.MediaPackageElementParser;
37 import org.opencastproject.mediapackage.MediaPackageException;
38 import org.opencastproject.mediapackage.MediaPackageParser;
39 import org.opencastproject.mediapackage.Track;
40 import org.opencastproject.security.api.OrganizationDirectoryService;
41 import org.opencastproject.security.api.SecurityService;
42 import org.opencastproject.security.api.UserDirectoryService;
43 import org.opencastproject.serviceregistry.api.ServiceRegistry;
44 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
45 import org.opencastproject.util.ConfigurationException;
46 import org.opencastproject.util.LoadUtil;
47 import org.opencastproject.util.NotFoundException;
48 import org.opencastproject.util.OsgiUtil;
49 import org.opencastproject.util.data.Option;
50 import org.opencastproject.workspace.api.Workspace;
51
52 import com.amazonaws.AmazonClientException;
53 import com.amazonaws.AmazonServiceException;
54 import com.amazonaws.ClientConfiguration;
55 import com.amazonaws.HttpMethod;
56 import com.amazonaws.auth.AWSCredentialsProvider;
57 import com.amazonaws.auth.AWSStaticCredentialsProvider;
58 import com.amazonaws.auth.BasicAWSCredentials;
59 import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
60 import com.amazonaws.auth.policy.Policy;
61 import com.amazonaws.auth.policy.Principal;
62 import com.amazonaws.auth.policy.Statement;
63 import com.amazonaws.auth.policy.actions.S3Actions;
64 import com.amazonaws.auth.policy.resources.S3ObjectResource;
65 import com.amazonaws.client.builder.AwsClientBuilder;
66 import com.amazonaws.services.s3.AmazonS3;
67 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
68 import com.amazonaws.services.s3.model.BucketWebsiteConfiguration;
69 import com.amazonaws.services.s3.model.SetBucketWebsiteConfigurationRequest;
70 import com.amazonaws.services.s3.transfer.TransferManager;
71 import com.amazonaws.services.s3.transfer.Upload;
72 import com.google.gson.Gson;
73 import com.google.gson.reflect.TypeToken;
74
75 import org.apache.commons.io.FilenameUtils;
76 import org.apache.commons.lang3.BooleanUtils;
77 import org.apache.commons.lang3.StringUtils;
78 import org.apache.commons.lang3.math.NumberUtils;
79 import org.apache.http.client.methods.CloseableHttpResponse;
80 import org.apache.http.client.methods.HttpHead;
81 import org.apache.http.impl.client.CloseableHttpClient;
82 import org.apache.http.impl.client.HttpClients;
83 import org.osgi.service.component.ComponentContext;
84 import org.osgi.service.component.annotations.Activate;
85 import org.osgi.service.component.annotations.Component;
86 import org.osgi.service.component.annotations.Deactivate;
87 import org.osgi.service.component.annotations.Reference;
88 import org.slf4j.Logger;
89 import org.slf4j.LoggerFactory;
90
91 import java.io.File;
92 import java.io.IOException;
93 import java.net.URI;
94 import java.net.URISyntaxException;
95 import java.nio.file.Files;
96 import java.nio.file.Path;
97 import java.nio.file.Paths;
98 import java.util.ArrayList;
99 import java.util.Arrays;
100 import java.util.Comparator;
101 import java.util.Date;
102 import java.util.HashMap;
103 import java.util.HashSet;
104 import java.util.List;
105 import java.util.Map.Entry;
106 import java.util.Set;
107 import java.util.stream.Collectors;
108 import java.util.stream.Stream;
109
110 import javax.servlet.http.HttpServletResponse;
111
112 @Component(
113 immediate = true,
114 service = { DistributionService.class, DownloadDistributionService.class, AwsS3DistributionService.class },
115 property = {
116 "service.description=Distribution Service (AWS S3)",
117 "distribution.channel=aws.s3"
118 }
119 )
120 public class AwsS3DistributionServiceImpl extends AbstractDistributionService
121 implements AwsS3DistributionService, DistributionService {
122
123
124 private static final Logger logger = LoggerFactory.getLogger(AwsS3DistributionServiceImpl.class);
125
126
127 public static final String JOB_TYPE = "org.opencastproject.distribution.aws.s3";
128
129
130 public enum Operation {
131 Distribute, Retract
132 }
133
134
135 public static final String AWS_S3_DISTRIBUTION_ENABLE = "org.opencastproject.distribution.aws.s3.distribution.enable";
136 public static final String AWS_S3_DISTRIBUTION_BASE_CONFIG
137 = "org.opencastproject.distribution.aws.s3.distribution.base";
138 public static final String AWS_S3_ACCESS_KEY_ID_CONFIG = "org.opencastproject.distribution.aws.s3.access.id";
139 public static final String AWS_S3_SECRET_ACCESS_KEY_CONFIG = "org.opencastproject.distribution.aws.s3.secret.key";
140 public static final String AWS_S3_REGION_CONFIG = "org.opencastproject.distribution.aws.s3.region";
141 public static final String AWS_S3_BUCKET_CONFIG = "org.opencastproject.distribution.aws.s3.bucket";
142 public static final String AWS_S3_ENDPOINT_CONFIG = "org.opencastproject.distribution.aws.s3.endpoint";
143 public static final String AWS_S3_PATH_STYLE_CONFIG = "org.opencastproject.distribution.aws.s3.path.style";
144 public static final String AWS_S3_PRESIGNED_URL_CONFIG = "org.opencastproject.distribution.aws.s3.presigned.url";
145 public static final String AWS_S3_PRESIGNED_URL_VALID_DURATION_CONFIG
146 = "org.opencastproject.distribution.aws.s3.presigned.url.valid.duration";
147
148 public static final String AWS_S3_MAX_CONNECTIONS = "org.opencastproject.distribution.aws.s3.max.connections";
149 public static final String AWS_S3_CONNECTION_TIMEOUT = "org.opencastproject.distribution.aws.s3.connection.timeout";
150 public static final String AWS_S3_MAX_RETRIES = "org.opencastproject.distribution.aws.s3.max.retries";
151
152 public static final String DISTRIBUTE_JOB_LOAD_KEY = "job.load.aws.s3.distribute";
153 public static final String RETRACT_JOB_LOAD_KEY = "job.load.aws.s3.retract";
154
155
156 public static final String OPENCAST_STORAGE_DIR = "org.opencastproject.storage.dir";
157 public static final String DEFAULT_TEMP_DIR = "tmp/s3dist";
158
159
160
161
162 public static final int DEFAULT_MAX_CONNECTIONS = 50;
163 public static final int DEFAULT_CONNECTION_TIMEOUT = 10000;
164 public static final int DEFAULT_MAX_RETRIES = 100;
165
166
167 public static final float DEFAULT_DISTRIBUTE_JOB_LOAD = 0.1f;
168
169
170 public static final float DEFAULT_RETRACT_JOB_LOAD = 0.1f;
171
172
173 public static final int DEFAULT_PRESIGNED_URL_EXPIRE_MILLIS = 6 * 60 * 60 * 1000;
174
175
176 private static final int MAXIMUM_PRESIGNED_URL_EXPIRE_MILLIS = 7 * 24 * 60 * 60 * 1000;
177
178
179 private float distributeJobLoad = DEFAULT_DISTRIBUTE_JOB_LOAD;
180
181
182 private float retractJobLoad = DEFAULT_RETRACT_JOB_LOAD;
183
184
185 private static final int MAX_TRIES = 10;
186
187
188 private static final long SLEEP_INTERVAL = 30000L;
189
190
191 private AmazonS3 s3 = null;
192 private TransferManager s3TransferManager = null;
193
194
195 private String bucketName = null;
196 private Path tmpPath = null;
197
198
199 private String endpoint = null;
200
201
202 private boolean pathStyle = false;
203
204
205 private boolean presignedUrl = false;
206
207
208 private int presignedUrlValidDuration = DEFAULT_PRESIGNED_URL_EXPIRE_MILLIS;
209
210
211 private String opencastDistributionUrl = null;
212
213 private Gson gson = new Gson();
214
215
216
217
218 public AwsS3DistributionServiceImpl() {
219 super(JOB_TYPE);
220 }
221
222 private String getAWSConfigKey(ComponentContext cc, String key) {
223 try {
224 return OsgiUtil.getComponentContextProperty(cc, key);
225 } catch (RuntimeException e) {
226 throw new ConfigurationException(key + " is missing or invalid", e);
227 }
228 }
229
230 @Override
231 @Activate
232 public void activate(ComponentContext cc) {
233
234
235 if (cc != null) {
236
237 if (!BooleanUtils.toBoolean(getAWSConfigKey(cc, AWS_S3_DISTRIBUTION_ENABLE))) {
238 logger.info("AWS S3 distribution disabled");
239 return;
240 }
241
242 tmpPath = Paths.get(cc.getBundleContext().getProperty(OPENCAST_STORAGE_DIR), DEFAULT_TEMP_DIR);
243
244
245 if (tmpPath.toFile().exists()) {
246 try (Stream<Path> walk = Files.walk(tmpPath)) {
247 walk.map(Path::toFile).sorted(Comparator.reverseOrder()).forEach(File::delete);
248 } catch (IOException e) {
249 logger.warn("Unable to delete {}", tmpPath, e);
250 }
251 }
252 logger.info("AWS S3 Distribution uses temp storage in {}", tmpPath);
253 try {
254 Files.createDirectories(tmpPath);
255 } catch (IOException e) {
256 logger.error("Could not create temporary directory for AWS S3 Distribution : `{}`", tmpPath);
257 throw new IllegalStateException(e);
258 }
259
260
261 bucketName = getAWSConfigKey(cc, AWS_S3_BUCKET_CONFIG);
262 logger.info("AWS S3 bucket name is {}", bucketName);
263
264
265 String regionStr = getAWSConfigKey(cc, AWS_S3_REGION_CONFIG);
266 logger.info("AWS region is {}", regionStr);
267
268
269 endpoint = OsgiUtil.getComponentContextProperty(cc, AWS_S3_ENDPOINT_CONFIG, "s3." + regionStr + ".amazonaws.com");
270 logger.info("AWS S3 endpoint is {}", endpoint);
271
272
273 pathStyle = BooleanUtils.toBoolean(OsgiUtil.getComponentContextProperty(cc, AWS_S3_PATH_STYLE_CONFIG, "false"));
274 logger.info("AWS path style is {}", pathStyle);
275
276
277 String presignedUrlConfigValue = OsgiUtil.getComponentContextProperty(cc, AWS_S3_PRESIGNED_URL_CONFIG, "false");
278 presignedUrl = StringUtils.equalsIgnoreCase("true", presignedUrlConfigValue);
279 logger.info("AWS use presigned URL: {}", presignedUrl);
280
281
282 String presignedUrlExpTimeMillisConfigValue = OsgiUtil.getComponentContextProperty(cc,
283 AWS_S3_PRESIGNED_URL_VALID_DURATION_CONFIG, null);
284 presignedUrlValidDuration = NumberUtils.toInt(presignedUrlExpTimeMillisConfigValue,
285 DEFAULT_PRESIGNED_URL_EXPIRE_MILLIS);
286 if (presignedUrlValidDuration > MAXIMUM_PRESIGNED_URL_EXPIRE_MILLIS) {
287 logger.warn(
288 "Valid duration of presigned URL is too large, MAXIMUM_PRESIGNED_URL_EXPIRE_MILLIS(7 days) is used");
289 presignedUrlValidDuration = MAXIMUM_PRESIGNED_URL_EXPIRE_MILLIS;
290 }
291
292 opencastDistributionUrl = getAWSConfigKey(cc, AWS_S3_DISTRIBUTION_BASE_CONFIG);
293 if (!opencastDistributionUrl.endsWith("/")) {
294 opencastDistributionUrl = opencastDistributionUrl + "/";
295 }
296 logger.info("AWS distribution url is {}", opencastDistributionUrl);
297
298 distributeJobLoad = LoadUtil.getConfiguredLoadValue(cc.getProperties(), DISTRIBUTE_JOB_LOAD_KEY,
299 DEFAULT_DISTRIBUTE_JOB_LOAD, serviceRegistry);
300 retractJobLoad = LoadUtil.getConfiguredLoadValue(cc.getProperties(), RETRACT_JOB_LOAD_KEY,
301 DEFAULT_RETRACT_JOB_LOAD, serviceRegistry);
302
303
304 AWSCredentialsProvider provider = null;
305 Option<String> accessKeyIdOpt = OsgiUtil.getOptCfg(cc.getProperties(), AWS_S3_ACCESS_KEY_ID_CONFIG);
306 Option<String> accessKeySecretOpt = OsgiUtil.getOptCfg(cc.getProperties(), AWS_S3_SECRET_ACCESS_KEY_CONFIG);
307
308
309
310
311 if (accessKeyIdOpt.isNone() && accessKeySecretOpt.isNone()) {
312 provider = new DefaultAWSCredentialsProviderChain();
313 } else {
314 provider = new AWSStaticCredentialsProvider(
315 new BasicAWSCredentials(accessKeyIdOpt.get(), accessKeySecretOpt.get()));
316 }
317
318
319 ClientConfiguration clientConfiguration = new ClientConfiguration();
320
321 int maxConnections = OsgiUtil.getOptCfgAsInt(cc.getProperties(), AWS_S3_MAX_CONNECTIONS)
322 .getOrElse(DEFAULT_MAX_CONNECTIONS);
323 logger.debug("Max Connections: {}", maxConnections);
324 clientConfiguration.setMaxConnections(maxConnections);
325
326 int connectionTimeout = OsgiUtil.getOptCfgAsInt(cc.getProperties(), AWS_S3_CONNECTION_TIMEOUT)
327 .getOrElse(DEFAULT_CONNECTION_TIMEOUT);
328 logger.debug("Connection Output: {}", connectionTimeout);
329 clientConfiguration.setConnectionTimeout(connectionTimeout);
330
331 int maxRetries = OsgiUtil.getOptCfgAsInt(cc.getProperties(), AWS_S3_MAX_RETRIES)
332 .getOrElse(DEFAULT_MAX_RETRIES);
333 logger.debug("Max Retry: {}", maxRetries);
334 clientConfiguration.setMaxErrorRetry(maxRetries);
335
336
337 s3 = AmazonS3ClientBuilder.standard()
338 .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, regionStr))
339 .withClientConfiguration(clientConfiguration)
340 .withPathStyleAccessEnabled(pathStyle).withCredentials(provider).build();
341
342 s3TransferManager = new TransferManager(s3);
343
344
345 createAWSBucket();
346 distributionChannel = OsgiUtil.getComponentContextProperty(cc, CONFIG_KEY_STORE_TYPE);
347
348 logger.info("AwsS3DistributionService activated!");
349 }
350 }
351
352 @Override
353 public String getDistributionType() {
354 return distributionChannel;
355 }
356
357 @Deactivate
358 public void deactivate() {
359
360 if (s3TransferManager != null) {
361 s3TransferManager.shutdownNow();
362 }
363
364 logger.info("AwsS3DistributionService deactivated!");
365 }
366
367 @Override
368 public Job distribute(String pubChannelId, MediaPackage mediaPackage, Set<String> downloadIds,
369 boolean checkAvailability, boolean preserveReference) throws DistributionException, MediaPackageException {
370 throw new UnsupportedOperationException("Not supported yet.");
371
372 }
373
374
375
376
377
378
379
380 @Override
381 public Job distribute(String channelId, MediaPackage mediaPackage, Set<String> elementIds, boolean checkAvailability)
382 throws DistributionException, MediaPackageException {
383 notNull(mediaPackage, "mediapackage");
384 notNull(elementIds, "elementIds");
385 notNull(channelId, "channelId");
386 try {
387 return serviceRegistry.createJob(JOB_TYPE, Operation.Distribute.toString(), Arrays.asList(channelId,
388 MediaPackageParser.getAsXml(mediaPackage), gson.toJson(elementIds), Boolean.toString(checkAvailability)),
389 distributeJobLoad);
390 } catch (ServiceRegistryException e) {
391 throw new DistributionException("Unable to create a job", e);
392 }
393 }
394
395
396
397
398
399
400
401 @Override
402 public Job distribute(String channelId, MediaPackage mediapackage, String elementId)
403 throws DistributionException, MediaPackageException {
404 return distribute(channelId, mediapackage, elementId, true);
405 }
406
407
408
409
410
411
412
413 @Override
414 public Job distribute(String channelId, MediaPackage mediaPackage, String elementId, boolean checkAvailability)
415 throws DistributionException, MediaPackageException {
416 Set<String> elementIds = new HashSet<>();
417 elementIds.add(elementId);
418 return distribute(channelId, mediaPackage, elementIds, checkAvailability);
419 }
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437 public MediaPackageElement[] distributeElements(String channelId, MediaPackage mediapackage, Set<String> elementIds,
438 boolean checkAvailability) throws DistributionException {
439 notNull(mediapackage, "mediapackage");
440 notNull(elementIds, "elementIds");
441 notNull(channelId, "channelId");
442
443 final Set<MediaPackageElement> elements = getElements(mediapackage, elementIds);
444 List<MediaPackageElement> distributedElements = new ArrayList<>();
445
446 if (AdaptivePlaylist.hasHLSPlaylist(elements)) {
447 return distributeHLSElements(channelId, mediapackage, elements, checkAvailability);
448 }
449
450 for (MediaPackageElement element : elements) {
451 MediaPackageElement distributedElement = distributeElement(channelId, mediapackage, element, checkAvailability);
452 distributedElements.add(distributedElement);
453 }
454 return distributedElements.toArray(new MediaPackageElement[distributedElements.size()]);
455 }
456
457 private Set<MediaPackageElement> getElements(MediaPackage mediapackage, Set<String> elementIds)
458 throws IllegalStateException {
459 final Set<MediaPackageElement> elements = new HashSet<>();
460 for (String elementId : elementIds) {
461 MediaPackageElement element = mediapackage.getElementById(elementId);
462 if (element != null) {
463 elements.add(element);
464 } else {
465 throw new IllegalStateException(
466 format("No element %s found in mediapackage %s", elementId, mediapackage.getIdentifier()));
467 }
468 }
469 return elements;
470 }
471
472
473
474
475
476
477
478
479
480
481
482
483
484 public MediaPackageElement distributeElement(String channelId, final MediaPackage mediaPackage,
485 MediaPackageElement element, boolean checkAvailability) throws DistributionException {
486 notNull(channelId, "channelId");
487 notNull(mediaPackage, "mediapackage");
488 notNull(element, "element");
489
490 try {
491 return distributeElement(channelId, mediaPackage, element, checkAvailability, workspace.get(element.getURI()));
492 } catch (NotFoundException e) {
493 throw new DistributionException("Unable to find " + element.getURI() + " in the workspace", e);
494 } catch (IOException e) {
495 throw new DistributionException("Error loading " + element.getURI() + " from the workspace", e);
496 }
497 }
498
499 private MediaPackageElement distributeElement(String channelId, final MediaPackage mediaPackage,
500 MediaPackageElement element, boolean checkAvailability, File source) throws DistributionException {
501
502
503
504 try {
505 String objectName = buildObjectName(channelId, mediaPackage.getIdentifier().toString(), element);
506 logger.info("Uploading {} to bucket {}...", objectName, bucketName);
507 Upload upload = s3TransferManager.upload(bucketName, objectName, source);
508 long start = System.currentTimeMillis();
509
510 try {
511
512 upload.waitForCompletion();
513 logger.info("Upload of {} to bucket {} completed in {} seconds", objectName, bucketName,
514 (System.currentTimeMillis() - start) / 1000);
515 } catch (AmazonClientException e) {
516 throw new DistributionException("AWS error: " + e.getMessage(), e);
517 }
518
519
520 MediaPackageElement distributedElement = (MediaPackageElement) element.clone();
521 try {
522 distributedElement.setURI(getDistributionUri(objectName));
523 } catch (URISyntaxException e) {
524 throw new DistributionException("Distributed element produces an invalid URI", e);
525 }
526
527 logger.info("Distributed element {}, object {}", element.getIdentifier(), objectName);
528
529 if (checkAvailability) {
530 URI uri = distributedElement.getURI();
531 String distributedElementUriStr = uri.toString();
532 int tries = 0;
533 CloseableHttpResponse response = null;
534 boolean success = false;
535 while (tries < MAX_TRIES) {
536 try {
537 if (presignedUrl) {
538
539 Date fiveMinutesLater = new Date(System.currentTimeMillis() + 5 * 60 * 1000);
540 uri = s3.generatePresignedUrl(bucketName, objectName, fiveMinutesLater, HttpMethod.HEAD).toURI();
541 }
542 CloseableHttpClient httpClient = HttpClients.createDefault();
543 logger.trace("Trying to access {}", uri);
544 response = httpClient.execute(new HttpHead(uri));
545 if (response.getStatusLine().getStatusCode() == HttpServletResponse.SC_OK) {
546 logger.trace("Successfully got {}", uri);
547 success = true;
548 break;
549 } else {
550 logger.debug("Http status code when checking distributed element {} is {}", objectName,
551 response.getStatusLine().getStatusCode());
552 }
553 } catch (Exception e) {
554 logger.info("Checking availability of {} threw exception {}. Trying again.", objectName, e.getMessage());
555
556 } finally {
557 if (null != response) {
558 response.close();
559 }
560 }
561 tries++;
562 logger.trace("Sleeping for {} seconds...", SLEEP_INTERVAL / 1000);
563 Thread.sleep(SLEEP_INTERVAL);
564 }
565 if (!success) {
566 logger.warn("Could not check availability of distributed file {}", uri);
567
568 }
569 }
570
571 return distributedElement;
572 } catch (Exception e) {
573 logger.warn("Error distributing element " + element.getIdentifier() + " of media package " + mediaPackage, e);
574 if (e instanceof DistributionException) {
575 throw (DistributionException) e;
576 } else {
577 throw new DistributionException(e);
578 }
579 }
580 }
581
582 @Override
583 public Job retract(String channelId, MediaPackage mediapackage, String elementId) throws DistributionException {
584 Set<String> elementIds = new HashSet<>();
585 elementIds.add(elementId);
586 return retract(channelId, mediapackage, elementIds);
587 }
588
589 @Override
590 public Job retract(String channelId, MediaPackage mediapackage, Set<String> elementIds) throws DistributionException {
591 notNull(mediapackage, "mediapackage");
592 notNull(elementIds, "elementIds");
593 notNull(channelId, "channelId");
594 try {
595 return serviceRegistry.createJob(JOB_TYPE, Operation.Retract.toString(),
596 Arrays.asList(channelId, MediaPackageParser.getAsXml(mediapackage), gson.toJson(elementIds)),
597 retractJobLoad);
598 } catch (ServiceRegistryException e) {
599 throw new DistributionException("Unable to create a job", e);
600 }
601 }
602
603 @Override
604 public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediapackage, String elementId)
605 throws DistributionException, MediaPackageException {
606 Set<String> elementIds = new HashSet<String>();
607 elementIds.add(elementId);
608 return distributeSync(channelId, mediapackage, elementIds, true);
609 }
610
611 @Override
612 public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediapackage, Set<String> elementIds,
613 boolean checkAvailability) throws DistributionException {
614 final MediaPackageElement[] distributedElements = distributeElements(channelId, mediapackage, elementIds,
615 checkAvailability);
616 if (distributedElements == null) {
617 return null;
618 }
619 return Arrays.asList(distributedElements);
620 }
621
622 @Override
623 public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediapackage, String elementId)
624 throws DistributionException {
625 Set<String> elementIds = new HashSet<String>();
626 elementIds.add(elementId);
627 return retractSync(channelId, mediapackage, elementIds);
628 }
629
630 @Override
631 public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediaPackage, Set<String> elementIds)
632 throws DistributionException {
633 final MediaPackageElement[] retractedElements = retractElements(channelId, mediaPackage, elementIds);
634 if (retractedElements == null) {
635 return null;
636 }
637 return Arrays.asList(retractedElements);
638 }
639
640
641
642
643
644
645
646
647
648
649
650
651 protected MediaPackageElement retractElement(String channelId, MediaPackage mediaPackage, MediaPackageElement element)
652 throws DistributionException {
653 notNull(mediaPackage, "mediaPackage");
654 notNull(element, "element");
655
656 try {
657 String objectName = getDistributedObjectName(element);
658 if (objectName != null) {
659 s3.deleteObject(bucketName, objectName);
660 logger.info("Retracted element {}, object {}", element.getIdentifier(), objectName);
661 }
662 return element;
663 } catch (AmazonClientException e) {
664 throw new DistributionException("AWS error: " + e.getMessage(), e);
665 }
666 }
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683 protected MediaPackageElement[] retractElements(String channelId, MediaPackage mediapackage, Set<String> elementIds)
684 throws DistributionException {
685 notNull(mediapackage, "mediapackage");
686 notNull(elementIds, "elementIds");
687 notNull(channelId, "channelId");
688
689 Set<MediaPackageElement> elements = getElements(mediapackage, elementIds);
690 List<MediaPackageElement> retractedElements = new ArrayList<>();
691
692 for (MediaPackageElement element : elements) {
693 MediaPackageElement retractedElement = retractElement(channelId, mediapackage, element);
694 retractedElements.add(retractedElement);
695 }
696 return retractedElements.toArray(new MediaPackageElement[retractedElements.size()]);
697 }
698
699
700
701
702
703
704
705
706
707 protected String buildObjectName(String channelId, String mpId, MediaPackageElement element) {
708
709 final String orgId = securityService.getOrganization().getId();
710 String uriString = element.getURI().toString();
711 String fileName = FilenameUtils.getName(uriString);
712 return buildObjectName(orgId, channelId, mpId, element.getIdentifier(), fileName);
713 }
714
715
716
717
718
719
720
721
722
723
724
725 protected String buildObjectName(String orgId, String channelId, String mpId, String elementId, String fileName) {
726 return StringUtils.join(new String[] { orgId, channelId, mpId, elementId, fileName }, "/");
727 }
728
729
730
731
732
733
734
735
736 protected URI getDistributionUri(String objectName) throws URISyntaxException {
737
738 return new URI(opencastDistributionUrl + objectName);
739 }
740
741
742
743
744
745
746 protected String getDistributedObjectName(MediaPackageElement element) {
747
748 String uriString = element.getURI().toString();
749
750
751 if (uriString.startsWith(opencastDistributionUrl) && uriString.length() > opencastDistributionUrl.length()) {
752 return uriString.substring(opencastDistributionUrl.length());
753 } else {
754
755 logger.warn(
756 "Cannot retract {}. Uri must be in the format "
757 + "https://host/bucketName/orgId/channelId/mpId/originalElementId/fileName.extension",
758 uriString);
759 return null;
760 }
761 }
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781 private MediaPackageElement[] distributeHLSElements(String channelId, MediaPackage mediapackage,
782 Set<MediaPackageElement> elements, boolean checkAvailability) throws DistributionException {
783
784 List<MediaPackageElement> distributedElements = new ArrayList<MediaPackageElement>();
785 List<MediaPackageElement> nontrackElements = elements.stream()
786 .filter(e -> e.getElementType() != MediaPackageElement.Type.Track).collect(Collectors.toList());
787
788 for (MediaPackageElement element : nontrackElements) {
789 MediaPackageElement distributedElement = distributeElement(channelId, mediapackage, element, checkAvailability);
790 distributedElements.add(distributedElement);
791 }
792
793
794 List<Track> trackElements = elements.stream().filter(e -> e.getElementType() == MediaPackageElement.Type.Track)
795 .map(e -> (Track) e).collect(Collectors.toList());
796 HashMap<MediaPackageElementFlavor, List<Track>> trackElementsMap
797 = new HashMap<MediaPackageElementFlavor, List<Track>>();
798 for (Track t : trackElements) {
799 List<Track> l = trackElementsMap.get(t.getFlavor());
800 if (l == null) {
801 l = new ArrayList<Track>();
802 }
803 l.add(t);
804 trackElementsMap.put(t.getFlavor(), l);
805 }
806
807 Path tmpDir = null;
808 try {
809 tmpDir = Files.createTempDirectory(tmpPath, mediapackage.getIdentifier().toString());
810
811 for (Entry<MediaPackageElementFlavor, List<Track>> elementSet : trackElementsMap.entrySet()) {
812 List<Track> tracks = elementSet.getValue();
813 try {
814 List<Track> transformedTracks = new ArrayList<Track>();
815
816 if (tracks.stream().anyMatch(AdaptivePlaylist.isHLSTrackPred)) {
817
818
819 List<Track> tmpTracks = new ArrayList<Track>();
820 for (Track t : tracks) {
821
822 Track tcopy = (Track) t.clone();
823 String newName = "./" + t.getURI().getPath();
824 Path newPath = tmpDir.resolve(newName).normalize();
825 Files.createDirectories(newPath.getParent());
826
827 if (AdaptivePlaylist.isPlaylist(t)) {
828 File f = workspace.get(t.getURI());
829 Path plcopy = Files.copy(f.toPath(), newPath);
830 tcopy.setURI(plcopy.toUri());
831 } else {
832 Path plcopy = Files.createFile(newPath);
833 tcopy.setURI(plcopy.toUri());
834 }
835 tmpTracks.add(tcopy);
836 }
837
838 tmpTracks = AdaptivePlaylist.fixReferences(tmpTracks, tmpDir.toFile());
839
840
841 tracks.stream().filter(AdaptivePlaylist.isHLSTrackPred.negate()).forEach(t -> transformedTracks.add(t));
842 tmpTracks.stream().filter(AdaptivePlaylist.isHLSTrackPred).forEach(t -> transformedTracks.add(t));
843 } else {
844 transformedTracks.addAll(tracks);
845 }
846 for (Track track : transformedTracks) {
847 MediaPackageElement distributedElement;
848 if (AdaptivePlaylist.isPlaylist(track)) {
849 distributedElement = distributeElement(channelId, mediapackage, track, checkAvailability,
850 new File(track.getURI()));
851 } else {
852 distributedElement = distributeElement(channelId, mediapackage, track, checkAvailability);
853 }
854 distributedElements.add(distributedElement);
855 }
856 } catch (MediaPackageException | NotFoundException | IOException e1) {
857 logger.error("HLS Prepare failed for mediapackage {} in {}", elementSet.getKey(), mediapackage, e1);
858 throw new DistributionException("Cannot distribute " + mediapackage);
859 } catch (URISyntaxException e1) {
860 logger.error("HLS Prepare failed - Bad URI syntax {} in {}", elementSet.getKey(), mediapackage, e1);
861 throw new DistributionException("Cannot distribute - BAD URI syntax " + mediapackage);
862 }
863 }
864 } catch (IOException e2) {
865 throw new DistributionException("Cannot create tmp dir to process HLS:" + mediapackage + e2.getMessage());
866 } finally {
867
868 try (Stream<Path> walk = Files.walk(tmpDir)) {
869 walk.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
870 } catch (IOException e) {
871 logger.warn("Cannot delete tmp dir for processing HLS mp {}, path {}", mediapackage, tmpPath, e);
872 }
873 }
874 return distributedElements.toArray(new MediaPackageElement[distributedElements.size()]);
875 }
876
877
878
879
880
881
882 @Override
883 protected String process(Job job) throws Exception {
884 Operation op = null;
885 String operation = job.getOperation();
886 List<String> arguments = job.getArguments();
887 try {
888 op = Operation.valueOf(operation);
889 String channelId = arguments.get(0);
890 MediaPackage mediaPackage = MediaPackageParser.getFromXml(arguments.get(1));
891 Set<String> elementIds = gson.fromJson(arguments.get(2), new TypeToken<Set<String>>() {
892 }.getType());
893 switch (op) {
894 case Distribute:
895 Boolean checkAvailability = Boolean.parseBoolean(arguments.get(3));
896 MediaPackageElement[] distributedElements = distributeElements(channelId, mediaPackage, elementIds,
897 checkAvailability);
898 return (distributedElements != null)
899 ? MediaPackageElementParser.getArrayAsXml(Arrays.asList(distributedElements))
900 : null;
901 case Retract:
902 MediaPackageElement[] retractedElements = retractElements(channelId, mediaPackage, elementIds);
903 return (retractedElements != null) ? MediaPackageElementParser.getArrayAsXml(Arrays.asList(retractedElements))
904 : null;
905 default:
906 throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
907 }
908 } catch (IllegalArgumentException e) {
909 throw new ServiceRegistryException("This service can't handle operations of type '" + op + "'", e);
910 } catch (IndexOutOfBoundsException e) {
911 throw new ServiceRegistryException("This argument list for operation '" + op + "' does not meet expectations", e);
912 } catch (Exception e) {
913 throw new ServiceRegistryException("Error handling operation '" + op + "'", e);
914 }
915 }
916
917
918
919
920 protected void createAWSBucket() {
921
922 try {
923 s3.listObjects(bucketName);
924 } catch (AmazonServiceException e) {
925 if (e.getStatusCode() == 404) {
926
927 try {
928 s3.createBucket(bucketName);
929
930 Statement allowPublicReadStatement = new Statement(Statement.Effect.Allow).withPrincipals(Principal.AllUsers)
931 .withActions(S3Actions.GetObject).withResources(new S3ObjectResource(bucketName, "*"));
932 Policy policy = new Policy().withStatements(allowPublicReadStatement);
933 s3.setBucketPolicy(bucketName, policy.toJson());
934
935
936 BucketWebsiteConfiguration defaultWebsite = new BucketWebsiteConfiguration();
937
938
939 defaultWebsite.setIndexDocumentSuffix("index.html");
940 defaultWebsite.setErrorDocument("error.html");
941 s3.setBucketWebsiteConfiguration(new SetBucketWebsiteConfigurationRequest(bucketName, defaultWebsite));
942 logger.info("AWS S3 bucket {} created", bucketName);
943 } catch (Exception e2) {
944 throw new ConfigurationException("Bucket " + bucketName + " cannot be created: " + e2.getMessage(), e2);
945 }
946 } else {
947 throw new ConfigurationException("Bucket " + bucketName + " exists, but we can't access it: " + e.getMessage(),
948 e);
949 }
950 }
951 }
952
953 public URI presignedURI(URI uri) throws URISyntaxException {
954 if (!presignedUrl) {
955 return uri;
956 }
957 String s3UrlPrefix = s3.getUrl(bucketName, "").toString();
958
959
960 if (uri.toString().startsWith(s3UrlPrefix)) {
961 String objectName = uri.toString().substring(s3UrlPrefix.length());
962 Date validUntil = new Date(System.currentTimeMillis() + presignedUrlValidDuration);
963 return s3.generatePresignedUrl(bucketName, objectName, validUntil).toURI();
964 } else {
965 return uri;
966 }
967 }
968
969
970
971 protected void setS3(AmazonS3 s3) {
972 this.s3 = s3;
973 }
974
975 protected void setS3TransferManager(TransferManager s3TransferManager) {
976 this.s3TransferManager = s3TransferManager;
977 }
978
979 protected void setBucketName(String bucketName) {
980 this.bucketName = bucketName;
981 }
982
983 protected void setOpencastDistributionUrl(String distributionUrl) {
984 opencastDistributionUrl = distributionUrl;
985 }
986
987
988 protected void setStorageTmp(String path) {
989 this.tmpPath = Paths.get(path, DEFAULT_TEMP_DIR);
990 try {
991 Files.createDirectories(tmpPath);
992 } catch (IOException e) {
993 logger.info("AWS S3 bucket cannot create {} ", tmpPath);
994 }
995 }
996
997 @Reference
998 @Override
999 public void setWorkspace(Workspace workspace) {
1000 super.setWorkspace(workspace);
1001 }
1002
1003 @Reference
1004 @Override
1005 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
1006 super.setServiceRegistry(serviceRegistry);
1007 }
1008
1009 @Reference
1010 @Override
1011 public void setSecurityService(SecurityService securityService) {
1012 super.setSecurityService(securityService);
1013 }
1014
1015 @Reference
1016 @Override
1017 public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
1018 super.setUserDirectoryService(userDirectoryService);
1019 }
1020
1021 @Reference
1022 @Override
1023 public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
1024 super.setOrganizationDirectoryService(organizationDirectoryService);
1025 }
1026
1027 }