1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.serviceregistry.api;
23
24 import org.opencastproject.mediapackage.MediaPackageElement;
25 import org.opencastproject.mediapackage.MediaPackageElementParser;
26 import org.opencastproject.security.api.TrustedHttpClient;
27 import org.opencastproject.util.UrlSupport;
28
29 import org.apache.commons.io.IOUtils;
30 import org.apache.commons.lang3.StringUtils;
31 import org.apache.http.HttpResponse;
32 import org.apache.http.HttpStatus;
33 import org.apache.http.StatusLine;
34 import org.apache.http.client.methods.HttpRequestBase;
35 import org.joda.time.DateTimeConstants;
36 import org.osgi.service.component.annotations.Reference;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 import java.io.IOException;
41 import java.io.InputStream;
42 import java.net.URI;
43 import java.nio.charset.Charset;
44 import java.util.ArrayList;
45 import java.util.Arrays;
46 import java.util.List;
47 import java.util.Optional;
48
49
50
51
52 public class RemoteBase {
53
54 private static final int TIMEOUT = 10000;
55
56
57 private static final Logger logger = LoggerFactory.getLogger(RemoteBase.class);
58
59
60 protected String serviceType = null;
61
62
63 protected TrustedHttpClient client = null;
64
65
66 protected ServiceRegistry remoteServiceManager = null;
67
68
69 private static final List<Integer> knownHttpStatuses = Arrays.asList(HttpStatus.SC_SERVICE_UNAVAILABLE);
70
71
72
73
74
75
76
77 protected RemoteBase(String type) {
78 if (type == null)
79 throw new IllegalArgumentException("Service type must not be null");
80 this.serviceType = type;
81 }
82
83
84
85
86
87
88 @Reference
89 public void setTrustedHttpClient(TrustedHttpClient client) {
90 this.client = client;
91 }
92
93
94
95
96
97
98 @Reference
99 public void setRemoteServiceManager(ServiceRegistry remoteServiceManager) {
100 this.remoteServiceManager = remoteServiceManager;
101 }
102
103 protected <A> Optional<A> runRequest(HttpRequestBase req, java.util.function.Function<HttpResponse, Optional<A>> f) {
104 HttpResponse res = null;
105 try {
106 res = getResponse(req);
107 return res != null ? f.apply(res) : Optional.empty();
108 } finally {
109 closeConnection(res);
110 }
111 }
112
113 public static Optional<List<MediaPackageElement>> elementsFromHttpResponse(HttpResponse response) {
114 try {
115 String xml = IOUtils.toString(response.getEntity().getContent(), Charset.forName("utf-8"));
116 List<MediaPackageElement> result = new ArrayList<>(MediaPackageElementParser.getArrayFromXml(xml));
117 return Optional.of(result);
118 } catch (Exception e) {
119 logger.error("Error parsing MediaPackage elements from HTTP response", e);
120 return Optional.empty();
121 }
122 }
123
124
125
126
127
128
129
130
131
132
133
134 protected HttpResponse getResponse(HttpRequestBase httpRequest) {
135 return getResponse(httpRequest, HttpStatus.SC_OK);
136 }
137
138
139
140
141
142
143
144
145
146
147
148
149
150 protected HttpResponse getResponse(HttpRequestBase httpRequest, Integer... expectedHttpStatus) {
151
152 final long maxWaitTimeMillis = System.currentTimeMillis() + DateTimeConstants.MILLIS_PER_DAY;
153 boolean warnedUnavailability = false;
154
155
156 while (true) {
157
158 List<ServiceRegistration> remoteServices = null;
159 List<String> servicesInWarningState = new ArrayList<String>();
160 List<String> servicesInKnownState = new ArrayList<String>();
161
162
163 boolean warned = false;
164 while (remoteServices == null || remoteServices.size() == 0) {
165 try {
166 remoteServices = remoteServiceManager.getServiceRegistrationsByLoad(serviceType);
167 if (remoteServices == null || remoteServices.size() == 0) {
168 if (!warned) {
169 logger.warn("No services of type '{}' found, waiting...", serviceType);
170 warned = true;
171 }
172 logger.debug("Still no services of type '{}' found, waiting...", serviceType);
173 try {
174 Thread.sleep(TIMEOUT);
175 } catch (InterruptedException e) {
176 logger.warn("Interrupted while waiting for remote service of type '{}'", serviceType);
177 return null;
178 }
179 }
180 } catch (ServiceRegistryException e) {
181 logger.warn("Unable to obtain a list of remote services", e);
182 return null;
183 }
184 }
185
186 URI originalUri = httpRequest.getURI();
187 String uriSuffix = null;
188 if (originalUri != null && StringUtils.isNotBlank(originalUri.toString())) {
189 uriSuffix = originalUri.toString();
190 }
191
192
193 String fullUrl = null;
194 for (ServiceRegistration remoteService : remoteServices) {
195 HttpResponse response = null;
196 try {
197 if (uriSuffix == null) {
198 fullUrl = UrlSupport.concat(remoteService.getHost(), remoteService.getPath());
199 } else {
200 fullUrl = UrlSupport.concat(new String[] { remoteService.getHost(), remoteService.getPath(), uriSuffix });
201 }
202
203 logger.debug("Connecting to remote service of type '{}' at {}", serviceType, fullUrl);
204
205 URI uri = new URI(fullUrl);
206 httpRequest.setURI(uri);
207 response = client.execute(httpRequest);
208 StatusLine status = response.getStatusLine();
209 if (Arrays.asList(expectedHttpStatus).contains(status.getStatusCode())) {
210 if (servicesInWarningState.contains(fullUrl) || servicesInKnownState.contains(fullUrl)) {
211 logger.warn("Service at {} is back to normal with expected status code {}", fullUrl,
212 status.getStatusCode());
213 }
214 return response;
215 } else {
216 if (!knownHttpStatuses.contains(status.getStatusCode()) && !servicesInWarningState.contains(fullUrl)) {
217 logger.warn("Service at {} returned unexpected response code {}", fullUrl, status.getStatusCode());
218 servicesInWarningState.add(fullUrl);
219 servicesInKnownState.remove(fullUrl);
220 } else if (knownHttpStatuses.contains(status.getStatusCode()) && !servicesInKnownState.contains(fullUrl)) {
221 logger.info("Service at {} returned known response code {}", fullUrl, status.getStatusCode());
222 servicesInKnownState.add(fullUrl);
223 servicesInWarningState.remove(fullUrl);
224 }
225 }
226 } catch (Exception e) {
227 logger.error("Exception while trying to dispatch job to {}", fullUrl, e);
228 servicesInWarningState.add(fullUrl);
229 }
230 closeConnection(response);
231 }
232
233 if (servicesInKnownState.isEmpty()) {
234 logger.warn("All services of type '{}' are in unknown state, abort remote call {}", serviceType, originalUri);
235 return null;
236 }
237
238
239 httpRequest.setURI(originalUri);
240
241
242 if (!warnedUnavailability) {
243 logger.warn("No service of type '{}' is currently readily available", serviceType);
244 warnedUnavailability = true;
245 } else {
246 logger.debug("All services of type '{}' are still unavailable", serviceType);
247 }
248
249 try {
250 if (System.currentTimeMillis() > maxWaitTimeMillis) {
251 logger.warn(
252 "Still no service of type '{}' available while waiting for more than one day, abort remote call {}",
253 serviceType, originalUri);
254 return null;
255 }
256 Thread.sleep(TIMEOUT);
257 } catch (InterruptedException e) {
258 logger.warn("Interrupted while waiting for remote service of type '{}'", serviceType);
259 return null;
260 }
261
262 }
263 }
264
265
266
267
268 protected void closeConnection(HttpResponse response) {
269 if (response != null)
270 try {
271 client.close(response);
272 } catch (IOException e) {
273
274 }
275 }
276
277
278
279
280
281 public class HttpClientClosingInputStream extends InputStream {
282
283
284 protected InputStream delegateStream = null;
285
286
287 protected HttpResponse httpResponse = null;
288
289
290
291
292
293
294
295 public HttpClientClosingInputStream(HttpResponse resp) throws IllegalStateException, IOException {
296 this.delegateStream = resp.getEntity().getContent();
297 this.httpResponse = resp;
298 }
299
300
301
302
303
304
305 @Override
306 public int read() throws IOException {
307 return delegateStream.read();
308 }
309
310
311
312
313
314
315 @Override
316 public int available() throws IOException {
317 return delegateStream.available();
318 }
319
320
321
322
323
324 @Override
325 public void close() throws IOException {
326 delegateStream.close();
327 closeConnection(httpResponse);
328 }
329
330
331
332
333
334 @Override
335 public void mark(int readlimit) {
336 delegateStream.mark(readlimit);
337 }
338
339
340
341
342
343 @Override
344 public boolean markSupported() {
345 return delegateStream.markSupported();
346 }
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367 @Override
368 public int read(byte[] b, int off, int len) throws IOException {
369 return delegateStream.read(b, off, len);
370 }
371
372
373
374
375
376
377
378
379
380
381
382
383
384 @Override
385 public int read(byte[] b) throws IOException {
386 return delegateStream.read(b);
387 }
388
389
390
391
392
393 @Override
394 public void reset() throws IOException {
395 delegateStream.reset();
396 }
397
398
399
400
401
402
403
404
405
406 @Override
407 public long skip(long n) throws IOException {
408 return delegateStream.skip(n);
409 }
410
411
412
413
414
415 @Override
416 public String toString() {
417 return getClass().getName() + " : " + delegateStream.toString();
418 }
419 }
420
421 }