View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.distribution.aws.s3;
22  
23  import static java.lang.String.format;
24  import static org.opencastproject.util.RequireUtil.notNull;
25  
26  import org.opencastproject.distribution.api.AbstractDistributionService;
27  import org.opencastproject.distribution.api.DistributionException;
28  import org.opencastproject.distribution.api.DistributionService;
29  import org.opencastproject.distribution.api.DownloadDistributionService;
30  import org.opencastproject.distribution.aws.s3.api.AwsS3DistributionService;
31  import org.opencastproject.job.api.Job;
32  import org.opencastproject.mediapackage.AdaptivePlaylist;
33  import org.opencastproject.mediapackage.MediaPackage;
34  import org.opencastproject.mediapackage.MediaPackageElement;
35  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
36  import org.opencastproject.mediapackage.MediaPackageElementParser;
37  import org.opencastproject.mediapackage.MediaPackageException;
38  import org.opencastproject.mediapackage.MediaPackageParser;
39  import org.opencastproject.mediapackage.Track;
40  import org.opencastproject.security.api.OrganizationDirectoryService;
41  import org.opencastproject.security.api.SecurityService;
42  import org.opencastproject.security.api.UserDirectoryService;
43  import org.opencastproject.serviceregistry.api.ServiceRegistry;
44  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
45  import org.opencastproject.util.ConfigurationException;
46  import org.opencastproject.util.LoadUtil;
47  import org.opencastproject.util.NotFoundException;
48  import org.opencastproject.util.OsgiUtil;
49  import org.opencastproject.workspace.api.Workspace;
50  
51  import com.amazonaws.AmazonClientException;
52  import com.amazonaws.AmazonServiceException;
53  import com.amazonaws.ClientConfiguration;
54  import com.amazonaws.HttpMethod;
55  import com.amazonaws.auth.AWSCredentialsProvider;
56  import com.amazonaws.auth.AWSStaticCredentialsProvider;
57  import com.amazonaws.auth.BasicAWSCredentials;
58  import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
59  import com.amazonaws.auth.policy.Policy;
60  import com.amazonaws.auth.policy.Principal;
61  import com.amazonaws.auth.policy.Statement;
62  import com.amazonaws.auth.policy.actions.S3Actions;
63  import com.amazonaws.auth.policy.resources.S3ObjectResource;
64  import com.amazonaws.client.builder.AwsClientBuilder;
65  import com.amazonaws.services.s3.AmazonS3;
66  import com.amazonaws.services.s3.AmazonS3ClientBuilder;
67  import com.amazonaws.services.s3.model.BucketWebsiteConfiguration;
68  import com.amazonaws.services.s3.model.SetBucketWebsiteConfigurationRequest;
69  import com.amazonaws.services.s3.transfer.TransferManager;
70  import com.amazonaws.services.s3.transfer.Upload;
71  import com.google.gson.Gson;
72  import com.google.gson.reflect.TypeToken;
73  
74  import org.apache.commons.io.FilenameUtils;
75  import org.apache.commons.lang3.BooleanUtils;
76  import org.apache.commons.lang3.StringUtils;
77  import org.apache.commons.lang3.math.NumberUtils;
78  import org.apache.http.client.methods.CloseableHttpResponse;
79  import org.apache.http.client.methods.HttpHead;
80  import org.apache.http.impl.client.CloseableHttpClient;
81  import org.apache.http.impl.client.HttpClients;
82  import org.osgi.service.component.ComponentContext;
83  import org.osgi.service.component.annotations.Activate;
84  import org.osgi.service.component.annotations.Component;
85  import org.osgi.service.component.annotations.Deactivate;
86  import org.osgi.service.component.annotations.Reference;
87  import org.slf4j.Logger;
88  import org.slf4j.LoggerFactory;
89  
90  import java.io.File;
91  import java.io.IOException;
92  import java.net.URI;
93  import java.net.URISyntaxException;
94  import java.nio.file.Files;
95  import java.nio.file.Path;
96  import java.nio.file.Paths;
97  import java.util.ArrayList;
98  import java.util.Arrays;
99  import java.util.Collections;
100 import java.util.Comparator;
101 import java.util.Date;
102 import java.util.HashMap;
103 import java.util.HashSet;
104 import java.util.List;
105 import java.util.Map;
106 import java.util.Map.Entry;
107 import java.util.Optional;
108 import java.util.Set;
109 import java.util.stream.Collectors;
110 import java.util.stream.Stream;
111 
112 import javax.servlet.http.HttpServletResponse;
113 
114 @Component(
115     immediate = true,
116     service = { DistributionService.class, DownloadDistributionService.class, AwsS3DistributionService.class },
117     property = {
118         "service.description=Distribution Service (AWS S3)",
119         "distribution.channel=aws.s3"
120     }
121 )
122 public class AwsS3DistributionServiceImpl extends AbstractDistributionService
123         implements AwsS3DistributionService, DistributionService {
124 
125   /** Logging facility */
126   private static final Logger logger = LoggerFactory.getLogger(AwsS3DistributionServiceImpl.class);
127 
128   /** Job type */
129   public static final String JOB_TYPE = "org.opencastproject.distribution.aws.s3";
130 
131   /** List of available operations on jobs */
132   public enum Operation {
133     Distribute, Retract
134   }
135 
136   // Service configuration
137   public static final String AWS_S3_DISTRIBUTION_ENABLE = "org.opencastproject.distribution.aws.s3.distribution.enable";
138   public static final String AWS_S3_DISTRIBUTION_BASE_CONFIG
139           = "org.opencastproject.distribution.aws.s3.distribution.base";
140   public static final String AWS_S3_ACCESS_KEY_ID_CONFIG = "org.opencastproject.distribution.aws.s3.access.id";
141   public static final String AWS_S3_SECRET_ACCESS_KEY_CONFIG = "org.opencastproject.distribution.aws.s3.secret.key";
142   public static final String AWS_S3_REGION_CONFIG = "org.opencastproject.distribution.aws.s3.region";
143   public static final String AWS_S3_BUCKET_CONFIG = "org.opencastproject.distribution.aws.s3.bucket";
144   public static final String AWS_S3_BUCKET_CONFIG_PREFIX = "org.opencastproject.distribution.aws.s3.bucket.";
145   public static final String AWS_S3_ENDPOINT_CONFIG = "org.opencastproject.distribution.aws.s3.endpoint";
146   public static final String AWS_S3_PATH_STYLE_CONFIG = "org.opencastproject.distribution.aws.s3.path.style";
147   public static final String AWS_S3_PRESIGNED_URL_CONFIG = "org.opencastproject.distribution.aws.s3.presigned.url";
148   public static final String AWS_S3_PRESIGNED_URL_VALID_DURATION_CONFIG
149       = "org.opencastproject.distribution.aws.s3.presigned.url.valid.duration";
150   // S3 client configuration
151   public static final String AWS_S3_MAX_CONNECTIONS = "org.opencastproject.distribution.aws.s3.max.connections";
152   public static final String AWS_S3_CONNECTION_TIMEOUT = "org.opencastproject.distribution.aws.s3.connection.timeout";
153   public static final String AWS_S3_MAX_RETRIES = "org.opencastproject.distribution.aws.s3.max.retries";
154   // job loads
155   public static final String DISTRIBUTE_JOB_LOAD_KEY = "job.load.aws.s3.distribute";
156   public static final String RETRACT_JOB_LOAD_KEY = "job.load.aws.s3.retract";
157 
158   // config.properties
159   public static final String OPENCAST_STORAGE_DIR = "org.opencastproject.storage.dir";
160   public static final String DEFAULT_TEMP_DIR = "tmp/s3dist";
161 
162   // Defaults
163 
164   // S3 client config defaults
165   public static final int DEFAULT_MAX_CONNECTIONS = 50;
166   public static final int DEFAULT_CONNECTION_TIMEOUT = 10000;
167   public static final int DEFAULT_MAX_RETRIES = 100;
168 
169   /** The load on the system introduced by creating a distribute job */
170   public static final float DEFAULT_DISTRIBUTE_JOB_LOAD = 0.1f;
171 
172   /** The load on the system introduced by creating a retract job */
173   public static final float DEFAULT_RETRACT_JOB_LOAD = 0.1f;
174 
175   /** Default expiration time for presigned URL in millis, 6 hours */
176   public static final int DEFAULT_PRESIGNED_URL_EXPIRE_MILLIS = 6 * 60 * 60 * 1000;
177 
178   /** Max expiration time for presigned URL in millis, 7 days */
179   private static final int MAXIMUM_PRESIGNED_URL_EXPIRE_MILLIS = 7 * 24 * 60 * 60 * 1000;
180 
181   /** The load on the system introduced by creating a distribute job */
182   private float distributeJobLoad = DEFAULT_DISTRIBUTE_JOB_LOAD;
183 
184   /** The load on the system introduced by creating a retract job */
185   private float retractJobLoad = DEFAULT_RETRACT_JOB_LOAD;
186 
187   /** Maximum number of tries for checking availability of distributed file */
188   private static final int MAX_TRIES = 10;
189 
190   /** Interval time in millis to sleep between checks of availability */
191   private static final long SLEEP_INTERVAL = 30000L;
192 
193   public static final String DEFAULT_ORG_KEY = "*";
194 
195   /** The AWS client and transfer manager */
196   private AmazonS3 s3 = null;
197   private TransferManager s3TransferManager = null;
198 
199   /** The AWS S3 org to bucket name mapping */
200   private Map<String, String> orgBucketNameMap = new HashMap<>();
201   private Path tmpPath = null;
202 
203   /** The AWS S3 endpoint */
204   private String endpoint = null;
205 
206   /** path style enabled */
207   private boolean pathStyle = false;
208 
209   /** whether use presigned URL */
210   private boolean presignedUrl = false;
211 
212   /** valid duration for presigned URL in milliseconds */
213   private int presignedUrlValidDuration = DEFAULT_PRESIGNED_URL_EXPIRE_MILLIS;
214 
215   /** The opencast download distribution url */
216   private String opencastDistributionUrl = null;
217 
218   private Gson gson = new Gson();
219 
220   /**
221    * Creates a new instance of the AWS S3 distribution service.
222    */
223   public AwsS3DistributionServiceImpl() {
224     super(JOB_TYPE);
225   }
226 
227   private String getAWSConfigKey(ComponentContext cc, String key) {
228     try {
229       return OsgiUtil.getComponentContextProperty(cc, key);
230     } catch (RuntimeException e) {
231       throw new ConfigurationException(key + " is missing or invalid", e);
232     }
233   }
234 
235   @Override
236   @Activate
237   public void activate(ComponentContext cc) {
238 
239     // Get the configuration
240     if (cc != null) {
241 
242       if (!BooleanUtils.toBoolean(getAWSConfigKey(cc, AWS_S3_DISTRIBUTION_ENABLE))) {
243         logger.info("AWS S3 distribution disabled");
244         return;
245       }
246 
247       tmpPath = Paths.get(cc.getBundleContext().getProperty(OPENCAST_STORAGE_DIR), DEFAULT_TEMP_DIR);
248 
249       // clean up old data and delete directory if it exists
250       if (tmpPath.toFile().exists()) {
251         try (Stream<Path> walk = Files.walk(tmpPath)) {
252           walk.map(Path::toFile).sorted(Comparator.reverseOrder()).forEach(File::delete);
253         } catch (IOException e) {
254           logger.warn("Unable to delete {}", tmpPath, e);
255         }
256       }
257       logger.info("AWS S3 Distribution uses temp storage in {}", tmpPath);
258       try { // create a new temp directory
259         Files.createDirectories(tmpPath);
260       } catch (IOException e) {
261         logger.error("Could not create temporary directory for AWS S3 Distribution : `{}`", tmpPath);
262         throw new IllegalStateException(e);
263       }
264 
265       // AWS S3 default bucket name
266       Optional<String> defaultBucketNameOpt = OsgiUtil.getOptCfg(cc.getProperties(), AWS_S3_BUCKET_CONFIG);
267       if (defaultBucketNameOpt.isPresent()) {
268         orgBucketNameMap.put(DEFAULT_ORG_KEY, defaultBucketNameOpt.get());
269         logger.info("AWS S3 default bucket name is {}", defaultBucketNameOpt.get());
270       }
271 
272       // AWS S3 org bucket name mapping
273       Collections.list(cc.getProperties().keys()).stream()
274           .filter(s -> s.startsWith(AWS_S3_BUCKET_CONFIG_PREFIX))
275           .forEach(s -> {
276             String orgId = s.substring(AWS_S3_BUCKET_CONFIG_PREFIX.length());
277             String bucketName = OsgiUtil.getComponentContextProperty(cc, s);
278             orgBucketNameMap.put(orgId, bucketName);
279           });
280 
281       if (orgBucketNameMap.isEmpty()) {
282         throw new ConfigurationException("AWS S3 distribution is enabled, but no buckets are configured");
283       }
284 
285       // AWS region
286       String regionStr = getAWSConfigKey(cc, AWS_S3_REGION_CONFIG);
287       logger.info("AWS region is {}", regionStr);
288 
289       // AWS endpoint
290       endpoint = OsgiUtil.getComponentContextProperty(cc, AWS_S3_ENDPOINT_CONFIG, "s3." + regionStr + ".amazonaws.com");
291       logger.info("AWS S3 endpoint is {}", endpoint);
292 
293       // AWS path style
294       pathStyle = BooleanUtils.toBoolean(OsgiUtil.getComponentContextProperty(cc, AWS_S3_PATH_STYLE_CONFIG, "false"));
295       logger.info("AWS path style is {}", pathStyle);
296 
297       // AWS presigned URL
298       String presignedUrlConfigValue = OsgiUtil.getComponentContextProperty(cc, AWS_S3_PRESIGNED_URL_CONFIG, "false");
299       presignedUrl = StringUtils.equalsIgnoreCase("true", presignedUrlConfigValue);
300       logger.info("AWS use presigned URL: {}", presignedUrl);
301 
302       // AWS presigned URL expiration time in millis
303       String presignedUrlExpTimeMillisConfigValue = OsgiUtil.getComponentContextProperty(cc,
304               AWS_S3_PRESIGNED_URL_VALID_DURATION_CONFIG, null);
305       presignedUrlValidDuration = NumberUtils.toInt(presignedUrlExpTimeMillisConfigValue,
306               DEFAULT_PRESIGNED_URL_EXPIRE_MILLIS);
307       if (presignedUrlValidDuration > MAXIMUM_PRESIGNED_URL_EXPIRE_MILLIS) {
308         logger.warn(
309                 "Valid duration of presigned URL is too large, MAXIMUM_PRESIGNED_URL_EXPIRE_MILLIS(7 days) is used");
310         presignedUrlValidDuration = MAXIMUM_PRESIGNED_URL_EXPIRE_MILLIS;
311       }
312 
313       opencastDistributionUrl = getAWSConfigKey(cc, AWS_S3_DISTRIBUTION_BASE_CONFIG);
314       if (!opencastDistributionUrl.endsWith("/")) {
315         opencastDistributionUrl = opencastDistributionUrl + "/";
316       }
317       logger.info("AWS distribution url is {}", opencastDistributionUrl);
318 
319       distributeJobLoad = LoadUtil.getConfiguredLoadValue(cc.getProperties(), DISTRIBUTE_JOB_LOAD_KEY,
320               DEFAULT_DISTRIBUTE_JOB_LOAD, serviceRegistry);
321       retractJobLoad = LoadUtil.getConfiguredLoadValue(cc.getProperties(), RETRACT_JOB_LOAD_KEY,
322               DEFAULT_RETRACT_JOB_LOAD, serviceRegistry);
323 
324       // Explicit credentials are optional.
325       AWSCredentialsProvider provider = null;
326       Optional<String> accessKeyIdOpt = OsgiUtil.getOptCfg(cc.getProperties(), AWS_S3_ACCESS_KEY_ID_CONFIG);
327       Optional<String> accessKeySecretOpt = OsgiUtil.getOptCfg(cc.getProperties(), AWS_S3_SECRET_ACCESS_KEY_CONFIG);
328 
329       // Keys not informed so use default credentials provider chain, which
330       // will look at the environment variables, java system props, credential files, and instance
331       // profile credentials
332       if (accessKeyIdOpt.isEmpty() && accessKeySecretOpt.isEmpty()) {
333         provider = new DefaultAWSCredentialsProviderChain();
334       } else {
335         provider = new AWSStaticCredentialsProvider(
336                 new BasicAWSCredentials(accessKeyIdOpt.get(), accessKeySecretOpt.get()));
337       }
338 
339       // S3 client configuration
340       ClientConfiguration clientConfiguration = new ClientConfiguration();
341 
342       int maxConnections = OsgiUtil.getOptCfgAsInt(cc.getProperties(), AWS_S3_MAX_CONNECTIONS)
343               .orElse(DEFAULT_MAX_CONNECTIONS);
344       logger.debug("Max Connections: {}", maxConnections);
345       clientConfiguration.setMaxConnections(maxConnections);
346 
347       int connectionTimeout = OsgiUtil.getOptCfgAsInt(cc.getProperties(), AWS_S3_CONNECTION_TIMEOUT)
348               .orElse(DEFAULT_CONNECTION_TIMEOUT);
349       logger.debug("Connection Output: {}", connectionTimeout);
350       clientConfiguration.setConnectionTimeout(connectionTimeout);
351 
352       int maxRetries = OsgiUtil.getOptCfgAsInt(cc.getProperties(), AWS_S3_MAX_RETRIES)
353               .orElse(DEFAULT_MAX_RETRIES);
354       logger.debug("Max Retry: {}", maxRetries);
355       clientConfiguration.setMaxErrorRetry(maxRetries);
356 
357       // Create AWS client
358       s3 = AmazonS3ClientBuilder.standard()
359               .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, regionStr))
360               .withClientConfiguration(clientConfiguration)
361               .withPathStyleAccessEnabled(pathStyle).withCredentials(provider).build();
362 
363       s3TransferManager = new TransferManager(s3);
364 
365       // Create AWS S3 bucket if not there yet
366       createAWSBucket();
367       distributionChannel = OsgiUtil.getComponentContextProperty(cc, CONFIG_KEY_STORE_TYPE);
368 
369       logger.info("AwsS3DistributionService activated!");
370     }
371   }
372 
373   @Override
374   public String getDistributionType() {
375     return distributionChannel;
376   }
377 
378   @Deactivate
379   public void deactivate() {
380     // Transfer manager is null if service disabled
381     if (s3TransferManager != null) {
382       s3TransferManager.shutdownNow();
383     }
384 
385     logger.info("AwsS3DistributionService deactivated!");
386   }
387 
388   @Override
389   public Job distribute(String pubChannelId, MediaPackage mediaPackage, Set<String> downloadIds,
390           boolean checkAvailability, boolean preserveReference) throws DistributionException, MediaPackageException {
391     throw new UnsupportedOperationException("Not supported yet.");
392     // stub function
393   }
394 
395   /**
396    * {@inheritDoc}
397    *
398    * @see org.opencastproject.distribution.api.DownloadDistributionService#distribute(String,
399    *      org.opencastproject.mediapackage.MediaPackage, String, boolean)
400    */
401   @Override
402   public Job distribute(String channelId, MediaPackage mediaPackage, Set<String> elementIds, boolean checkAvailability)
403           throws DistributionException, MediaPackageException {
404     notNull(mediaPackage, "mediapackage");
405     notNull(elementIds, "elementIds");
406     notNull(channelId, "channelId");
407     try {
408       return serviceRegistry.createJob(JOB_TYPE, Operation.Distribute.toString(), Arrays.asList(channelId,
409               MediaPackageParser.getAsXml(mediaPackage), gson.toJson(elementIds), Boolean.toString(checkAvailability)),
410               distributeJobLoad);
411     } catch (ServiceRegistryException e) {
412       throw new DistributionException("Unable to create a job", e);
413     }
414   }
415 
416   /**
417    * {@inheritDoc}
418    *
419    * @see org.opencastproject.distribution.api.DistributionService#distribute(String,
420    *      org.opencastproject.mediapackage.MediaPackage, String)
421    */
422   @Override
423   public Job distribute(String channelId, MediaPackage mediapackage, String elementId)
424           throws DistributionException, MediaPackageException {
425     return distribute(channelId, mediapackage, elementId, true);
426   }
427 
428   /**
429    * {@inheritDoc}
430    *
431    * @see org.opencastproject.distribution.api.DownloadDistributionService#distribute(String,
432    *      org.opencastproject.mediapackage.MediaPackage, String, boolean)
433    */
434   @Override
435   public Job distribute(String channelId, MediaPackage mediaPackage, String elementId, boolean checkAvailability)
436           throws DistributionException, MediaPackageException {
437     Set<String> elementIds = new HashSet<>();
438     elementIds.add(elementId);
439     return distribute(channelId, mediaPackage, elementIds, checkAvailability);
440   }
441 
442   /**
443    * Distribute Mediapackage elements to the download distribution service.
444    *
445    * @param channelId
446    *          # The id of the publication channel to be distributed to.
447    * @param mediapackage
448    *          The media package that contains the elements to be distributed.
449    * @param elementIds
450    *          The ids of the elements that should be distributed contained within the media package.
451    * @param checkAvailability
452    *          Check the availability of the distributed element via http.
453    * @return A reference to the MediaPackageElements that have been distributed.
454    * @throws DistributionException
455    *           Thrown if the parent directory of the MediaPackageElement cannot be created, if the MediaPackageElement
456    *           cannot be copied or another unexpected exception occurs.
457    */
458   public MediaPackageElement[] distributeElements(String channelId, MediaPackage mediapackage, Set<String> elementIds,
459           boolean checkAvailability) throws DistributionException {
460     notNull(mediapackage, "mediapackage");
461     notNull(elementIds, "elementIds");
462     notNull(channelId, "channelId");
463 
464     final Set<MediaPackageElement> elements = getElements(mediapackage, elementIds);
465     List<MediaPackageElement> distributedElements = new ArrayList<>();
466 
467     if (AdaptivePlaylist.hasHLSPlaylist(elements)) {
468       return distributeHLSElements(channelId, mediapackage, elements, checkAvailability);
469     }
470 
471     for (MediaPackageElement element : elements) {
472       MediaPackageElement distributedElement = distributeElement(channelId, mediapackage, element, checkAvailability);
473       distributedElements.add(distributedElement);
474     }
475     return distributedElements.toArray(new MediaPackageElement[distributedElements.size()]);
476   }
477 
478   private Set<MediaPackageElement> getElements(MediaPackage mediapackage, Set<String> elementIds)
479           throws IllegalStateException {
480     final Set<MediaPackageElement> elements = new HashSet<>();
481     for (String elementId : elementIds) {
482       MediaPackageElement element = mediapackage.getElementById(elementId);
483       if (element != null) {
484         elements.add(element);
485       } else {
486         throw new IllegalStateException(
487                 format("No element %s found in mediapackage %s", elementId, mediapackage.getIdentifier()));
488       }
489     }
490     return elements;
491   }
492 
493   /**
494    * Distribute a media package element to AWS S3.
495    *
496    * @param mediaPackage
497    *          The media package that contains the element to distribute.
498    * @param element
499    *          The element that should be distributed contained within the media package.
500    * @param checkAvailability
501    *          Checks if the distributed element is available
502    * @return A reference to the MediaPackageElement that has been distributed.
503    * @throws DistributionException
504    */
505   public MediaPackageElement distributeElement(String channelId, final MediaPackage mediaPackage,
506           MediaPackageElement element, boolean checkAvailability) throws DistributionException {
507     notNull(channelId, "channelId");
508     notNull(mediaPackage, "mediapackage");
509     notNull(element, "element");
510 
511     try {
512       return distributeElement(channelId, mediaPackage, element, checkAvailability, workspace.get(element.getURI()));
513     } catch (NotFoundException e) {
514       throw new DistributionException("Unable to find " + element.getURI() + " in the workspace", e);
515     } catch (IOException e) {
516       throw new DistributionException("Error loading " + element.getURI() + " from the workspace", e);
517     }
518   }
519 
520   private MediaPackageElement distributeElement(String channelId, final MediaPackage mediaPackage,
521           MediaPackageElement element, boolean checkAvailability, File source) throws DistributionException {
522 
523     // Use TransferManager to take advantage of multipart upload.
524     // TransferManager processes all transfers asynchronously, so this call will return immediately.
525     try {
526       String orgId = securityService.getOrganization().getId();
527       String bucketName = getBucketName(orgId);
528       String objectName = buildObjectName(channelId, mediaPackage.getIdentifier().toString(), element);
529       logger.info("Uploading {} to bucket {}...", objectName, bucketName);
530       Upload upload = s3TransferManager.upload(bucketName, objectName, source);
531       long start = System.currentTimeMillis();
532 
533       try {
534         // Block and wait for the upload to finish
535         upload.waitForCompletion();
536         logger.info("Upload of {} to bucket {} completed in {} seconds", objectName, bucketName,
537                 (System.currentTimeMillis() - start) / 1000);
538       } catch (AmazonClientException e) {
539         throw new DistributionException("AWS error: " + e.getMessage(), e);
540       }
541 
542       // Create a representation of the distributed file in the media package
543       MediaPackageElement distributedElement = (MediaPackageElement) element.clone();
544       try {
545         distributedElement.setURI(getDistributionUri(objectName));
546       } catch (URISyntaxException e) {
547         throw new DistributionException("Distributed element produces an invalid URI", e);
548       }
549 
550       logger.info("Distributed element {}, object {}", element.getIdentifier(), objectName);
551 
552       if (checkAvailability) {
553         URI uri = distributedElement.getURI();
554         String distributedElementUriStr = uri.toString();
555         int tries = 0;
556         CloseableHttpResponse response = null;
557         boolean success = false;
558         while (tries < MAX_TRIES) {
559           try {
560             if (presignedUrl) {
561               // 5 minutes should be enough for check availability for presigned URL.
562               Date fiveMinutesLater = new Date(System.currentTimeMillis() + 5 * 60 * 1000);
563               uri = s3.generatePresignedUrl(bucketName, objectName, fiveMinutesLater, HttpMethod.HEAD).toURI();
564             }
565             CloseableHttpClient httpClient = HttpClients.createDefault();
566             logger.trace("Trying to access {}", uri);
567             response = httpClient.execute(new HttpHead(uri));
568             if (response.getStatusLine().getStatusCode() == HttpServletResponse.SC_OK) {
569               logger.trace("Successfully got {}", uri);
570               success = true;
571               break; // Exit the loop, response is closed
572             } else {
573               logger.debug("Http status code when checking distributed element {} is {}", objectName,
574                       response.getStatusLine().getStatusCode());
575             }
576           } catch (Exception e) {
577             logger.info("Checking availability of {} threw exception {}. Trying again.", objectName, e.getMessage());
578             // Just try again
579           } finally {
580             if (null != response) {
581               response.close();
582             }
583           }
584           tries++;
585           logger.trace("Sleeping for {} seconds...", SLEEP_INTERVAL / 1000);
586           Thread.sleep(SLEEP_INTERVAL);
587         }
588         if (!success) {
589           logger.warn("Could not check availability of distributed file {}", uri);
590           // throw new DistributionException("Unable to load distributed file " + uri.toString());
591         }
592       }
593 
594       return distributedElement;
595     } catch (Exception e) {
596       logger.warn("Error distributing element " + element.getIdentifier() + " of media package " + mediaPackage, e);
597       if (e instanceof DistributionException) {
598         throw (DistributionException) e;
599       } else {
600         throw new DistributionException(e);
601       }
602     }
603   }
604 
605   @Override
606   public Job retract(String channelId, MediaPackage mediapackage, String elementId) throws DistributionException {
607     Set<String> elementIds = new HashSet<>();
608     elementIds.add(elementId);
609     return retract(channelId, mediapackage, elementIds);
610   }
611 
612   @Override
613   public Job retract(String channelId, MediaPackage mediapackage, Set<String> elementIds) throws DistributionException {
614     notNull(mediapackage, "mediapackage");
615     notNull(elementIds, "elementIds");
616     notNull(channelId, "channelId");
617     try {
618       return serviceRegistry.createJob(JOB_TYPE, Operation.Retract.toString(),
619               Arrays.asList(channelId, MediaPackageParser.getAsXml(mediapackage), gson.toJson(elementIds)),
620               retractJobLoad);
621     } catch (ServiceRegistryException e) {
622       throw new DistributionException("Unable to create a job", e);
623     }
624   }
625 
626   @Override
627   public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediapackage, String elementId)
628           throws DistributionException, MediaPackageException {
629     Set<String> elementIds = new HashSet<String>();
630     elementIds.add(elementId);
631     return distributeSync(channelId, mediapackage, elementIds, true);
632   }
633 
634   @Override
635   public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediapackage, Set<String> elementIds,
636           boolean checkAvailability) throws DistributionException {
637     final MediaPackageElement[] distributedElements = distributeElements(channelId, mediapackage, elementIds,
638             checkAvailability);
639     if (distributedElements == null) {
640       return null;
641     }
642     return Arrays.asList(distributedElements);
643   }
644 
645   @Override
646   public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediapackage, String elementId)
647           throws DistributionException {
648     Set<String> elementIds = new HashSet<String>();
649     elementIds.add(elementId);
650     return retractSync(channelId, mediapackage, elementIds);
651   }
652 
653   @Override
654   public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediaPackage, Set<String> elementIds)
655           throws DistributionException {
656     final MediaPackageElement[] retractedElements = retractElements(channelId, mediaPackage, elementIds);
657     if (retractedElements == null) {
658       return null;
659     }
660     return Arrays.asList(retractedElements);
661   }
662 
663   /**
664    * Retracts the media package element with the given identifier from the distribution channel.
665    *
666    * @param channelId
667    *          the channel id
668    * @param mediaPackage
669    *          the media package
670    * @param element
671    *          the element
672    * @return the retracted element or <code>null</code> if the element was not retracted
673    */
674   protected MediaPackageElement retractElement(String channelId, MediaPackage mediaPackage, MediaPackageElement element)
675           throws DistributionException {
676     notNull(mediaPackage, "mediaPackage");
677     notNull(element, "element");
678 
679     try {
680       String orgId = securityService.getOrganization().getId();
681       String bucketName = getBucketName(orgId);
682       String objectName = getDistributedObjectName(element);
683       if (objectName != null) {
684         s3.deleteObject(bucketName, objectName);
685         logger.info("Retracted element {}, object {}", element.getIdentifier(), objectName);
686       }
687       return element;
688     } catch (AmazonClientException e) {
689       throw new DistributionException("AWS error: " + e.getMessage(), e);
690     }
691   }
692 
693   /**
694    * Retract a media package element from the distribution channel. The retracted element must not necessarily be the
695    * one given as parameter <code>elementId</code>. Instead, the element's distribution URI will be calculated. This way
696    * you are able to retract elements by providing the "original" element here.
697    *
698    * @param channelId
699    *          the channel id
700    * @param mediapackage
701    *          the mediapackage
702    * @param elementIds
703    *          the element identifiers
704    * @return the retracted element or <code>null</code> if the element was not retracted
705    * @throws org.opencastproject.distribution.api.DistributionException
706    *           in case of an error
707    */
708   protected MediaPackageElement[] retractElements(String channelId, MediaPackage mediapackage, Set<String> elementIds)
709           throws DistributionException {
710     notNull(mediapackage, "mediapackage");
711     notNull(elementIds, "elementIds");
712     notNull(channelId, "channelId");
713 
714     Set<MediaPackageElement> elements = getElements(mediapackage, elementIds);
715     List<MediaPackageElement> retractedElements = new ArrayList<>();
716 
717     for (MediaPackageElement element : elements) {
718       MediaPackageElement retractedElement = retractElement(channelId, mediapackage, element);
719       retractedElements.add(retractedElement);
720     }
721     return retractedElements.toArray(new MediaPackageElement[retractedElements.size()]);
722   }
723 
724   /**
725    * Builds the aws s3 object name.
726    *
727    * @param channelId
728    * @param mpId
729    * @param element
730    * @return
731    */
732   protected String buildObjectName(String channelId, String mpId, MediaPackageElement element) {
733     // Something like ORG_ID/CHANNEL_ID/MP_ID/ELEMENT_ID/FILE_NAME.EXTENSION
734     final String orgId = securityService.getOrganization().getId();
735     String uriString = element.getURI().toString();
736     String fileName = FilenameUtils.getName(uriString);
737     return buildObjectName(orgId, channelId, mpId, element.getIdentifier(), fileName);
738   }
739 
740   /**
741    * Builds the aws s3 object name using the raw elementID and filename
742    *
743    * @param orgId
744    * @param channelId
745    * @param mpId
746    * @param elementId
747    * @param fileName
748    * @return
749    */
750   protected String buildObjectName(String orgId, String channelId, String mpId, String elementId, String fileName) {
751     return StringUtils.join(new String[] { orgId, channelId, mpId, elementId, fileName }, "/");
752   }
753 
754   /**
755    * Gets the URI for the element to be distributed.
756    *
757    * @return The resulting URI after distribution
758    * @throws URISyntaxException
759    *           if the concrete implementation tries to create a malformed uri
760    */
761   protected URI getDistributionUri(String objectName) throws URISyntaxException {
762     // Something like https://OPENCAST_DOWNLOAD_URL/ORG_ID/CHANNEL_ID/MP_ID/ELEMENT_ID/FILE_NAME.EXTENSION
763     return new URI(opencastDistributionUrl + objectName);
764   }
765 
766   /**
767    * Gets the distributed object's name.
768    *
769    * @return The distributed object name
770    */
771   protected String getDistributedObjectName(MediaPackageElement element) {
772     // Something like https://OPENCAST_DOWNLOAD_URL/ORG_ID/CHANNEL_ID/MP_ID/ORIGINAL_ELEMENT_ID/FILE_NAME.EXTENSION
773     String uriString = element.getURI().toString();
774 
775     // String directoryName = distributionDirectory.getAbsolutePath();
776     if (uriString.startsWith(opencastDistributionUrl) && uriString.length() > opencastDistributionUrl.length()) {
777       return uriString.substring(opencastDistributionUrl.length());
778     } else {
779       // Cannot retract
780       logger.warn(
781           "Cannot retract {}. Uri must be in the format "
782               + "https://host/bucketName/orgId/channelId/mpId/originalElementId/fileName.extension",
783           uriString);
784       return null;
785     }
786   }
787 
788   /**
789    * Distribute static items, create a temp directory for playlists, modify them to fix references, then publish the new
790    * list and then delete the temp files. This is used if there are any HLS playlists in the mediapackage, all the
791    * videos in the publication should be HLS or progressive, but not both. However, If this is called with non HLS
792    * files, it will distribute them anyway.
793    *
794    * @param channelId
795    *          - distribution channel
796    * @param mediapackage
797    *          - that holds all the files
798    * @param elements
799    *          - all the elements for publication
800    * @param checkAvailability
801    *          - check before pub
802    * @return distributed elements
803    * @throws DistributionException
804    * @throws IOException
805    */
806   private MediaPackageElement[] distributeHLSElements(String channelId, MediaPackage mediapackage,
807           Set<MediaPackageElement> elements, boolean checkAvailability) throws DistributionException {
808 
809     List<MediaPackageElement> distributedElements = new ArrayList<MediaPackageElement>();
810     List<MediaPackageElement> nontrackElements = elements.stream()
811             .filter(e -> e.getElementType() != MediaPackageElement.Type.Track).collect(Collectors.toList());
812     // Distribute non track items
813     for (MediaPackageElement element : nontrackElements) {
814       MediaPackageElement distributedElement = distributeElement(channelId, mediapackage, element, checkAvailability);
815       distributedElements.add(distributedElement);
816     }
817     // Then get all tracks from mediapackage and sort them by flavor
818     // Each flavor is one video with multiple renditions
819     List<Track> trackElements = elements.stream().filter(e -> e.getElementType() == MediaPackageElement.Type.Track)
820             .map(e -> (Track) e).collect(Collectors.toList());
821     HashMap<MediaPackageElementFlavor, List<Track>> trackElementsMap
822         = new HashMap<MediaPackageElementFlavor, List<Track>>();
823     for (Track t : trackElements) {
824       List<Track> l = trackElementsMap.get(t.getFlavor());
825       if (l == null) {
826         l = new ArrayList<Track>();
827       }
828       l.add(t);
829       trackElementsMap.put(t.getFlavor(), l);
830     }
831 
832     Path tmpDir = null;
833     try {
834       tmpDir = Files.createTempDirectory(tmpPath, mediapackage.getIdentifier().toString());
835       // Run distribution one flavor at a time
836       for (Entry<MediaPackageElementFlavor, List<Track>> elementSet : trackElementsMap.entrySet()) {
837         List<Track> tracks = elementSet.getValue();
838         try {
839           List<Track> transformedTracks = new ArrayList<Track>();
840           // If there are playlists in this flavor
841           if (tracks.stream().anyMatch(AdaptivePlaylist.isHLSTrackPred)) {
842             // For each adaptive playlist, get all the HLS files from the track URI
843             // and put them into a temporary directory
844             List<Track> tmpTracks = new ArrayList<Track>();
845             for (Track t : tracks) {
846 
847               Track tcopy = (Track) t.clone();
848               String newName = "./" + t.getURI().getPath();
849               Path newPath = tmpDir.resolve(newName).normalize();
850               Files.createDirectories(newPath.getParent());
851               // If this flavor is a HLS playlist and therefore has internal references
852               if (AdaptivePlaylist.isPlaylist(t)) {
853                 File f = workspace.get(t.getURI()); // Get actual file
854                 Path plcopy = Files.copy(f.toPath(), newPath);
855                 tcopy.setURI(plcopy.toUri()); // make it into an URI from filesystem
856               } else {
857                 Path plcopy = Files.createFile(newPath); // new Empty File, only care about the URI
858                 tcopy.setURI(plcopy.toUri());
859               }
860               tmpTracks.add(tcopy);
861             }
862             // The playlists' references are then replaced with relative links
863             tmpTracks = AdaptivePlaylist.fixReferences(tmpTracks, tmpDir.toFile()); // replace with fixed elements
864             // after fixing it, we retrieve the new playlist files and discard the old
865             // we collect the mp4 tracks and the playlists and put them into transformedTracks
866             tracks.stream().filter(AdaptivePlaylist.isHLSTrackPred.negate()).forEach(t -> transformedTracks.add(t));
867             tmpTracks.stream().filter(AdaptivePlaylist.isHLSTrackPred).forEach(t -> transformedTracks.add(t));
868           } else {
869             transformedTracks.addAll(tracks); // not playlists, distribute anyway
870           }
871           for (Track track : transformedTracks) {
872             MediaPackageElement distributedElement;
873             if (AdaptivePlaylist.isPlaylist(track)) {
874               distributedElement = distributeElement(channelId, mediapackage, track, checkAvailability,
875                       new File(track.getURI()));
876             } else {
877               distributedElement = distributeElement(channelId, mediapackage, track, checkAvailability);
878             }
879             distributedElements.add(distributedElement);
880           }
881         } catch (MediaPackageException | NotFoundException | IOException e1) {
882           logger.error("HLS Prepare failed for mediapackage {} in {}", elementSet.getKey(), mediapackage, e1);
883           throw new DistributionException("Cannot distribute " + mediapackage);
884         } catch (URISyntaxException e1) {
885           logger.error("HLS Prepare failed - Bad URI syntax {} in {}", elementSet.getKey(), mediapackage, e1);
886           throw new DistributionException("Cannot distribute - BAD URI syntax " + mediapackage);
887         }
888       }
889     } catch (IOException e2) {
890       throw new DistributionException("Cannot create tmp dir to process HLS:" + mediapackage + e2.getMessage());
891     } finally {
892       // Clean up temp dir
893       try (Stream<Path> walk = Files.walk(tmpDir)) {
894         walk.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
895       } catch (IOException e) {
896         logger.warn("Cannot delete tmp dir for processing HLS mp {}, path {}", mediapackage, tmpPath, e);
897       }
898     }
899     return distributedElements.toArray(new MediaPackageElement[distributedElements.size()]);
900   }
901 
902   /**
903    * {@inheritDoc}
904    *
905    * @see org.opencastproject.job.api.AbstractJobProducer#process(org.opencastproject.job.api.Job)
906    */
907   @Override
908   protected String process(Job job) throws Exception {
909     Operation op = null;
910     String operation = job.getOperation();
911     List<String> arguments = job.getArguments();
912     try {
913       op = Operation.valueOf(operation);
914       String channelId = arguments.get(0);
915       MediaPackage mediaPackage = MediaPackageParser.getFromXml(arguments.get(1));
916       Set<String> elementIds = gson.fromJson(arguments.get(2), new TypeToken<Set<String>>() {
917       }.getType());
918       switch (op) {
919         case Distribute:
920           Boolean checkAvailability = Boolean.parseBoolean(arguments.get(3));
921           MediaPackageElement[] distributedElements = distributeElements(channelId, mediaPackage, elementIds,
922                   checkAvailability);
923           return (distributedElements != null)
924                   ? MediaPackageElementParser.getArrayAsXml(Arrays.asList(distributedElements))
925                   : null;
926         case Retract:
927           MediaPackageElement[] retractedElements = retractElements(channelId, mediaPackage, elementIds);
928           return (retractedElements != null) ? MediaPackageElementParser.getArrayAsXml(Arrays.asList(retractedElements))
929                   : null;
930         default:
931           throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
932       }
933     } catch (IllegalArgumentException e) {
934       throw new ServiceRegistryException("This service can't handle operations of type '" + op + "'", e);
935     } catch (IndexOutOfBoundsException e) {
936       throw new ServiceRegistryException("This argument list for operation '" + op + "' does not meet expectations", e);
937     } catch (Exception e) {
938       throw new ServiceRegistryException("Error handling operation '" + op + "'", e);
939     }
940   }
941 
942   /**
943    * Creates the AWS S3 bucket if it doesn't exist yet.
944    */
945   protected void createAWSBucket() {
946     orgBucketNameMap.forEach((org, bucketName) -> {
947       // Does bucket exist?
948       try {
949         s3.listObjects(bucketName);
950       } catch (AmazonServiceException e) {
951         if (e.getStatusCode() == 404) {
952           // Create the bucket
953           try {
954             s3.createBucket(bucketName);
955             // Allow public read
956             Statement allowPublicReadStatement = new Statement(Statement.Effect.Allow)
957                 .withPrincipals(Principal.AllUsers)
958                 .withActions(S3Actions.GetObject)
959                 .withResources(new S3ObjectResource(bucketName, "*"));
960             Policy policy = new Policy().withStatements(allowPublicReadStatement);
961             s3.setBucketPolicy(bucketName, policy.toJson());
962 
963             // Set the website configuration. This needs to be static-site-enabled currently.
964             BucketWebsiteConfiguration defaultWebsite = new BucketWebsiteConfiguration();
965             // These files don't actually exist, but that doesn't matter since no one should be looking around in the
966             // bucket anyway.
967             defaultWebsite.setIndexDocumentSuffix("index.html");
968             defaultWebsite.setErrorDocument("error.html");
969             s3.setBucketWebsiteConfiguration(new SetBucketWebsiteConfigurationRequest(bucketName, defaultWebsite));
970             logger.info("AWS S3 bucket {} created", bucketName);
971           } catch (Exception e2) {
972             throw new ConfigurationException("Bucket " + bucketName + " cannot be created: " + e2.getMessage(), e2);
973           }
974         } else {
975           throw new ConfigurationException("Bucket " + bucketName + " exists, but we can't access it: "
976               + e.getMessage(), e);
977         }
978       }
979     });
980   }
981 
982   public URI presignedURI(URI uri) throws URISyntaxException {
983     if (!presignedUrl) {
984       return uri;
985     }
986     String orgId = securityService.getOrganization().getId();
987     String bucketName = getBucketName(orgId);
988     String s3UrlPrefix = s3.getUrl(bucketName, "").toString();
989 
990     // Only handle URIs match s3 domain and bucket
991     if (uri.toString().startsWith(s3UrlPrefix)) {
992       String objectName = uri.toString().substring(s3UrlPrefix.length());
993       Date validUntil = new Date(System.currentTimeMillis() + presignedUrlValidDuration);
994       return s3.generatePresignedUrl(bucketName, objectName, validUntil).toURI();
995     } else {
996       return uri;
997     }
998   }
999 
1000   /** The methods below are used by the test class */
1001 
1002   protected void setS3(AmazonS3 s3) {
1003     this.s3 = s3;
1004   }
1005 
1006   protected void setS3TransferManager(TransferManager s3TransferManager) {
1007     this.s3TransferManager = s3TransferManager;
1008   }
1009 
1010   protected void setBucketName(String orgId, String bucketName) {
1011     orgBucketNameMap.put(orgId, bucketName);
1012   }
1013 
1014   protected void setOpencastDistributionUrl(String distributionUrl) {
1015     opencastDistributionUrl = distributionUrl;
1016   }
1017 
1018   // Use by unit test
1019   protected void setStorageTmp(String path) {
1020     this.tmpPath = Paths.get(path, DEFAULT_TEMP_DIR);
1021     try {
1022       Files.createDirectories(tmpPath);
1023     } catch (IOException e) {
1024       logger.info("AWS S3 bucket cannot create {} ", tmpPath);
1025     }
1026   }
1027 
1028   private String getBucketName(String orgId) {
1029     String bucketName = orgBucketNameMap.get(orgId);
1030     if (bucketName == null) {
1031       // check if we have a default bucket name
1032       bucketName = orgBucketNameMap.get(DEFAULT_ORG_KEY);
1033       if (bucketName == null) {
1034         throw new ConfigurationException("No bucket configured for organization " + orgId);
1035       }
1036     }
1037     return bucketName;
1038   }
1039 
1040   @Reference
1041   @Override
1042   public void setWorkspace(Workspace workspace) {
1043     super.setWorkspace(workspace);
1044   }
1045 
1046   @Reference
1047   @Override
1048   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
1049     super.setServiceRegistry(serviceRegistry);
1050   }
1051 
1052   @Reference
1053   @Override
1054   public void setSecurityService(SecurityService securityService) {
1055     super.setSecurityService(securityService);
1056   }
1057 
1058   @Reference
1059   @Override
1060   public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
1061     super.setUserDirectoryService(userDirectoryService);
1062   }
1063 
1064   @Reference
1065   @Override
1066   public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
1067     super.setOrganizationDirectoryService(organizationDirectoryService);
1068   }
1069 }