1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.assetmanager.aws.s3;
23
24 import static java.lang.String.format;
25
26 import org.opencastproject.assetmanager.api.storage.AssetStore;
27 import org.opencastproject.assetmanager.api.storage.AssetStoreException;
28 import org.opencastproject.assetmanager.api.storage.RemoteAssetStore;
29 import org.opencastproject.assetmanager.api.storage.StoragePath;
30 import org.opencastproject.assetmanager.aws.AwsAbstractArchive;
31 import org.opencastproject.assetmanager.aws.AwsUploadOperationResult;
32 import org.opencastproject.assetmanager.aws.persistence.AwsAssetDatabase;
33 import org.opencastproject.assetmanager.aws.persistence.AwsAssetDatabaseException;
34 import org.opencastproject.assetmanager.aws.persistence.AwsAssetMapping;
35 import org.opencastproject.util.ConfigurationException;
36 import org.opencastproject.util.MimeType;
37 import org.opencastproject.util.OsgiUtil;
38 import org.opencastproject.workspace.api.Workspace;
39
40 import com.amazonaws.AmazonServiceException;
41 import com.amazonaws.ClientConfiguration;
42 import com.amazonaws.HttpMethod;
43 import com.amazonaws.SdkClientException;
44 import com.amazonaws.auth.AWSCredentialsProvider;
45 import com.amazonaws.auth.AWSStaticCredentialsProvider;
46 import com.amazonaws.auth.BasicAWSCredentials;
47 import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
48 import com.amazonaws.client.builder.AwsClientBuilder;
49 import com.amazonaws.services.s3.AmazonS3;
50 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
51 import com.amazonaws.services.s3.model.BucketVersioningConfiguration;
52 import com.amazonaws.services.s3.model.CopyObjectRequest;
53 import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest;
54 import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
55 import com.amazonaws.services.s3.model.GetObjectTaggingResult;
56 import com.amazonaws.services.s3.model.ObjectMetadata;
57 import com.amazonaws.services.s3.model.ObjectTagging;
58 import com.amazonaws.services.s3.model.RestoreObjectRequest;
59 import com.amazonaws.services.s3.model.SetBucketVersioningConfigurationRequest;
60 import com.amazonaws.services.s3.model.SetObjectTaggingRequest;
61 import com.amazonaws.services.s3.model.StorageClass;
62 import com.amazonaws.services.s3.model.Tag;
63 import com.amazonaws.services.s3.transfer.TransferManager;
64 import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
65 import com.amazonaws.services.s3.transfer.Upload;
66
67 import org.apache.commons.lang3.BooleanUtils;
68 import org.apache.commons.lang3.StringUtils;
69 import org.osgi.service.component.ComponentContext;
70 import org.osgi.service.component.annotations.Activate;
71 import org.osgi.service.component.annotations.Component;
72 import org.osgi.service.component.annotations.Reference;
73 import org.slf4j.Logger;
74 import org.slf4j.LoggerFactory;
75
76 import java.io.File;
77 import java.io.IOException;
78 import java.io.InputStream;
79 import java.net.URL;
80 import java.util.ArrayList;
81 import java.util.Collections;
82 import java.util.Date;
83 import java.util.Dictionary;
84 import java.util.HashMap;
85 import java.util.List;
86 import java.util.Map;
87 import java.util.Optional;
88
89 @Component(
90 property = {
91 "service.description=Amazon S3 based asset store",
92 "store.type=aws-s3"
93 },
94 immediate = true,
95 service = { RemoteAssetStore.class, AwsS3AssetStore.class }
96 )
97 public class AwsS3AssetStore extends AwsAbstractArchive implements RemoteAssetStore {
98
99
100 private static final Logger logger = LoggerFactory.getLogger(AwsS3AssetStore.class);
101
102 private static final Tag freezable = new Tag("Freezable", "true");
103 private static final Integer RESTORE_MIN_WAIT = 1080000;
104 private static final Integer RESTORE_POLL = 900000;
105
106
107
108 public static final String AWS_S3_ENABLED = "org.opencastproject.assetmanager.aws.s3.enabled";
109 public static final String AWS_S3_ACCESS_KEY_ID_CONFIG = "org.opencastproject.assetmanager.aws.s3.access.id";
110 public static final String AWS_S3_SECRET_ACCESS_KEY_CONFIG = "org.opencastproject.assetmanager.aws.s3.secret.key";
111 public static final String AWS_S3_REGION_CONFIG = "org.opencastproject.assetmanager.aws.s3.region";
112 public static final String AWS_S3_BUCKET_CONFIG = "org.opencastproject.assetmanager.aws.s3.bucket";
113 public static final String AWS_S3_BUCKET_CONFIG_PREFIX = "org.opencastproject.assetmanager.aws.s3.bucket.";
114 public static final String AWS_S3_ENDPOINT_CONFIG = "org.opencastproject.assetmanager.aws.s3.endpoint";
115 public static final String AWS_S3_PATH_STYLE_CONFIG = "org.opencastproject.assetmanager.aws.s3.path.style";
116 public static final String AWS_S3_MAX_CONNECTIONS = "org.opencastproject.assetmanager.aws.s3.max.connections";
117 public static final String AWS_S3_CONNECTION_TIMEOUT = "org.opencastproject.assetmanager.aws.s3.connection.timeout";
118 public static final String AWS_S3_MAX_RETRIES = "org.opencastproject.assetmanager.aws.s3.max.retries";
119 public static final String AWS_GLACIER_RESTORE_DAYS = "org.opencastproject.assetmanager.aws.s3.glacier.restore.days";
120
121 public static final Integer AWS_S3_GLACIER_RESTORE_DAYS_DEFAULT = 2;
122
123
124 public static final int DEFAULT_MAX_CONNECTIONS = 50;
125 public static final int DEFAULT_CONNECTION_TIMEOUT = 10000;
126 public static final int DEFAULT_MAX_RETRIES = 100;
127
128 public static final long DOWNLOAD_URL_EXPIRATION_MS = 30 * 60 * 1000;
129
130 public static final String DEFAULT_ORG_KEY = "*";
131
132
133 private AmazonS3 s3 = null;
134 private TransferManager s3TransferManager = null;
135
136
137 private Map<String, String> orgBucketNameMap = new HashMap<>();
138
139 private String endpoint = null;
140
141 private boolean pathStyle = false;
142
143
144 private Integer restorePeriod;
145
146 protected boolean bucketCreated = false;
147
148
149 @Override
150 @Reference
151 public void setWorkspace(Workspace workspace) {
152 super.setWorkspace(workspace);
153 }
154
155
156 @Override
157 @Reference
158 public void setDatabase(AwsAssetDatabase db) {
159 super.setDatabase(db);
160 }
161
162
163
164
165
166
167
168 @Activate
169 public void activate(final ComponentContext cc) throws IllegalStateException, ConfigurationException {
170
171 if (cc != null) {
172 @SuppressWarnings("rawtypes")
173 Dictionary properties = cc.getProperties();
174
175 boolean enabled = Boolean.parseBoolean(StringUtils.trimToEmpty((String) properties.get(AWS_S3_ENABLED)));
176 if (!enabled) {
177 logger.info("AWS S3 asset store is disabled");
178 return;
179 }
180
181
182 storeType = StringUtils.trimToEmpty((String) properties.get(AssetStore.STORE_TYPE_PROPERTY));
183 if (StringUtils.isEmpty(storeType)) {
184 throw new ConfigurationException("Invalid store type value");
185 }
186 logger.info("{} is: {}", AssetStore.STORE_TYPE_PROPERTY, storeType);
187
188
189 Optional<String> defaultBucketNameOpt = OsgiUtil.getOptCfg(cc.getProperties(), AWS_S3_BUCKET_CONFIG);
190 if (defaultBucketNameOpt.isPresent()) {
191 orgBucketNameMap.put(DEFAULT_ORG_KEY, defaultBucketNameOpt.get());
192 logger.info("AWS S3 default bucket name is {}", defaultBucketNameOpt.get());
193 }
194
195
196 Collections.list(cc.getProperties().keys()).stream()
197 .filter(s -> s.startsWith(AWS_S3_BUCKET_CONFIG_PREFIX))
198 .forEach(s -> {
199 String orgId = s.substring(AWS_S3_BUCKET_CONFIG_PREFIX.length());
200 String bucketName = OsgiUtil.getComponentContextProperty(cc, s);
201 orgBucketNameMap.put(orgId, bucketName);
202 });
203
204 if (orgBucketNameMap.isEmpty()) {
205 throw new ConfigurationException("AWS S3 asset store is enabled, but no buckets are configured");
206 }
207
208
209 regionName = getAWSConfigKey(cc, AWS_S3_REGION_CONFIG);
210 logger.info("AWS region is {}", regionName);
211
212 endpoint = OsgiUtil.getComponentContextProperty(
213 cc, AWS_S3_ENDPOINT_CONFIG, "s3." + regionName + ".amazonaws.com");
214 logger.info("AWS endpoint is {}", endpoint);
215
216 pathStyle = BooleanUtils.toBoolean(OsgiUtil.getComponentContextProperty(cc, AWS_S3_PATH_STYLE_CONFIG, "false"));
217 logger.info("AWS path style is {}", pathStyle);
218
219
220 restorePeriod = OsgiUtil.getOptCfgAsInt(cc.getProperties(), AWS_GLACIER_RESTORE_DAYS)
221 .orElse(AWS_S3_GLACIER_RESTORE_DAYS_DEFAULT);
222
223
224 AWSCredentialsProvider provider = null;
225 Optional<String> accessKeyIdOpt = OsgiUtil.getOptCfg(cc.getProperties(), AWS_S3_ACCESS_KEY_ID_CONFIG);
226 Optional<String> accessKeySecretOpt = OsgiUtil.getOptCfg(cc.getProperties(), AWS_S3_SECRET_ACCESS_KEY_CONFIG);
227
228
229
230
231 if (accessKeyIdOpt.isEmpty() && accessKeySecretOpt.isEmpty()) {
232 provider = new DefaultAWSCredentialsProviderChain();
233 } else {
234 provider = new AWSStaticCredentialsProvider(
235 new BasicAWSCredentials(accessKeyIdOpt.get(), accessKeySecretOpt.get()));
236 }
237
238
239 ClientConfiguration clientConfiguration = new ClientConfiguration();
240
241 int maxConnections = OsgiUtil.getOptCfgAsInt(cc.getProperties(), AWS_S3_MAX_CONNECTIONS)
242 .orElse(DEFAULT_MAX_CONNECTIONS);
243 logger.debug("Max Connections: {}", maxConnections);
244 clientConfiguration.setMaxConnections(maxConnections);
245
246 int connectionTimeout = OsgiUtil.getOptCfgAsInt(cc.getProperties(), AWS_S3_CONNECTION_TIMEOUT)
247 .orElse(DEFAULT_CONNECTION_TIMEOUT);
248 logger.debug("Connection Output: {}", connectionTimeout);
249 clientConfiguration.setConnectionTimeout(connectionTimeout);
250
251 int maxRetries = OsgiUtil.getOptCfgAsInt(cc.getProperties(), AWS_S3_MAX_RETRIES)
252 .orElse(DEFAULT_MAX_RETRIES);
253 logger.debug("Max Retry: {}", maxRetries);
254 clientConfiguration.setMaxErrorRetry(maxRetries);
255
256
257 s3 = AmazonS3ClientBuilder.standard()
258 .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint
259 , regionName))
260 .withClientConfiguration(clientConfiguration)
261 .withPathStyleAccessEnabled(pathStyle)
262 .withCredentials(provider)
263 .build();
264
265 s3TransferManager = TransferManagerBuilder.standard().withS3Client(s3).build();
266
267 logger.info("AwsS3ArchiveAssetStore activated!");
268 }
269
270 }
271
272
273
274
275 void createAWSBucket() {
276 orgBucketNameMap.forEach((org, bucketName) -> {
277
278 try {
279 s3.listObjects(bucketName);
280 } catch (AmazonServiceException e) {
281 if (e.getStatusCode() == 404) {
282
283 try {
284 s3.createBucket(bucketName);
285
286 BucketVersioningConfiguration configuration = new BucketVersioningConfiguration().withStatus("Enabled");
287 SetBucketVersioningConfigurationRequest configRequest = new SetBucketVersioningConfigurationRequest(
288 bucketName, configuration);
289 s3.setBucketVersioningConfiguration(configRequest);
290 logger.info("AWS S3 ARCHIVE bucket {} created and versioning enabled", bucketName);
291 } catch (Exception e2) {
292 throw new IllegalStateException(
293 "ARCHIVE bucket " + bucketName + " cannot be created: " + e2.getMessage(), e2);
294 }
295 } else {
296 throw new IllegalStateException("ARCHIVE bucket " + bucketName + " exists, but we can't access it: "
297 + e.getMessage(), e);
298 }
299 }
300
301 bucketCreated = true;
302 });
303 }
304
305
306
307
308 @Override
309 protected AwsUploadOperationResult uploadObject(String orgId, File origin, String objectName,
310 Optional<MimeType> mimeType) throws AssetStoreException {
311
312 if (!bucketCreated) {
313 createAWSBucket();
314 }
315
316 String bucketName = getBucketName(orgId);
317
318
319
320
321 logger.info("Uploading {} to archive bucket {}...", objectName, bucketName);
322
323 try {
324 Upload upload = s3TransferManager.upload(bucketName, objectName, origin);
325 long start = System.currentTimeMillis();
326
327 upload.waitForCompletion();
328 logger.info("Upload of {} to archive bucket {} completed in {} seconds",
329 new Object[] { objectName, bucketName, (System.currentTimeMillis() - start) / 1000 });
330 ObjectMetadata objMetadata = s3.getObjectMetadata(bucketName, objectName);
331 logger.trace("Got object metadata for: {}, version is {}", objectName, objMetadata.getVersionId());
332
333
334
335 if (mimeType.isPresent()) {
336 switch (mimeType.get().getType()) {
337 case "audio":
338 case "image":
339 case "video":
340 logger.debug("Tagging S3 object {} as Freezable", objectName);
341 List<Tag> tags = new ArrayList<>();
342 tags.add(freezable);
343 s3.setObjectTagging(new SetObjectTaggingRequest(bucketName, objectName, new ObjectTagging(tags)));
344 break;
345 default:
346 break;
347 }
348 }
349
350
351 String versionId = objMetadata.getVersionId();
352 if (null == versionId) {
353 return new AwsUploadOperationResult(objectName, "-1");
354 }
355 return new AwsUploadOperationResult(objectName, versionId);
356 } catch (InterruptedException e) {
357 throw new AssetStoreException("Operation interrupted", e);
358 } catch (Exception e) {
359 throw new AssetStoreException("Upload failed", e);
360 }
361 }
362
363
364
365
366
367 public String getAssetObjectKey(StoragePath storagePath) throws AssetStoreException {
368 try {
369 AwsAssetMapping map = database.findMapping(storagePath);
370 return map.getObjectKey();
371 } catch (AwsAssetDatabaseException e) {
372 throw new AssetStoreException(e);
373 }
374 }
375
376
377
378
379
380 public String getAssetStorageClass(StoragePath storagePath) throws AssetStoreException {
381 if (!contains(storagePath)) {
382 return "NONE";
383 }
384 return getObjectStorageClass(storagePath.getOrganizationId(), getAssetObjectKey(storagePath));
385 }
386
387 private String getObjectStorageClass(String orgId, String objectName) throws AssetStoreException {
388 try {
389 String bucketName = getBucketName(orgId);
390 String storageClass = s3.getObjectMetadata(bucketName, objectName).getStorageClass();
391 return storageClass == null ? StorageClass.Standard.toString() : storageClass;
392 } catch (SdkClientException e) {
393 throw new AssetStoreException(e);
394 }
395 }
396
397
398
399
400
401
402
403 public String modifyAssetStorageClass(StoragePath storagePath, String storageClassId) throws AssetStoreException {
404 try {
405 StorageClass storageClass = StorageClass.fromValue(storageClassId);
406 AwsAssetMapping map = database.findMapping(storagePath);
407 return modifyObjectStorageClass(map.getOrganizationId(), map.getObjectKey(), storageClass).toString();
408 } catch (AwsAssetDatabaseException | IllegalArgumentException e) {
409 throw new AssetStoreException(e);
410 }
411 }
412
413 private StorageClass modifyObjectStorageClass(String orgId, String objectName, StorageClass storageClass)
414 throws AssetStoreException {
415 try {
416 StorageClass objectStorageClass = StorageClass.fromValue(getObjectStorageClass(orgId, objectName));
417 String bucketName = getBucketName(orgId);
418
419 if (storageClass != objectStorageClass) {
420
421 if (objectStorageClass == StorageClass.Glacier || objectStorageClass == StorageClass.DeepArchive) {
422 boolean isRestoring = isRestoring(orgId, objectName);
423 boolean isRestored = null != s3.getObjectMetadata(bucketName, objectName).getRestoreExpirationTime();
424 if (!isRestoring && !isRestored) {
425 logger.warn("S3 Object {} can not be moved from storage class {} to {} without restoring the object first",
426 objectName, objectStorageClass, storageClass);
427 return objectStorageClass;
428 }
429 }
430
431
432 if (storageClass == StorageClass.Glacier || objectStorageClass == StorageClass.DeepArchive) {
433 GetObjectTaggingRequest gotr = new GetObjectTaggingRequest(bucketName, objectName);
434 GetObjectTaggingResult objectTaggingRequest = s3.getObjectTagging(gotr);
435 if (!objectTaggingRequest.getTagSet().contains(freezable)) {
436 logger.info("S3 object {} is not suitable for storage class {}", objectName, storageClass);
437 return objectStorageClass;
438 }
439 }
440
441 CopyObjectRequest copyRequest = new CopyObjectRequest(bucketName, objectName, bucketName, objectName)
442 .withStorageClass(storageClass);
443 s3.copyObject(copyRequest);
444 logger.info("S3 object {} moved to storage class {}", objectName, storageClass);
445 } else {
446 logger.info("S3 object {} already in storage class {}", objectName, storageClass);
447 }
448
449 return storageClass;
450 } catch (SdkClientException e) {
451 throw new AssetStoreException(e);
452 }
453 }
454
455
456
457
458 @Override
459 protected InputStream getObject(AwsAssetMapping map) {
460 String storageClassId = getObjectStorageClass(map.getOrganizationId(), map.getObjectKey());
461
462 if (StorageClass.Glacier.name().equals(storageClassId) || StorageClass.DeepArchive.name().equals(storageClassId)) {
463
464 restoreGlacierObject(map.getOrganizationId(), map.getObjectKey(), restorePeriod, true);
465 }
466
467 try {
468
469
470 String bucketName = getBucketName(map.getOrganizationId());
471 String objectKey = map.getObjectKey();
472 Date expiration = new Date(System.currentTimeMillis() + DOWNLOAD_URL_EXPIRATION_MS);
473 GeneratePresignedUrlRequest generatePresignedUrlRequest = new GeneratePresignedUrlRequest(bucketName, objectKey)
474 .withMethod(HttpMethod.GET).withExpiration(expiration);
475 URL signedUrl = s3.generatePresignedUrl(generatePresignedUrlRequest);
476 logger.debug("Returning pre-signed URL stream for '{}': {}", map, signedUrl);
477 return signedUrl.openStream();
478 } catch (IOException e) {
479 throw new AssetStoreException(e);
480 }
481 }
482
483 public String getAssetRestoreStatusString(StoragePath storagePath) {
484 try {
485 AwsAssetMapping map = database.findMapping(storagePath);
486 String bucketName = getBucketName(map.getOrganizationId());
487
488 Date expirationTime = s3.getObjectMetadata(bucketName, map.getObjectKey()).getRestoreExpirationTime();
489 if (expirationTime != null) {
490 return format("RESTORED, expires in %s", expirationTime.toString());
491 }
492
493 Boolean prevOngoingRestore = s3.getObjectMetadata(bucketName, map.getObjectKey()).getOngoingRestore();
494 if (prevOngoingRestore != null && prevOngoingRestore) {
495 return "RESTORING";
496 }
497
498 return "NONE";
499 } catch (AwsAssetDatabaseException | IllegalArgumentException e) {
500 throw new AssetStoreException(e);
501 }
502 }
503
504
505
506
507
508
509
510 public void initiateRestoreAsset(StoragePath storagePath, Integer assetRestorePeriod) throws AssetStoreException {
511 try {
512 AwsAssetMapping map = database.findMapping(storagePath);
513 restoreGlacierObject(map.getOrganizationId(), map.getObjectKey(), assetRestorePeriod, false);
514 } catch (AwsAssetDatabaseException | IllegalArgumentException e) {
515 throw new AssetStoreException(e);
516 }
517 }
518
519 private boolean isRestoring(String orgId, String objectName) {
520 String bucketName = getBucketName(orgId);
521 Boolean prevOngoingRestore = s3.getObjectMetadata(bucketName, objectName).getOngoingRestore();
522
523
524
525 if (prevOngoingRestore != null && prevOngoingRestore) {
526 logger.info("Object {} is already being restored", objectName);
527 return true;
528 }
529 logger.info("Object {} is not currently being restored", objectName);
530 return false;
531 }
532
533 private void restoreGlacierObject(String orgId, String objectName, Integer objectRestorePeriod, Boolean wait) {
534 String bucketName = getBucketName(orgId);
535 boolean newRestore = false;
536 if (isRestoring(orgId, objectName)) {
537 if (!wait) {
538 return;
539 }
540 logger.info("Waiting for object {}", objectName);
541 } else {
542 RestoreObjectRequest requestRestore = new RestoreObjectRequest(bucketName, objectName, objectRestorePeriod);
543 s3.restoreObjectV2(requestRestore);
544 newRestore = true;
545 }
546
547
548
549 if (s3.getObjectMetadata(bucketName, objectName).getRestoreExpirationTime() == null) {
550 logger.info("Restoring object {} from Glacier class storage", objectName);
551
552
553 if (!wait) {
554 return;
555 }
556
557
558
559 try {
560 if (newRestore) {
561 Thread.sleep(RESTORE_MIN_WAIT);
562 }
563
564 while (s3.getObjectMetadata(bucketName, objectName).getOngoingRestore()) {
565 Thread.sleep(RESTORE_POLL);
566 }
567
568 logger.info("Object {} has been restored from Glacier class storage, for {} days", objectName,
569 objectRestorePeriod);
570 } catch (InterruptedException e) {
571 logger.error("Object {} has not yet been restored from Glacier class storage", objectName);
572 }
573 } else {
574 logger.info("Object {} has already been restored, further extended by {} days", objectName, objectRestorePeriod);
575 }
576 }
577
578 private String getBucketName(String orgId) {
579 String bucketName = orgBucketNameMap.get(orgId);
580 if (bucketName == null) {
581
582 bucketName = orgBucketNameMap.get(DEFAULT_ORG_KEY);
583 if (bucketName == null) {
584 throw new ConfigurationException("No bucket configured for organization " + orgId);
585 }
586 }
587 return bucketName;
588 }
589
590
591
592
593 @Override
594 protected void deleteObject(AwsAssetMapping map) {
595 String bucketName = getBucketName(map.getOrganizationId());
596 s3.deleteObject(bucketName, map.getObjectKey());
597 }
598
599 public Integer getRestorePeriod() {
600 return restorePeriod;
601 }
602
603
604 void setS3(AmazonS3 s3) {
605 this.s3 = s3;
606 }
607
608 void setS3TransferManager(TransferManager s3TransferManager) {
609 this.s3TransferManager = s3TransferManager;
610 }
611
612 void setBucketName(String orgId, String bucketName) {
613 orgBucketNameMap.put(orgId, bucketName);
614 }
615 }