1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.distribution.aws.s3.remote;
22
23 import static java.lang.String.format;
24 import static org.opencastproject.util.HttpUtil.param;
25 import static org.opencastproject.util.HttpUtil.post;
26 import static org.opencastproject.util.JobUtil.jobFromHttpResponse;
27 import static org.opencastproject.util.data.functions.Options.join;
28
29 import org.opencastproject.distribution.api.DistributionException;
30 import org.opencastproject.distribution.api.DistributionService;
31 import org.opencastproject.distribution.api.DownloadDistributionService;
32 import org.opencastproject.distribution.aws.s3.api.AwsS3DistributionService;
33 import org.opencastproject.job.api.Job;
34 import org.opencastproject.mediapackage.MediaPackage;
35 import org.opencastproject.mediapackage.MediaPackageElement;
36 import org.opencastproject.mediapackage.MediaPackageException;
37 import org.opencastproject.mediapackage.MediaPackageParser;
38 import org.opencastproject.security.api.TrustedHttpClient;
39 import org.opencastproject.serviceregistry.api.RemoteBase;
40 import org.opencastproject.serviceregistry.api.ServiceRegistry;
41 import org.opencastproject.util.OsgiUtil;
42
43 import com.google.gson.Gson;
44
45 import org.apache.http.client.methods.HttpPost;
46 import org.osgi.service.component.ComponentContext;
47 import org.osgi.service.component.annotations.Activate;
48 import org.osgi.service.component.annotations.Component;
49 import org.osgi.service.component.annotations.Reference;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52
53 import java.util.HashSet;
54 import java.util.List;
55 import java.util.Set;
56
57
58
59
60
61
62 @Component(
63 immediate = true,
64 service = { DistributionService.class, DownloadDistributionService.class, AwsS3DistributionService.class },
65 property = {
66 "service.description=Distribution (AWS S3) Remote Service Proxy",
67 "distribution.channel=aws.s3"
68 }
69 )
70 public class AwsS3DistributionServiceRemoteImpl extends RemoteBase implements AwsS3DistributionService,
71 DownloadDistributionService {
72
73 private static final Logger logger = LoggerFactory.getLogger(AwsS3DistributionServiceRemoteImpl.class);
74
75 private static final String PARAM_CHANNEL_ID = "channelId";
76 private static final String PARAM_MEDIAPACKAGE = "mediapackage";
77 private static final String PARAM_ELEMENT_ID = "elementId";
78 private static final String PARAM_FILENAME = "fileName";
79 private static final String PARAM_CHECK_AVAILABILITY = "checkAvailability";
80
81 private final Gson gson = new Gson();
82
83
84 private String distributionChannel;
85
86 public AwsS3DistributionServiceRemoteImpl() {
87
88 super("waiting for activation");
89 }
90
91
92 @Activate
93 protected void activate(ComponentContext cc) {
94 this.distributionChannel = OsgiUtil.getComponentContextProperty(cc, CONFIG_KEY_STORE_TYPE);
95 super.serviceType = JOB_TYPE_PREFIX + this.distributionChannel;
96 }
97
98 public String getDistributionType() {
99 return this.distributionChannel;
100 }
101
102 @Override
103 public Job distribute(String channelId, MediaPackage mediaPackage, String elementId) throws DistributionException {
104 return distribute(channelId, mediaPackage, elementId, true);
105 }
106
107 @Override
108 public Job distribute(String channelId, MediaPackage mediaPackage, String elementId, boolean checkAvailability)
109 throws DistributionException {
110 Set<String> elementIds = new HashSet<String>();
111 elementIds.add(elementId);
112 return distribute(channelId, mediaPackage, elementIds, checkAvailability);
113 }
114
115 @Override
116 public Job distribute(String channelId, final MediaPackage mediaPackage, Set<String> elementIds,
117 boolean checkAvailability)
118 throws DistributionException {
119 logger.info("Distributing {} elements to {}@{}", elementIds.size(), channelId, distributionChannel);
120 final HttpPost req = post(param(PARAM_CHANNEL_ID, channelId),
121 param(PARAM_MEDIAPACKAGE, MediaPackageParser.getAsXml(mediaPackage)),
122 param(PARAM_ELEMENT_ID, gson.toJson(elementIds)),
123 param(PARAM_CHECK_AVAILABILITY, Boolean.toString(checkAvailability)));
124 for (Job job : join(runRequest(req, jobFromHttpResponse))) {
125 return job;
126 }
127 throw new DistributionException(format("Unable to distribute '%s' elements of "
128 + "mediapackage '%s' using a remote destribution service proxy",
129 elementIds.size(), mediaPackage.getIdentifier().toString()));
130 }
131
132 @Override
133 public Job retract(String channelId, MediaPackage mediaPackage, String elementId) throws DistributionException {
134 Set<String> elementIds = new HashSet<String>();
135 elementIds.add(elementId);
136 return retract(channelId, mediaPackage, elementIds);
137 }
138
139 @Override
140 public Job retract(String channelId, MediaPackage mediaPackage, Set<String> elementIds) throws DistributionException {
141 logger.info("Retracting {} elements from {}@{}", elementIds.size(), channelId, distributionChannel);
142 final HttpPost req = post("/retract",
143 param(PARAM_MEDIAPACKAGE, MediaPackageParser.getAsXml(mediaPackage)),
144 param(PARAM_ELEMENT_ID, gson.toJson(elementIds)),
145 param(PARAM_CHANNEL_ID, channelId));
146 for (Job job : join(runRequest(req, jobFromHttpResponse))) {
147 return job;
148 }
149 throw new DistributionException(format("Unable to retract '%s' elements of "
150 + "mediapackage '%s' using a remote destribution service proxy",
151 elementIds.size(), mediaPackage.getIdentifier().toString()));
152 }
153
154 @Override
155 public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediaPackage, String elementId)
156 throws DistributionException {
157 Set<String> elementIds = new HashSet<String>();
158 elementIds.add(elementId);
159 return distributeSync(channelId, mediaPackage, elementIds, true);
160 }
161
162 @Override
163 public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediapackage, Set<String> elementIds,
164 boolean checkAvailability) throws DistributionException {
165 logger.info("Distributing {} elements to {}@{}", elementIds.size(), channelId, distributionChannel);
166 final HttpPost req = post("/distributesync", param(PARAM_CHANNEL_ID, channelId),
167 param(PARAM_MEDIAPACKAGE, MediaPackageParser.getAsXml(mediapackage)),
168 param(PARAM_ELEMENT_ID, gson.toJson(elementIds)),
169 param(PARAM_CHECK_AVAILABILITY, Boolean.toString(checkAvailability)));
170 for (List<MediaPackageElement> elements : join(runRequest(req, elementsFromHttpResponse))) {
171 return elements;
172 }
173 throw new DistributionException(format("Unable to distribute '%s' elements of "
174 + "mediapackage '%s' using a remote destribution service proxy",
175 elementIds.size(), mediapackage.getIdentifier().toString()));
176 }
177
178 @Override
179 public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediaPackage, String elementId)
180 throws DistributionException {
181 Set<String> elementIds = new HashSet<String>();
182 elementIds.add(elementId);
183 return retractSync(channelId, mediaPackage, elementIds);
184 }
185
186 @Override
187 public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediaPackage, Set<String> elementIds)
188 throws DistributionException {
189 logger.info("Retracting {} elements from {}@{}", elementIds.size(), channelId, distributionChannel);
190 final HttpPost req = post("/retractsync",
191 param(PARAM_MEDIAPACKAGE, MediaPackageParser.getAsXml(mediaPackage)),
192 param(PARAM_ELEMENT_ID, gson.toJson(elementIds)),
193 param(PARAM_CHANNEL_ID, channelId));
194 for (List<MediaPackageElement> elements : join(runRequest(req, elementsFromHttpResponse))) {
195 return elements;
196 }
197 throw new DistributionException(format("Unable to retract '%s' elements of "
198 + "mediapackage '%s' using a remote destribution service proxy",
199 elementIds.size(), mediaPackage.getIdentifier().toString()));
200 }
201
202 @Override
203 public Job distribute(String pubChannelId, MediaPackage mediaPackage, Set<String> downloadIds,
204 boolean checkAvailability, boolean preserveReference) throws DistributionException, MediaPackageException {
205 throw new UnsupportedOperationException("Not supported yet.");
206
207 }
208
209 @Reference
210 @Override
211 public void setTrustedHttpClient(TrustedHttpClient trustedHttpClient) {
212 super.setTrustedHttpClient(trustedHttpClient);
213 }
214
215 @Reference
216 @Override
217 public void setRemoteServiceManager(ServiceRegistry serviceRegistry) {
218 super.setRemoteServiceManager(serviceRegistry);
219 }
220
221 }