AwsS3AssetStore.java
/*
* Licensed to The Apereo Foundation under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional
* information regarding copyright ownership.
*
*
* The Apereo Foundation licenses this file to you under the Educational
* Community License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License
* at:
*
* http://opensource.org/licenses/ecl2.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.opencastproject.assetmanager.aws.s3;
import static java.lang.String.format;
import org.opencastproject.assetmanager.api.storage.AssetStore;
import org.opencastproject.assetmanager.api.storage.AssetStoreException;
import org.opencastproject.assetmanager.api.storage.RemoteAssetStore;
import org.opencastproject.assetmanager.api.storage.StoragePath;
import org.opencastproject.assetmanager.aws.AwsAbstractArchive;
import org.opencastproject.assetmanager.aws.AwsUploadOperationResult;
import org.opencastproject.assetmanager.aws.persistence.AwsAssetDatabase;
import org.opencastproject.assetmanager.aws.persistence.AwsAssetDatabaseException;
import org.opencastproject.assetmanager.aws.persistence.AwsAssetMapping;
import org.opencastproject.util.ConfigurationException;
import org.opencastproject.util.MimeType;
import org.opencastproject.util.OsgiUtil;
import org.opencastproject.util.data.Option;
import org.opencastproject.workspace.api.Workspace;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.HttpMethod;
import com.amazonaws.SdkClientException;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.BucketVersioningConfiguration;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest;
import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
import com.amazonaws.services.s3.model.GetObjectTaggingResult;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.ObjectTagging;
import com.amazonaws.services.s3.model.RestoreObjectRequest;
import com.amazonaws.services.s3.model.SetBucketVersioningConfigurationRequest;
import com.amazonaws.services.s3.model.SetObjectTaggingRequest;
import com.amazonaws.services.s3.model.StorageClass;
import com.amazonaws.services.s3.model.Tag;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.amazonaws.services.s3.transfer.Upload;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@Component(
property = {
"service.description=Amazon S3 based asset store",
"store.type=aws-s3"
},
immediate = true,
service = { RemoteAssetStore.class, AwsS3AssetStore.class }
)
public class AwsS3AssetStore extends AwsAbstractArchive implements RemoteAssetStore {
/** Log facility */
private static final Logger logger = LoggerFactory.getLogger(AwsS3AssetStore.class);
private static final Tag freezable = new Tag("Freezable", "true");
private static final Integer RESTORE_MIN_WAIT = 1080000; // 3h
private static final Integer RESTORE_POLL = 900000; // 15m
// Service configuration
public static final String AWS_S3_ENABLED = "org.opencastproject.assetmanager.aws.s3.enabled";
public static final String AWS_S3_ACCESS_KEY_ID_CONFIG = "org.opencastproject.assetmanager.aws.s3.access.id";
public static final String AWS_S3_SECRET_ACCESS_KEY_CONFIG = "org.opencastproject.assetmanager.aws.s3.secret.key";
public static final String AWS_S3_REGION_CONFIG = "org.opencastproject.assetmanager.aws.s3.region";
public static final String AWS_S3_BUCKET_CONFIG = "org.opencastproject.assetmanager.aws.s3.bucket";
public static final String AWS_S3_BUCKET_CONFIG_PREFIX = "org.opencastproject.assetmanager.aws.s3.bucket.";
public static final String AWS_S3_ENDPOINT_CONFIG = "org.opencastproject.assetmanager.aws.s3.endpoint";
public static final String AWS_S3_PATH_STYLE_CONFIG = "org.opencastproject.assetmanager.aws.s3.path.style";
public static final String AWS_S3_MAX_CONNECTIONS = "org.opencastproject.assetmanager.aws.s3.max.connections";
public static final String AWS_S3_CONNECTION_TIMEOUT = "org.opencastproject.assetmanager.aws.s3.connection.timeout";
public static final String AWS_S3_MAX_RETRIES = "org.opencastproject.assetmanager.aws.s3.max.retries";
public static final String AWS_GLACIER_RESTORE_DAYS = "org.opencastproject.assetmanager.aws.s3.glacier.restore.days";
public static final Integer AWS_S3_GLACIER_RESTORE_DAYS_DEFAULT = 2;
// defaults
public static final int DEFAULT_MAX_CONNECTIONS = 50;
public static final int DEFAULT_CONNECTION_TIMEOUT = 10000;
public static final int DEFAULT_MAX_RETRIES = 100;
public static final long DOWNLOAD_URL_EXPIRATION_MS = 30 * 60 * 1000; // 30 min
public static final String DEFAULT_ORG_KEY = "*";
/** The AWS client and transfer manager */
private AmazonS3 s3 = null;
private TransferManager s3TransferManager = null;
/** The AWS S3 org to bucket name mapping */
private Map<String, String> orgBucketNameMap = new HashMap<>();
private String endpoint = null;
private boolean pathStyle = false;
/** The Glacier storage class, restore period **/
private Integer restorePeriod;
protected boolean bucketCreated = false;
/** OSGi Di */
@Override
@Reference
public void setWorkspace(Workspace workspace) {
super.setWorkspace(workspace);
}
/** OSGi Di */
@Override
@Reference
public void setDatabase(AwsAssetDatabase db) {
super.setDatabase(db);
}
/**
* Service activator, called via declarative services configuration.
*
* @param cc
* the component context
*/
@Activate
public void activate(final ComponentContext cc) throws IllegalStateException, ConfigurationException {
// Get the configuration
if (cc != null) {
@SuppressWarnings("rawtypes")
Dictionary properties = cc.getProperties();
boolean enabled = Boolean.parseBoolean(StringUtils.trimToEmpty((String) properties.get(AWS_S3_ENABLED)));
if (!enabled) {
logger.info("AWS S3 asset store is disabled");
return;
}
// Store type: "aws-s3"
storeType = StringUtils.trimToEmpty((String) properties.get(AssetStore.STORE_TYPE_PROPERTY));
if (StringUtils.isEmpty(storeType)) {
throw new ConfigurationException("Invalid store type value");
}
logger.info("{} is: {}", AssetStore.STORE_TYPE_PROPERTY, storeType);
// AWS S3 default bucket name
Option<String> defaultBucketNameOpt = OsgiUtil.getOptCfg(cc.getProperties(), AWS_S3_BUCKET_CONFIG);
if (defaultBucketNameOpt.isSome()) {
orgBucketNameMap.put(DEFAULT_ORG_KEY, defaultBucketNameOpt.get());
logger.info("AWS S3 default bucket name is {}", defaultBucketNameOpt.get());
}
// AWS S3 org bucket name mapping
Collections.list(cc.getProperties().keys()).stream()
.filter(s -> s.startsWith(AWS_S3_BUCKET_CONFIG_PREFIX))
.forEach(s -> {
String orgId = s.substring(AWS_S3_BUCKET_CONFIG_PREFIX.length());
String bucketName = OsgiUtil.getComponentContextProperty(cc, s);
orgBucketNameMap.put(orgId, bucketName);
});
if (orgBucketNameMap.isEmpty()) {
throw new ConfigurationException("AWS S3 asset store is enabled, but no buckets are configured");
}
// AWS region
regionName = getAWSConfigKey(cc, AWS_S3_REGION_CONFIG);
logger.info("AWS region is {}", regionName);
endpoint = OsgiUtil.getComponentContextProperty(
cc, AWS_S3_ENDPOINT_CONFIG, "s3." + regionName + ".amazonaws.com");
logger.info("AWS endpoint is {}", endpoint);
pathStyle = BooleanUtils.toBoolean(OsgiUtil.getComponentContextProperty(cc, AWS_S3_PATH_STYLE_CONFIG, "false"));
logger.info("AWS path style is {}", pathStyle);
// Glacier storage class restore period
restorePeriod = OsgiUtil.getOptCfgAsInt(cc.getProperties(), AWS_GLACIER_RESTORE_DAYS)
.getOrElse(AWS_S3_GLACIER_RESTORE_DAYS_DEFAULT);
// Explicit credentials are optional.
AWSCredentialsProvider provider = null;
Option<String> accessKeyIdOpt = OsgiUtil.getOptCfg(cc.getProperties(), AWS_S3_ACCESS_KEY_ID_CONFIG);
Option<String> accessKeySecretOpt = OsgiUtil.getOptCfg(cc.getProperties(), AWS_S3_SECRET_ACCESS_KEY_CONFIG);
// Keys not informed so use default credentials provider chain, which
// will look at the environment variables, java system props, credential files, and instance
// profile credentials
if (accessKeyIdOpt.isNone() && accessKeySecretOpt.isNone()) {
provider = new DefaultAWSCredentialsProviderChain();
} else {
provider = new AWSStaticCredentialsProvider(
new BasicAWSCredentials(accessKeyIdOpt.get(), accessKeySecretOpt.get()));
}
// S3 client configuration
ClientConfiguration clientConfiguration = new ClientConfiguration();
int maxConnections = OsgiUtil.getOptCfgAsInt(cc.getProperties(), AWS_S3_MAX_CONNECTIONS)
.getOrElse(DEFAULT_MAX_CONNECTIONS);
logger.debug("Max Connections: {}", maxConnections);
clientConfiguration.setMaxConnections(maxConnections);
int connectionTimeout = OsgiUtil.getOptCfgAsInt(cc.getProperties(), AWS_S3_CONNECTION_TIMEOUT)
.getOrElse(DEFAULT_CONNECTION_TIMEOUT);
logger.debug("Connection Output: {}", connectionTimeout);
clientConfiguration.setConnectionTimeout(connectionTimeout);
int maxRetries = OsgiUtil.getOptCfgAsInt(cc.getProperties(), AWS_S3_MAX_RETRIES)
.getOrElse(DEFAULT_MAX_RETRIES);
logger.debug("Max Retry: {}", maxRetries);
clientConfiguration.setMaxErrorRetry(maxRetries);
// Create AWS client.
s3 = AmazonS3ClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint
, regionName))
.withClientConfiguration(clientConfiguration)
.withPathStyleAccessEnabled(pathStyle)
.withCredentials(provider)
.build();
s3TransferManager = TransferManagerBuilder.standard().withS3Client(s3).build();
logger.info("AwsS3ArchiveAssetStore activated!");
}
}
/**
* Creates the AWS S3 bucket if it doesn't exist yet.
*/
void createAWSBucket() {
orgBucketNameMap.forEach((org, bucketName) -> {
// Does bucket exist?
try {
s3.listObjects(bucketName);
} catch (AmazonServiceException e) {
if (e.getStatusCode() == 404) {
// Create the bucket
try {
s3.createBucket(bucketName);
// Enable versioning
BucketVersioningConfiguration configuration = new BucketVersioningConfiguration().withStatus("Enabled");
SetBucketVersioningConfigurationRequest configRequest = new SetBucketVersioningConfigurationRequest(
bucketName, configuration);
s3.setBucketVersioningConfiguration(configRequest);
logger.info("AWS S3 ARCHIVE bucket {} created and versioning enabled", bucketName);
} catch (Exception e2) {
throw new IllegalStateException(
"ARCHIVE bucket " + bucketName + " cannot be created: " + e2.getMessage(), e2);
}
} else {
throw new IllegalStateException("ARCHIVE bucket " + bucketName + " exists, but we can't access it: "
+ e.getMessage(), e);
}
}
// Bucket already existed or was just created
bucketCreated = true;
});
}
/**
* Returns the aws s3 object id created by aws
*/
@Override
protected AwsUploadOperationResult uploadObject(String orgId, File origin, String objectName,
Optional<MimeType> mimeType) throws AssetStoreException {
// Check first if bucket is there.
if (!bucketCreated) {
createAWSBucket();
}
String bucketName = getBucketName(orgId);
// Upload file to AWS S3
// Use TransferManager to take advantage of multipart upload.
// TransferManager processes all transfers asynchronously, so this call will return immediately.
logger.info("Uploading {} to archive bucket {}...", objectName, bucketName);
try {
Upload upload = s3TransferManager.upload(bucketName, objectName, origin);
long start = System.currentTimeMillis();
// Block and wait for the upload to finish
upload.waitForCompletion();
logger.info("Upload of {} to archive bucket {} completed in {} seconds",
new Object[] { objectName, bucketName, (System.currentTimeMillis() - start) / 1000 });
ObjectMetadata objMetadata = s3.getObjectMetadata(bucketName, objectName);
logger.trace("Got object metadata for: {}, version is {}", objectName, objMetadata.getVersionId());
// Tag objects that are suitable for Glacier storage class
// NOTE: Use of S3TransferManager means that tagging has to be done as a separate request
if (mimeType.isPresent()) {
switch (mimeType.get().getType()) {
case "audio":
case "image":
case "video":
logger.debug("Tagging S3 object {} as Freezable", objectName);
List<Tag> tags = new ArrayList<>();
tags.add(freezable);
s3.setObjectTagging(new SetObjectTaggingRequest(bucketName, objectName, new ObjectTagging(tags)));
break;
default:
break;
}
}
// If bucket versioning is disabled the versionId is null, so return a -1 to indicate no version
String versionId = objMetadata.getVersionId();
if (null == versionId) {
return new AwsUploadOperationResult(objectName, "-1");
}
return new AwsUploadOperationResult(objectName, versionId);
} catch (InterruptedException e) {
throw new AssetStoreException("Operation interrupted", e);
} catch (Exception e) {
throw new AssetStoreException("Upload failed", e);
}
}
/**
* Return the object key of the asset in S3
* @param storagePath asset storage path
*/
public String getAssetObjectKey(StoragePath storagePath) throws AssetStoreException {
try {
AwsAssetMapping map = database.findMapping(storagePath);
return map.getObjectKey();
} catch (AwsAssetDatabaseException e) {
throw new AssetStoreException(e);
}
}
/**
* Return the storage class of the asset in S3
* @param storagePath asset storage path
*/
public String getAssetStorageClass(StoragePath storagePath) throws AssetStoreException {
if (!contains(storagePath)) {
return "NONE";
}
return getObjectStorageClass(storagePath.getOrganizationId(), getAssetObjectKey(storagePath));
}
private String getObjectStorageClass(String orgId, String objectName) throws AssetStoreException {
try {
String bucketName = getBucketName(orgId);
String storageClass = s3.getObjectMetadata(bucketName, objectName).getStorageClass();
return storageClass == null ? StorageClass.Standard.toString() : storageClass;
} catch (SdkClientException e) {
throw new AssetStoreException(e);
}
}
/**
* Change the storage class of the object if possible
* @param storagePath asset storage path
* @param storageClassId metadata storage class id
* @see <a href="https://aws.amazon.com/s3/storage-classes/">The S3 storage class docs</a>
*/
public String modifyAssetStorageClass(StoragePath storagePath, String storageClassId) throws AssetStoreException {
try {
StorageClass storageClass = StorageClass.fromValue(storageClassId);
AwsAssetMapping map = database.findMapping(storagePath);
return modifyObjectStorageClass(map.getOrganizationId(), map.getObjectKey(), storageClass).toString();
} catch (AwsAssetDatabaseException | IllegalArgumentException e) {
throw new AssetStoreException(e);
}
}
private StorageClass modifyObjectStorageClass(String orgId, String objectName, StorageClass storageClass)
throws AssetStoreException {
try {
StorageClass objectStorageClass = StorageClass.fromValue(getObjectStorageClass(orgId, objectName));
String bucketName = getBucketName(orgId);
if (storageClass != objectStorageClass) {
/* objects can only be retrieved from Glacier not moved */
if (objectStorageClass == StorageClass.Glacier || objectStorageClass == StorageClass.DeepArchive) {
boolean isRestoring = isRestoring(orgId, objectName);
boolean isRestored = null != s3.getObjectMetadata(bucketName, objectName).getRestoreExpirationTime();
if (!isRestoring && !isRestored) {
logger.warn("S3 Object {} can not be moved from storage class {} to {} without restoring the object first",
objectName, objectStorageClass, storageClass);
return objectStorageClass;
}
}
/* Only put suitable objects in Glacier */
if (storageClass == StorageClass.Glacier || objectStorageClass == StorageClass.DeepArchive) {
GetObjectTaggingRequest gotr = new GetObjectTaggingRequest(bucketName, objectName);
GetObjectTaggingResult objectTaggingRequest = s3.getObjectTagging(gotr);
if (!objectTaggingRequest.getTagSet().contains(freezable)) {
logger.info("S3 object {} is not suitable for storage class {}", objectName, storageClass);
return objectStorageClass;
}
}
CopyObjectRequest copyRequest = new CopyObjectRequest(bucketName, objectName, bucketName, objectName)
.withStorageClass(storageClass);
s3.copyObject(copyRequest);
logger.info("S3 object {} moved to storage class {}", objectName, storageClass);
} else {
logger.info("S3 object {} already in storage class {}", objectName, storageClass);
}
return storageClass;
} catch (SdkClientException e) {
throw new AssetStoreException(e);
}
}
/**
*
*/
@Override
protected InputStream getObject(AwsAssetMapping map) {
String storageClassId = getObjectStorageClass(map.getOrganizationId(), map.getObjectKey());
if (StorageClass.Glacier.name().equals(storageClassId) || StorageClass.DeepArchive.name().equals(storageClassId)) {
// restore object and wait until available if necessary
restoreGlacierObject(map.getOrganizationId(), map.getObjectKey(), restorePeriod, true);
}
try {
// Do not use S3 object stream anymore because the S3 object needs to be closed to release
// the http connection so create the stream using the object url (signed).
String bucketName = getBucketName(map.getOrganizationId());
String objectKey = map.getObjectKey();
Date expiration = new Date(System.currentTimeMillis() + DOWNLOAD_URL_EXPIRATION_MS);
GeneratePresignedUrlRequest generatePresignedUrlRequest = new GeneratePresignedUrlRequest(bucketName, objectKey)
.withMethod(HttpMethod.GET).withExpiration(expiration);
URL signedUrl = s3.generatePresignedUrl(generatePresignedUrlRequest);
logger.debug("Returning pre-signed URL stream for '{}': {}", map, signedUrl);
return signedUrl.openStream();
} catch (IOException e) {
throw new AssetStoreException(e);
}
}
public String getAssetRestoreStatusString(StoragePath storagePath) {
try {
AwsAssetMapping map = database.findMapping(storagePath);
String bucketName = getBucketName(map.getOrganizationId());
Date expirationTime = s3.getObjectMetadata(bucketName, map.getObjectKey()).getRestoreExpirationTime();
if (expirationTime != null) {
return format("RESTORED, expires in %s", expirationTime.toString());
}
Boolean prevOngoingRestore = s3.getObjectMetadata(bucketName, map.getObjectKey()).getOngoingRestore();
if (prevOngoingRestore != null && prevOngoingRestore) {
return "RESTORING";
}
return "NONE";
} catch (AwsAssetDatabaseException | IllegalArgumentException e) {
throw new AssetStoreException(e);
}
}
/*
* Restore a frozen asset from deep archive
* @param storagePath asset storage path
* @param assetRestorePeriod number of days to restore assest for
* @see https://aws.amazon.com/s3/storage-classes/
*/
public void initiateRestoreAsset(StoragePath storagePath, Integer assetRestorePeriod) throws AssetStoreException {
try {
AwsAssetMapping map = database.findMapping(storagePath);
restoreGlacierObject(map.getOrganizationId(), map.getObjectKey(), assetRestorePeriod, false);
} catch (AwsAssetDatabaseException | IllegalArgumentException e) {
throw new AssetStoreException(e);
}
}
private boolean isRestoring(String orgId, String objectName) {
String bucketName = getBucketName(orgId);
Boolean prevOngoingRestore = s3.getObjectMetadata(bucketName, objectName).getOngoingRestore();
//FIXME: prevOngoingRestore is null when the object isn't being restored for some reason
// The javadocs for getOngoingRestore don't say anything about retuning null, and it doesn't make a ton of sense
// so I'm guessing this is a bug in the library itself that's not present in the version Manchester is using
if (prevOngoingRestore != null && prevOngoingRestore) {
logger.info("Object {} is already being restored", objectName);
return true;
}
logger.info("Object {} is not currently being restored", objectName);
return false;
}
private void restoreGlacierObject(String orgId, String objectName, Integer objectRestorePeriod, Boolean wait) {
String bucketName = getBucketName(orgId);
boolean newRestore = false;
if (isRestoring(orgId, objectName)) {
if (!wait) {
return;
}
logger.info("Waiting for object {}", objectName);
} else {
RestoreObjectRequest requestRestore = new RestoreObjectRequest(bucketName, objectName, objectRestorePeriod);
s3.restoreObjectV2(requestRestore);
newRestore = true;
}
// if the object had already been restored the restore request will just
// increase the expiration time
if (s3.getObjectMetadata(bucketName, objectName).getRestoreExpirationTime() == null) {
logger.info("Restoring object {} from Glacier class storage", objectName);
// Just initiate restore?
if (!wait) {
return;
}
// Check the restoration status of the object.
// Wait min restore time and then poll ofter that
try {
if (newRestore) {
Thread.sleep(RESTORE_MIN_WAIT);
}
while (s3.getObjectMetadata(bucketName, objectName).getOngoingRestore()) {
Thread.sleep(RESTORE_POLL);
}
logger.info("Object {} has been restored from Glacier class storage, for {} days", objectName,
objectRestorePeriod);
} catch (InterruptedException e) {
logger.error("Object {} has not yet been restored from Glacier class storage", objectName);
}
} else {
logger.info("Object {} has already been restored, further extended by {} days", objectName, objectRestorePeriod);
}
}
private String getBucketName(String orgId) {
String bucketName = orgBucketNameMap.get(orgId);
if (bucketName == null) {
// check if we have a default bucket name
bucketName = orgBucketNameMap.get(DEFAULT_ORG_KEY);
if (bucketName == null) {
throw new ConfigurationException("No bucket configured for organization " + orgId);
}
}
return bucketName;
}
/**
*
*/
@Override
protected void deleteObject(AwsAssetMapping map) {
String bucketName = getBucketName(map.getOrganizationId());
s3.deleteObject(bucketName, map.getObjectKey());
}
public Integer getRestorePeriod() {
return restorePeriod;
}
// For running tests
void setS3(AmazonS3 s3) {
this.s3 = s3;
}
void setS3TransferManager(TransferManager s3TransferManager) {
this.s3TransferManager = s3TransferManager;
}
void setBucketName(String orgId, String bucketName) {
orgBucketNameMap.put(orgId, bucketName);
}
}