1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.assetmanager.aws.s3;
23
24 import static java.lang.String.format;
25
26 import org.opencastproject.assetmanager.api.storage.AssetStore;
27 import org.opencastproject.assetmanager.api.storage.AssetStoreException;
28 import org.opencastproject.assetmanager.api.storage.RemoteAssetStore;
29 import org.opencastproject.assetmanager.api.storage.StoragePath;
30 import org.opencastproject.assetmanager.aws.AwsAbstractArchive;
31 import org.opencastproject.assetmanager.aws.AwsUploadOperationResult;
32 import org.opencastproject.assetmanager.aws.persistence.AwsAssetDatabase;
33 import org.opencastproject.assetmanager.aws.persistence.AwsAssetDatabaseException;
34 import org.opencastproject.assetmanager.aws.persistence.AwsAssetMapping;
35 import org.opencastproject.util.ConfigurationException;
36 import org.opencastproject.util.MimeType;
37 import org.opencastproject.util.OsgiUtil;
38 import org.opencastproject.util.data.Option;
39 import org.opencastproject.workspace.api.Workspace;
40
41 import com.amazonaws.AmazonServiceException;
42 import com.amazonaws.ClientConfiguration;
43 import com.amazonaws.HttpMethod;
44 import com.amazonaws.SdkClientException;
45 import com.amazonaws.auth.AWSCredentialsProvider;
46 import com.amazonaws.auth.AWSStaticCredentialsProvider;
47 import com.amazonaws.auth.BasicAWSCredentials;
48 import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
49 import com.amazonaws.client.builder.AwsClientBuilder;
50 import com.amazonaws.services.s3.AmazonS3;
51 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
52 import com.amazonaws.services.s3.model.BucketVersioningConfiguration;
53 import com.amazonaws.services.s3.model.CopyObjectRequest;
54 import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest;
55 import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
56 import com.amazonaws.services.s3.model.GetObjectTaggingResult;
57 import com.amazonaws.services.s3.model.ObjectMetadata;
58 import com.amazonaws.services.s3.model.ObjectTagging;
59 import com.amazonaws.services.s3.model.RestoreObjectRequest;
60 import com.amazonaws.services.s3.model.SetBucketVersioningConfigurationRequest;
61 import com.amazonaws.services.s3.model.SetObjectTaggingRequest;
62 import com.amazonaws.services.s3.model.StorageClass;
63 import com.amazonaws.services.s3.model.Tag;
64 import com.amazonaws.services.s3.transfer.TransferManager;
65 import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
66 import com.amazonaws.services.s3.transfer.Upload;
67
68 import org.apache.commons.lang3.BooleanUtils;
69 import org.apache.commons.lang3.StringUtils;
70 import org.osgi.service.component.ComponentContext;
71 import org.osgi.service.component.annotations.Activate;
72 import org.osgi.service.component.annotations.Component;
73 import org.osgi.service.component.annotations.Reference;
74 import org.slf4j.Logger;
75 import org.slf4j.LoggerFactory;
76
77 import java.io.File;
78 import java.io.IOException;
79 import java.io.InputStream;
80 import java.net.URL;
81 import java.util.ArrayList;
82 import java.util.Collections;
83 import java.util.Date;
84 import java.util.Dictionary;
85 import java.util.HashMap;
86 import java.util.List;
87 import java.util.Map;
88 import java.util.Optional;
89
90 @Component(
91 property = {
92 "service.description=Amazon S3 based asset store",
93 "store.type=aws-s3"
94 },
95 immediate = true,
96 service = { RemoteAssetStore.class, AwsS3AssetStore.class }
97 )
98 public class AwsS3AssetStore extends AwsAbstractArchive implements RemoteAssetStore {
99
100
101 private static final Logger logger = LoggerFactory.getLogger(AwsS3AssetStore.class);
102
103 private static final Tag freezable = new Tag("Freezable", "true");
104 private static final Integer RESTORE_MIN_WAIT = 1080000;
105 private static final Integer RESTORE_POLL = 900000;
106
107
108
109 public static final String AWS_S3_ENABLED = "org.opencastproject.assetmanager.aws.s3.enabled";
110 public static final String AWS_S3_ACCESS_KEY_ID_CONFIG = "org.opencastproject.assetmanager.aws.s3.access.id";
111 public static final String AWS_S3_SECRET_ACCESS_KEY_CONFIG = "org.opencastproject.assetmanager.aws.s3.secret.key";
112 public static final String AWS_S3_REGION_CONFIG = "org.opencastproject.assetmanager.aws.s3.region";
113 public static final String AWS_S3_BUCKET_CONFIG = "org.opencastproject.assetmanager.aws.s3.bucket";
114 public static final String AWS_S3_BUCKET_CONFIG_PREFIX = "org.opencastproject.assetmanager.aws.s3.bucket.";
115 public static final String AWS_S3_ENDPOINT_CONFIG = "org.opencastproject.assetmanager.aws.s3.endpoint";
116 public static final String AWS_S3_PATH_STYLE_CONFIG = "org.opencastproject.assetmanager.aws.s3.path.style";
117 public static final String AWS_S3_MAX_CONNECTIONS = "org.opencastproject.assetmanager.aws.s3.max.connections";
118 public static final String AWS_S3_CONNECTION_TIMEOUT = "org.opencastproject.assetmanager.aws.s3.connection.timeout";
119 public static final String AWS_S3_MAX_RETRIES = "org.opencastproject.assetmanager.aws.s3.max.retries";
120 public static final String AWS_GLACIER_RESTORE_DAYS = "org.opencastproject.assetmanager.aws.s3.glacier.restore.days";
121
122 public static final Integer AWS_S3_GLACIER_RESTORE_DAYS_DEFAULT = 2;
123
124
125 public static final int DEFAULT_MAX_CONNECTIONS = 50;
126 public static final int DEFAULT_CONNECTION_TIMEOUT = 10000;
127 public static final int DEFAULT_MAX_RETRIES = 100;
128
129 public static final long DOWNLOAD_URL_EXPIRATION_MS = 30 * 60 * 1000;
130
131 public static final String DEFAULT_ORG_KEY = "*";
132
133
134 private AmazonS3 s3 = null;
135 private TransferManager s3TransferManager = null;
136
137
138 private Map<String, String> orgBucketNameMap = new HashMap<>();
139
140 private String endpoint = null;
141
142 private boolean pathStyle = false;
143
144
145 private Integer restorePeriod;
146
147 protected boolean bucketCreated = false;
148
149
150 @Override
151 @Reference
152 public void setWorkspace(Workspace workspace) {
153 super.setWorkspace(workspace);
154 }
155
156
157 @Override
158 @Reference
159 public void setDatabase(AwsAssetDatabase db) {
160 super.setDatabase(db);
161 }
162
163
164
165
166
167
168
169 @Activate
170 public void activate(final ComponentContext cc) throws IllegalStateException, ConfigurationException {
171
172 if (cc != null) {
173 @SuppressWarnings("rawtypes")
174 Dictionary properties = cc.getProperties();
175
176 boolean enabled = Boolean.parseBoolean(StringUtils.trimToEmpty((String) properties.get(AWS_S3_ENABLED)));
177 if (!enabled) {
178 logger.info("AWS S3 asset store is disabled");
179 return;
180 }
181
182
183 storeType = StringUtils.trimToEmpty((String) properties.get(AssetStore.STORE_TYPE_PROPERTY));
184 if (StringUtils.isEmpty(storeType)) {
185 throw new ConfigurationException("Invalid store type value");
186 }
187 logger.info("{} is: {}", AssetStore.STORE_TYPE_PROPERTY, storeType);
188
189
190 Option<String> defaultBucketNameOpt = OsgiUtil.getOptCfg(cc.getProperties(), AWS_S3_BUCKET_CONFIG);
191 if (defaultBucketNameOpt.isSome()) {
192 orgBucketNameMap.put(DEFAULT_ORG_KEY, defaultBucketNameOpt.get());
193 logger.info("AWS S3 default bucket name is {}", defaultBucketNameOpt.get());
194 }
195
196
197 Collections.list(cc.getProperties().keys()).stream()
198 .filter(s -> s.startsWith(AWS_S3_BUCKET_CONFIG_PREFIX))
199 .forEach(s -> {
200 String orgId = s.substring(AWS_S3_BUCKET_CONFIG_PREFIX.length());
201 String bucketName = OsgiUtil.getComponentContextProperty(cc, s);
202 orgBucketNameMap.put(orgId, bucketName);
203 });
204
205 if (orgBucketNameMap.isEmpty()) {
206 throw new ConfigurationException("AWS S3 asset store is enabled, but no buckets are configured");
207 }
208
209
210 regionName = getAWSConfigKey(cc, AWS_S3_REGION_CONFIG);
211 logger.info("AWS region is {}", regionName);
212
213 endpoint = OsgiUtil.getComponentContextProperty(
214 cc, AWS_S3_ENDPOINT_CONFIG, "s3." + regionName + ".amazonaws.com");
215 logger.info("AWS endpoint is {}", endpoint);
216
217 pathStyle = BooleanUtils.toBoolean(OsgiUtil.getComponentContextProperty(cc, AWS_S3_PATH_STYLE_CONFIG, "false"));
218 logger.info("AWS path style is {}", pathStyle);
219
220
221 restorePeriod = OsgiUtil.getOptCfgAsInt(cc.getProperties(), AWS_GLACIER_RESTORE_DAYS)
222 .getOrElse(AWS_S3_GLACIER_RESTORE_DAYS_DEFAULT);
223
224
225 AWSCredentialsProvider provider = null;
226 Option<String> accessKeyIdOpt = OsgiUtil.getOptCfg(cc.getProperties(), AWS_S3_ACCESS_KEY_ID_CONFIG);
227 Option<String> accessKeySecretOpt = OsgiUtil.getOptCfg(cc.getProperties(), AWS_S3_SECRET_ACCESS_KEY_CONFIG);
228
229
230
231
232 if (accessKeyIdOpt.isNone() && accessKeySecretOpt.isNone()) {
233 provider = new DefaultAWSCredentialsProviderChain();
234 } else {
235 provider = new AWSStaticCredentialsProvider(
236 new BasicAWSCredentials(accessKeyIdOpt.get(), accessKeySecretOpt.get()));
237 }
238
239
240 ClientConfiguration clientConfiguration = new ClientConfiguration();
241
242 int maxConnections = OsgiUtil.getOptCfgAsInt(cc.getProperties(), AWS_S3_MAX_CONNECTIONS)
243 .getOrElse(DEFAULT_MAX_CONNECTIONS);
244 logger.debug("Max Connections: {}", maxConnections);
245 clientConfiguration.setMaxConnections(maxConnections);
246
247 int connectionTimeout = OsgiUtil.getOptCfgAsInt(cc.getProperties(), AWS_S3_CONNECTION_TIMEOUT)
248 .getOrElse(DEFAULT_CONNECTION_TIMEOUT);
249 logger.debug("Connection Output: {}", connectionTimeout);
250 clientConfiguration.setConnectionTimeout(connectionTimeout);
251
252 int maxRetries = OsgiUtil.getOptCfgAsInt(cc.getProperties(), AWS_S3_MAX_RETRIES)
253 .getOrElse(DEFAULT_MAX_RETRIES);
254 logger.debug("Max Retry: {}", maxRetries);
255 clientConfiguration.setMaxErrorRetry(maxRetries);
256
257
258 s3 = AmazonS3ClientBuilder.standard()
259 .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint
260 , regionName))
261 .withClientConfiguration(clientConfiguration)
262 .withPathStyleAccessEnabled(pathStyle)
263 .withCredentials(provider)
264 .build();
265
266 s3TransferManager = TransferManagerBuilder.standard().withS3Client(s3).build();
267
268 logger.info("AwsS3ArchiveAssetStore activated!");
269 }
270
271 }
272
273
274
275
276 void createAWSBucket() {
277 orgBucketNameMap.forEach((org, bucketName) -> {
278
279 try {
280 s3.listObjects(bucketName);
281 } catch (AmazonServiceException e) {
282 if (e.getStatusCode() == 404) {
283
284 try {
285 s3.createBucket(bucketName);
286
287 BucketVersioningConfiguration configuration = new BucketVersioningConfiguration().withStatus("Enabled");
288 SetBucketVersioningConfigurationRequest configRequest = new SetBucketVersioningConfigurationRequest(
289 bucketName, configuration);
290 s3.setBucketVersioningConfiguration(configRequest);
291 logger.info("AWS S3 ARCHIVE bucket {} created and versioning enabled", bucketName);
292 } catch (Exception e2) {
293 throw new IllegalStateException(
294 "ARCHIVE bucket " + bucketName + " cannot be created: " + e2.getMessage(), e2);
295 }
296 } else {
297 throw new IllegalStateException("ARCHIVE bucket " + bucketName + " exists, but we can't access it: "
298 + e.getMessage(), e);
299 }
300 }
301
302 bucketCreated = true;
303 });
304 }
305
306
307
308
309 @Override
310 protected AwsUploadOperationResult uploadObject(String orgId, File origin, String objectName,
311 Optional<MimeType> mimeType) throws AssetStoreException {
312
313 if (!bucketCreated) {
314 createAWSBucket();
315 }
316
317 String bucketName = getBucketName(orgId);
318
319
320
321
322 logger.info("Uploading {} to archive bucket {}...", objectName, bucketName);
323
324 try {
325 Upload upload = s3TransferManager.upload(bucketName, objectName, origin);
326 long start = System.currentTimeMillis();
327
328 upload.waitForCompletion();
329 logger.info("Upload of {} to archive bucket {} completed in {} seconds",
330 new Object[] { objectName, bucketName, (System.currentTimeMillis() - start) / 1000 });
331 ObjectMetadata objMetadata = s3.getObjectMetadata(bucketName, objectName);
332 logger.trace("Got object metadata for: {}, version is {}", objectName, objMetadata.getVersionId());
333
334
335
336 if (mimeType.isPresent()) {
337 switch (mimeType.get().getType()) {
338 case "audio":
339 case "image":
340 case "video":
341 logger.debug("Tagging S3 object {} as Freezable", objectName);
342 List<Tag> tags = new ArrayList<>();
343 tags.add(freezable);
344 s3.setObjectTagging(new SetObjectTaggingRequest(bucketName, objectName, new ObjectTagging(tags)));
345 break;
346 default:
347 break;
348 }
349 }
350
351
352 String versionId = objMetadata.getVersionId();
353 if (null == versionId) {
354 return new AwsUploadOperationResult(objectName, "-1");
355 }
356 return new AwsUploadOperationResult(objectName, versionId);
357 } catch (InterruptedException e) {
358 throw new AssetStoreException("Operation interrupted", e);
359 } catch (Exception e) {
360 throw new AssetStoreException("Upload failed", e);
361 }
362 }
363
364
365
366
367
368 public String getAssetObjectKey(StoragePath storagePath) throws AssetStoreException {
369 try {
370 AwsAssetMapping map = database.findMapping(storagePath);
371 return map.getObjectKey();
372 } catch (AwsAssetDatabaseException e) {
373 throw new AssetStoreException(e);
374 }
375 }
376
377
378
379
380
381 public String getAssetStorageClass(StoragePath storagePath) throws AssetStoreException {
382 if (!contains(storagePath)) {
383 return "NONE";
384 }
385 return getObjectStorageClass(storagePath.getOrganizationId(), getAssetObjectKey(storagePath));
386 }
387
388 private String getObjectStorageClass(String orgId, String objectName) throws AssetStoreException {
389 try {
390 String bucketName = getBucketName(orgId);
391 String storageClass = s3.getObjectMetadata(bucketName, objectName).getStorageClass();
392 return storageClass == null ? StorageClass.Standard.toString() : storageClass;
393 } catch (SdkClientException e) {
394 throw new AssetStoreException(e);
395 }
396 }
397
398
399
400
401
402
403
404 public String modifyAssetStorageClass(StoragePath storagePath, String storageClassId) throws AssetStoreException {
405 try {
406 StorageClass storageClass = StorageClass.fromValue(storageClassId);
407 AwsAssetMapping map = database.findMapping(storagePath);
408 return modifyObjectStorageClass(map.getOrganizationId(), map.getObjectKey(), storageClass).toString();
409 } catch (AwsAssetDatabaseException | IllegalArgumentException e) {
410 throw new AssetStoreException(e);
411 }
412 }
413
414 private StorageClass modifyObjectStorageClass(String orgId, String objectName, StorageClass storageClass)
415 throws AssetStoreException {
416 try {
417 StorageClass objectStorageClass = StorageClass.fromValue(getObjectStorageClass(orgId, objectName));
418 String bucketName = getBucketName(orgId);
419
420 if (storageClass != objectStorageClass) {
421
422 if (objectStorageClass == StorageClass.Glacier || objectStorageClass == StorageClass.DeepArchive) {
423 boolean isRestoring = isRestoring(orgId, objectName);
424 boolean isRestored = null != s3.getObjectMetadata(bucketName, objectName).getRestoreExpirationTime();
425 if (!isRestoring && !isRestored) {
426 logger.warn("S3 Object {} can not be moved from storage class {} to {} without restoring the object first",
427 objectName, objectStorageClass, storageClass);
428 return objectStorageClass;
429 }
430 }
431
432
433 if (storageClass == StorageClass.Glacier || objectStorageClass == StorageClass.DeepArchive) {
434 GetObjectTaggingRequest gotr = new GetObjectTaggingRequest(bucketName, objectName);
435 GetObjectTaggingResult objectTaggingRequest = s3.getObjectTagging(gotr);
436 if (!objectTaggingRequest.getTagSet().contains(freezable)) {
437 logger.info("S3 object {} is not suitable for storage class {}", objectName, storageClass);
438 return objectStorageClass;
439 }
440 }
441
442 CopyObjectRequest copyRequest = new CopyObjectRequest(bucketName, objectName, bucketName, objectName)
443 .withStorageClass(storageClass);
444 s3.copyObject(copyRequest);
445 logger.info("S3 object {} moved to storage class {}", objectName, storageClass);
446 } else {
447 logger.info("S3 object {} already in storage class {}", objectName, storageClass);
448 }
449
450 return storageClass;
451 } catch (SdkClientException e) {
452 throw new AssetStoreException(e);
453 }
454 }
455
456
457
458
459 @Override
460 protected InputStream getObject(AwsAssetMapping map) {
461 String storageClassId = getObjectStorageClass(map.getOrganizationId(), map.getObjectKey());
462
463 if (StorageClass.Glacier.name().equals(storageClassId) || StorageClass.DeepArchive.name().equals(storageClassId)) {
464
465 restoreGlacierObject(map.getOrganizationId(), map.getObjectKey(), restorePeriod, true);
466 }
467
468 try {
469
470
471 String bucketName = getBucketName(map.getOrganizationId());
472 String objectKey = map.getObjectKey();
473 Date expiration = new Date(System.currentTimeMillis() + DOWNLOAD_URL_EXPIRATION_MS);
474 GeneratePresignedUrlRequest generatePresignedUrlRequest = new GeneratePresignedUrlRequest(bucketName, objectKey)
475 .withMethod(HttpMethod.GET).withExpiration(expiration);
476 URL signedUrl = s3.generatePresignedUrl(generatePresignedUrlRequest);
477 logger.debug("Returning pre-signed URL stream for '{}': {}", map, signedUrl);
478 return signedUrl.openStream();
479 } catch (IOException e) {
480 throw new AssetStoreException(e);
481 }
482 }
483
484 public String getAssetRestoreStatusString(StoragePath storagePath) {
485 try {
486 AwsAssetMapping map = database.findMapping(storagePath);
487 String bucketName = getBucketName(map.getOrganizationId());
488
489 Date expirationTime = s3.getObjectMetadata(bucketName, map.getObjectKey()).getRestoreExpirationTime();
490 if (expirationTime != null) {
491 return format("RESTORED, expires in %s", expirationTime.toString());
492 }
493
494 Boolean prevOngoingRestore = s3.getObjectMetadata(bucketName, map.getObjectKey()).getOngoingRestore();
495 if (prevOngoingRestore != null && prevOngoingRestore) {
496 return "RESTORING";
497 }
498
499 return "NONE";
500 } catch (AwsAssetDatabaseException | IllegalArgumentException e) {
501 throw new AssetStoreException(e);
502 }
503 }
504
505
506
507
508
509
510
511 public void initiateRestoreAsset(StoragePath storagePath, Integer assetRestorePeriod) throws AssetStoreException {
512 try {
513 AwsAssetMapping map = database.findMapping(storagePath);
514 restoreGlacierObject(map.getOrganizationId(), map.getObjectKey(), assetRestorePeriod, false);
515 } catch (AwsAssetDatabaseException | IllegalArgumentException e) {
516 throw new AssetStoreException(e);
517 }
518 }
519
520 private boolean isRestoring(String orgId, String objectName) {
521 String bucketName = getBucketName(orgId);
522 Boolean prevOngoingRestore = s3.getObjectMetadata(bucketName, objectName).getOngoingRestore();
523
524
525
526 if (prevOngoingRestore != null && prevOngoingRestore) {
527 logger.info("Object {} is already being restored", objectName);
528 return true;
529 }
530 logger.info("Object {} is not currently being restored", objectName);
531 return false;
532 }
533
534 private void restoreGlacierObject(String orgId, String objectName, Integer objectRestorePeriod, Boolean wait) {
535 String bucketName = getBucketName(orgId);
536 boolean newRestore = false;
537 if (isRestoring(orgId, objectName)) {
538 if (!wait) {
539 return;
540 }
541 logger.info("Waiting for object {}", objectName);
542 } else {
543 RestoreObjectRequest requestRestore = new RestoreObjectRequest(bucketName, objectName, objectRestorePeriod);
544 s3.restoreObjectV2(requestRestore);
545 newRestore = true;
546 }
547
548
549
550 if (s3.getObjectMetadata(bucketName, objectName).getRestoreExpirationTime() == null) {
551 logger.info("Restoring object {} from Glacier class storage", objectName);
552
553
554 if (!wait) {
555 return;
556 }
557
558
559
560 try {
561 if (newRestore) {
562 Thread.sleep(RESTORE_MIN_WAIT);
563 }
564
565 while (s3.getObjectMetadata(bucketName, objectName).getOngoingRestore()) {
566 Thread.sleep(RESTORE_POLL);
567 }
568
569 logger.info("Object {} has been restored from Glacier class storage, for {} days", objectName,
570 objectRestorePeriod);
571 } catch (InterruptedException e) {
572 logger.error("Object {} has not yet been restored from Glacier class storage", objectName);
573 }
574 } else {
575 logger.info("Object {} has already been restored, further extended by {} days", objectName, objectRestorePeriod);
576 }
577 }
578
579 private String getBucketName(String orgId) {
580 String bucketName = orgBucketNameMap.get(orgId);
581 if (bucketName == null) {
582
583 bucketName = orgBucketNameMap.get(DEFAULT_ORG_KEY);
584 if (bucketName == null) {
585 throw new ConfigurationException("No bucket configured for organization " + orgId);
586 }
587 }
588 return bucketName;
589 }
590
591
592
593
594 @Override
595 protected void deleteObject(AwsAssetMapping map) {
596 String bucketName = getBucketName(map.getOrganizationId());
597 s3.deleteObject(bucketName, map.getObjectKey());
598 }
599
600 public Integer getRestorePeriod() {
601 return restorePeriod;
602 }
603
604
605 void setS3(AmazonS3 s3) {
606 this.s3 = s3;
607 }
608
609 void setS3TransferManager(TransferManager s3TransferManager) {
610 this.s3TransferManager = s3TransferManager;
611 }
612
613 void setBucketName(String orgId, String bucketName) {
614 orgBucketNameMap.put(orgId, bucketName);
615 }
616 }