View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.serviceregistry.api;
23  
24  import static org.opencastproject.util.data.Option.none;
25  import static org.opencastproject.util.data.Option.some;
26  
27  import org.opencastproject.mediapackage.MediaPackageElement;
28  import org.opencastproject.mediapackage.MediaPackageElementParser;
29  import org.opencastproject.security.api.TrustedHttpClient;
30  import org.opencastproject.util.UrlSupport;
31  import org.opencastproject.util.data.Function;
32  import org.opencastproject.util.data.Option;
33  
34  import org.apache.commons.io.IOUtils;
35  import org.apache.commons.lang3.StringUtils;
36  import org.apache.http.HttpResponse;
37  import org.apache.http.HttpStatus;
38  import org.apache.http.StatusLine;
39  import org.apache.http.client.methods.HttpRequestBase;
40  import org.joda.time.DateTimeConstants;
41  import org.osgi.service.component.annotations.Reference;
42  import org.slf4j.Logger;
43  import org.slf4j.LoggerFactory;
44  
45  import java.io.IOException;
46  import java.io.InputStream;
47  import java.net.URI;
48  import java.nio.charset.Charset;
49  import java.util.ArrayList;
50  import java.util.Arrays;
51  import java.util.List;
52  
53  /**
54   * Base class serving as a convenience implementation for remote services.
55   */
56  public class RemoteBase {
57  
58    private static final int TIMEOUT = 10000;
59  
60    /** The logger */
61    private static final Logger logger = LoggerFactory.getLogger(RemoteBase.class);
62  
63    /** The service type, used to look up remote implementations */
64    protected String serviceType = null;
65  
66    /** The http client to use when connecting to remote servers */
67    protected TrustedHttpClient client = null;
68  
69    /** The http client */
70    protected ServiceRegistry remoteServiceManager = null;
71  
72    /** A list of known http statuses */
73    private static final List<Integer> knownHttpStatuses = Arrays.asList(HttpStatus.SC_SERVICE_UNAVAILABLE);
74  
75    /**
76     * Creates a remote implementation for the given type of service.
77     *
78     * @param type
79     *          the service type
80     */
81    protected RemoteBase(String type) {
82      if (type == null)
83        throw new IllegalArgumentException("Service type must not be null");
84      this.serviceType = type;
85    }
86  
87    /**
88     * Sets the trusted http client
89     *
90     * @param client
91     */
92    @Reference
93    public void setTrustedHttpClient(TrustedHttpClient client) {
94      this.client = client;
95    }
96  
97    /**
98     * Sets the remote service manager.
99     *
100    * @param remoteServiceManager
101    */
102   @Reference
103   public void setRemoteServiceManager(ServiceRegistry remoteServiceManager) {
104     this.remoteServiceManager = remoteServiceManager;
105   }
106 
107   protected <A> Option<A> runRequest(HttpRequestBase req, Function<HttpResponse, A> f) {
108     HttpResponse res = null;
109     try {
110       res = getResponse(req);
111       return res != null ? some(f.apply(res)) : Option.<A> none();
112     } finally {
113       closeConnection(res);
114     }
115   }
116 
117 
118 
119   public static final Function<HttpResponse, Option<List<MediaPackageElement>>> elementsFromHttpResponse =
120     new Function<HttpResponse, Option<List<MediaPackageElement>>>() {
121     @Override
122     public Option<List<MediaPackageElement>> apply(HttpResponse response) {
123       try {
124         final String xml = IOUtils.toString(response.getEntity().getContent(), Charset.forName("utf-8"));
125         List<MediaPackageElement> result = new ArrayList<>(MediaPackageElementParser.getArrayFromXml(xml));
126         return some(result);
127       } catch (Exception e) {
128         logger.error("Error parsing Job from HTTP response", e);
129         return none();
130       }
131     }
132   };
133 
134   /**
135    * Makes a request to all available remote services and returns the response as soon as the first of them returns the
136    * {@link HttpStatus#SC_OK} as the status code.
137    *
138    * @param httpRequest
139    *          the http request. If the URI is specified, it should include only the path beyond the service endpoint.
140    *          For example, a request intended for http://{host}/{service}/extra/path/info.xml should include the URI
141    *          "/extra/path/info.xml".
142    * @return the response object, or null if we can not connect to any services
143    */
144   protected HttpResponse getResponse(HttpRequestBase httpRequest) {
145     return getResponse(httpRequest, HttpStatus.SC_OK);
146   }
147 
148   /**
149    * Makes a request to all available remote services and returns the response as soon as the first of them returns the
150    * expected http status code.
151    *
152    * @param httpRequest
153    *          the http request. If the URI is specified, it should include only the path beyond the service endpoint.
154    *          For example, a request intended for http://{host}/{service}/extra/path/info.xml should include the URI
155    *          "/extra/path/info.xml".
156    * @param expectedHttpStatus
157    *          any expected status codes to include in the return.
158    * @return the response object, or null if we can not connect to any services
159    */
160   protected HttpResponse getResponse(HttpRequestBase httpRequest, Integer... expectedHttpStatus) {
161 
162     final long maxWaitTimeMillis = System.currentTimeMillis() + DateTimeConstants.MILLIS_PER_DAY;
163     boolean warnedUnavailability = false;
164 
165     // Try forever
166     while (true) {
167 
168       List<ServiceRegistration> remoteServices = null;
169       List<String> servicesInWarningState = new ArrayList<String>();
170       List<String> servicesInKnownState = new ArrayList<String>();
171 
172       // Find available services
173       boolean warned = false;
174       while (remoteServices == null || remoteServices.size() == 0) {
175         try {
176           remoteServices = remoteServiceManager.getServiceRegistrationsByLoad(serviceType);
177           if (remoteServices == null || remoteServices.size() == 0) {
178             if (!warned) {
179               logger.warn("No services of type '{}' found, waiting...", serviceType);
180               warned = true;
181             }
182             logger.debug("Still no services of type '{}' found, waiting...", serviceType);
183             try {
184               Thread.sleep(TIMEOUT);
185             } catch (InterruptedException e) {
186               logger.warn("Interrupted while waiting for remote service of type '{}'", serviceType);
187               return null;
188             }
189           }
190         } catch (ServiceRegistryException e) {
191           logger.warn("Unable to obtain a list of remote services", e);
192           return null;
193         }
194       }
195 
196       URI originalUri = httpRequest.getURI();
197       String uriSuffix = null;
198       if (originalUri != null && StringUtils.isNotBlank(originalUri.toString())) {
199         uriSuffix = originalUri.toString();
200       }
201 
202       // Try each available service
203       String fullUrl = null;
204       for (ServiceRegistration remoteService : remoteServices) {
205         HttpResponse response = null;
206         try {
207           if (uriSuffix == null) {
208             fullUrl = UrlSupport.concat(remoteService.getHost(), remoteService.getPath());
209           } else {
210             fullUrl = UrlSupport.concat(new String[] { remoteService.getHost(), remoteService.getPath(), uriSuffix });
211           }
212 
213           logger.debug("Connecting to remote service of type '{}' at {}", serviceType, fullUrl);
214 
215           URI uri = new URI(fullUrl);
216           httpRequest.setURI(uri);
217           response = client.execute(httpRequest);
218           StatusLine status = response.getStatusLine();
219           if (Arrays.asList(expectedHttpStatus).contains(status.getStatusCode())) {
220             if (servicesInWarningState.contains(fullUrl) || servicesInKnownState.contains(fullUrl)) {
221               logger.warn("Service at {} is back to normal with expected status code {}", fullUrl,
222                       status.getStatusCode());
223             }
224             return response;
225           } else {
226             if (!knownHttpStatuses.contains(status.getStatusCode()) && !servicesInWarningState.contains(fullUrl)) {
227               logger.warn("Service at {} returned unexpected response code {}", fullUrl, status.getStatusCode());
228               servicesInWarningState.add(fullUrl);
229               servicesInKnownState.remove(fullUrl);
230             } else if (knownHttpStatuses.contains(status.getStatusCode()) && !servicesInKnownState.contains(fullUrl)) {
231               logger.info("Service at {} returned known response code {}", fullUrl, status.getStatusCode());
232               servicesInKnownState.add(fullUrl);
233               servicesInWarningState.remove(fullUrl);
234             }
235           }
236         } catch (Exception e) {
237           logger.error("Exception while trying to dispatch job to {}", fullUrl, e);
238           servicesInWarningState.add(fullUrl);
239         }
240         closeConnection(response);
241       }
242 
243       if (servicesInKnownState.isEmpty()) {
244         logger.warn("All services of type '{}' are in unknown state, abort remote call {}", serviceType, originalUri);
245         return null;
246       }
247 
248       // Reset Original URI
249       httpRequest.setURI(originalUri);
250 
251       // If none of them accepted the request, let's wait and retry
252       if (!warnedUnavailability) {
253         logger.warn("No service of type '{}' is currently readily available", serviceType);
254         warnedUnavailability = true;
255       } else {
256         logger.debug("All services of type '{}' are still unavailable", serviceType);
257       }
258 
259       try {
260         if (System.currentTimeMillis() > maxWaitTimeMillis) {
261           logger.warn(
262                   "Still no service of type '{}' available while waiting for more than one day, abort remote call {}",
263                   serviceType, originalUri);
264           return null;
265         }
266         Thread.sleep(TIMEOUT);
267       } catch (InterruptedException e) {
268         logger.warn("Interrupted while waiting for remote service of type '{}'", serviceType);
269         return null;
270       }
271 
272     }
273   }
274 
275   /**
276    * Closes any http connections kept open by this http response.
277    */
278   protected void closeConnection(HttpResponse response) {
279     if (response != null)
280       try {
281         client.close(response);
282       } catch (IOException e) {
283         // ignore
284       }
285   }
286 
287   /**
288    * A stream wrapper that closes the http response when the stream is closed. If a remote service proxy returns an
289    * inputstream, this implementation should be used to ensure that the http connection is closed properly.
290    */
291   public class HttpClientClosingInputStream extends InputStream {
292 
293     /** The input stream delivering the actual data */
294     protected InputStream delegateStream = null;
295 
296     /** The http response to close when the stream is closed */
297     protected HttpResponse httpResponse = null;
298 
299     /**
300      * Constructs an HttpClientClosingInputStream from a source stream and an http response.
301      *
302      * @throws IOException
303      * @throws IllegalStateException
304      */
305     public HttpClientClosingInputStream(HttpResponse resp) throws IllegalStateException, IOException {
306       this.delegateStream = resp.getEntity().getContent();
307       this.httpResponse = resp;
308     }
309 
310     /**
311      * {@inheritDoc}
312      *
313      * @see java.io.InputStream#read()
314      */
315     @Override
316     public int read() throws IOException {
317       return delegateStream.read();
318     }
319 
320     /**
321      * {@inheritDoc}
322      *
323      * @see java.io.InputStream#available()
324      */
325     @Override
326     public int available() throws IOException {
327       return delegateStream.available();
328     }
329 
330     /**
331      * @throws IOException
332      * @see java.io.InputStream#close()
333      */
334     @Override
335     public void close() throws IOException {
336       delegateStream.close();
337       closeConnection(httpResponse);
338     }
339 
340     /**
341      * @param readlimit
342      * @see java.io.InputStream#mark(int)
343      */
344     @Override
345     public void mark(int readlimit) {
346       delegateStream.mark(readlimit);
347     }
348 
349     /**
350      * @return whether this stream supports marking
351      * @see java.io.InputStream#markSupported()
352      */
353     @Override
354     public boolean markSupported() {
355       return delegateStream.markSupported();
356     }
357 
358     /**
359      * @param b
360      *          the buffer into which the data is read.
361      * @param off
362      *          the start offset in array <code>b</code> at which the data is written.
363      * @param len
364      *          the maximum number of bytes to read.
365      * @return the total number of bytes read into the buffer, or <code>-1</code> if there is no more data because the
366      *         end of the stream has been reached.
367      * @exception IOException
368      *              If the first byte cannot be read for any reason other than end of file, or if the input stream has
369      *              been closed, or if some other I/O error occurs.
370      * @exception NullPointerException
371      *              If <code>b</code> is <code>null</code>.
372      * @exception IndexOutOfBoundsException
373      *              If <code>off</code> is negative, <code>len</code> is negative, or <code>len</code> is greater than
374      *              <code>b.length - off</code>
375      * @see java.io.InputStream#read(byte[], int, int)
376      */
377     @Override
378     public int read(byte[] b, int off, int len) throws IOException {
379       return delegateStream.read(b, off, len);
380     }
381 
382     /**
383      * @param b
384      *          the buffer into which the data is read.
385      * @return the total number of bytes read into the buffer, or <code>-1</code> is there is no more data because the
386      *         end of the stream has been reached.
387      * @exception IOException
388      *              If the first byte cannot be read for any reason other than the end of the file, if the input stream
389      *              has been closed, or if some other I/O error occurs.
390      * @exception NullPointerException
391      *              if <code>b</code> is <code>null</code>.
392      * @see java.io.InputStream#read(byte[])
393      */
394     @Override
395     public int read(byte[] b) throws IOException {
396       return delegateStream.read(b);
397     }
398 
399     /**
400      * @throws IOException
401      * @see java.io.InputStream#reset()
402      */
403     @Override
404     public void reset() throws IOException {
405       delegateStream.reset();
406     }
407 
408     /**
409      * @param n
410      *          the number of bytes to be skipped.
411      * @return the actual number of bytes skipped.
412      * @exception IOException
413      *              if the stream does not support seek, or if some other I/O error occurs.
414      * @see java.io.InputStream#skip(long)
415      */
416     @Override
417     public long skip(long n) throws IOException {
418       return delegateStream.skip(n);
419     }
420 
421     /**
422      * @return
423      * @see java.lang.Object#toString()
424      */
425     @Override
426     public String toString() {
427       return getClass().getName() + " : " + delegateStream.toString();
428     }
429   }
430 
431 }