1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.distribution.aws.s3.remote;
22
23 import static java.lang.String.format;
24 import static org.opencastproject.util.HttpUtil.param;
25 import static org.opencastproject.util.HttpUtil.post;
26
27 import org.opencastproject.distribution.api.DistributionException;
28 import org.opencastproject.distribution.api.DistributionService;
29 import org.opencastproject.distribution.api.DownloadDistributionService;
30 import org.opencastproject.distribution.aws.s3.api.AwsS3DistributionService;
31 import org.opencastproject.job.api.Job;
32 import org.opencastproject.mediapackage.MediaPackage;
33 import org.opencastproject.mediapackage.MediaPackageElement;
34 import org.opencastproject.mediapackage.MediaPackageException;
35 import org.opencastproject.mediapackage.MediaPackageParser;
36 import org.opencastproject.security.api.TrustedHttpClient;
37 import org.opencastproject.serviceregistry.api.RemoteBase;
38 import org.opencastproject.serviceregistry.api.ServiceRegistry;
39 import org.opencastproject.util.JobUtil;
40 import org.opencastproject.util.OsgiUtil;
41
42 import com.google.gson.Gson;
43
44 import org.apache.http.client.methods.HttpPost;
45 import org.osgi.service.component.ComponentContext;
46 import org.osgi.service.component.annotations.Activate;
47 import org.osgi.service.component.annotations.Component;
48 import org.osgi.service.component.annotations.Reference;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 import java.util.HashSet;
53 import java.util.List;
54 import java.util.Optional;
55 import java.util.Set;
56
57
58
59
60
61
62 @Component(
63 immediate = true,
64 service = { DistributionService.class, DownloadDistributionService.class, AwsS3DistributionService.class },
65 property = {
66 "service.description=Distribution (AWS S3) Remote Service Proxy",
67 "distribution.channel=aws.s3"
68 }
69 )
70 public class AwsS3DistributionServiceRemoteImpl extends RemoteBase implements AwsS3DistributionService,
71 DownloadDistributionService {
72
73 private static final Logger logger = LoggerFactory.getLogger(AwsS3DistributionServiceRemoteImpl.class);
74
75 private static final String PARAM_CHANNEL_ID = "channelId";
76 private static final String PARAM_MEDIAPACKAGE = "mediapackage";
77 private static final String PARAM_ELEMENT_ID = "elementId";
78 private static final String PARAM_FILENAME = "fileName";
79 private static final String PARAM_CHECK_AVAILABILITY = "checkAvailability";
80
81 private final Gson gson = new Gson();
82
83
84 private String distributionChannel;
85
86 public AwsS3DistributionServiceRemoteImpl() {
87
88 super("waiting for activation");
89 }
90
91
92 @Activate
93 protected void activate(ComponentContext cc) {
94 this.distributionChannel = OsgiUtil.getComponentContextProperty(cc, CONFIG_KEY_STORE_TYPE);
95 super.serviceType = JOB_TYPE_PREFIX + this.distributionChannel;
96 }
97
98 public String getDistributionType() {
99 return this.distributionChannel;
100 }
101
102 @Override
103 public Job distribute(String channelId, MediaPackage mediaPackage, String elementId) throws DistributionException {
104 return distribute(channelId, mediaPackage, elementId, true);
105 }
106
107 @Override
108 public Job distribute(String channelId, MediaPackage mediaPackage, String elementId, boolean checkAvailability)
109 throws DistributionException {
110 Set<String> elementIds = new HashSet<String>();
111 elementIds.add(elementId);
112 return distribute(channelId, mediaPackage, elementIds, checkAvailability);
113 }
114
115 @Override
116 public Job distribute(String channelId, final MediaPackage mediaPackage, Set<String> elementIds,
117 boolean checkAvailability)
118 throws DistributionException {
119 logger.info("Distributing {} elements to {}@{}", elementIds.size(), channelId, distributionChannel);
120 final HttpPost req = post(param(PARAM_CHANNEL_ID, channelId),
121 param(PARAM_MEDIAPACKAGE, MediaPackageParser.getAsXml(mediaPackage)),
122 param(PARAM_ELEMENT_ID, gson.toJson(elementIds)),
123 param(PARAM_CHECK_AVAILABILITY, Boolean.toString(checkAvailability)));
124 Optional<Job> job = runRequest(req, JobUtil::jobFromHttpResponse);
125 if (job.isPresent()) {
126 return job.get();
127 }
128 throw new DistributionException(format("Unable to distribute '%s' elements of "
129 + "mediapackage '%s' using a remote destribution service proxy",
130 elementIds.size(), mediaPackage.getIdentifier().toString()));
131 }
132
133 @Override
134 public Job retract(String channelId, MediaPackage mediaPackage, String elementId) throws DistributionException {
135 Set<String> elementIds = new HashSet<String>();
136 elementIds.add(elementId);
137 return retract(channelId, mediaPackage, elementIds);
138 }
139
140 @Override
141 public Job retract(String channelId, MediaPackage mediaPackage, Set<String> elementIds) throws DistributionException {
142 logger.info("Retracting {} elements from {}@{}", elementIds.size(), channelId, distributionChannel);
143 final HttpPost req = post("/retract",
144 param(PARAM_MEDIAPACKAGE, MediaPackageParser.getAsXml(mediaPackage)),
145 param(PARAM_ELEMENT_ID, gson.toJson(elementIds)),
146 param(PARAM_CHANNEL_ID, channelId));
147 Optional<Job> job = runRequest(req, JobUtil::jobFromHttpResponse);
148 if (job.isPresent()) {
149 return job.get();
150 }
151 throw new DistributionException(format("Unable to retract '%s' elements of "
152 + "mediapackage '%s' using a remote destribution service proxy",
153 elementIds.size(), mediaPackage.getIdentifier().toString()));
154 }
155
156 @Override
157 public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediaPackage, String elementId)
158 throws DistributionException {
159 Set<String> elementIds = new HashSet<String>();
160 elementIds.add(elementId);
161 return distributeSync(channelId, mediaPackage, elementIds, true);
162 }
163
164 @Override
165 public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediapackage, Set<String> elementIds,
166 boolean checkAvailability) throws DistributionException {
167 logger.info("Distributing {} elements to {}@{}", elementIds.size(), channelId, distributionChannel);
168 final HttpPost req = post("/distributesync", param(PARAM_CHANNEL_ID, channelId),
169 param(PARAM_MEDIAPACKAGE, MediaPackageParser.getAsXml(mediapackage)),
170 param(PARAM_ELEMENT_ID, gson.toJson(elementIds)),
171 param(PARAM_CHECK_AVAILABILITY, Boolean.toString(checkAvailability)));
172 Optional<List<MediaPackageElement>> elements = runRequest(req, RemoteBase::elementsFromHttpResponse);
173 if (elements.isPresent()) {
174 return elements.get();
175 }
176 throw new DistributionException(format("Unable to distribute '%s' elements of "
177 + "mediapackage '%s' using a remote destribution service proxy",
178 elementIds.size(), mediapackage.getIdentifier().toString()));
179 }
180
181 @Override
182 public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediaPackage, String elementId)
183 throws DistributionException {
184 Set<String> elementIds = new HashSet<String>();
185 elementIds.add(elementId);
186 return retractSync(channelId, mediaPackage, elementIds);
187 }
188
189 @Override
190 public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediaPackage, Set<String> elementIds)
191 throws DistributionException {
192 logger.info("Retracting {} elements from {}@{}", elementIds.size(), channelId, distributionChannel);
193 final HttpPost req = post("/retractsync",
194 param(PARAM_MEDIAPACKAGE, MediaPackageParser.getAsXml(mediaPackage)),
195 param(PARAM_ELEMENT_ID, gson.toJson(elementIds)),
196 param(PARAM_CHANNEL_ID, channelId));
197 Optional<List<MediaPackageElement>> elements = runRequest(req, RemoteBase::elementsFromHttpResponse);
198 if (elements.isPresent()) {
199 return elements.get();
200 }
201 throw new DistributionException(format("Unable to retract '%s' elements of "
202 + "mediapackage '%s' using a remote destribution service proxy",
203 elementIds.size(), mediaPackage.getIdentifier().toString()));
204 }
205
206 @Override
207 public Job distribute(String pubChannelId, MediaPackage mediaPackage, Set<String> downloadIds,
208 boolean checkAvailability, boolean preserveReference) throws DistributionException, MediaPackageException {
209 throw new UnsupportedOperationException("Not supported yet.");
210
211 }
212
213 @Reference
214 @Override
215 public void setTrustedHttpClient(TrustedHttpClient trustedHttpClient) {
216 super.setTrustedHttpClient(trustedHttpClient);
217 }
218
219 @Reference
220 @Override
221 public void setRemoteServiceManager(ServiceRegistry serviceRegistry) {
222 super.setRemoteServiceManager(serviceRegistry);
223 }
224
225 }