1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.distribution.aws.s3;
22
23 import static java.lang.String.format;
24 import static org.opencastproject.util.RequireUtil.notNull;
25
26 import org.opencastproject.distribution.api.AbstractDistributionService;
27 import org.opencastproject.distribution.api.DistributionException;
28 import org.opencastproject.distribution.api.DistributionService;
29 import org.opencastproject.distribution.api.DownloadDistributionService;
30 import org.opencastproject.distribution.aws.s3.api.AwsS3DistributionService;
31 import org.opencastproject.job.api.Job;
32 import org.opencastproject.mediapackage.AdaptivePlaylist;
33 import org.opencastproject.mediapackage.MediaPackage;
34 import org.opencastproject.mediapackage.MediaPackageElement;
35 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
36 import org.opencastproject.mediapackage.MediaPackageElementParser;
37 import org.opencastproject.mediapackage.MediaPackageException;
38 import org.opencastproject.mediapackage.MediaPackageParser;
39 import org.opencastproject.mediapackage.Track;
40 import org.opencastproject.security.api.OrganizationDirectoryService;
41 import org.opencastproject.security.api.SecurityService;
42 import org.opencastproject.security.api.UserDirectoryService;
43 import org.opencastproject.serviceregistry.api.ServiceRegistry;
44 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
45 import org.opencastproject.util.ConfigurationException;
46 import org.opencastproject.util.LoadUtil;
47 import org.opencastproject.util.NotFoundException;
48 import org.opencastproject.util.OsgiUtil;
49 import org.opencastproject.util.data.Option;
50 import org.opencastproject.workspace.api.Workspace;
51
52 import com.amazonaws.AmazonClientException;
53 import com.amazonaws.AmazonServiceException;
54 import com.amazonaws.ClientConfiguration;
55 import com.amazonaws.HttpMethod;
56 import com.amazonaws.auth.AWSCredentialsProvider;
57 import com.amazonaws.auth.AWSStaticCredentialsProvider;
58 import com.amazonaws.auth.BasicAWSCredentials;
59 import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
60 import com.amazonaws.auth.policy.Policy;
61 import com.amazonaws.auth.policy.Principal;
62 import com.amazonaws.auth.policy.Statement;
63 import com.amazonaws.auth.policy.actions.S3Actions;
64 import com.amazonaws.auth.policy.resources.S3ObjectResource;
65 import com.amazonaws.client.builder.AwsClientBuilder;
66 import com.amazonaws.services.s3.AmazonS3;
67 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
68 import com.amazonaws.services.s3.model.BucketWebsiteConfiguration;
69 import com.amazonaws.services.s3.model.SetBucketWebsiteConfigurationRequest;
70 import com.amazonaws.services.s3.transfer.TransferManager;
71 import com.amazonaws.services.s3.transfer.Upload;
72 import com.google.gson.Gson;
73 import com.google.gson.reflect.TypeToken;
74
75 import org.apache.commons.io.FilenameUtils;
76 import org.apache.commons.lang3.BooleanUtils;
77 import org.apache.commons.lang3.StringUtils;
78 import org.apache.commons.lang3.math.NumberUtils;
79 import org.apache.http.client.methods.CloseableHttpResponse;
80 import org.apache.http.client.methods.HttpHead;
81 import org.apache.http.impl.client.CloseableHttpClient;
82 import org.apache.http.impl.client.HttpClients;
83 import org.osgi.service.component.ComponentContext;
84 import org.osgi.service.component.annotations.Activate;
85 import org.osgi.service.component.annotations.Component;
86 import org.osgi.service.component.annotations.Deactivate;
87 import org.osgi.service.component.annotations.Reference;
88 import org.slf4j.Logger;
89 import org.slf4j.LoggerFactory;
90
91 import java.io.File;
92 import java.io.IOException;
93 import java.net.URI;
94 import java.net.URISyntaxException;
95 import java.nio.file.Files;
96 import java.nio.file.Path;
97 import java.nio.file.Paths;
98 import java.util.ArrayList;
99 import java.util.Arrays;
100 import java.util.Collections;
101 import java.util.Comparator;
102 import java.util.Date;
103 import java.util.HashMap;
104 import java.util.HashSet;
105 import java.util.List;
106 import java.util.Map;
107 import java.util.Map.Entry;
108 import java.util.Set;
109 import java.util.stream.Collectors;
110 import java.util.stream.Stream;
111
112 import javax.servlet.http.HttpServletResponse;
113
114 @Component(
115 immediate = true,
116 service = { DistributionService.class, DownloadDistributionService.class, AwsS3DistributionService.class },
117 property = {
118 "service.description=Distribution Service (AWS S3)",
119 "distribution.channel=aws.s3"
120 }
121 )
122 public class AwsS3DistributionServiceImpl extends AbstractDistributionService
123 implements AwsS3DistributionService, DistributionService {
124
125
126 private static final Logger logger = LoggerFactory.getLogger(AwsS3DistributionServiceImpl.class);
127
128
129 public static final String JOB_TYPE = "org.opencastproject.distribution.aws.s3";
130
131
132 public enum Operation {
133 Distribute, Retract
134 }
135
136
137 public static final String AWS_S3_DISTRIBUTION_ENABLE = "org.opencastproject.distribution.aws.s3.distribution.enable";
138 public static final String AWS_S3_DISTRIBUTION_BASE_CONFIG
139 = "org.opencastproject.distribution.aws.s3.distribution.base";
140 public static final String AWS_S3_ACCESS_KEY_ID_CONFIG = "org.opencastproject.distribution.aws.s3.access.id";
141 public static final String AWS_S3_SECRET_ACCESS_KEY_CONFIG = "org.opencastproject.distribution.aws.s3.secret.key";
142 public static final String AWS_S3_REGION_CONFIG = "org.opencastproject.distribution.aws.s3.region";
143 public static final String AWS_S3_BUCKET_CONFIG = "org.opencastproject.distribution.aws.s3.bucket";
144 public static final String AWS_S3_BUCKET_CONFIG_PREFIX = "org.opencastproject.distribution.aws.s3.bucket.";
145 public static final String AWS_S3_ENDPOINT_CONFIG = "org.opencastproject.distribution.aws.s3.endpoint";
146 public static final String AWS_S3_PATH_STYLE_CONFIG = "org.opencastproject.distribution.aws.s3.path.style";
147 public static final String AWS_S3_PRESIGNED_URL_CONFIG = "org.opencastproject.distribution.aws.s3.presigned.url";
148 public static final String AWS_S3_PRESIGNED_URL_VALID_DURATION_CONFIG
149 = "org.opencastproject.distribution.aws.s3.presigned.url.valid.duration";
150
151 public static final String AWS_S3_MAX_CONNECTIONS = "org.opencastproject.distribution.aws.s3.max.connections";
152 public static final String AWS_S3_CONNECTION_TIMEOUT = "org.opencastproject.distribution.aws.s3.connection.timeout";
153 public static final String AWS_S3_MAX_RETRIES = "org.opencastproject.distribution.aws.s3.max.retries";
154
155 public static final String DISTRIBUTE_JOB_LOAD_KEY = "job.load.aws.s3.distribute";
156 public static final String RETRACT_JOB_LOAD_KEY = "job.load.aws.s3.retract";
157
158
159 public static final String OPENCAST_STORAGE_DIR = "org.opencastproject.storage.dir";
160 public static final String DEFAULT_TEMP_DIR = "tmp/s3dist";
161
162
163
164
165 public static final int DEFAULT_MAX_CONNECTIONS = 50;
166 public static final int DEFAULT_CONNECTION_TIMEOUT = 10000;
167 public static final int DEFAULT_MAX_RETRIES = 100;
168
169
170 public static final float DEFAULT_DISTRIBUTE_JOB_LOAD = 0.1f;
171
172
173 public static final float DEFAULT_RETRACT_JOB_LOAD = 0.1f;
174
175
176 public static final int DEFAULT_PRESIGNED_URL_EXPIRE_MILLIS = 6 * 60 * 60 * 1000;
177
178
179 private static final int MAXIMUM_PRESIGNED_URL_EXPIRE_MILLIS = 7 * 24 * 60 * 60 * 1000;
180
181
182 private float distributeJobLoad = DEFAULT_DISTRIBUTE_JOB_LOAD;
183
184
185 private float retractJobLoad = DEFAULT_RETRACT_JOB_LOAD;
186
187
188 private static final int MAX_TRIES = 10;
189
190
191 private static final long SLEEP_INTERVAL = 30000L;
192
193 public static final String DEFAULT_ORG_KEY = "*";
194
195
196 private AmazonS3 s3 = null;
197 private TransferManager s3TransferManager = null;
198
199
200 private Map<String, String> orgBucketNameMap = new HashMap<>();
201 private Path tmpPath = null;
202
203
204 private String endpoint = null;
205
206
207 private boolean pathStyle = false;
208
209
210 private boolean presignedUrl = false;
211
212
213 private int presignedUrlValidDuration = DEFAULT_PRESIGNED_URL_EXPIRE_MILLIS;
214
215
216 private String opencastDistributionUrl = null;
217
218 private Gson gson = new Gson();
219
220
221
222
223 public AwsS3DistributionServiceImpl() {
224 super(JOB_TYPE);
225 }
226
227 private String getAWSConfigKey(ComponentContext cc, String key) {
228 try {
229 return OsgiUtil.getComponentContextProperty(cc, key);
230 } catch (RuntimeException e) {
231 throw new ConfigurationException(key + " is missing or invalid", e);
232 }
233 }
234
235 @Override
236 @Activate
237 public void activate(ComponentContext cc) {
238
239
240 if (cc != null) {
241
242 if (!BooleanUtils.toBoolean(getAWSConfigKey(cc, AWS_S3_DISTRIBUTION_ENABLE))) {
243 logger.info("AWS S3 distribution disabled");
244 return;
245 }
246
247 tmpPath = Paths.get(cc.getBundleContext().getProperty(OPENCAST_STORAGE_DIR), DEFAULT_TEMP_DIR);
248
249
250 if (tmpPath.toFile().exists()) {
251 try (Stream<Path> walk = Files.walk(tmpPath)) {
252 walk.map(Path::toFile).sorted(Comparator.reverseOrder()).forEach(File::delete);
253 } catch (IOException e) {
254 logger.warn("Unable to delete {}", tmpPath, e);
255 }
256 }
257 logger.info("AWS S3 Distribution uses temp storage in {}", tmpPath);
258 try {
259 Files.createDirectories(tmpPath);
260 } catch (IOException e) {
261 logger.error("Could not create temporary directory for AWS S3 Distribution : `{}`", tmpPath);
262 throw new IllegalStateException(e);
263 }
264
265
266 Option<String> defaultBucketNameOpt = OsgiUtil.getOptCfg(cc.getProperties(), AWS_S3_BUCKET_CONFIG);
267 if (defaultBucketNameOpt.isSome()) {
268 orgBucketNameMap.put(DEFAULT_ORG_KEY, defaultBucketNameOpt.get());
269 logger.info("AWS S3 default bucket name is {}", defaultBucketNameOpt.get());
270 }
271
272
273 Collections.list(cc.getProperties().keys()).stream()
274 .filter(s -> s.startsWith(AWS_S3_BUCKET_CONFIG_PREFIX))
275 .forEach(s -> {
276 String orgId = s.substring(AWS_S3_BUCKET_CONFIG_PREFIX.length());
277 String bucketName = OsgiUtil.getComponentContextProperty(cc, s);
278 orgBucketNameMap.put(orgId, bucketName);
279 });
280
281 if (orgBucketNameMap.isEmpty()) {
282 throw new ConfigurationException("AWS S3 distribution is enabled, but no buckets are configured");
283 }
284
285
286 String regionStr = getAWSConfigKey(cc, AWS_S3_REGION_CONFIG);
287 logger.info("AWS region is {}", regionStr);
288
289
290 endpoint = OsgiUtil.getComponentContextProperty(cc, AWS_S3_ENDPOINT_CONFIG, "s3." + regionStr + ".amazonaws.com");
291 logger.info("AWS S3 endpoint is {}", endpoint);
292
293
294 pathStyle = BooleanUtils.toBoolean(OsgiUtil.getComponentContextProperty(cc, AWS_S3_PATH_STYLE_CONFIG, "false"));
295 logger.info("AWS path style is {}", pathStyle);
296
297
298 String presignedUrlConfigValue = OsgiUtil.getComponentContextProperty(cc, AWS_S3_PRESIGNED_URL_CONFIG, "false");
299 presignedUrl = StringUtils.equalsIgnoreCase("true", presignedUrlConfigValue);
300 logger.info("AWS use presigned URL: {}", presignedUrl);
301
302
303 String presignedUrlExpTimeMillisConfigValue = OsgiUtil.getComponentContextProperty(cc,
304 AWS_S3_PRESIGNED_URL_VALID_DURATION_CONFIG, null);
305 presignedUrlValidDuration = NumberUtils.toInt(presignedUrlExpTimeMillisConfigValue,
306 DEFAULT_PRESIGNED_URL_EXPIRE_MILLIS);
307 if (presignedUrlValidDuration > MAXIMUM_PRESIGNED_URL_EXPIRE_MILLIS) {
308 logger.warn(
309 "Valid duration of presigned URL is too large, MAXIMUM_PRESIGNED_URL_EXPIRE_MILLIS(7 days) is used");
310 presignedUrlValidDuration = MAXIMUM_PRESIGNED_URL_EXPIRE_MILLIS;
311 }
312
313 opencastDistributionUrl = getAWSConfigKey(cc, AWS_S3_DISTRIBUTION_BASE_CONFIG);
314 if (!opencastDistributionUrl.endsWith("/")) {
315 opencastDistributionUrl = opencastDistributionUrl + "/";
316 }
317 logger.info("AWS distribution url is {}", opencastDistributionUrl);
318
319 distributeJobLoad = LoadUtil.getConfiguredLoadValue(cc.getProperties(), DISTRIBUTE_JOB_LOAD_KEY,
320 DEFAULT_DISTRIBUTE_JOB_LOAD, serviceRegistry);
321 retractJobLoad = LoadUtil.getConfiguredLoadValue(cc.getProperties(), RETRACT_JOB_LOAD_KEY,
322 DEFAULT_RETRACT_JOB_LOAD, serviceRegistry);
323
324
325 AWSCredentialsProvider provider = null;
326 Option<String> accessKeyIdOpt = OsgiUtil.getOptCfg(cc.getProperties(), AWS_S3_ACCESS_KEY_ID_CONFIG);
327 Option<String> accessKeySecretOpt = OsgiUtil.getOptCfg(cc.getProperties(), AWS_S3_SECRET_ACCESS_KEY_CONFIG);
328
329
330
331
332 if (accessKeyIdOpt.isNone() && accessKeySecretOpt.isNone()) {
333 provider = new DefaultAWSCredentialsProviderChain();
334 } else {
335 provider = new AWSStaticCredentialsProvider(
336 new BasicAWSCredentials(accessKeyIdOpt.get(), accessKeySecretOpt.get()));
337 }
338
339
340 ClientConfiguration clientConfiguration = new ClientConfiguration();
341
342 int maxConnections = OsgiUtil.getOptCfgAsInt(cc.getProperties(), AWS_S3_MAX_CONNECTIONS)
343 .getOrElse(DEFAULT_MAX_CONNECTIONS);
344 logger.debug("Max Connections: {}", maxConnections);
345 clientConfiguration.setMaxConnections(maxConnections);
346
347 int connectionTimeout = OsgiUtil.getOptCfgAsInt(cc.getProperties(), AWS_S3_CONNECTION_TIMEOUT)
348 .getOrElse(DEFAULT_CONNECTION_TIMEOUT);
349 logger.debug("Connection Output: {}", connectionTimeout);
350 clientConfiguration.setConnectionTimeout(connectionTimeout);
351
352 int maxRetries = OsgiUtil.getOptCfgAsInt(cc.getProperties(), AWS_S3_MAX_RETRIES)
353 .getOrElse(DEFAULT_MAX_RETRIES);
354 logger.debug("Max Retry: {}", maxRetries);
355 clientConfiguration.setMaxErrorRetry(maxRetries);
356
357
358 s3 = AmazonS3ClientBuilder.standard()
359 .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, regionStr))
360 .withClientConfiguration(clientConfiguration)
361 .withPathStyleAccessEnabled(pathStyle).withCredentials(provider).build();
362
363 s3TransferManager = new TransferManager(s3);
364
365
366 createAWSBucket();
367 distributionChannel = OsgiUtil.getComponentContextProperty(cc, CONFIG_KEY_STORE_TYPE);
368
369 logger.info("AwsS3DistributionService activated!");
370 }
371 }
372
373 @Override
374 public String getDistributionType() {
375 return distributionChannel;
376 }
377
378 @Deactivate
379 public void deactivate() {
380
381 if (s3TransferManager != null) {
382 s3TransferManager.shutdownNow();
383 }
384
385 logger.info("AwsS3DistributionService deactivated!");
386 }
387
388 @Override
389 public Job distribute(String pubChannelId, MediaPackage mediaPackage, Set<String> downloadIds,
390 boolean checkAvailability, boolean preserveReference) throws DistributionException, MediaPackageException {
391 throw new UnsupportedOperationException("Not supported yet.");
392
393 }
394
395
396
397
398
399
400
401 @Override
402 public Job distribute(String channelId, MediaPackage mediaPackage, Set<String> elementIds, boolean checkAvailability)
403 throws DistributionException, MediaPackageException {
404 notNull(mediaPackage, "mediapackage");
405 notNull(elementIds, "elementIds");
406 notNull(channelId, "channelId");
407 try {
408 return serviceRegistry.createJob(JOB_TYPE, Operation.Distribute.toString(), Arrays.asList(channelId,
409 MediaPackageParser.getAsXml(mediaPackage), gson.toJson(elementIds), Boolean.toString(checkAvailability)),
410 distributeJobLoad);
411 } catch (ServiceRegistryException e) {
412 throw new DistributionException("Unable to create a job", e);
413 }
414 }
415
416
417
418
419
420
421
422 @Override
423 public Job distribute(String channelId, MediaPackage mediapackage, String elementId)
424 throws DistributionException, MediaPackageException {
425 return distribute(channelId, mediapackage, elementId, true);
426 }
427
428
429
430
431
432
433
434 @Override
435 public Job distribute(String channelId, MediaPackage mediaPackage, String elementId, boolean checkAvailability)
436 throws DistributionException, MediaPackageException {
437 Set<String> elementIds = new HashSet<>();
438 elementIds.add(elementId);
439 return distribute(channelId, mediaPackage, elementIds, checkAvailability);
440 }
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458 public MediaPackageElement[] distributeElements(String channelId, MediaPackage mediapackage, Set<String> elementIds,
459 boolean checkAvailability) throws DistributionException {
460 notNull(mediapackage, "mediapackage");
461 notNull(elementIds, "elementIds");
462 notNull(channelId, "channelId");
463
464 final Set<MediaPackageElement> elements = getElements(mediapackage, elementIds);
465 List<MediaPackageElement> distributedElements = new ArrayList<>();
466
467 if (AdaptivePlaylist.hasHLSPlaylist(elements)) {
468 return distributeHLSElements(channelId, mediapackage, elements, checkAvailability);
469 }
470
471 for (MediaPackageElement element : elements) {
472 MediaPackageElement distributedElement = distributeElement(channelId, mediapackage, element, checkAvailability);
473 distributedElements.add(distributedElement);
474 }
475 return distributedElements.toArray(new MediaPackageElement[distributedElements.size()]);
476 }
477
478 private Set<MediaPackageElement> getElements(MediaPackage mediapackage, Set<String> elementIds)
479 throws IllegalStateException {
480 final Set<MediaPackageElement> elements = new HashSet<>();
481 for (String elementId : elementIds) {
482 MediaPackageElement element = mediapackage.getElementById(elementId);
483 if (element != null) {
484 elements.add(element);
485 } else {
486 throw new IllegalStateException(
487 format("No element %s found in mediapackage %s", elementId, mediapackage.getIdentifier()));
488 }
489 }
490 return elements;
491 }
492
493
494
495
496
497
498
499
500
501
502
503
504
505 public MediaPackageElement distributeElement(String channelId, final MediaPackage mediaPackage,
506 MediaPackageElement element, boolean checkAvailability) throws DistributionException {
507 notNull(channelId, "channelId");
508 notNull(mediaPackage, "mediapackage");
509 notNull(element, "element");
510
511 try {
512 return distributeElement(channelId, mediaPackage, element, checkAvailability, workspace.get(element.getURI()));
513 } catch (NotFoundException e) {
514 throw new DistributionException("Unable to find " + element.getURI() + " in the workspace", e);
515 } catch (IOException e) {
516 throw new DistributionException("Error loading " + element.getURI() + " from the workspace", e);
517 }
518 }
519
520 private MediaPackageElement distributeElement(String channelId, final MediaPackage mediaPackage,
521 MediaPackageElement element, boolean checkAvailability, File source) throws DistributionException {
522
523
524
525 try {
526 String orgId = securityService.getOrganization().getId();
527 String bucketName = getBucketName(orgId);
528 String objectName = buildObjectName(channelId, mediaPackage.getIdentifier().toString(), element);
529 logger.info("Uploading {} to bucket {}...", objectName, bucketName);
530 Upload upload = s3TransferManager.upload(bucketName, objectName, source);
531 long start = System.currentTimeMillis();
532
533 try {
534
535 upload.waitForCompletion();
536 logger.info("Upload of {} to bucket {} completed in {} seconds", objectName, bucketName,
537 (System.currentTimeMillis() - start) / 1000);
538 } catch (AmazonClientException e) {
539 throw new DistributionException("AWS error: " + e.getMessage(), e);
540 }
541
542
543 MediaPackageElement distributedElement = (MediaPackageElement) element.clone();
544 try {
545 distributedElement.setURI(getDistributionUri(objectName));
546 } catch (URISyntaxException e) {
547 throw new DistributionException("Distributed element produces an invalid URI", e);
548 }
549
550 logger.info("Distributed element {}, object {}", element.getIdentifier(), objectName);
551
552 if (checkAvailability) {
553 URI uri = distributedElement.getURI();
554 String distributedElementUriStr = uri.toString();
555 int tries = 0;
556 CloseableHttpResponse response = null;
557 boolean success = false;
558 while (tries < MAX_TRIES) {
559 try {
560 if (presignedUrl) {
561
562 Date fiveMinutesLater = new Date(System.currentTimeMillis() + 5 * 60 * 1000);
563 uri = s3.generatePresignedUrl(bucketName, objectName, fiveMinutesLater, HttpMethod.HEAD).toURI();
564 }
565 CloseableHttpClient httpClient = HttpClients.createDefault();
566 logger.trace("Trying to access {}", uri);
567 response = httpClient.execute(new HttpHead(uri));
568 if (response.getStatusLine().getStatusCode() == HttpServletResponse.SC_OK) {
569 logger.trace("Successfully got {}", uri);
570 success = true;
571 break;
572 } else {
573 logger.debug("Http status code when checking distributed element {} is {}", objectName,
574 response.getStatusLine().getStatusCode());
575 }
576 } catch (Exception e) {
577 logger.info("Checking availability of {} threw exception {}. Trying again.", objectName, e.getMessage());
578
579 } finally {
580 if (null != response) {
581 response.close();
582 }
583 }
584 tries++;
585 logger.trace("Sleeping for {} seconds...", SLEEP_INTERVAL / 1000);
586 Thread.sleep(SLEEP_INTERVAL);
587 }
588 if (!success) {
589 logger.warn("Could not check availability of distributed file {}", uri);
590
591 }
592 }
593
594 return distributedElement;
595 } catch (Exception e) {
596 logger.warn("Error distributing element " + element.getIdentifier() + " of media package " + mediaPackage, e);
597 if (e instanceof DistributionException) {
598 throw (DistributionException) e;
599 } else {
600 throw new DistributionException(e);
601 }
602 }
603 }
604
605 @Override
606 public Job retract(String channelId, MediaPackage mediapackage, String elementId) throws DistributionException {
607 Set<String> elementIds = new HashSet<>();
608 elementIds.add(elementId);
609 return retract(channelId, mediapackage, elementIds);
610 }
611
612 @Override
613 public Job retract(String channelId, MediaPackage mediapackage, Set<String> elementIds) throws DistributionException {
614 notNull(mediapackage, "mediapackage");
615 notNull(elementIds, "elementIds");
616 notNull(channelId, "channelId");
617 try {
618 return serviceRegistry.createJob(JOB_TYPE, Operation.Retract.toString(),
619 Arrays.asList(channelId, MediaPackageParser.getAsXml(mediapackage), gson.toJson(elementIds)),
620 retractJobLoad);
621 } catch (ServiceRegistryException e) {
622 throw new DistributionException("Unable to create a job", e);
623 }
624 }
625
626 @Override
627 public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediapackage, String elementId)
628 throws DistributionException, MediaPackageException {
629 Set<String> elementIds = new HashSet<String>();
630 elementIds.add(elementId);
631 return distributeSync(channelId, mediapackage, elementIds, true);
632 }
633
634 @Override
635 public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediapackage, Set<String> elementIds,
636 boolean checkAvailability) throws DistributionException {
637 final MediaPackageElement[] distributedElements = distributeElements(channelId, mediapackage, elementIds,
638 checkAvailability);
639 if (distributedElements == null) {
640 return null;
641 }
642 return Arrays.asList(distributedElements);
643 }
644
645 @Override
646 public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediapackage, String elementId)
647 throws DistributionException {
648 Set<String> elementIds = new HashSet<String>();
649 elementIds.add(elementId);
650 return retractSync(channelId, mediapackage, elementIds);
651 }
652
653 @Override
654 public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediaPackage, Set<String> elementIds)
655 throws DistributionException {
656 final MediaPackageElement[] retractedElements = retractElements(channelId, mediaPackage, elementIds);
657 if (retractedElements == null) {
658 return null;
659 }
660 return Arrays.asList(retractedElements);
661 }
662
663
664
665
666
667
668
669
670
671
672
673
674 protected MediaPackageElement retractElement(String channelId, MediaPackage mediaPackage, MediaPackageElement element)
675 throws DistributionException {
676 notNull(mediaPackage, "mediaPackage");
677 notNull(element, "element");
678
679 try {
680 String orgId = securityService.getOrganization().getId();
681 String bucketName = getBucketName(orgId);
682 String objectName = getDistributedObjectName(element);
683 if (objectName != null) {
684 s3.deleteObject(bucketName, objectName);
685 logger.info("Retracted element {}, object {}", element.getIdentifier(), objectName);
686 }
687 return element;
688 } catch (AmazonClientException e) {
689 throw new DistributionException("AWS error: " + e.getMessage(), e);
690 }
691 }
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708 protected MediaPackageElement[] retractElements(String channelId, MediaPackage mediapackage, Set<String> elementIds)
709 throws DistributionException {
710 notNull(mediapackage, "mediapackage");
711 notNull(elementIds, "elementIds");
712 notNull(channelId, "channelId");
713
714 Set<MediaPackageElement> elements = getElements(mediapackage, elementIds);
715 List<MediaPackageElement> retractedElements = new ArrayList<>();
716
717 for (MediaPackageElement element : elements) {
718 MediaPackageElement retractedElement = retractElement(channelId, mediapackage, element);
719 retractedElements.add(retractedElement);
720 }
721 return retractedElements.toArray(new MediaPackageElement[retractedElements.size()]);
722 }
723
724
725
726
727
728
729
730
731
732 protected String buildObjectName(String channelId, String mpId, MediaPackageElement element) {
733
734 final String orgId = securityService.getOrganization().getId();
735 String uriString = element.getURI().toString();
736 String fileName = FilenameUtils.getName(uriString);
737 return buildObjectName(orgId, channelId, mpId, element.getIdentifier(), fileName);
738 }
739
740
741
742
743
744
745
746
747
748
749
750 protected String buildObjectName(String orgId, String channelId, String mpId, String elementId, String fileName) {
751 return StringUtils.join(new String[] { orgId, channelId, mpId, elementId, fileName }, "/");
752 }
753
754
755
756
757
758
759
760
761 protected URI getDistributionUri(String objectName) throws URISyntaxException {
762
763 return new URI(opencastDistributionUrl + objectName);
764 }
765
766
767
768
769
770
771 protected String getDistributedObjectName(MediaPackageElement element) {
772
773 String uriString = element.getURI().toString();
774
775
776 if (uriString.startsWith(opencastDistributionUrl) && uriString.length() > opencastDistributionUrl.length()) {
777 return uriString.substring(opencastDistributionUrl.length());
778 } else {
779
780 logger.warn(
781 "Cannot retract {}. Uri must be in the format "
782 + "https://host/bucketName/orgId/channelId/mpId/originalElementId/fileName.extension",
783 uriString);
784 return null;
785 }
786 }
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806 private MediaPackageElement[] distributeHLSElements(String channelId, MediaPackage mediapackage,
807 Set<MediaPackageElement> elements, boolean checkAvailability) throws DistributionException {
808
809 List<MediaPackageElement> distributedElements = new ArrayList<MediaPackageElement>();
810 List<MediaPackageElement> nontrackElements = elements.stream()
811 .filter(e -> e.getElementType() != MediaPackageElement.Type.Track).collect(Collectors.toList());
812
813 for (MediaPackageElement element : nontrackElements) {
814 MediaPackageElement distributedElement = distributeElement(channelId, mediapackage, element, checkAvailability);
815 distributedElements.add(distributedElement);
816 }
817
818
819 List<Track> trackElements = elements.stream().filter(e -> e.getElementType() == MediaPackageElement.Type.Track)
820 .map(e -> (Track) e).collect(Collectors.toList());
821 HashMap<MediaPackageElementFlavor, List<Track>> trackElementsMap
822 = new HashMap<MediaPackageElementFlavor, List<Track>>();
823 for (Track t : trackElements) {
824 List<Track> l = trackElementsMap.get(t.getFlavor());
825 if (l == null) {
826 l = new ArrayList<Track>();
827 }
828 l.add(t);
829 trackElementsMap.put(t.getFlavor(), l);
830 }
831
832 Path tmpDir = null;
833 try {
834 tmpDir = Files.createTempDirectory(tmpPath, mediapackage.getIdentifier().toString());
835
836 for (Entry<MediaPackageElementFlavor, List<Track>> elementSet : trackElementsMap.entrySet()) {
837 List<Track> tracks = elementSet.getValue();
838 try {
839 List<Track> transformedTracks = new ArrayList<Track>();
840
841 if (tracks.stream().anyMatch(AdaptivePlaylist.isHLSTrackPred)) {
842
843
844 List<Track> tmpTracks = new ArrayList<Track>();
845 for (Track t : tracks) {
846
847 Track tcopy = (Track) t.clone();
848 String newName = "./" + t.getURI().getPath();
849 Path newPath = tmpDir.resolve(newName).normalize();
850 Files.createDirectories(newPath.getParent());
851
852 if (AdaptivePlaylist.isPlaylist(t)) {
853 File f = workspace.get(t.getURI());
854 Path plcopy = Files.copy(f.toPath(), newPath);
855 tcopy.setURI(plcopy.toUri());
856 } else {
857 Path plcopy = Files.createFile(newPath);
858 tcopy.setURI(plcopy.toUri());
859 }
860 tmpTracks.add(tcopy);
861 }
862
863 tmpTracks = AdaptivePlaylist.fixReferences(tmpTracks, tmpDir.toFile());
864
865
866 tracks.stream().filter(AdaptivePlaylist.isHLSTrackPred.negate()).forEach(t -> transformedTracks.add(t));
867 tmpTracks.stream().filter(AdaptivePlaylist.isHLSTrackPred).forEach(t -> transformedTracks.add(t));
868 } else {
869 transformedTracks.addAll(tracks);
870 }
871 for (Track track : transformedTracks) {
872 MediaPackageElement distributedElement;
873 if (AdaptivePlaylist.isPlaylist(track)) {
874 distributedElement = distributeElement(channelId, mediapackage, track, checkAvailability,
875 new File(track.getURI()));
876 } else {
877 distributedElement = distributeElement(channelId, mediapackage, track, checkAvailability);
878 }
879 distributedElements.add(distributedElement);
880 }
881 } catch (MediaPackageException | NotFoundException | IOException e1) {
882 logger.error("HLS Prepare failed for mediapackage {} in {}", elementSet.getKey(), mediapackage, e1);
883 throw new DistributionException("Cannot distribute " + mediapackage);
884 } catch (URISyntaxException e1) {
885 logger.error("HLS Prepare failed - Bad URI syntax {} in {}", elementSet.getKey(), mediapackage, e1);
886 throw new DistributionException("Cannot distribute - BAD URI syntax " + mediapackage);
887 }
888 }
889 } catch (IOException e2) {
890 throw new DistributionException("Cannot create tmp dir to process HLS:" + mediapackage + e2.getMessage());
891 } finally {
892
893 try (Stream<Path> walk = Files.walk(tmpDir)) {
894 walk.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
895 } catch (IOException e) {
896 logger.warn("Cannot delete tmp dir for processing HLS mp {}, path {}", mediapackage, tmpPath, e);
897 }
898 }
899 return distributedElements.toArray(new MediaPackageElement[distributedElements.size()]);
900 }
901
902
903
904
905
906
907 @Override
908 protected String process(Job job) throws Exception {
909 Operation op = null;
910 String operation = job.getOperation();
911 List<String> arguments = job.getArguments();
912 try {
913 op = Operation.valueOf(operation);
914 String channelId = arguments.get(0);
915 MediaPackage mediaPackage = MediaPackageParser.getFromXml(arguments.get(1));
916 Set<String> elementIds = gson.fromJson(arguments.get(2), new TypeToken<Set<String>>() {
917 }.getType());
918 switch (op) {
919 case Distribute:
920 Boolean checkAvailability = Boolean.parseBoolean(arguments.get(3));
921 MediaPackageElement[] distributedElements = distributeElements(channelId, mediaPackage, elementIds,
922 checkAvailability);
923 return (distributedElements != null)
924 ? MediaPackageElementParser.getArrayAsXml(Arrays.asList(distributedElements))
925 : null;
926 case Retract:
927 MediaPackageElement[] retractedElements = retractElements(channelId, mediaPackage, elementIds);
928 return (retractedElements != null) ? MediaPackageElementParser.getArrayAsXml(Arrays.asList(retractedElements))
929 : null;
930 default:
931 throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
932 }
933 } catch (IllegalArgumentException e) {
934 throw new ServiceRegistryException("This service can't handle operations of type '" + op + "'", e);
935 } catch (IndexOutOfBoundsException e) {
936 throw new ServiceRegistryException("This argument list for operation '" + op + "' does not meet expectations", e);
937 } catch (Exception e) {
938 throw new ServiceRegistryException("Error handling operation '" + op + "'", e);
939 }
940 }
941
942
943
944
945 protected void createAWSBucket() {
946 orgBucketNameMap.forEach((org, bucketName) -> {
947
948 try {
949 s3.listObjects(bucketName);
950 } catch (AmazonServiceException e) {
951 if (e.getStatusCode() == 404) {
952
953 try {
954 s3.createBucket(bucketName);
955
956 Statement allowPublicReadStatement = new Statement(Statement.Effect.Allow)
957 .withPrincipals(Principal.AllUsers)
958 .withActions(S3Actions.GetObject)
959 .withResources(new S3ObjectResource(bucketName, "*"));
960 Policy policy = new Policy().withStatements(allowPublicReadStatement);
961 s3.setBucketPolicy(bucketName, policy.toJson());
962
963
964 BucketWebsiteConfiguration defaultWebsite = new BucketWebsiteConfiguration();
965
966
967 defaultWebsite.setIndexDocumentSuffix("index.html");
968 defaultWebsite.setErrorDocument("error.html");
969 s3.setBucketWebsiteConfiguration(new SetBucketWebsiteConfigurationRequest(bucketName, defaultWebsite));
970 logger.info("AWS S3 bucket {} created", bucketName);
971 } catch (Exception e2) {
972 throw new ConfigurationException("Bucket " + bucketName + " cannot be created: " + e2.getMessage(), e2);
973 }
974 } else {
975 throw new ConfigurationException("Bucket " + bucketName + " exists, but we can't access it: "
976 + e.getMessage(), e);
977 }
978 }
979 });
980 }
981
982 public URI presignedURI(URI uri) throws URISyntaxException {
983 if (!presignedUrl) {
984 return uri;
985 }
986 String orgId = securityService.getOrganization().getId();
987 String bucketName = getBucketName(orgId);
988 String s3UrlPrefix = s3.getUrl(bucketName, "").toString();
989
990
991 if (uri.toString().startsWith(s3UrlPrefix)) {
992 String objectName = uri.toString().substring(s3UrlPrefix.length());
993 Date validUntil = new Date(System.currentTimeMillis() + presignedUrlValidDuration);
994 return s3.generatePresignedUrl(bucketName, objectName, validUntil).toURI();
995 } else {
996 return uri;
997 }
998 }
999
1000
1001
1002 protected void setS3(AmazonS3 s3) {
1003 this.s3 = s3;
1004 }
1005
1006 protected void setS3TransferManager(TransferManager s3TransferManager) {
1007 this.s3TransferManager = s3TransferManager;
1008 }
1009
1010 protected void setBucketName(String orgId, String bucketName) {
1011 orgBucketNameMap.put(orgId, bucketName);
1012 }
1013
1014 protected void setOpencastDistributionUrl(String distributionUrl) {
1015 opencastDistributionUrl = distributionUrl;
1016 }
1017
1018
1019 protected void setStorageTmp(String path) {
1020 this.tmpPath = Paths.get(path, DEFAULT_TEMP_DIR);
1021 try {
1022 Files.createDirectories(tmpPath);
1023 } catch (IOException e) {
1024 logger.info("AWS S3 bucket cannot create {} ", tmpPath);
1025 }
1026 }
1027
1028 private String getBucketName(String orgId) {
1029 String bucketName = orgBucketNameMap.get(orgId);
1030 if (bucketName == null) {
1031
1032 bucketName = orgBucketNameMap.get(DEFAULT_ORG_KEY);
1033 if (bucketName == null) {
1034 throw new ConfigurationException("No bucket configured for organization " + orgId);
1035 }
1036 }
1037 return bucketName;
1038 }
1039
1040 @Reference
1041 @Override
1042 public void setWorkspace(Workspace workspace) {
1043 super.setWorkspace(workspace);
1044 }
1045
1046 @Reference
1047 @Override
1048 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
1049 super.setServiceRegistry(serviceRegistry);
1050 }
1051
1052 @Reference
1053 @Override
1054 public void setSecurityService(SecurityService securityService) {
1055 super.setSecurityService(securityService);
1056 }
1057
1058 @Reference
1059 @Override
1060 public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
1061 super.setUserDirectoryService(userDirectoryService);
1062 }
1063
1064 @Reference
1065 @Override
1066 public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
1067 super.setOrganizationDirectoryService(organizationDirectoryService);
1068 }
1069 }