1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.serviceregistry.api;
23
24 import static org.opencastproject.util.data.Option.none;
25 import static org.opencastproject.util.data.Option.some;
26
27 import org.opencastproject.mediapackage.MediaPackageElement;
28 import org.opencastproject.mediapackage.MediaPackageElementParser;
29 import org.opencastproject.security.api.TrustedHttpClient;
30 import org.opencastproject.util.UrlSupport;
31 import org.opencastproject.util.data.Function;
32 import org.opencastproject.util.data.Option;
33
34 import org.apache.commons.io.IOUtils;
35 import org.apache.commons.lang3.StringUtils;
36 import org.apache.http.HttpResponse;
37 import org.apache.http.HttpStatus;
38 import org.apache.http.StatusLine;
39 import org.apache.http.client.methods.HttpRequestBase;
40 import org.joda.time.DateTimeConstants;
41 import org.osgi.service.component.annotations.Reference;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 import java.io.IOException;
46 import java.io.InputStream;
47 import java.net.URI;
48 import java.nio.charset.Charset;
49 import java.util.ArrayList;
50 import java.util.Arrays;
51 import java.util.List;
52
53
54
55
56 public class RemoteBase {
57
58 private static final int TIMEOUT = 10000;
59
60
61 private static final Logger logger = LoggerFactory.getLogger(RemoteBase.class);
62
63
64 protected String serviceType = null;
65
66
67 protected TrustedHttpClient client = null;
68
69
70 protected ServiceRegistry remoteServiceManager = null;
71
72
73 private static final List<Integer> knownHttpStatuses = Arrays.asList(HttpStatus.SC_SERVICE_UNAVAILABLE);
74
75
76
77
78
79
80
81 protected RemoteBase(String type) {
82 if (type == null)
83 throw new IllegalArgumentException("Service type must not be null");
84 this.serviceType = type;
85 }
86
87
88
89
90
91
92 @Reference
93 public void setTrustedHttpClient(TrustedHttpClient client) {
94 this.client = client;
95 }
96
97
98
99
100
101
102 @Reference
103 public void setRemoteServiceManager(ServiceRegistry remoteServiceManager) {
104 this.remoteServiceManager = remoteServiceManager;
105 }
106
107 protected <A> Option<A> runRequest(HttpRequestBase req, Function<HttpResponse, A> f) {
108 HttpResponse res = null;
109 try {
110 res = getResponse(req);
111 return res != null ? some(f.apply(res)) : Option.<A> none();
112 } finally {
113 closeConnection(res);
114 }
115 }
116
117
118
119 public static final Function<HttpResponse, Option<List<MediaPackageElement>>> elementsFromHttpResponse =
120 new Function<HttpResponse, Option<List<MediaPackageElement>>>() {
121 @Override
122 public Option<List<MediaPackageElement>> apply(HttpResponse response) {
123 try {
124 final String xml = IOUtils.toString(response.getEntity().getContent(), Charset.forName("utf-8"));
125 List<MediaPackageElement> result = new ArrayList<>(MediaPackageElementParser.getArrayFromXml(xml));
126 return some(result);
127 } catch (Exception e) {
128 logger.error("Error parsing Job from HTTP response", e);
129 return none();
130 }
131 }
132 };
133
134
135
136
137
138
139
140
141
142
143
144 protected HttpResponse getResponse(HttpRequestBase httpRequest) {
145 return getResponse(httpRequest, HttpStatus.SC_OK);
146 }
147
148
149
150
151
152
153
154
155
156
157
158
159
160 protected HttpResponse getResponse(HttpRequestBase httpRequest, Integer... expectedHttpStatus) {
161
162 final long maxWaitTimeMillis = System.currentTimeMillis() + DateTimeConstants.MILLIS_PER_DAY;
163 boolean warnedUnavailability = false;
164
165
166 while (true) {
167
168 List<ServiceRegistration> remoteServices = null;
169 List<String> servicesInWarningState = new ArrayList<String>();
170 List<String> servicesInKnownState = new ArrayList<String>();
171
172
173 boolean warned = false;
174 while (remoteServices == null || remoteServices.size() == 0) {
175 try {
176 remoteServices = remoteServiceManager.getServiceRegistrationsByLoad(serviceType);
177 if (remoteServices == null || remoteServices.size() == 0) {
178 if (!warned) {
179 logger.warn("No services of type '{}' found, waiting...", serviceType);
180 warned = true;
181 }
182 logger.debug("Still no services of type '{}' found, waiting...", serviceType);
183 try {
184 Thread.sleep(TIMEOUT);
185 } catch (InterruptedException e) {
186 logger.warn("Interrupted while waiting for remote service of type '{}'", serviceType);
187 return null;
188 }
189 }
190 } catch (ServiceRegistryException e) {
191 logger.warn("Unable to obtain a list of remote services", e);
192 return null;
193 }
194 }
195
196 URI originalUri = httpRequest.getURI();
197 String uriSuffix = null;
198 if (originalUri != null && StringUtils.isNotBlank(originalUri.toString())) {
199 uriSuffix = originalUri.toString();
200 }
201
202
203 String fullUrl = null;
204 for (ServiceRegistration remoteService : remoteServices) {
205 HttpResponse response = null;
206 try {
207 if (uriSuffix == null) {
208 fullUrl = UrlSupport.concat(remoteService.getHost(), remoteService.getPath());
209 } else {
210 fullUrl = UrlSupport.concat(new String[] { remoteService.getHost(), remoteService.getPath(), uriSuffix });
211 }
212
213 logger.debug("Connecting to remote service of type '{}' at {}", serviceType, fullUrl);
214
215 URI uri = new URI(fullUrl);
216 httpRequest.setURI(uri);
217 response = client.execute(httpRequest);
218 StatusLine status = response.getStatusLine();
219 if (Arrays.asList(expectedHttpStatus).contains(status.getStatusCode())) {
220 if (servicesInWarningState.contains(fullUrl) || servicesInKnownState.contains(fullUrl)) {
221 logger.warn("Service at {} is back to normal with expected status code {}", fullUrl,
222 status.getStatusCode());
223 }
224 return response;
225 } else {
226 if (!knownHttpStatuses.contains(status.getStatusCode()) && !servicesInWarningState.contains(fullUrl)) {
227 logger.warn("Service at {} returned unexpected response code {}", fullUrl, status.getStatusCode());
228 servicesInWarningState.add(fullUrl);
229 servicesInKnownState.remove(fullUrl);
230 } else if (knownHttpStatuses.contains(status.getStatusCode()) && !servicesInKnownState.contains(fullUrl)) {
231 logger.info("Service at {} returned known response code {}", fullUrl, status.getStatusCode());
232 servicesInKnownState.add(fullUrl);
233 servicesInWarningState.remove(fullUrl);
234 }
235 }
236 } catch (Exception e) {
237 logger.error("Exception while trying to dispatch job to {}", fullUrl, e);
238 servicesInWarningState.add(fullUrl);
239 }
240 closeConnection(response);
241 }
242
243 if (servicesInKnownState.isEmpty()) {
244 logger.warn("All services of type '{}' are in unknown state, abort remote call {}", serviceType, originalUri);
245 return null;
246 }
247
248
249 httpRequest.setURI(originalUri);
250
251
252 if (!warnedUnavailability) {
253 logger.warn("No service of type '{}' is currently readily available", serviceType);
254 warnedUnavailability = true;
255 } else {
256 logger.debug("All services of type '{}' are still unavailable", serviceType);
257 }
258
259 try {
260 if (System.currentTimeMillis() > maxWaitTimeMillis) {
261 logger.warn(
262 "Still no service of type '{}' available while waiting for more than one day, abort remote call {}",
263 serviceType, originalUri);
264 return null;
265 }
266 Thread.sleep(TIMEOUT);
267 } catch (InterruptedException e) {
268 logger.warn("Interrupted while waiting for remote service of type '{}'", serviceType);
269 return null;
270 }
271
272 }
273 }
274
275
276
277
278 protected void closeConnection(HttpResponse response) {
279 if (response != null)
280 try {
281 client.close(response);
282 } catch (IOException e) {
283
284 }
285 }
286
287
288
289
290
291 public class HttpClientClosingInputStream extends InputStream {
292
293
294 protected InputStream delegateStream = null;
295
296
297 protected HttpResponse httpResponse = null;
298
299
300
301
302
303
304
305 public HttpClientClosingInputStream(HttpResponse resp) throws IllegalStateException, IOException {
306 this.delegateStream = resp.getEntity().getContent();
307 this.httpResponse = resp;
308 }
309
310
311
312
313
314
315 @Override
316 public int read() throws IOException {
317 return delegateStream.read();
318 }
319
320
321
322
323
324
325 @Override
326 public int available() throws IOException {
327 return delegateStream.available();
328 }
329
330
331
332
333
334 @Override
335 public void close() throws IOException {
336 delegateStream.close();
337 closeConnection(httpResponse);
338 }
339
340
341
342
343
344 @Override
345 public void mark(int readlimit) {
346 delegateStream.mark(readlimit);
347 }
348
349
350
351
352
353 @Override
354 public boolean markSupported() {
355 return delegateStream.markSupported();
356 }
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377 @Override
378 public int read(byte[] b, int off, int len) throws IOException {
379 return delegateStream.read(b, off, len);
380 }
381
382
383
384
385
386
387
388
389
390
391
392
393
394 @Override
395 public int read(byte[] b) throws IOException {
396 return delegateStream.read(b);
397 }
398
399
400
401
402
403 @Override
404 public void reset() throws IOException {
405 delegateStream.reset();
406 }
407
408
409
410
411
412
413
414
415
416 @Override
417 public long skip(long n) throws IOException {
418 return delegateStream.skip(n);
419 }
420
421
422
423
424
425 @Override
426 public String toString() {
427 return getClass().getName() + " : " + delegateStream.toString();
428 }
429 }
430
431 }