View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.distribution.aws.s3;
22  
23  import static java.lang.String.format;
24  import static org.opencastproject.util.RequireUtil.notNull;
25  
26  import org.opencastproject.distribution.api.AbstractDistributionService;
27  import org.opencastproject.distribution.api.DistributionException;
28  import org.opencastproject.distribution.api.DistributionService;
29  import org.opencastproject.distribution.api.DownloadDistributionService;
30  import org.opencastproject.distribution.aws.s3.api.AwsS3DistributionService;
31  import org.opencastproject.job.api.Job;
32  import org.opencastproject.mediapackage.AdaptivePlaylist;
33  import org.opencastproject.mediapackage.MediaPackage;
34  import org.opencastproject.mediapackage.MediaPackageElement;
35  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
36  import org.opencastproject.mediapackage.MediaPackageElementParser;
37  import org.opencastproject.mediapackage.MediaPackageException;
38  import org.opencastproject.mediapackage.MediaPackageParser;
39  import org.opencastproject.mediapackage.Track;
40  import org.opencastproject.security.api.OrganizationDirectoryService;
41  import org.opencastproject.security.api.SecurityService;
42  import org.opencastproject.security.api.UserDirectoryService;
43  import org.opencastproject.serviceregistry.api.ServiceRegistry;
44  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
45  import org.opencastproject.util.ConfigurationException;
46  import org.opencastproject.util.LoadUtil;
47  import org.opencastproject.util.NotFoundException;
48  import org.opencastproject.util.OsgiUtil;
49  import org.opencastproject.util.data.Option;
50  import org.opencastproject.workspace.api.Workspace;
51  
52  import com.amazonaws.AmazonClientException;
53  import com.amazonaws.AmazonServiceException;
54  import com.amazonaws.ClientConfiguration;
55  import com.amazonaws.HttpMethod;
56  import com.amazonaws.auth.AWSCredentialsProvider;
57  import com.amazonaws.auth.AWSStaticCredentialsProvider;
58  import com.amazonaws.auth.BasicAWSCredentials;
59  import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
60  import com.amazonaws.auth.policy.Policy;
61  import com.amazonaws.auth.policy.Principal;
62  import com.amazonaws.auth.policy.Statement;
63  import com.amazonaws.auth.policy.actions.S3Actions;
64  import com.amazonaws.auth.policy.resources.S3ObjectResource;
65  import com.amazonaws.client.builder.AwsClientBuilder;
66  import com.amazonaws.services.s3.AmazonS3;
67  import com.amazonaws.services.s3.AmazonS3ClientBuilder;
68  import com.amazonaws.services.s3.model.BucketWebsiteConfiguration;
69  import com.amazonaws.services.s3.model.SetBucketWebsiteConfigurationRequest;
70  import com.amazonaws.services.s3.transfer.TransferManager;
71  import com.amazonaws.services.s3.transfer.Upload;
72  import com.google.gson.Gson;
73  import com.google.gson.reflect.TypeToken;
74  
75  import org.apache.commons.io.FilenameUtils;
76  import org.apache.commons.lang3.BooleanUtils;
77  import org.apache.commons.lang3.StringUtils;
78  import org.apache.commons.lang3.math.NumberUtils;
79  import org.apache.http.client.methods.CloseableHttpResponse;
80  import org.apache.http.client.methods.HttpHead;
81  import org.apache.http.impl.client.CloseableHttpClient;
82  import org.apache.http.impl.client.HttpClients;
83  import org.osgi.service.component.ComponentContext;
84  import org.osgi.service.component.annotations.Activate;
85  import org.osgi.service.component.annotations.Component;
86  import org.osgi.service.component.annotations.Deactivate;
87  import org.osgi.service.component.annotations.Reference;
88  import org.slf4j.Logger;
89  import org.slf4j.LoggerFactory;
90  
91  import java.io.File;
92  import java.io.IOException;
93  import java.net.URI;
94  import java.net.URISyntaxException;
95  import java.nio.file.Files;
96  import java.nio.file.Path;
97  import java.nio.file.Paths;
98  import java.util.ArrayList;
99  import java.util.Arrays;
100 import java.util.Comparator;
101 import java.util.Date;
102 import java.util.HashMap;
103 import java.util.HashSet;
104 import java.util.List;
105 import java.util.Map.Entry;
106 import java.util.Set;
107 import java.util.stream.Collectors;
108 import java.util.stream.Stream;
109 
110 import javax.servlet.http.HttpServletResponse;
111 
112 @Component(
113     immediate = true,
114     service = { DistributionService.class, DownloadDistributionService.class, AwsS3DistributionService.class },
115     property = {
116         "service.description=Distribution Service (AWS S3)",
117         "distribution.channel=aws.s3"
118     }
119 )
120 public class AwsS3DistributionServiceImpl extends AbstractDistributionService
121         implements AwsS3DistributionService, DistributionService {
122 
123   /** Logging facility */
124   private static final Logger logger = LoggerFactory.getLogger(AwsS3DistributionServiceImpl.class);
125 
126   /** Job type */
127   public static final String JOB_TYPE = "org.opencastproject.distribution.aws.s3";
128 
129   /** List of available operations on jobs */
130   public enum Operation {
131     Distribute, Retract
132   }
133 
134   // Service configuration
135   public static final String AWS_S3_DISTRIBUTION_ENABLE = "org.opencastproject.distribution.aws.s3.distribution.enable";
136   public static final String AWS_S3_DISTRIBUTION_BASE_CONFIG
137           = "org.opencastproject.distribution.aws.s3.distribution.base";
138   public static final String AWS_S3_ACCESS_KEY_ID_CONFIG = "org.opencastproject.distribution.aws.s3.access.id";
139   public static final String AWS_S3_SECRET_ACCESS_KEY_CONFIG = "org.opencastproject.distribution.aws.s3.secret.key";
140   public static final String AWS_S3_REGION_CONFIG = "org.opencastproject.distribution.aws.s3.region";
141   public static final String AWS_S3_BUCKET_CONFIG = "org.opencastproject.distribution.aws.s3.bucket";
142   public static final String AWS_S3_ENDPOINT_CONFIG = "org.opencastproject.distribution.aws.s3.endpoint";
143   public static final String AWS_S3_PATH_STYLE_CONFIG = "org.opencastproject.distribution.aws.s3.path.style";
144   public static final String AWS_S3_PRESIGNED_URL_CONFIG = "org.opencastproject.distribution.aws.s3.presigned.url";
145   public static final String AWS_S3_PRESIGNED_URL_VALID_DURATION_CONFIG
146       = "org.opencastproject.distribution.aws.s3.presigned.url.valid.duration";
147   // S3 client configuration
148   public static final String AWS_S3_MAX_CONNECTIONS = "org.opencastproject.distribution.aws.s3.max.connections";
149   public static final String AWS_S3_CONNECTION_TIMEOUT = "org.opencastproject.distribution.aws.s3.connection.timeout";
150   public static final String AWS_S3_MAX_RETRIES = "org.opencastproject.distribution.aws.s3.max.retries";
151   // job loads
152   public static final String DISTRIBUTE_JOB_LOAD_KEY = "job.load.aws.s3.distribute";
153   public static final String RETRACT_JOB_LOAD_KEY = "job.load.aws.s3.retract";
154 
155   // config.properties
156   public static final String OPENCAST_STORAGE_DIR = "org.opencastproject.storage.dir";
157   public static final String DEFAULT_TEMP_DIR = "tmp/s3dist";
158 
159   // Defaults
160 
161   // S3 client config defaults
162   public static final int DEFAULT_MAX_CONNECTIONS = 50;
163   public static final int DEFAULT_CONNECTION_TIMEOUT = 10000;
164   public static final int DEFAULT_MAX_RETRIES = 100;
165 
166   /** The load on the system introduced by creating a distribute job */
167   public static final float DEFAULT_DISTRIBUTE_JOB_LOAD = 0.1f;
168 
169   /** The load on the system introduced by creating a retract job */
170   public static final float DEFAULT_RETRACT_JOB_LOAD = 0.1f;
171 
172   /** Default expiration time for presigned URL in millis, 6 hours */
173   public static final int DEFAULT_PRESIGNED_URL_EXPIRE_MILLIS = 6 * 60 * 60 * 1000;
174 
175   /** Max expiration time for presigned URL in millis, 7 days */
176   private static final int MAXIMUM_PRESIGNED_URL_EXPIRE_MILLIS = 7 * 24 * 60 * 60 * 1000;
177 
178   /** The load on the system introduced by creating a distribute job */
179   private float distributeJobLoad = DEFAULT_DISTRIBUTE_JOB_LOAD;
180 
181   /** The load on the system introduced by creating a retract job */
182   private float retractJobLoad = DEFAULT_RETRACT_JOB_LOAD;
183 
184   /** Maximum number of tries for checking availability of distributed file */
185   private static final int MAX_TRIES = 10;
186 
187   /** Interval time in millis to sleep between checks of availability */
188   private static final long SLEEP_INTERVAL = 30000L;
189 
190   /** The AWS client and transfer manager */
191   private AmazonS3 s3 = null;
192   private TransferManager s3TransferManager = null;
193 
194   /** The AWS S3 bucket name */
195   private String bucketName = null;
196   private Path tmpPath = null;
197 
198   /** The AWS S3 endpoint */
199   private String endpoint = null;
200 
201   /** path style enabled */
202   private boolean pathStyle = false;
203 
204   /** whether use presigned URL */
205   private boolean presignedUrl = false;
206 
207   /** valid duration for presigned URL in milliseconds */
208   private int presignedUrlValidDuration = DEFAULT_PRESIGNED_URL_EXPIRE_MILLIS;
209 
210   /** The opencast download distribution url */
211   private String opencastDistributionUrl = null;
212 
213   private Gson gson = new Gson();
214 
215   /**
216    * Creates a new instance of the AWS S3 distribution service.
217    */
218   public AwsS3DistributionServiceImpl() {
219     super(JOB_TYPE);
220   }
221 
222   private String getAWSConfigKey(ComponentContext cc, String key) {
223     try {
224       return OsgiUtil.getComponentContextProperty(cc, key);
225     } catch (RuntimeException e) {
226       throw new ConfigurationException(key + " is missing or invalid", e);
227     }
228   }
229 
230   @Override
231   @Activate
232   public void activate(ComponentContext cc) {
233 
234     // Get the configuration
235     if (cc != null) {
236 
237       if (!BooleanUtils.toBoolean(getAWSConfigKey(cc, AWS_S3_DISTRIBUTION_ENABLE))) {
238         logger.info("AWS S3 distribution disabled");
239         return;
240       }
241 
242       tmpPath = Paths.get(cc.getBundleContext().getProperty(OPENCAST_STORAGE_DIR), DEFAULT_TEMP_DIR);
243 
244       // clean up old data and delete directory if it exists
245       if (tmpPath.toFile().exists()) {
246         try (Stream<Path> walk = Files.walk(tmpPath)) {
247           walk.map(Path::toFile).sorted(Comparator.reverseOrder()).forEach(File::delete);
248         } catch (IOException e) {
249           logger.warn("Unable to delete {}", tmpPath, e);
250         }
251       }
252       logger.info("AWS S3 Distribution uses temp storage in {}", tmpPath);
253       try { // create a new temp directory
254         Files.createDirectories(tmpPath);
255       } catch (IOException e) {
256         logger.error("Could not create temporary directory for AWS S3 Distribution : `{}`", tmpPath);
257         throw new IllegalStateException(e);
258       }
259 
260       // AWS S3 bucket name
261       bucketName = getAWSConfigKey(cc, AWS_S3_BUCKET_CONFIG);
262       logger.info("AWS S3 bucket name is {}", bucketName);
263 
264       // AWS region
265       String regionStr = getAWSConfigKey(cc, AWS_S3_REGION_CONFIG);
266       logger.info("AWS region is {}", regionStr);
267 
268       // AWS endpoint
269       endpoint = OsgiUtil.getComponentContextProperty(cc, AWS_S3_ENDPOINT_CONFIG, "s3." + regionStr + ".amazonaws.com");
270       logger.info("AWS S3 endpoint is {}", endpoint);
271 
272       // AWS path style
273       pathStyle = BooleanUtils.toBoolean(OsgiUtil.getComponentContextProperty(cc, AWS_S3_PATH_STYLE_CONFIG, "false"));
274       logger.info("AWS path style is {}", pathStyle);
275 
276       // AWS presigned URL
277       String presignedUrlConfigValue = OsgiUtil.getComponentContextProperty(cc, AWS_S3_PRESIGNED_URL_CONFIG, "false");
278       presignedUrl = StringUtils.equalsIgnoreCase("true", presignedUrlConfigValue);
279       logger.info("AWS use presigned URL: {}", presignedUrl);
280 
281       // AWS presigned URL expiration time in millis
282       String presignedUrlExpTimeMillisConfigValue = OsgiUtil.getComponentContextProperty(cc,
283               AWS_S3_PRESIGNED_URL_VALID_DURATION_CONFIG, null);
284       presignedUrlValidDuration = NumberUtils.toInt(presignedUrlExpTimeMillisConfigValue,
285               DEFAULT_PRESIGNED_URL_EXPIRE_MILLIS);
286       if (presignedUrlValidDuration > MAXIMUM_PRESIGNED_URL_EXPIRE_MILLIS) {
287         logger.warn(
288                 "Valid duration of presigned URL is too large, MAXIMUM_PRESIGNED_URL_EXPIRE_MILLIS(7 days) is used");
289         presignedUrlValidDuration = MAXIMUM_PRESIGNED_URL_EXPIRE_MILLIS;
290       }
291 
292       opencastDistributionUrl = getAWSConfigKey(cc, AWS_S3_DISTRIBUTION_BASE_CONFIG);
293       if (!opencastDistributionUrl.endsWith("/")) {
294         opencastDistributionUrl = opencastDistributionUrl + "/";
295       }
296       logger.info("AWS distribution url is {}", opencastDistributionUrl);
297 
298       distributeJobLoad = LoadUtil.getConfiguredLoadValue(cc.getProperties(), DISTRIBUTE_JOB_LOAD_KEY,
299               DEFAULT_DISTRIBUTE_JOB_LOAD, serviceRegistry);
300       retractJobLoad = LoadUtil.getConfiguredLoadValue(cc.getProperties(), RETRACT_JOB_LOAD_KEY,
301               DEFAULT_RETRACT_JOB_LOAD, serviceRegistry);
302 
303       // Explicit credentials are optional.
304       AWSCredentialsProvider provider = null;
305       Option<String> accessKeyIdOpt = OsgiUtil.getOptCfg(cc.getProperties(), AWS_S3_ACCESS_KEY_ID_CONFIG);
306       Option<String> accessKeySecretOpt = OsgiUtil.getOptCfg(cc.getProperties(), AWS_S3_SECRET_ACCESS_KEY_CONFIG);
307 
308       // Keys not informed so use default credentials provider chain, which
309       // will look at the environment variables, java system props, credential files, and instance
310       // profile credentials
311       if (accessKeyIdOpt.isNone() && accessKeySecretOpt.isNone()) {
312         provider = new DefaultAWSCredentialsProviderChain();
313       } else {
314         provider = new AWSStaticCredentialsProvider(
315                 new BasicAWSCredentials(accessKeyIdOpt.get(), accessKeySecretOpt.get()));
316       }
317 
318       // S3 client configuration
319       ClientConfiguration clientConfiguration = new ClientConfiguration();
320 
321       int maxConnections = OsgiUtil.getOptCfgAsInt(cc.getProperties(), AWS_S3_MAX_CONNECTIONS)
322               .getOrElse(DEFAULT_MAX_CONNECTIONS);
323       logger.debug("Max Connections: {}", maxConnections);
324       clientConfiguration.setMaxConnections(maxConnections);
325 
326       int connectionTimeout = OsgiUtil.getOptCfgAsInt(cc.getProperties(), AWS_S3_CONNECTION_TIMEOUT)
327               .getOrElse(DEFAULT_CONNECTION_TIMEOUT);
328       logger.debug("Connection Output: {}", connectionTimeout);
329       clientConfiguration.setConnectionTimeout(connectionTimeout);
330 
331       int maxRetries = OsgiUtil.getOptCfgAsInt(cc.getProperties(), AWS_S3_MAX_RETRIES)
332               .getOrElse(DEFAULT_MAX_RETRIES);
333       logger.debug("Max Retry: {}", maxRetries);
334       clientConfiguration.setMaxErrorRetry(maxRetries);
335 
336       // Create AWS client
337       s3 = AmazonS3ClientBuilder.standard()
338               .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, regionStr))
339               .withClientConfiguration(clientConfiguration)
340               .withPathStyleAccessEnabled(pathStyle).withCredentials(provider).build();
341 
342       s3TransferManager = new TransferManager(s3);
343 
344       // Create AWS S3 bucket if not there yet
345       createAWSBucket();
346       distributionChannel = OsgiUtil.getComponentContextProperty(cc, CONFIG_KEY_STORE_TYPE);
347 
348       logger.info("AwsS3DistributionService activated!");
349     }
350   }
351 
352   @Override
353   public String getDistributionType() {
354     return distributionChannel;
355   }
356 
357   @Deactivate
358   public void deactivate() {
359     // Transfer manager is null if service disabled
360     if (s3TransferManager != null) {
361       s3TransferManager.shutdownNow();
362     }
363 
364     logger.info("AwsS3DistributionService deactivated!");
365   }
366 
367   @Override
368   public Job distribute(String pubChannelId, MediaPackage mediaPackage, Set<String> downloadIds,
369           boolean checkAvailability, boolean preserveReference) throws DistributionException, MediaPackageException {
370     throw new UnsupportedOperationException("Not supported yet.");
371     // stub function
372   }
373 
374   /**
375    * {@inheritDoc}
376    *
377    * @see org.opencastproject.distribution.api.DownloadDistributionService#distribute(String,
378    *      org.opencastproject.mediapackage.MediaPackage, String, boolean)
379    */
380   @Override
381   public Job distribute(String channelId, MediaPackage mediaPackage, Set<String> elementIds, boolean checkAvailability)
382           throws DistributionException, MediaPackageException {
383     notNull(mediaPackage, "mediapackage");
384     notNull(elementIds, "elementIds");
385     notNull(channelId, "channelId");
386     try {
387       return serviceRegistry.createJob(JOB_TYPE, Operation.Distribute.toString(), Arrays.asList(channelId,
388               MediaPackageParser.getAsXml(mediaPackage), gson.toJson(elementIds), Boolean.toString(checkAvailability)),
389               distributeJobLoad);
390     } catch (ServiceRegistryException e) {
391       throw new DistributionException("Unable to create a job", e);
392     }
393   }
394 
395   /**
396    * {@inheritDoc}
397    *
398    * @see org.opencastproject.distribution.api.DistributionService#distribute(String,
399    *      org.opencastproject.mediapackage.MediaPackage, String)
400    */
401   @Override
402   public Job distribute(String channelId, MediaPackage mediapackage, String elementId)
403           throws DistributionException, MediaPackageException {
404     return distribute(channelId, mediapackage, elementId, true);
405   }
406 
407   /**
408    * {@inheritDoc}
409    *
410    * @see org.opencastproject.distribution.api.DownloadDistributionService#distribute(String,
411    *      org.opencastproject.mediapackage.MediaPackage, String, boolean)
412    */
413   @Override
414   public Job distribute(String channelId, MediaPackage mediaPackage, String elementId, boolean checkAvailability)
415           throws DistributionException, MediaPackageException {
416     Set<String> elementIds = new HashSet<>();
417     elementIds.add(elementId);
418     return distribute(channelId, mediaPackage, elementIds, checkAvailability);
419   }
420 
421   /**
422    * Distribute Mediapackage elements to the download distribution service.
423    *
424    * @param channelId
425    *          # The id of the publication channel to be distributed to.
426    * @param mediapackage
427    *          The media package that contains the elements to be distributed.
428    * @param elementIds
429    *          The ids of the elements that should be distributed contained within the media package.
430    * @param checkAvailability
431    *          Check the availability of the distributed element via http.
432    * @return A reference to the MediaPackageElements that have been distributed.
433    * @throws DistributionException
434    *           Thrown if the parent directory of the MediaPackageElement cannot be created, if the MediaPackageElement
435    *           cannot be copied or another unexpected exception occurs.
436    */
437   public MediaPackageElement[] distributeElements(String channelId, MediaPackage mediapackage, Set<String> elementIds,
438           boolean checkAvailability) throws DistributionException {
439     notNull(mediapackage, "mediapackage");
440     notNull(elementIds, "elementIds");
441     notNull(channelId, "channelId");
442 
443     final Set<MediaPackageElement> elements = getElements(mediapackage, elementIds);
444     List<MediaPackageElement> distributedElements = new ArrayList<>();
445 
446     if (AdaptivePlaylist.hasHLSPlaylist(elements)) {
447       return distributeHLSElements(channelId, mediapackage, elements, checkAvailability);
448     }
449 
450     for (MediaPackageElement element : elements) {
451       MediaPackageElement distributedElement = distributeElement(channelId, mediapackage, element, checkAvailability);
452       distributedElements.add(distributedElement);
453     }
454     return distributedElements.toArray(new MediaPackageElement[distributedElements.size()]);
455   }
456 
457   private Set<MediaPackageElement> getElements(MediaPackage mediapackage, Set<String> elementIds)
458           throws IllegalStateException {
459     final Set<MediaPackageElement> elements = new HashSet<>();
460     for (String elementId : elementIds) {
461       MediaPackageElement element = mediapackage.getElementById(elementId);
462       if (element != null) {
463         elements.add(element);
464       } else {
465         throw new IllegalStateException(
466                 format("No element %s found in mediapackage %s", elementId, mediapackage.getIdentifier()));
467       }
468     }
469     return elements;
470   }
471 
472   /**
473    * Distribute a media package element to AWS S3.
474    *
475    * @param mediaPackage
476    *          The media package that contains the element to distribute.
477    * @param element
478    *          The element that should be distributed contained within the media package.
479    * @param checkAvailability
480    *          Checks if the distributed element is available
481    * @return A reference to the MediaPackageElement that has been distributed.
482    * @throws DistributionException
483    */
484   public MediaPackageElement distributeElement(String channelId, final MediaPackage mediaPackage,
485           MediaPackageElement element, boolean checkAvailability) throws DistributionException {
486     notNull(channelId, "channelId");
487     notNull(mediaPackage, "mediapackage");
488     notNull(element, "element");
489 
490     try {
491       return distributeElement(channelId, mediaPackage, element, checkAvailability, workspace.get(element.getURI()));
492     } catch (NotFoundException e) {
493       throw new DistributionException("Unable to find " + element.getURI() + " in the workspace", e);
494     } catch (IOException e) {
495       throw new DistributionException("Error loading " + element.getURI() + " from the workspace", e);
496     }
497   }
498 
499   private MediaPackageElement distributeElement(String channelId, final MediaPackage mediaPackage,
500           MediaPackageElement element, boolean checkAvailability, File source) throws DistributionException {
501 
502     // Use TransferManager to take advantage of multipart upload.
503     // TransferManager processes all transfers asynchronously, so this call will return immediately.
504     try {
505       String objectName = buildObjectName(channelId, mediaPackage.getIdentifier().toString(), element);
506       logger.info("Uploading {} to bucket {}...", objectName, bucketName);
507       Upload upload = s3TransferManager.upload(bucketName, objectName, source);
508       long start = System.currentTimeMillis();
509 
510       try {
511         // Block and wait for the upload to finish
512         upload.waitForCompletion();
513         logger.info("Upload of {} to bucket {} completed in {} seconds", objectName, bucketName,
514                 (System.currentTimeMillis() - start) / 1000);
515       } catch (AmazonClientException e) {
516         throw new DistributionException("AWS error: " + e.getMessage(), e);
517       }
518 
519       // Create a representation of the distributed file in the media package
520       MediaPackageElement distributedElement = (MediaPackageElement) element.clone();
521       try {
522         distributedElement.setURI(getDistributionUri(objectName));
523       } catch (URISyntaxException e) {
524         throw new DistributionException("Distributed element produces an invalid URI", e);
525       }
526 
527       logger.info("Distributed element {}, object {}", element.getIdentifier(), objectName);
528 
529       if (checkAvailability) {
530         URI uri = distributedElement.getURI();
531         String distributedElementUriStr = uri.toString();
532         int tries = 0;
533         CloseableHttpResponse response = null;
534         boolean success = false;
535         while (tries < MAX_TRIES) {
536           try {
537             if (presignedUrl) {
538               // 5 minutes should be enough for check availability for presigned URL.
539               Date fiveMinutesLater = new Date(System.currentTimeMillis() + 5 * 60 * 1000);
540               uri = s3.generatePresignedUrl(bucketName, objectName, fiveMinutesLater, HttpMethod.HEAD).toURI();
541             }
542             CloseableHttpClient httpClient = HttpClients.createDefault();
543             logger.trace("Trying to access {}", uri);
544             response = httpClient.execute(new HttpHead(uri));
545             if (response.getStatusLine().getStatusCode() == HttpServletResponse.SC_OK) {
546               logger.trace("Successfully got {}", uri);
547               success = true;
548               break; // Exit the loop, response is closed
549             } else {
550               logger.debug("Http status code when checking distributed element {} is {}", objectName,
551                       response.getStatusLine().getStatusCode());
552             }
553           } catch (Exception e) {
554             logger.info("Checking availability of {} threw exception {}. Trying again.", objectName, e.getMessage());
555             // Just try again
556           } finally {
557             if (null != response) {
558               response.close();
559             }
560           }
561           tries++;
562           logger.trace("Sleeping for {} seconds...", SLEEP_INTERVAL / 1000);
563           Thread.sleep(SLEEP_INTERVAL);
564         }
565         if (!success) {
566           logger.warn("Could not check availability of distributed file {}", uri);
567           // throw new DistributionException("Unable to load distributed file " + uri.toString());
568         }
569       }
570 
571       return distributedElement;
572     } catch (Exception e) {
573       logger.warn("Error distributing element " + element.getIdentifier() + " of media package " + mediaPackage, e);
574       if (e instanceof DistributionException) {
575         throw (DistributionException) e;
576       } else {
577         throw new DistributionException(e);
578       }
579     }
580   }
581 
582   @Override
583   public Job retract(String channelId, MediaPackage mediapackage, String elementId) throws DistributionException {
584     Set<String> elementIds = new HashSet<>();
585     elementIds.add(elementId);
586     return retract(channelId, mediapackage, elementIds);
587   }
588 
589   @Override
590   public Job retract(String channelId, MediaPackage mediapackage, Set<String> elementIds) throws DistributionException {
591     notNull(mediapackage, "mediapackage");
592     notNull(elementIds, "elementIds");
593     notNull(channelId, "channelId");
594     try {
595       return serviceRegistry.createJob(JOB_TYPE, Operation.Retract.toString(),
596               Arrays.asList(channelId, MediaPackageParser.getAsXml(mediapackage), gson.toJson(elementIds)),
597               retractJobLoad);
598     } catch (ServiceRegistryException e) {
599       throw new DistributionException("Unable to create a job", e);
600     }
601   }
602 
603   @Override
604   public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediapackage, String elementId)
605           throws DistributionException, MediaPackageException {
606     Set<String> elementIds = new HashSet<String>();
607     elementIds.add(elementId);
608     return distributeSync(channelId, mediapackage, elementIds, true);
609   }
610 
611   @Override
612   public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediapackage, Set<String> elementIds,
613           boolean checkAvailability) throws DistributionException {
614     final MediaPackageElement[] distributedElements = distributeElements(channelId, mediapackage, elementIds,
615             checkAvailability);
616     if (distributedElements == null) {
617       return null;
618     }
619     return Arrays.asList(distributedElements);
620   }
621 
622   @Override
623   public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediapackage, String elementId)
624           throws DistributionException {
625     Set<String> elementIds = new HashSet<String>();
626     elementIds.add(elementId);
627     return retractSync(channelId, mediapackage, elementIds);
628   }
629 
630   @Override
631   public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediaPackage, Set<String> elementIds)
632           throws DistributionException {
633     final MediaPackageElement[] retractedElements = retractElements(channelId, mediaPackage, elementIds);
634     if (retractedElements == null) {
635       return null;
636     }
637     return Arrays.asList(retractedElements);
638   }
639 
640   /**
641    * Retracts the media package element with the given identifier from the distribution channel.
642    *
643    * @param channelId
644    *          the channel id
645    * @param mediaPackage
646    *          the media package
647    * @param element
648    *          the element
649    * @return the retracted element or <code>null</code> if the element was not retracted
650    */
651   protected MediaPackageElement retractElement(String channelId, MediaPackage mediaPackage, MediaPackageElement element)
652           throws DistributionException {
653     notNull(mediaPackage, "mediaPackage");
654     notNull(element, "element");
655 
656     try {
657       String objectName = getDistributedObjectName(element);
658       if (objectName != null) {
659         s3.deleteObject(bucketName, objectName);
660         logger.info("Retracted element {}, object {}", element.getIdentifier(), objectName);
661       }
662       return element;
663     } catch (AmazonClientException e) {
664       throw new DistributionException("AWS error: " + e.getMessage(), e);
665     }
666   }
667 
668   /**
669    * Retract a media package element from the distribution channel. The retracted element must not necessarily be the
670    * one given as parameter <code>elementId</code>. Instead, the element's distribution URI will be calculated. This way
671    * you are able to retract elements by providing the "original" element here.
672    *
673    * @param channelId
674    *          the channel id
675    * @param mediapackage
676    *          the mediapackage
677    * @param elementIds
678    *          the element identifiers
679    * @return the retracted element or <code>null</code> if the element was not retracted
680    * @throws org.opencastproject.distribution.api.DistributionException
681    *           in case of an error
682    */
683   protected MediaPackageElement[] retractElements(String channelId, MediaPackage mediapackage, Set<String> elementIds)
684           throws DistributionException {
685     notNull(mediapackage, "mediapackage");
686     notNull(elementIds, "elementIds");
687     notNull(channelId, "channelId");
688 
689     Set<MediaPackageElement> elements = getElements(mediapackage, elementIds);
690     List<MediaPackageElement> retractedElements = new ArrayList<>();
691 
692     for (MediaPackageElement element : elements) {
693       MediaPackageElement retractedElement = retractElement(channelId, mediapackage, element);
694       retractedElements.add(retractedElement);
695     }
696     return retractedElements.toArray(new MediaPackageElement[retractedElements.size()]);
697   }
698 
699   /**
700    * Builds the aws s3 object name.
701    *
702    * @param channelId
703    * @param mpId
704    * @param element
705    * @return
706    */
707   protected String buildObjectName(String channelId, String mpId, MediaPackageElement element) {
708     // Something like ORG_ID/CHANNEL_ID/MP_ID/ELEMENT_ID/FILE_NAME.EXTENSION
709     final String orgId = securityService.getOrganization().getId();
710     String uriString = element.getURI().toString();
711     String fileName = FilenameUtils.getName(uriString);
712     return buildObjectName(orgId, channelId, mpId, element.getIdentifier(), fileName);
713   }
714 
715   /**
716    * Builds the aws s3 object name using the raw elementID and filename
717    *
718    * @param orgId
719    * @param channelId
720    * @param mpId
721    * @param elementId
722    * @param fileName
723    * @return
724    */
725   protected String buildObjectName(String orgId, String channelId, String mpId, String elementId, String fileName) {
726     return StringUtils.join(new String[] { orgId, channelId, mpId, elementId, fileName }, "/");
727   }
728 
729   /**
730    * Gets the URI for the element to be distributed.
731    *
732    * @return The resulting URI after distribution
733    * @throws URISyntaxException
734    *           if the concrete implementation tries to create a malformed uri
735    */
736   protected URI getDistributionUri(String objectName) throws URISyntaxException {
737     // Something like https://OPENCAST_DOWNLOAD_URL/ORG_ID/CHANNEL_ID/MP_ID/ELEMENT_ID/FILE_NAME.EXTENSION
738     return new URI(opencastDistributionUrl + objectName);
739   }
740 
741   /**
742    * Gets the distributed object's name.
743    *
744    * @return The distributed object name
745    */
746   protected String getDistributedObjectName(MediaPackageElement element) {
747     // Something like https://OPENCAST_DOWNLOAD_URL/ORG_ID/CHANNEL_ID/MP_ID/ORIGINAL_ELEMENT_ID/FILE_NAME.EXTENSION
748     String uriString = element.getURI().toString();
749 
750     // String directoryName = distributionDirectory.getAbsolutePath();
751     if (uriString.startsWith(opencastDistributionUrl) && uriString.length() > opencastDistributionUrl.length()) {
752       return uriString.substring(opencastDistributionUrl.length());
753     } else {
754       // Cannot retract
755       logger.warn(
756           "Cannot retract {}. Uri must be in the format "
757               + "https://host/bucketName/orgId/channelId/mpId/originalElementId/fileName.extension",
758           uriString);
759       return null;
760     }
761   }
762 
763   /**
764    * Distribute static items, create a temp directory for playlists, modify them to fix references, then publish the new
765    * list and then delete the temp files. This is used if there are any HLS playlists in the mediapackage, all the
766    * videos in the publication should be HLS or progressive, but not both. However, If this is called with non HLS
767    * files, it will distribute them anyway.
768    *
769    * @param channelId
770    *          - distribution channel
771    * @param mediapackage
772    *          - that holds all the files
773    * @param elements
774    *          - all the elements for publication
775    * @param checkAvailability
776    *          - check before pub
777    * @return distributed elements
778    * @throws DistributionException
779    * @throws IOException
780    */
781   private MediaPackageElement[] distributeHLSElements(String channelId, MediaPackage mediapackage,
782           Set<MediaPackageElement> elements, boolean checkAvailability) throws DistributionException {
783 
784     List<MediaPackageElement> distributedElements = new ArrayList<MediaPackageElement>();
785     List<MediaPackageElement> nontrackElements = elements.stream()
786             .filter(e -> e.getElementType() != MediaPackageElement.Type.Track).collect(Collectors.toList());
787     // Distribute non track items
788     for (MediaPackageElement element : nontrackElements) {
789       MediaPackageElement distributedElement = distributeElement(channelId, mediapackage, element, checkAvailability);
790       distributedElements.add(distributedElement);
791     }
792     // Then get all tracks from mediapackage and sort them by flavor
793     // Each flavor is one video with multiple renditions
794     List<Track> trackElements = elements.stream().filter(e -> e.getElementType() == MediaPackageElement.Type.Track)
795             .map(e -> (Track) e).collect(Collectors.toList());
796     HashMap<MediaPackageElementFlavor, List<Track>> trackElementsMap
797         = new HashMap<MediaPackageElementFlavor, List<Track>>();
798     for (Track t : trackElements) {
799       List<Track> l = trackElementsMap.get(t.getFlavor());
800       if (l == null) {
801         l = new ArrayList<Track>();
802       }
803       l.add(t);
804       trackElementsMap.put(t.getFlavor(), l);
805     }
806 
807     Path tmpDir = null;
808     try {
809       tmpDir = Files.createTempDirectory(tmpPath, mediapackage.getIdentifier().toString());
810       // Run distribution one flavor at a time
811       for (Entry<MediaPackageElementFlavor, List<Track>> elementSet : trackElementsMap.entrySet()) {
812         List<Track> tracks = elementSet.getValue();
813         try {
814           List<Track> transformedTracks = new ArrayList<Track>();
815           // If there are playlists in this flavor
816           if (tracks.stream().anyMatch(AdaptivePlaylist.isHLSTrackPred)) {
817             // For each adaptive playlist, get all the HLS files from the track URI
818             // and put them into a temporary directory
819             List<Track> tmpTracks = new ArrayList<Track>();
820             for (Track t : tracks) {
821 
822               Track tcopy = (Track) t.clone();
823               String newName = "./" + t.getURI().getPath();
824               Path newPath = tmpDir.resolve(newName).normalize();
825               Files.createDirectories(newPath.getParent());
826               // If this flavor is a HLS playlist and therefore has internal references
827               if (AdaptivePlaylist.isPlaylist(t)) {
828                 File f = workspace.get(t.getURI()); // Get actual file
829                 Path plcopy = Files.copy(f.toPath(), newPath);
830                 tcopy.setURI(plcopy.toUri()); // make it into an URI from filesystem
831               } else {
832                 Path plcopy = Files.createFile(newPath); // new Empty File, only care about the URI
833                 tcopy.setURI(plcopy.toUri());
834               }
835               tmpTracks.add(tcopy);
836             }
837             // The playlists' references are then replaced with relative links
838             tmpTracks = AdaptivePlaylist.fixReferences(tmpTracks, tmpDir.toFile()); // replace with fixed elements
839             // after fixing it, we retrieve the new playlist files and discard the old
840             // we collect the mp4 tracks and the playlists and put them into transformedTracks
841             tracks.stream().filter(AdaptivePlaylist.isHLSTrackPred.negate()).forEach(t -> transformedTracks.add(t));
842             tmpTracks.stream().filter(AdaptivePlaylist.isHLSTrackPred).forEach(t -> transformedTracks.add(t));
843           } else {
844             transformedTracks.addAll(tracks); // not playlists, distribute anyway
845           }
846           for (Track track : transformedTracks) {
847             MediaPackageElement distributedElement;
848             if (AdaptivePlaylist.isPlaylist(track)) {
849               distributedElement = distributeElement(channelId, mediapackage, track, checkAvailability,
850                       new File(track.getURI()));
851             } else {
852               distributedElement = distributeElement(channelId, mediapackage, track, checkAvailability);
853             }
854             distributedElements.add(distributedElement);
855           }
856         } catch (MediaPackageException | NotFoundException | IOException e1) {
857           logger.error("HLS Prepare failed for mediapackage {} in {}", elementSet.getKey(), mediapackage, e1);
858           throw new DistributionException("Cannot distribute " + mediapackage);
859         } catch (URISyntaxException e1) {
860           logger.error("HLS Prepare failed - Bad URI syntax {} in {}", elementSet.getKey(), mediapackage, e1);
861           throw new DistributionException("Cannot distribute - BAD URI syntax " + mediapackage);
862         }
863       }
864     } catch (IOException e2) {
865       throw new DistributionException("Cannot create tmp dir to process HLS:" + mediapackage + e2.getMessage());
866     } finally {
867       // Clean up temp dir
868       try (Stream<Path> walk = Files.walk(tmpDir)) {
869         walk.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
870       } catch (IOException e) {
871         logger.warn("Cannot delete tmp dir for processing HLS mp {}, path {}", mediapackage, tmpPath, e);
872       }
873     }
874     return distributedElements.toArray(new MediaPackageElement[distributedElements.size()]);
875   }
876 
877   /**
878    * {@inheritDoc}
879    *
880    * @see org.opencastproject.job.api.AbstractJobProducer#process(org.opencastproject.job.api.Job)
881    */
882   @Override
883   protected String process(Job job) throws Exception {
884     Operation op = null;
885     String operation = job.getOperation();
886     List<String> arguments = job.getArguments();
887     try {
888       op = Operation.valueOf(operation);
889       String channelId = arguments.get(0);
890       MediaPackage mediaPackage = MediaPackageParser.getFromXml(arguments.get(1));
891       Set<String> elementIds = gson.fromJson(arguments.get(2), new TypeToken<Set<String>>() {
892       }.getType());
893       switch (op) {
894         case Distribute:
895           Boolean checkAvailability = Boolean.parseBoolean(arguments.get(3));
896           MediaPackageElement[] distributedElements = distributeElements(channelId, mediaPackage, elementIds,
897                   checkAvailability);
898           return (distributedElements != null)
899                   ? MediaPackageElementParser.getArrayAsXml(Arrays.asList(distributedElements))
900                   : null;
901         case Retract:
902           MediaPackageElement[] retractedElements = retractElements(channelId, mediaPackage, elementIds);
903           return (retractedElements != null) ? MediaPackageElementParser.getArrayAsXml(Arrays.asList(retractedElements))
904                   : null;
905         default:
906           throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
907       }
908     } catch (IllegalArgumentException e) {
909       throw new ServiceRegistryException("This service can't handle operations of type '" + op + "'", e);
910     } catch (IndexOutOfBoundsException e) {
911       throw new ServiceRegistryException("This argument list for operation '" + op + "' does not meet expectations", e);
912     } catch (Exception e) {
913       throw new ServiceRegistryException("Error handling operation '" + op + "'", e);
914     }
915   }
916 
917   /**
918    * Creates the AWS S3 bucket if it doesn't exist yet.
919    */
920   protected void createAWSBucket() {
921     // Does bucket exist?
922     try {
923       s3.listObjects(bucketName);
924     } catch (AmazonServiceException e) {
925       if (e.getStatusCode() == 404) {
926         // Create the bucket
927         try {
928           s3.createBucket(bucketName);
929           // Allow public read
930           Statement allowPublicReadStatement = new Statement(Statement.Effect.Allow).withPrincipals(Principal.AllUsers)
931                   .withActions(S3Actions.GetObject).withResources(new S3ObjectResource(bucketName, "*"));
932           Policy policy = new Policy().withStatements(allowPublicReadStatement);
933           s3.setBucketPolicy(bucketName, policy.toJson());
934 
935           // Set the website configuration. This needs to be static-site-enabled currently.
936           BucketWebsiteConfiguration defaultWebsite = new BucketWebsiteConfiguration();
937           // These files don't actually exist, but that doesn't matter since no one should be looking around in the
938           // bucket anyway.
939           defaultWebsite.setIndexDocumentSuffix("index.html");
940           defaultWebsite.setErrorDocument("error.html");
941           s3.setBucketWebsiteConfiguration(new SetBucketWebsiteConfigurationRequest(bucketName, defaultWebsite));
942           logger.info("AWS S3 bucket {} created", bucketName);
943         } catch (Exception e2) {
944           throw new ConfigurationException("Bucket " + bucketName + " cannot be created: " + e2.getMessage(), e2);
945         }
946       } else {
947         throw new ConfigurationException("Bucket " + bucketName + " exists, but we can't access it: " + e.getMessage(),
948                 e);
949       }
950     }
951   }
952 
953   public URI presignedURI(URI uri) throws URISyntaxException {
954     if (!presignedUrl) {
955       return uri;
956     }
957     String s3UrlPrefix = s3.getUrl(bucketName, "").toString();
958 
959     // Only handle URIs match s3 domain and bucket
960     if (uri.toString().startsWith(s3UrlPrefix)) {
961       String objectName = uri.toString().substring(s3UrlPrefix.length());
962       Date validUntil = new Date(System.currentTimeMillis() + presignedUrlValidDuration);
963       return s3.generatePresignedUrl(bucketName, objectName, validUntil).toURI();
964     } else {
965       return uri;
966     }
967   }
968 
969   /** The methods below are used by the test class */
970 
971   protected void setS3(AmazonS3 s3) {
972     this.s3 = s3;
973   }
974 
975   protected void setS3TransferManager(TransferManager s3TransferManager) {
976     this.s3TransferManager = s3TransferManager;
977   }
978 
979   protected void setBucketName(String bucketName) {
980     this.bucketName = bucketName;
981   }
982 
983   protected void setOpencastDistributionUrl(String distributionUrl) {
984     opencastDistributionUrl = distributionUrl;
985   }
986 
987   // Use by unit test
988   protected void setStorageTmp(String path) {
989     this.tmpPath = Paths.get(path, DEFAULT_TEMP_DIR);
990     try {
991       Files.createDirectories(tmpPath);
992     } catch (IOException e) {
993       logger.info("AWS S3 bucket cannot create {} ", tmpPath);
994     }
995   }
996 
997   @Reference
998   @Override
999   public void setWorkspace(Workspace workspace) {
1000     super.setWorkspace(workspace);
1001   }
1002 
1003   @Reference
1004   @Override
1005   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
1006     super.setServiceRegistry(serviceRegistry);
1007   }
1008 
1009   @Reference
1010   @Override
1011   public void setSecurityService(SecurityService securityService) {
1012     super.setSecurityService(securityService);
1013   }
1014 
1015   @Reference
1016   @Override
1017   public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
1018     super.setUserDirectoryService(userDirectoryService);
1019   }
1020 
1021   @Reference
1022   @Override
1023   public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
1024     super.setOrganizationDirectoryService(organizationDirectoryService);
1025   }
1026 
1027 }