1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.assetmanager.aws.s3;
23
24 import static java.lang.String.format;
25
26 import org.opencastproject.assetmanager.api.storage.AssetStore;
27 import org.opencastproject.assetmanager.api.storage.AssetStoreException;
28 import org.opencastproject.assetmanager.api.storage.RemoteAssetStore;
29 import org.opencastproject.assetmanager.api.storage.StoragePath;
30 import org.opencastproject.assetmanager.aws.AwsAbstractArchive;
31 import org.opencastproject.assetmanager.aws.AwsUploadOperationResult;
32 import org.opencastproject.assetmanager.aws.persistence.AwsAssetDatabase;
33 import org.opencastproject.assetmanager.aws.persistence.AwsAssetDatabaseException;
34 import org.opencastproject.assetmanager.aws.persistence.AwsAssetMapping;
35 import org.opencastproject.util.ConfigurationException;
36 import org.opencastproject.util.MimeType;
37 import org.opencastproject.util.OsgiUtil;
38 import org.opencastproject.util.data.Option;
39 import org.opencastproject.workspace.api.Workspace;
40
41 import com.amazonaws.AmazonServiceException;
42 import com.amazonaws.ClientConfiguration;
43 import com.amazonaws.HttpMethod;
44 import com.amazonaws.SdkClientException;
45 import com.amazonaws.auth.AWSCredentialsProvider;
46 import com.amazonaws.auth.AWSStaticCredentialsProvider;
47 import com.amazonaws.auth.BasicAWSCredentials;
48 import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
49 import com.amazonaws.client.builder.AwsClientBuilder;
50 import com.amazonaws.services.s3.AmazonS3;
51 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
52 import com.amazonaws.services.s3.model.BucketVersioningConfiguration;
53 import com.amazonaws.services.s3.model.CopyObjectRequest;
54 import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest;
55 import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
56 import com.amazonaws.services.s3.model.GetObjectTaggingResult;
57 import com.amazonaws.services.s3.model.ObjectMetadata;
58 import com.amazonaws.services.s3.model.ObjectTagging;
59 import com.amazonaws.services.s3.model.RestoreObjectRequest;
60 import com.amazonaws.services.s3.model.SetBucketVersioningConfigurationRequest;
61 import com.amazonaws.services.s3.model.SetObjectTaggingRequest;
62 import com.amazonaws.services.s3.model.StorageClass;
63 import com.amazonaws.services.s3.model.Tag;
64 import com.amazonaws.services.s3.transfer.TransferManager;
65 import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
66 import com.amazonaws.services.s3.transfer.Upload;
67
68 import org.apache.commons.lang3.BooleanUtils;
69 import org.apache.commons.lang3.StringUtils;
70 import org.osgi.service.component.ComponentContext;
71 import org.osgi.service.component.annotations.Activate;
72 import org.osgi.service.component.annotations.Component;
73 import org.osgi.service.component.annotations.Reference;
74 import org.slf4j.Logger;
75 import org.slf4j.LoggerFactory;
76
77 import java.io.File;
78 import java.io.IOException;
79 import java.io.InputStream;
80 import java.net.URL;
81 import java.util.ArrayList;
82 import java.util.Date;
83 import java.util.Dictionary;
84 import java.util.List;
85 import java.util.Optional;
86
87 @Component(
88 property = {
89 "service.description=Amazon S3 based asset store",
90 "store.type=aws-s3"
91 },
92 immediate = true,
93 service = { RemoteAssetStore.class, AwsS3AssetStore.class }
94 )
95 public class AwsS3AssetStore extends AwsAbstractArchive implements RemoteAssetStore {
96
97
98 private static final Logger logger = LoggerFactory.getLogger(AwsS3AssetStore.class);
99
100 private static final Tag freezable = new Tag("Freezable", "true");
101 private static final Integer RESTORE_MIN_WAIT = 1080000;
102 private static final Integer RESTORE_POLL = 900000;
103
104
105
106 public static final String AWS_S3_ENABLED = "org.opencastproject.assetmanager.aws.s3.enabled";
107 public static final String AWS_S3_ACCESS_KEY_ID_CONFIG = "org.opencastproject.assetmanager.aws.s3.access.id";
108 public static final String AWS_S3_SECRET_ACCESS_KEY_CONFIG = "org.opencastproject.assetmanager.aws.s3.secret.key";
109 public static final String AWS_S3_REGION_CONFIG = "org.opencastproject.assetmanager.aws.s3.region";
110 public static final String AWS_S3_BUCKET_CONFIG = "org.opencastproject.assetmanager.aws.s3.bucket";
111 public static final String AWS_S3_ENDPOINT_CONFIG = "org.opencastproject.assetmanager.aws.s3.endpoint";
112 public static final String AWS_S3_PATH_STYLE_CONFIG = "org.opencastproject.assetmanager.aws.s3.path.style";
113 public static final String AWS_S3_MAX_CONNECTIONS = "org.opencastproject.assetmanager.aws.s3.max.connections";
114 public static final String AWS_S3_CONNECTION_TIMEOUT = "org.opencastproject.assetmanager.aws.s3.connection.timeout";
115 public static final String AWS_S3_MAX_RETRIES = "org.opencastproject.assetmanager.aws.s3.max.retries";
116 public static final String AWS_GLACIER_RESTORE_DAYS = "org.opencastproject.assetmanager.aws.s3.glacier.restore.days";
117
118 public static final Integer AWS_S3_GLACIER_RESTORE_DAYS_DEFAULT = 2;
119
120
121 public static final int DEFAULT_MAX_CONNECTIONS = 50;
122 public static final int DEFAULT_CONNECTION_TIMEOUT = 10000;
123 public static final int DEFAULT_MAX_RETRIES = 100;
124
125 public static final long DOWNLOAD_URL_EXPIRATION_MS = 30 * 60 * 1000;
126
127
128 private AmazonS3 s3 = null;
129 private TransferManager s3TransferManager = null;
130
131
132 private String bucketName = null;
133
134 private String endpoint = null;
135
136 private boolean pathStyle = false;
137
138
139 private Integer restorePeriod;
140
141 protected boolean bucketCreated = false;
142
143
144 @Override
145 @Reference
146 public void setWorkspace(Workspace workspace) {
147 super.setWorkspace(workspace);
148 }
149
150
151 @Override
152 @Reference
153 public void setDatabase(AwsAssetDatabase db) {
154 super.setDatabase(db);
155 }
156
157
158
159
160
161
162
163 @Activate
164 public void activate(final ComponentContext cc) throws IllegalStateException, ConfigurationException {
165
166 if (cc != null) {
167 @SuppressWarnings("rawtypes")
168 Dictionary properties = cc.getProperties();
169
170 boolean enabled = Boolean.parseBoolean(StringUtils.trimToEmpty((String) properties.get(AWS_S3_ENABLED)));
171 if (!enabled) {
172 logger.info("AWS S3 asset store is disabled");
173 return;
174 }
175
176
177 storeType = StringUtils.trimToEmpty((String) properties.get(AssetStore.STORE_TYPE_PROPERTY));
178 if (StringUtils.isEmpty(storeType)) {
179 throw new ConfigurationException("Invalid store type value");
180 }
181 logger.info("{} is: {}", AssetStore.STORE_TYPE_PROPERTY, storeType);
182
183
184 bucketName = getAWSConfigKey(cc, AWS_S3_BUCKET_CONFIG);
185 logger.info("AWS S3 bucket name is {}", bucketName);
186
187
188 regionName = getAWSConfigKey(cc, AWS_S3_REGION_CONFIG);
189 logger.info("AWS region is {}", regionName);
190
191 endpoint = OsgiUtil.getComponentContextProperty(
192 cc, AWS_S3_ENDPOINT_CONFIG, "s3." + regionName + ".amazonaws.com");
193 logger.info("AWS endpoint is {}", endpoint);
194
195 pathStyle = BooleanUtils.toBoolean(OsgiUtil.getComponentContextProperty(cc, AWS_S3_PATH_STYLE_CONFIG, "false"));
196 logger.info("AWS path style is {}", pathStyle);
197
198
199 restorePeriod = OsgiUtil.getOptCfgAsInt(cc.getProperties(), AWS_GLACIER_RESTORE_DAYS)
200 .getOrElse(AWS_S3_GLACIER_RESTORE_DAYS_DEFAULT);
201
202
203 AWSCredentialsProvider provider = null;
204 Option<String> accessKeyIdOpt = OsgiUtil.getOptCfg(cc.getProperties(), AWS_S3_ACCESS_KEY_ID_CONFIG);
205 Option<String> accessKeySecretOpt = OsgiUtil.getOptCfg(cc.getProperties(), AWS_S3_SECRET_ACCESS_KEY_CONFIG);
206
207
208
209
210 if (accessKeyIdOpt.isNone() && accessKeySecretOpt.isNone()) {
211 provider = new DefaultAWSCredentialsProviderChain();
212 } else {
213 provider = new AWSStaticCredentialsProvider(
214 new BasicAWSCredentials(accessKeyIdOpt.get(), accessKeySecretOpt.get()));
215 }
216
217
218 ClientConfiguration clientConfiguration = new ClientConfiguration();
219
220 int maxConnections = OsgiUtil.getOptCfgAsInt(cc.getProperties(), AWS_S3_MAX_CONNECTIONS)
221 .getOrElse(DEFAULT_MAX_CONNECTIONS);
222 logger.debug("Max Connections: {}", maxConnections);
223 clientConfiguration.setMaxConnections(maxConnections);
224
225 int connectionTimeout = OsgiUtil.getOptCfgAsInt(cc.getProperties(), AWS_S3_CONNECTION_TIMEOUT)
226 .getOrElse(DEFAULT_CONNECTION_TIMEOUT);
227 logger.debug("Connection Output: {}", connectionTimeout);
228 clientConfiguration.setConnectionTimeout(connectionTimeout);
229
230 int maxRetries = OsgiUtil.getOptCfgAsInt(cc.getProperties(), AWS_S3_MAX_RETRIES)
231 .getOrElse(DEFAULT_MAX_RETRIES);
232 logger.debug("Max Retry: {}", maxRetries);
233 clientConfiguration.setMaxErrorRetry(maxRetries);
234
235
236 s3 = AmazonS3ClientBuilder.standard()
237 .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint
238 , regionName))
239 .withClientConfiguration(clientConfiguration)
240 .withPathStyleAccessEnabled(pathStyle)
241 .withCredentials(provider)
242 .build();
243
244 s3TransferManager = TransferManagerBuilder.standard().withS3Client(s3).build();
245
246 logger.info("AwsS3ArchiveAssetStore activated!");
247 }
248
249 }
250
251
252
253
254 void createAWSBucket() {
255
256 try {
257 s3.listObjects(bucketName);
258 } catch (AmazonServiceException e) {
259 if (e.getStatusCode() == 404) {
260
261 try {
262 s3.createBucket(bucketName);
263
264 BucketVersioningConfiguration configuration = new BucketVersioningConfiguration().withStatus("Enabled");
265 SetBucketVersioningConfigurationRequest configRequest = new SetBucketVersioningConfigurationRequest(
266 bucketName, configuration);
267 s3.setBucketVersioningConfiguration(configRequest);
268 logger.info("AWS S3 ARCHIVE bucket {} created and versioning enabled", bucketName);
269 } catch (Exception e2) {
270 throw new IllegalStateException(
271 "ARCHIVE bucket " + bucketName + " cannot be created: " + e2.getMessage(), e2);
272 }
273 } else {
274 throw new IllegalStateException("ARCHIVE bucket " + bucketName + " exists, but we can't access it: "
275 + e.getMessage(), e);
276 }
277 }
278
279 bucketCreated = true;
280 }
281
282
283
284
285 @Override
286 protected AwsUploadOperationResult uploadObject(File origin, String objectName, Optional<MimeType> mimeType)
287 throws AssetStoreException {
288
289 if (!bucketCreated) {
290 createAWSBucket();
291 }
292
293
294
295
296 logger.info("Uploading {} to archive bucket {}...", objectName, bucketName);
297
298 try {
299 Upload upload = s3TransferManager.upload(bucketName, objectName, origin);
300 long start = System.currentTimeMillis();
301
302 upload.waitForCompletion();
303 logger.info("Upload of {} to archive bucket {} completed in {} seconds",
304 new Object[] { objectName, bucketName, (System.currentTimeMillis() - start) / 1000 });
305 ObjectMetadata objMetadata = s3.getObjectMetadata(bucketName, objectName);
306 logger.trace("Got object metadata for: {}, version is {}", objectName, objMetadata.getVersionId());
307
308
309
310 if (mimeType.isPresent()) {
311 switch (mimeType.get().getType()) {
312 case "audio":
313 case "image":
314 case "video":
315 logger.debug("Tagging S3 object {} as Freezable", objectName);
316 List<Tag> tags = new ArrayList<>();
317 tags.add(freezable);
318 s3.setObjectTagging(new SetObjectTaggingRequest(bucketName, objectName, new ObjectTagging(tags)));
319 break;
320 default:
321 break;
322 }
323 }
324
325
326 String versionId = objMetadata.getVersionId();
327 if (null == versionId) {
328 return new AwsUploadOperationResult(objectName, "-1");
329 }
330 return new AwsUploadOperationResult(objectName, versionId);
331 } catch (InterruptedException e) {
332 throw new AssetStoreException("Operation interrupted", e);
333 } catch (Exception e) {
334 throw new AssetStoreException("Upload failed", e);
335 }
336 }
337
338
339
340
341
342 public String getAssetObjectKey(StoragePath storagePath) throws AssetStoreException {
343 try {
344 AwsAssetMapping map = database.findMapping(storagePath);
345 return map.getObjectKey();
346 } catch (AwsAssetDatabaseException e) {
347 throw new AssetStoreException(e);
348 }
349 }
350
351
352
353
354
355 public String getAssetStorageClass(StoragePath storagePath) throws AssetStoreException {
356 if (!contains(storagePath)) {
357 return "NONE";
358 }
359 return getObjectStorageClass(getAssetObjectKey(storagePath));
360 }
361
362 private String getObjectStorageClass(String objectName) throws AssetStoreException {
363 try {
364 String storageClass = s3.getObjectMetadata(bucketName, objectName).getStorageClass();
365 return storageClass == null ? StorageClass.Standard.toString() : storageClass;
366 } catch (SdkClientException e) {
367 throw new AssetStoreException(e);
368 }
369 }
370
371
372
373
374
375
376
377 public String modifyAssetStorageClass(StoragePath storagePath, String storageClassId) throws AssetStoreException {
378 try {
379 StorageClass storageClass = StorageClass.fromValue(storageClassId);
380 AwsAssetMapping map = database.findMapping(storagePath);
381 return modifyObjectStorageClass(map.getObjectKey(), storageClass).toString();
382 } catch (AwsAssetDatabaseException | IllegalArgumentException e) {
383 throw new AssetStoreException(e);
384 }
385 }
386
387 private StorageClass modifyObjectStorageClass(String objectName, StorageClass storageClass)
388 throws AssetStoreException {
389 try {
390 StorageClass objectStorageClass = StorageClass.fromValue(getObjectStorageClass(objectName));
391
392 if (storageClass != objectStorageClass) {
393
394 if (objectStorageClass == StorageClass.Glacier || objectStorageClass == StorageClass.DeepArchive) {
395 boolean isRestoring = isRestoring(objectName);
396 boolean isRestored = null != s3.getObjectMetadata(bucketName, objectName).getRestoreExpirationTime();
397 if (!isRestoring && !isRestored) {
398 logger.warn("S3 Object {} can not be moved from storage class {} to {} without restoring the object first",
399 objectName, objectStorageClass, storageClass);
400 return objectStorageClass;
401 }
402 }
403
404
405 if (storageClass == StorageClass.Glacier || objectStorageClass == StorageClass.DeepArchive) {
406 GetObjectTaggingRequest gotr = new GetObjectTaggingRequest(bucketName, objectName);
407 GetObjectTaggingResult objectTaggingRequest = s3.getObjectTagging(gotr);
408 if (!objectTaggingRequest.getTagSet().contains(freezable)) {
409 logger.info("S3 object {} is not suitable for storage class {}", objectName, storageClass);
410 return objectStorageClass;
411 }
412 }
413
414 CopyObjectRequest copyRequest = new CopyObjectRequest(bucketName, objectName, bucketName, objectName)
415 .withStorageClass(storageClass);
416 s3.copyObject(copyRequest);
417 logger.info("S3 object {} moved to storage class {}", objectName, storageClass);
418 } else {
419 logger.info("S3 object {} already in storage class {}", objectName, storageClass);
420 }
421
422 return storageClass;
423 } catch (SdkClientException e) {
424 throw new AssetStoreException(e);
425 }
426 }
427
428
429
430
431 @Override
432 protected InputStream getObject(AwsAssetMapping map) {
433 String storageClassId = getObjectStorageClass(map.getObjectKey());
434
435 if (StorageClass.Glacier.name().equals(storageClassId) || StorageClass.DeepArchive.name().equals(storageClassId)) {
436
437 restoreGlacierObject(map.getObjectKey(), restorePeriod, true);
438 }
439
440 try {
441
442
443 String objectKey = map.getObjectKey();
444 Date expiration = new Date(System.currentTimeMillis() + DOWNLOAD_URL_EXPIRATION_MS);
445 GeneratePresignedUrlRequest generatePresignedUrlRequest = new GeneratePresignedUrlRequest(bucketName, objectKey)
446 .withMethod(HttpMethod.GET).withExpiration(expiration);
447 URL signedUrl = s3.generatePresignedUrl(generatePresignedUrlRequest);
448 logger.debug("Returning pre-signed URL stream for '{}': {}", map, signedUrl);
449 return signedUrl.openStream();
450 } catch (IOException e) {
451 throw new AssetStoreException(e);
452 }
453 }
454
455 public String getAssetRestoreStatusString(StoragePath storagePath) {
456 try {
457 AwsAssetMapping map = database.findMapping(storagePath);
458
459 Date expirationTime = s3.getObjectMetadata(bucketName, map.getObjectKey()).getRestoreExpirationTime();
460 if (expirationTime != null) {
461 return format("RESTORED, expires in %s", expirationTime.toString());
462 }
463
464 Boolean prevOngoingRestore = s3.getObjectMetadata(bucketName, map.getObjectKey()).getOngoingRestore();
465 if (prevOngoingRestore != null && prevOngoingRestore) {
466 return "RESTORING";
467 }
468
469 return "NONE";
470 } catch (AwsAssetDatabaseException | IllegalArgumentException e) {
471 throw new AssetStoreException(e);
472 }
473 }
474
475
476
477
478
479
480
481 public void initiateRestoreAsset(StoragePath storagePath, Integer assetRestorePeriod) throws AssetStoreException {
482 try {
483 AwsAssetMapping map = database.findMapping(storagePath);
484 restoreGlacierObject(map.getObjectKey(), assetRestorePeriod, false);
485 } catch (AwsAssetDatabaseException | IllegalArgumentException e) {
486 throw new AssetStoreException(e);
487 }
488 }
489
490 private boolean isRestoring(String objectName) {
491 Boolean prevOngoingRestore = s3.getObjectMetadata(bucketName, objectName).getOngoingRestore();
492
493
494
495 if (prevOngoingRestore != null && prevOngoingRestore) {
496 logger.info("Object {} is already being restored", objectName);
497 return true;
498 }
499 logger.info("Object {} is not currently being restored", objectName);
500 return false;
501 }
502
503 private void restoreGlacierObject(String objectName, Integer objectRestorePeriod, Boolean wait) {
504 boolean newRestore = false;
505 if (isRestoring(objectName)) {
506 if (!wait) {
507 return;
508 }
509 logger.info("Waiting for object {}", objectName);
510 } else {
511 RestoreObjectRequest requestRestore = new RestoreObjectRequest(bucketName, objectName, objectRestorePeriod);
512 s3.restoreObjectV2(requestRestore);
513 newRestore = true;
514 }
515
516
517
518 if (s3.getObjectMetadata(bucketName, objectName).getRestoreExpirationTime() == null) {
519 logger.info("Restoring object {} from Glacier class storage", objectName);
520
521
522 if (!wait) {
523 return;
524 }
525
526
527
528 try {
529 if (newRestore) {
530 Thread.sleep(RESTORE_MIN_WAIT);
531 }
532
533 while (s3.getObjectMetadata(bucketName, objectName).getOngoingRestore()) {
534 Thread.sleep(RESTORE_POLL);
535 }
536
537 logger.info("Object {} has been restored from Glacier class storage, for {} days", objectName,
538 objectRestorePeriod);
539 } catch (InterruptedException e) {
540 logger.error("Object {} has not yet been restored from Glacier class storage", objectName);
541 }
542 } else {
543 logger.info("Object {} has already been restored, further extended by {} days", objectName, objectRestorePeriod);
544 }
545 }
546
547
548
549
550
551 @Override
552 protected void deleteObject(AwsAssetMapping map) {
553 s3.deleteObject(bucketName, map.getObjectKey());
554 }
555
556 public Integer getRestorePeriod() {
557 return restorePeriod;
558 }
559
560
561 void setS3(AmazonS3 s3) {
562 this.s3 = s3;
563 }
564
565 void setS3TransferManager(TransferManager s3TransferManager) {
566 this.s3TransferManager = s3TransferManager;
567 }
568
569 void setBucketName(String bucketName) {
570 this.bucketName = bucketName;
571 }
572
573 }