View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.serviceregistry.api;
23  
24  import org.opencastproject.mediapackage.MediaPackageElement;
25  import org.opencastproject.mediapackage.MediaPackageElementParser;
26  import org.opencastproject.security.api.TrustedHttpClient;
27  import org.opencastproject.util.UrlSupport;
28  
29  import org.apache.commons.io.IOUtils;
30  import org.apache.commons.lang3.StringUtils;
31  import org.apache.http.HttpResponse;
32  import org.apache.http.HttpStatus;
33  import org.apache.http.StatusLine;
34  import org.apache.http.client.methods.HttpRequestBase;
35  import org.joda.time.DateTimeConstants;
36  import org.osgi.service.component.annotations.Reference;
37  import org.slf4j.Logger;
38  import org.slf4j.LoggerFactory;
39  
40  import java.io.IOException;
41  import java.io.InputStream;
42  import java.net.URI;
43  import java.nio.charset.Charset;
44  import java.util.ArrayList;
45  import java.util.Arrays;
46  import java.util.List;
47  import java.util.Optional;
48  
49  /**
50   * Base class serving as a convenience implementation for remote services.
51   */
52  public class RemoteBase {
53  
54    private static final int TIMEOUT = 10000;
55  
56    /** The logger */
57    private static final Logger logger = LoggerFactory.getLogger(RemoteBase.class);
58  
59    /** The service type, used to look up remote implementations */
60    protected String serviceType = null;
61  
62    /** The http client to use when connecting to remote servers */
63    protected TrustedHttpClient client = null;
64  
65    /** The http client */
66    protected ServiceRegistry remoteServiceManager = null;
67  
68    /** A list of known http statuses */
69    private static final List<Integer> knownHttpStatuses = Arrays.asList(HttpStatus.SC_SERVICE_UNAVAILABLE);
70  
71    /**
72     * Creates a remote implementation for the given type of service.
73     *
74     * @param type
75     *          the service type
76     */
77    protected RemoteBase(String type) {
78      if (type == null) {
79        throw new IllegalArgumentException("Service type must not be null");
80      }
81      this.serviceType = type;
82    }
83  
84    /**
85     * Sets the trusted http client
86     *
87     * @param client
88     */
89    @Reference
90    public void setTrustedHttpClient(TrustedHttpClient client) {
91      this.client = client;
92    }
93  
94    /**
95     * Sets the remote service manager.
96     *
97     * @param remoteServiceManager
98     */
99    @Reference
100   public void setRemoteServiceManager(ServiceRegistry remoteServiceManager) {
101     this.remoteServiceManager = remoteServiceManager;
102   }
103 
104   protected <A> Optional<A> runRequest(HttpRequestBase req, java.util.function.Function<HttpResponse, Optional<A>> f) {
105     HttpResponse res = null;
106     try {
107       res = getResponse(req);
108       return res != null ? f.apply(res) : Optional.empty();
109     } finally {
110       closeConnection(res);
111     }
112   }
113 
114   public static Optional<List<MediaPackageElement>> elementsFromHttpResponse(HttpResponse response) {
115     try {
116       String xml = IOUtils.toString(response.getEntity().getContent(), Charset.forName("utf-8"));
117       List<MediaPackageElement> result = new ArrayList<>(MediaPackageElementParser.getArrayFromXml(xml));
118       return Optional.of(result);
119     } catch (Exception e) {
120       logger.error("Error parsing MediaPackage elements from HTTP response", e);
121       return Optional.empty();
122     }
123   }
124 
125   /**
126    * Makes a request to all available remote services and returns the response as soon as the first of them returns the
127    * {@link HttpStatus#SC_OK} as the status code.
128    *
129    * @param httpRequest
130    *          the http request. If the URI is specified, it should include only the path beyond the service endpoint.
131    *          For example, a request intended for http://{host}/{service}/extra/path/info.xml should include the URI
132    *          "/extra/path/info.xml".
133    * @return the response object, or null if we can not connect to any services
134    */
135   protected HttpResponse getResponse(HttpRequestBase httpRequest) {
136     return getResponse(httpRequest, HttpStatus.SC_OK);
137   }
138 
139   /**
140    * Makes a request to all available remote services and returns the response as soon as the first of them returns the
141    * expected http status code.
142    *
143    * @param httpRequest
144    *          the http request. If the URI is specified, it should include only the path beyond the service endpoint.
145    *          For example, a request intended for http://{host}/{service}/extra/path/info.xml should include the URI
146    *          "/extra/path/info.xml".
147    * @param expectedHttpStatus
148    *          any expected status codes to include in the return.
149    * @return the response object, or null if we can not connect to any services
150    */
151   protected HttpResponse getResponse(HttpRequestBase httpRequest, Integer... expectedHttpStatus) {
152 
153     final long maxWaitTimeMillis = System.currentTimeMillis() + DateTimeConstants.MILLIS_PER_DAY;
154     boolean warnedUnavailability = false;
155 
156     // Try forever
157     while (true) {
158 
159       List<ServiceRegistration> remoteServices = null;
160       List<String> servicesInWarningState = new ArrayList<String>();
161       List<String> servicesInKnownState = new ArrayList<String>();
162 
163       // Find available services
164       boolean warned = false;
165       while (remoteServices == null || remoteServices.size() == 0) {
166         try {
167           remoteServices = remoteServiceManager.getServiceRegistrationsByLoad(serviceType);
168           if (remoteServices == null || remoteServices.size() == 0) {
169             if (!warned) {
170               logger.warn("No services of type '{}' found, waiting...", serviceType);
171               warned = true;
172             }
173             logger.debug("Still no services of type '{}' found, waiting...", serviceType);
174             try {
175               Thread.sleep(TIMEOUT);
176             } catch (InterruptedException e) {
177               logger.warn("Interrupted while waiting for remote service of type '{}'", serviceType);
178               return null;
179             }
180           }
181         } catch (ServiceRegistryException e) {
182           logger.warn("Unable to obtain a list of remote services", e);
183           return null;
184         }
185       }
186 
187       URI originalUri = httpRequest.getURI();
188       String uriSuffix = null;
189       if (originalUri != null && StringUtils.isNotBlank(originalUri.toString())) {
190         uriSuffix = originalUri.toString();
191       }
192 
193       // Try each available service
194       String fullUrl = null;
195       for (ServiceRegistration remoteService : remoteServices) {
196         HttpResponse response = null;
197         try {
198           if (uriSuffix == null) {
199             fullUrl = UrlSupport.concat(remoteService.getHost(), remoteService.getPath());
200           } else {
201             fullUrl = UrlSupport.concat(new String[] { remoteService.getHost(), remoteService.getPath(), uriSuffix });
202           }
203 
204           logger.debug("Connecting to remote service of type '{}' at {}", serviceType, fullUrl);
205 
206           URI uri = new URI(fullUrl);
207           httpRequest.setURI(uri);
208           response = client.execute(httpRequest);
209           StatusLine status = response.getStatusLine();
210           if (Arrays.asList(expectedHttpStatus).contains(status.getStatusCode())) {
211             if (servicesInWarningState.contains(fullUrl) || servicesInKnownState.contains(fullUrl)) {
212               logger.warn("Service at {} is back to normal with expected status code {}", fullUrl,
213                       status.getStatusCode());
214             }
215             return response;
216           } else {
217             if (!knownHttpStatuses.contains(status.getStatusCode()) && !servicesInWarningState.contains(fullUrl)) {
218               logger.warn("Service at {} returned unexpected response code {}", fullUrl, status.getStatusCode());
219               servicesInWarningState.add(fullUrl);
220               servicesInKnownState.remove(fullUrl);
221             } else if (knownHttpStatuses.contains(status.getStatusCode()) && !servicesInKnownState.contains(fullUrl)) {
222               logger.info("Service at {} returned known response code {}", fullUrl, status.getStatusCode());
223               servicesInKnownState.add(fullUrl);
224               servicesInWarningState.remove(fullUrl);
225             }
226           }
227         } catch (Exception e) {
228           logger.error("Exception while trying to dispatch job to {}", fullUrl, e);
229           servicesInWarningState.add(fullUrl);
230         }
231         closeConnection(response);
232       }
233 
234       if (servicesInKnownState.isEmpty()) {
235         logger.warn("All services of type '{}' are in unknown state, abort remote call {}", serviceType, originalUri);
236         return null;
237       }
238 
239       // Reset Original URI
240       httpRequest.setURI(originalUri);
241 
242       // If none of them accepted the request, let's wait and retry
243       if (!warnedUnavailability) {
244         logger.warn("No service of type '{}' is currently readily available", serviceType);
245         warnedUnavailability = true;
246       } else {
247         logger.debug("All services of type '{}' are still unavailable", serviceType);
248       }
249 
250       try {
251         if (System.currentTimeMillis() > maxWaitTimeMillis) {
252           logger.warn(
253                   "Still no service of type '{}' available while waiting for more than one day, abort remote call {}",
254                   serviceType, originalUri);
255           return null;
256         }
257         Thread.sleep(TIMEOUT);
258       } catch (InterruptedException e) {
259         logger.warn("Interrupted while waiting for remote service of type '{}'", serviceType);
260         return null;
261       }
262 
263     }
264   }
265 
266   /**
267    * Closes any http connections kept open by this http response.
268    */
269   protected void closeConnection(HttpResponse response) {
270     if (response != null) {
271       try {
272         client.close(response);
273       } catch (IOException e) {
274         // ignore
275       }
276     }
277   }
278 
279   /**
280    * A stream wrapper that closes the http response when the stream is closed. If a remote service proxy returns an
281    * inputstream, this implementation should be used to ensure that the http connection is closed properly.
282    */
283   public class HttpClientClosingInputStream extends InputStream {
284 
285     /** The input stream delivering the actual data */
286     protected InputStream delegateStream = null;
287 
288     /** The http response to close when the stream is closed */
289     protected HttpResponse httpResponse = null;
290 
291     /**
292      * Constructs an HttpClientClosingInputStream from a source stream and an http response.
293      *
294      * @throws IOException
295      * @throws IllegalStateException
296      */
297     public HttpClientClosingInputStream(HttpResponse resp) throws IllegalStateException, IOException {
298       this.delegateStream = resp.getEntity().getContent();
299       this.httpResponse = resp;
300     }
301 
302     /**
303      * {@inheritDoc}
304      *
305      * @see java.io.InputStream#read()
306      */
307     @Override
308     public int read() throws IOException {
309       return delegateStream.read();
310     }
311 
312     /**
313      * {@inheritDoc}
314      *
315      * @see java.io.InputStream#available()
316      */
317     @Override
318     public int available() throws IOException {
319       return delegateStream.available();
320     }
321 
322     /**
323      * @throws IOException
324      * @see java.io.InputStream#close()
325      */
326     @Override
327     public void close() throws IOException {
328       delegateStream.close();
329       closeConnection(httpResponse);
330     }
331 
332     /**
333      * @param readlimit
334      * @see java.io.InputStream#mark(int)
335      */
336     @Override
337     public void mark(int readlimit) {
338       delegateStream.mark(readlimit);
339     }
340 
341     /**
342      * @return whether this stream supports marking
343      * @see java.io.InputStream#markSupported()
344      */
345     @Override
346     public boolean markSupported() {
347       return delegateStream.markSupported();
348     }
349 
350     /**
351      * @param b
352      *          the buffer into which the data is read.
353      * @param off
354      *          the start offset in array <code>b</code> at which the data is written.
355      * @param len
356      *          the maximum number of bytes to read.
357      * @return the total number of bytes read into the buffer, or <code>-1</code> if there is no more data because the
358      *         end of the stream has been reached.
359      * @exception IOException
360      *              If the first byte cannot be read for any reason other than end of file, or if the input stream has
361      *              been closed, or if some other I/O error occurs.
362      * @exception NullPointerException
363      *              If <code>b</code> is <code>null</code>.
364      * @exception IndexOutOfBoundsException
365      *              If <code>off</code> is negative, <code>len</code> is negative, or <code>len</code> is greater than
366      *              <code>b.length - off</code>
367      * @see java.io.InputStream#read(byte[], int, int)
368      */
369     @Override
370     public int read(byte[] b, int off, int len) throws IOException {
371       return delegateStream.read(b, off, len);
372     }
373 
374     /**
375      * @param b
376      *          the buffer into which the data is read.
377      * @return the total number of bytes read into the buffer, or <code>-1</code> is there is no more data because the
378      *         end of the stream has been reached.
379      * @exception IOException
380      *              If the first byte cannot be read for any reason other than the end of the file, if the input stream
381      *              has been closed, or if some other I/O error occurs.
382      * @exception NullPointerException
383      *              if <code>b</code> is <code>null</code>.
384      * @see java.io.InputStream#read(byte[])
385      */
386     @Override
387     public int read(byte[] b) throws IOException {
388       return delegateStream.read(b);
389     }
390 
391     /**
392      * @throws IOException
393      * @see java.io.InputStream#reset()
394      */
395     @Override
396     public void reset() throws IOException {
397       delegateStream.reset();
398     }
399 
400     /**
401      * @param n
402      *          the number of bytes to be skipped.
403      * @return the actual number of bytes skipped.
404      * @exception IOException
405      *              if the stream does not support seek, or if some other I/O error occurs.
406      * @see java.io.InputStream#skip(long)
407      */
408     @Override
409     public long skip(long n) throws IOException {
410       return delegateStream.skip(n);
411     }
412 
413     /**
414      * @return
415      * @see java.lang.Object#toString()
416      */
417     @Override
418     public String toString() {
419       return getClass().getName() + " : " + delegateStream.toString();
420     }
421   }
422 
423 }