1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.distribution.streaming.remote;
23
24 import static java.lang.String.format;
25 import static org.opencastproject.util.HttpUtil.param;
26 import static org.opencastproject.util.HttpUtil.post;
27
28 import org.opencastproject.distribution.api.DistributionException;
29 import org.opencastproject.distribution.api.DistributionService;
30 import org.opencastproject.distribution.api.StreamingDistributionService;
31 import org.opencastproject.job.api.Job;
32 import org.opencastproject.mediapackage.MediaPackage;
33 import org.opencastproject.mediapackage.MediaPackageElement;
34 import org.opencastproject.mediapackage.MediaPackageException;
35 import org.opencastproject.mediapackage.MediaPackageParser;
36 import org.opencastproject.security.api.TrustedHttpClient;
37 import org.opencastproject.serviceregistry.api.RemoteBase;
38 import org.opencastproject.serviceregistry.api.ServiceRegistry;
39 import org.opencastproject.util.JobUtil;
40 import org.opencastproject.util.OsgiUtil;
41
42 import com.google.gson.Gson;
43
44 import org.apache.http.HttpResponse;
45 import org.apache.http.client.methods.HttpGet;
46 import org.apache.http.client.methods.HttpPost;
47 import org.osgi.service.component.ComponentContext;
48 import org.osgi.service.component.annotations.Activate;
49 import org.osgi.service.component.annotations.Component;
50 import org.osgi.service.component.annotations.Reference;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 import java.io.BufferedReader;
55 import java.io.InputStreamReader;
56 import java.util.HashSet;
57 import java.util.List;
58 import java.util.Optional;
59 import java.util.Set;
60
61
62
63
64 @Component(
65 immediate = true,
66 service = { DistributionService.class, StreamingDistributionService.class },
67 property = {
68 "service.description=Distribution (Streaming) Remote Service Proxy",
69 "distribution.channel=streaming"
70 }
71 )
72 public class StreamingDistributionServiceRemoteImpl extends RemoteBase implements StreamingDistributionService {
73
74
75 private static final Logger logger = LoggerFactory.getLogger(StreamingDistributionServiceRemoteImpl.class);
76
77
78 protected String distributionChannel;
79
80
81 private static final String PARAM_CHANNEL_ID = "channelId";
82 private static final String PARAM_MEDIAPACKAGE = "mediapackage";
83 private static final String PARAM_ELEMENT_IDS = "elementIds";
84
85 private static final Gson gson = new Gson();
86
87 public StreamingDistributionServiceRemoteImpl() {
88
89 super("waiting for activation");
90 }
91
92
93 @Activate
94 protected void activate(ComponentContext cc) {
95 this.distributionChannel = OsgiUtil.getComponentContextProperty(cc, CONFIG_KEY_STORE_TYPE);
96 super.serviceType = JOB_TYPE_PREFIX + this.distributionChannel;
97 }
98
99 public String getDistributionType() {
100 return this.distributionChannel;
101 }
102
103 @Override
104 public Job distribute(String channelId, MediaPackage mediaPackage, String elementId)
105 throws DistributionException, MediaPackageException {
106 Set<String> elementIds = new HashSet<String>();
107 elementIds.add(elementId);
108 return distribute(channelId, mediaPackage, elementIds);
109 }
110
111 @Override
112 public boolean publishToStreaming() {
113 HttpGet get = new HttpGet("/publishToStreaming");
114 HttpResponse response = getResponse(get);
115
116 if (response != null) {
117 String content = null;
118 try (BufferedReader r = new BufferedReader(new InputStreamReader(response.getEntity().getContent()))) {
119 content = r.readLine();
120 } catch (Exception e) {
121 logger.error("Failed to read response from remote service: ", e);
122 } finally {
123 closeConnection(response);
124 }
125 if (content != null) {
126 return Boolean.parseBoolean(content.trim());
127 }
128 }
129 return false;
130 }
131
132 @Override
133 public Job distribute(String channelId, final MediaPackage mediaPackage, Set<String> elementIds)
134 throws DistributionException, MediaPackageException {
135 logger.info("Distributing {} elements to {}@{}", elementIds.size(), channelId, distributionChannel);
136 final HttpPost req = post(param(PARAM_CHANNEL_ID, channelId),
137 param(PARAM_MEDIAPACKAGE, MediaPackageParser.getAsXml(mediaPackage)),
138 param(PARAM_ELEMENT_IDS, gson.toJson(elementIds)));
139 Optional<Job> job = runRequest(req, JobUtil::jobFromHttpResponse);
140 if (job.isPresent()) {
141 return job.get();
142 }
143 throw new DistributionException(format("Unable to distribute '%s' elements of "
144 + "mediapackage '%s' using a remote destribution service proxy",
145 elementIds.size(), mediaPackage.getIdentifier().toString()));
146 }
147
148 @Override
149 public Job retract(String channelId, MediaPackage mediaPackage, String elementId) throws DistributionException {
150 Set<String> elementIds = new HashSet<String>();
151 elementIds.add(elementId);
152 return retract(channelId, mediaPackage, elementIds);
153 }
154
155 @Override
156 public Job retract(String channelId, MediaPackage mediaPackage, Set<String> elementIds) throws DistributionException {
157 logger.info("Retracting {} elements from {}@{}", elementIds.size(), channelId, distributionChannel);
158 final HttpPost req = post("/retract",
159 param(PARAM_MEDIAPACKAGE, MediaPackageParser.getAsXml(mediaPackage)),
160 param(PARAM_ELEMENT_IDS, gson.toJson(elementIds)),
161 param(PARAM_CHANNEL_ID, channelId));
162 Optional<Job> job = runRequest(req, JobUtil::jobFromHttpResponse);
163 if (job.isPresent()) {
164 return job.get();
165 }
166 throw new DistributionException(format("Unable to retract '%s' elements of "
167 + "mediapackage '%s' using a remote destribution service proxy",
168 elementIds.size(), mediaPackage.getIdentifier().toString()));
169 }
170
171 @Override
172 public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediaPackage, String elementId)
173 throws DistributionException {
174 Set<String> elementIds = new HashSet<String>();
175 elementIds.add(elementId);
176 return distributeSync(channelId, mediaPackage, elementIds);
177 }
178
179 @Override
180 public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediapackage, Set<String> elementIds)
181 throws DistributionException {
182 logger.info("Distributing {} elements to {}@{}", elementIds.size(), channelId, distributionChannel);
183 final HttpPost req = post("/distributesync", param(PARAM_CHANNEL_ID, channelId),
184 param(PARAM_MEDIAPACKAGE, MediaPackageParser.getAsXml(mediapackage)),
185 param(PARAM_ELEMENT_IDS, gson.toJson(elementIds)));
186 Optional<List<MediaPackageElement>> elements = runRequest(req, RemoteBase::elementsFromHttpResponse);
187 if (elements.isPresent()) {
188 return elements.get();
189 }
190 throw new DistributionException(format("Unable to distribute '%s' elements of "
191 + "mediapackage '%s' using a remote destribution service proxy",
192 elementIds.size(), mediapackage.getIdentifier().toString()));
193 }
194
195 @Override
196 public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediaPackage, String elementId)
197 throws DistributionException {
198 Set<String> elementIds = new HashSet<String>();
199 elementIds.add(elementId);
200 return retractSync(channelId, mediaPackage, elementIds);
201 }
202
203 @Override
204 public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediaPackage, Set<String> elementIds)
205 throws DistributionException {
206 logger.info("Retracting {} elements from {}@{}", elementIds.size(), channelId, distributionChannel);
207 final HttpPost req = post("/retract",
208 param(PARAM_MEDIAPACKAGE, MediaPackageParser.getAsXml(mediaPackage)),
209 param(PARAM_ELEMENT_IDS, gson.toJson(elementIds)),
210 param(PARAM_CHANNEL_ID, channelId));
211 Optional<List<MediaPackageElement>> elements = runRequest(req, RemoteBase::elementsFromHttpResponse);
212 if (elements.isPresent()) {
213 return elements.get();
214 }
215 throw new DistributionException(format("Unable to retract '%s' elements of "
216 + "mediapackage '%s' using a remote destribution service proxy",
217 elementIds.size(), mediaPackage.getIdentifier().toString()));
218 }
219
220 @Reference
221 @Override
222 public void setTrustedHttpClient(TrustedHttpClient trustedHttpClient) {
223 super.setTrustedHttpClient(trustedHttpClient);
224 }
225 @Reference
226 @Override
227 public void setRemoteServiceManager(ServiceRegistry serviceRegistry) {
228 super.setRemoteServiceManager(serviceRegistry);
229 }
230
231 }