1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.serviceregistry.api;
23
24 import org.opencastproject.mediapackage.MediaPackageElement;
25 import org.opencastproject.mediapackage.MediaPackageElementParser;
26 import org.opencastproject.security.api.TrustedHttpClient;
27 import org.opencastproject.util.UrlSupport;
28
29 import org.apache.commons.io.IOUtils;
30 import org.apache.commons.lang3.StringUtils;
31 import org.apache.http.HttpResponse;
32 import org.apache.http.HttpStatus;
33 import org.apache.http.StatusLine;
34 import org.apache.http.client.methods.HttpRequestBase;
35 import org.joda.time.DateTimeConstants;
36 import org.osgi.service.component.annotations.Reference;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 import java.io.IOException;
41 import java.io.InputStream;
42 import java.net.URI;
43 import java.nio.charset.Charset;
44 import java.util.ArrayList;
45 import java.util.Arrays;
46 import java.util.List;
47 import java.util.Optional;
48
49
50
51
52 public class RemoteBase {
53
54 private static final int TIMEOUT = 10000;
55
56
57 private static final Logger logger = LoggerFactory.getLogger(RemoteBase.class);
58
59
60 protected String serviceType = null;
61
62
63 protected TrustedHttpClient client = null;
64
65
66 protected ServiceRegistry remoteServiceManager = null;
67
68
69 private static final List<Integer> knownHttpStatuses = Arrays.asList(HttpStatus.SC_SERVICE_UNAVAILABLE);
70
71
72
73
74
75
76
77 protected RemoteBase(String type) {
78 if (type == null) {
79 throw new IllegalArgumentException("Service type must not be null");
80 }
81 this.serviceType = type;
82 }
83
84
85
86
87
88
89 @Reference
90 public void setTrustedHttpClient(TrustedHttpClient client) {
91 this.client = client;
92 }
93
94
95
96
97
98
99 @Reference
100 public void setRemoteServiceManager(ServiceRegistry remoteServiceManager) {
101 this.remoteServiceManager = remoteServiceManager;
102 }
103
104 protected <A> Optional<A> runRequest(HttpRequestBase req, java.util.function.Function<HttpResponse, Optional<A>> f) {
105 HttpResponse res = null;
106 try {
107 res = getResponse(req);
108 return res != null ? f.apply(res) : Optional.empty();
109 } finally {
110 closeConnection(res);
111 }
112 }
113
114 public static Optional<List<MediaPackageElement>> elementsFromHttpResponse(HttpResponse response) {
115 try {
116 String xml = IOUtils.toString(response.getEntity().getContent(), Charset.forName("utf-8"));
117 List<MediaPackageElement> result = new ArrayList<>(MediaPackageElementParser.getArrayFromXml(xml));
118 return Optional.of(result);
119 } catch (Exception e) {
120 logger.error("Error parsing MediaPackage elements from HTTP response", e);
121 return Optional.empty();
122 }
123 }
124
125
126
127
128
129
130
131
132
133
134
135 protected HttpResponse getResponse(HttpRequestBase httpRequest) {
136 return getResponse(httpRequest, HttpStatus.SC_OK);
137 }
138
139
140
141
142
143
144
145
146
147
148
149
150
151 protected HttpResponse getResponse(HttpRequestBase httpRequest, Integer... expectedHttpStatus) {
152
153 final long maxWaitTimeMillis = System.currentTimeMillis() + DateTimeConstants.MILLIS_PER_DAY;
154 boolean warnedUnavailability = false;
155
156
157 while (true) {
158
159 List<ServiceRegistration> remoteServices = null;
160 List<String> servicesInWarningState = new ArrayList<String>();
161 List<String> servicesInKnownState = new ArrayList<String>();
162
163
164 boolean warned = false;
165 while (remoteServices == null || remoteServices.size() == 0) {
166 try {
167 remoteServices = remoteServiceManager.getServiceRegistrationsByLoad(serviceType);
168 if (remoteServices == null || remoteServices.size() == 0) {
169 if (!warned) {
170 logger.warn("No services of type '{}' found, waiting...", serviceType);
171 warned = true;
172 }
173 logger.debug("Still no services of type '{}' found, waiting...", serviceType);
174 try {
175 Thread.sleep(TIMEOUT);
176 } catch (InterruptedException e) {
177 logger.warn("Interrupted while waiting for remote service of type '{}'", serviceType);
178 return null;
179 }
180 }
181 } catch (ServiceRegistryException e) {
182 logger.warn("Unable to obtain a list of remote services", e);
183 return null;
184 }
185 }
186
187 URI originalUri = httpRequest.getURI();
188 String uriSuffix = null;
189 if (originalUri != null && StringUtils.isNotBlank(originalUri.toString())) {
190 uriSuffix = originalUri.toString();
191 }
192
193
194 String fullUrl = null;
195 for (ServiceRegistration remoteService : remoteServices) {
196 HttpResponse response = null;
197 try {
198 if (uriSuffix == null) {
199 fullUrl = UrlSupport.concat(remoteService.getHost(), remoteService.getPath());
200 } else {
201 fullUrl = UrlSupport.concat(new String[] { remoteService.getHost(), remoteService.getPath(), uriSuffix });
202 }
203
204 logger.debug("Connecting to remote service of type '{}' at {}", serviceType, fullUrl);
205
206 URI uri = new URI(fullUrl);
207 httpRequest.setURI(uri);
208 response = client.execute(httpRequest);
209 StatusLine status = response.getStatusLine();
210 if (Arrays.asList(expectedHttpStatus).contains(status.getStatusCode())) {
211 if (servicesInWarningState.contains(fullUrl) || servicesInKnownState.contains(fullUrl)) {
212 logger.warn("Service at {} is back to normal with expected status code {}", fullUrl,
213 status.getStatusCode());
214 }
215 return response;
216 } else {
217 if (!knownHttpStatuses.contains(status.getStatusCode()) && !servicesInWarningState.contains(fullUrl)) {
218 logger.warn("Service at {} returned unexpected response code {}", fullUrl, status.getStatusCode());
219 servicesInWarningState.add(fullUrl);
220 servicesInKnownState.remove(fullUrl);
221 } else if (knownHttpStatuses.contains(status.getStatusCode()) && !servicesInKnownState.contains(fullUrl)) {
222 logger.info("Service at {} returned known response code {}", fullUrl, status.getStatusCode());
223 servicesInKnownState.add(fullUrl);
224 servicesInWarningState.remove(fullUrl);
225 }
226 }
227 } catch (Exception e) {
228 logger.error("Exception while trying to dispatch job to {}", fullUrl, e);
229 servicesInWarningState.add(fullUrl);
230 }
231 closeConnection(response);
232 }
233
234 if (servicesInKnownState.isEmpty()) {
235 logger.warn("All services of type '{}' are in unknown state, abort remote call {}", serviceType, originalUri);
236 return null;
237 }
238
239
240 httpRequest.setURI(originalUri);
241
242
243 if (!warnedUnavailability) {
244 logger.warn("No service of type '{}' is currently readily available", serviceType);
245 warnedUnavailability = true;
246 } else {
247 logger.debug("All services of type '{}' are still unavailable", serviceType);
248 }
249
250 try {
251 if (System.currentTimeMillis() > maxWaitTimeMillis) {
252 logger.warn(
253 "Still no service of type '{}' available while waiting for more than one day, abort remote call {}",
254 serviceType, originalUri);
255 return null;
256 }
257 Thread.sleep(TIMEOUT);
258 } catch (InterruptedException e) {
259 logger.warn("Interrupted while waiting for remote service of type '{}'", serviceType);
260 return null;
261 }
262
263 }
264 }
265
266
267
268
269 protected void closeConnection(HttpResponse response) {
270 if (response != null) {
271 try {
272 client.close(response);
273 } catch (IOException e) {
274
275 }
276 }
277 }
278
279
280
281
282
283 public class HttpClientClosingInputStream extends InputStream {
284
285
286 protected InputStream delegateStream = null;
287
288
289 protected HttpResponse httpResponse = null;
290
291
292
293
294
295
296
297 public HttpClientClosingInputStream(HttpResponse resp) throws IllegalStateException, IOException {
298 this.delegateStream = resp.getEntity().getContent();
299 this.httpResponse = resp;
300 }
301
302
303
304
305
306
307 @Override
308 public int read() throws IOException {
309 return delegateStream.read();
310 }
311
312
313
314
315
316
317 @Override
318 public int available() throws IOException {
319 return delegateStream.available();
320 }
321
322
323
324
325
326 @Override
327 public void close() throws IOException {
328 delegateStream.close();
329 closeConnection(httpResponse);
330 }
331
332
333
334
335
336 @Override
337 public void mark(int readlimit) {
338 delegateStream.mark(readlimit);
339 }
340
341
342
343
344
345 @Override
346 public boolean markSupported() {
347 return delegateStream.markSupported();
348 }
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369 @Override
370 public int read(byte[] b, int off, int len) throws IOException {
371 return delegateStream.read(b, off, len);
372 }
373
374
375
376
377
378
379
380
381
382
383
384
385
386 @Override
387 public int read(byte[] b) throws IOException {
388 return delegateStream.read(b);
389 }
390
391
392
393
394
395 @Override
396 public void reset() throws IOException {
397 delegateStream.reset();
398 }
399
400
401
402
403
404
405
406
407
408 @Override
409 public long skip(long n) throws IOException {
410 return delegateStream.skip(n);
411 }
412
413
414
415
416
417 @Override
418 public String toString() {
419 return getClass().getName() + " : " + delegateStream.toString();
420 }
421 }
422
423 }