1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.distribution.streaming.remote;
23
24 import static java.lang.String.format;
25 import static org.opencastproject.util.HttpUtil.param;
26 import static org.opencastproject.util.HttpUtil.post;
27 import static org.opencastproject.util.JobUtil.jobFromHttpResponse;
28 import static org.opencastproject.util.data.functions.Options.join;
29
30 import org.opencastproject.distribution.api.DistributionException;
31 import org.opencastproject.distribution.api.DistributionService;
32 import org.opencastproject.distribution.api.StreamingDistributionService;
33 import org.opencastproject.job.api.Job;
34 import org.opencastproject.mediapackage.MediaPackage;
35 import org.opencastproject.mediapackage.MediaPackageElement;
36 import org.opencastproject.mediapackage.MediaPackageException;
37 import org.opencastproject.mediapackage.MediaPackageParser;
38 import org.opencastproject.security.api.TrustedHttpClient;
39 import org.opencastproject.serviceregistry.api.RemoteBase;
40 import org.opencastproject.serviceregistry.api.ServiceRegistry;
41 import org.opencastproject.util.OsgiUtil;
42
43 import com.google.gson.Gson;
44
45 import org.apache.http.HttpResponse;
46 import org.apache.http.client.methods.HttpGet;
47 import org.apache.http.client.methods.HttpPost;
48 import org.osgi.service.component.ComponentContext;
49 import org.osgi.service.component.annotations.Activate;
50 import org.osgi.service.component.annotations.Component;
51 import org.osgi.service.component.annotations.Reference;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 import java.io.BufferedReader;
56 import java.io.InputStreamReader;
57 import java.util.HashSet;
58 import java.util.List;
59 import java.util.Set;
60
61
62
63
64 @Component(
65 immediate = true,
66 service = { DistributionService.class, StreamingDistributionService.class },
67 property = {
68 "service.description=Distribution (Streaming) Remote Service Proxy",
69 "distribution.channel=streaming"
70 }
71 )
72 public class StreamingDistributionServiceRemoteImpl extends RemoteBase implements StreamingDistributionService {
73
74
75 private static final Logger logger = LoggerFactory.getLogger(StreamingDistributionServiceRemoteImpl.class);
76
77
78 protected String distributionChannel;
79
80
81 private static final String PARAM_CHANNEL_ID = "channelId";
82 private static final String PARAM_MEDIAPACKAGE = "mediapackage";
83 private static final String PARAM_ELEMENT_IDS = "elementIds";
84
85 private static final Gson gson = new Gson();
86
87 public StreamingDistributionServiceRemoteImpl() {
88
89 super("waiting for activation");
90 }
91
92
93 @Activate
94 protected void activate(ComponentContext cc) {
95 this.distributionChannel = OsgiUtil.getComponentContextProperty(cc, CONFIG_KEY_STORE_TYPE);
96 super.serviceType = JOB_TYPE_PREFIX + this.distributionChannel;
97 }
98
99 public String getDistributionType() {
100 return this.distributionChannel;
101 }
102
103 @Override
104 public Job distribute(String channelId, MediaPackage mediaPackage, String elementId)
105 throws DistributionException, MediaPackageException {
106 Set<String> elementIds = new HashSet<String>();
107 elementIds.add(elementId);
108 return distribute(channelId, mediaPackage, elementIds);
109 }
110
111 @Override
112 public boolean publishToStreaming() {
113 HttpGet get = new HttpGet("/publishToStreaming");
114 HttpResponse response = getResponse(get);
115
116 if (response != null) {
117 String content = null;
118 try (BufferedReader r = new BufferedReader(new InputStreamReader(response.getEntity().getContent()))) {
119 content = r.readLine();
120 } catch (Exception e) {
121 logger.error("Failed to read response from remote service: ", e);
122 } finally {
123 closeConnection(response);
124 }
125 if (content != null) {
126 return Boolean.parseBoolean(content.trim());
127 }
128 }
129 return false;
130 }
131
132 @Override
133 public Job distribute(String channelId, final MediaPackage mediaPackage, Set<String> elementIds)
134 throws DistributionException, MediaPackageException {
135 logger.info("Distributing {} elements to {}@{}", elementIds.size(), channelId, distributionChannel);
136 final HttpPost req = post(param(PARAM_CHANNEL_ID, channelId),
137 param(PARAM_MEDIAPACKAGE, MediaPackageParser.getAsXml(mediaPackage)),
138 param(PARAM_ELEMENT_IDS, gson.toJson(elementIds)));
139 for (Job job : join(runRequest(req, jobFromHttpResponse))) {
140 return job;
141 }
142 throw new DistributionException(format("Unable to distribute '%s' elements of "
143 + "mediapackage '%s' using a remote destribution service proxy",
144 elementIds.size(), mediaPackage.getIdentifier().toString()));
145 }
146
147 @Override
148 public Job retract(String channelId, MediaPackage mediaPackage, String elementId) throws DistributionException {
149 Set<String> elementIds = new HashSet<String>();
150 elementIds.add(elementId);
151 return retract(channelId, mediaPackage, elementIds);
152 }
153
154 @Override
155 public Job retract(String channelId, MediaPackage mediaPackage, Set<String> elementIds) throws DistributionException {
156 logger.info("Retracting {} elements from {}@{}", elementIds.size(), channelId, distributionChannel);
157 final HttpPost req = post("/retract",
158 param(PARAM_MEDIAPACKAGE, MediaPackageParser.getAsXml(mediaPackage)),
159 param(PARAM_ELEMENT_IDS, gson.toJson(elementIds)),
160 param(PARAM_CHANNEL_ID, channelId));
161 for (Job job : join(runRequest(req, jobFromHttpResponse))) {
162 return job;
163 }
164 throw new DistributionException(format("Unable to retract '%s' elements of "
165 + "mediapackage '%s' using a remote destribution service proxy",
166 elementIds.size(), mediaPackage.getIdentifier().toString()));
167 }
168
169 @Override
170 public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediaPackage, String elementId)
171 throws DistributionException {
172 Set<String> elementIds = new HashSet<String>();
173 elementIds.add(elementId);
174 return distributeSync(channelId, mediaPackage, elementIds);
175 }
176
177 @Override
178 public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediapackage, Set<String> elementIds)
179 throws DistributionException {
180 logger.info("Distributing {} elements to {}@{}", elementIds.size(), channelId, distributionChannel);
181 final HttpPost req = post("/distributesync", param(PARAM_CHANNEL_ID, channelId),
182 param(PARAM_MEDIAPACKAGE, MediaPackageParser.getAsXml(mediapackage)),
183 param(PARAM_ELEMENT_IDS, gson.toJson(elementIds)));
184 for (List<MediaPackageElement> elements : join(runRequest(req, elementsFromHttpResponse))) {
185 return elements;
186 }
187 throw new DistributionException(format("Unable to distribute '%s' elements of "
188 + "mediapackage '%s' using a remote destribution service proxy",
189 elementIds.size(), mediapackage.getIdentifier().toString()));
190 }
191
192 @Override
193 public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediaPackage, String elementId)
194 throws DistributionException {
195 Set<String> elementIds = new HashSet<String>();
196 elementIds.add(elementId);
197 return retractSync(channelId, mediaPackage, elementIds);
198 }
199
200 @Override
201 public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediaPackage, Set<String> elementIds)
202 throws DistributionException {
203 logger.info("Retracting {} elements from {}@{}", elementIds.size(), channelId, distributionChannel);
204 final HttpPost req = post("/retract",
205 param(PARAM_MEDIAPACKAGE, MediaPackageParser.getAsXml(mediaPackage)),
206 param(PARAM_ELEMENT_IDS, gson.toJson(elementIds)),
207 param(PARAM_CHANNEL_ID, channelId));
208 for (List<MediaPackageElement> elements : join(runRequest(req, elementsFromHttpResponse))) {
209 return elements;
210 }
211 throw new DistributionException(format("Unable to retract '%s' elements of "
212 + "mediapackage '%s' using a remote destribution service proxy",
213 elementIds.size(), mediaPackage.getIdentifier().toString()));
214 }
215
216 @Reference
217 @Override
218 public void setTrustedHttpClient(TrustedHttpClient trustedHttpClient) {
219 super.setTrustedHttpClient(trustedHttpClient);
220 }
221 @Reference
222 @Override
223 public void setRemoteServiceManager(ServiceRegistry serviceRegistry) {
224 super.setRemoteServiceManager(serviceRegistry);
225 }
226
227 }