View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.serviceregistry.api;
23  
24  import org.opencastproject.mediapackage.MediaPackageElement;
25  import org.opencastproject.mediapackage.MediaPackageElementParser;
26  import org.opencastproject.security.api.TrustedHttpClient;
27  import org.opencastproject.util.UrlSupport;
28  
29  import org.apache.commons.io.IOUtils;
30  import org.apache.commons.lang3.StringUtils;
31  import org.apache.http.HttpResponse;
32  import org.apache.http.HttpStatus;
33  import org.apache.http.StatusLine;
34  import org.apache.http.client.methods.HttpRequestBase;
35  import org.joda.time.DateTimeConstants;
36  import org.osgi.service.component.annotations.Reference;
37  import org.slf4j.Logger;
38  import org.slf4j.LoggerFactory;
39  
40  import java.io.IOException;
41  import java.io.InputStream;
42  import java.net.URI;
43  import java.nio.charset.Charset;
44  import java.util.ArrayList;
45  import java.util.Arrays;
46  import java.util.List;
47  import java.util.Optional;
48  
49  /**
50   * Base class serving as a convenience implementation for remote services.
51   */
52  public class RemoteBase {
53  
54    private static final int TIMEOUT = 10000;
55  
56    /** The logger */
57    private static final Logger logger = LoggerFactory.getLogger(RemoteBase.class);
58  
59    /** The service type, used to look up remote implementations */
60    protected String serviceType = null;
61  
62    /** The http client to use when connecting to remote servers */
63    protected TrustedHttpClient client = null;
64  
65    /** The http client */
66    protected ServiceRegistry remoteServiceManager = null;
67  
68    /** A list of known http statuses */
69    private static final List<Integer> knownHttpStatuses = Arrays.asList(HttpStatus.SC_SERVICE_UNAVAILABLE);
70  
71    /**
72     * Creates a remote implementation for the given type of service.
73     *
74     * @param type
75     *          the service type
76     */
77    protected RemoteBase(String type) {
78      if (type == null)
79        throw new IllegalArgumentException("Service type must not be null");
80      this.serviceType = type;
81    }
82  
83    /**
84     * Sets the trusted http client
85     *
86     * @param client
87     */
88    @Reference
89    public void setTrustedHttpClient(TrustedHttpClient client) {
90      this.client = client;
91    }
92  
93    /**
94     * Sets the remote service manager.
95     *
96     * @param remoteServiceManager
97     */
98    @Reference
99    public void setRemoteServiceManager(ServiceRegistry remoteServiceManager) {
100     this.remoteServiceManager = remoteServiceManager;
101   }
102 
103   protected <A> Optional<A> runRequest(HttpRequestBase req, java.util.function.Function<HttpResponse, Optional<A>> f) {
104     HttpResponse res = null;
105     try {
106       res = getResponse(req);
107       return res != null ? f.apply(res) : Optional.empty();
108     } finally {
109       closeConnection(res);
110     }
111   }
112 
113   public static Optional<List<MediaPackageElement>> elementsFromHttpResponse(HttpResponse response) {
114     try {
115       String xml = IOUtils.toString(response.getEntity().getContent(), Charset.forName("utf-8"));
116       List<MediaPackageElement> result = new ArrayList<>(MediaPackageElementParser.getArrayFromXml(xml));
117       return Optional.of(result);
118     } catch (Exception e) {
119       logger.error("Error parsing MediaPackage elements from HTTP response", e);
120       return Optional.empty();
121     }
122   }
123 
124   /**
125    * Makes a request to all available remote services and returns the response as soon as the first of them returns the
126    * {@link HttpStatus#SC_OK} as the status code.
127    *
128    * @param httpRequest
129    *          the http request. If the URI is specified, it should include only the path beyond the service endpoint.
130    *          For example, a request intended for http://{host}/{service}/extra/path/info.xml should include the URI
131    *          "/extra/path/info.xml".
132    * @return the response object, or null if we can not connect to any services
133    */
134   protected HttpResponse getResponse(HttpRequestBase httpRequest) {
135     return getResponse(httpRequest, HttpStatus.SC_OK);
136   }
137 
138   /**
139    * Makes a request to all available remote services and returns the response as soon as the first of them returns the
140    * expected http status code.
141    *
142    * @param httpRequest
143    *          the http request. If the URI is specified, it should include only the path beyond the service endpoint.
144    *          For example, a request intended for http://{host}/{service}/extra/path/info.xml should include the URI
145    *          "/extra/path/info.xml".
146    * @param expectedHttpStatus
147    *          any expected status codes to include in the return.
148    * @return the response object, or null if we can not connect to any services
149    */
150   protected HttpResponse getResponse(HttpRequestBase httpRequest, Integer... expectedHttpStatus) {
151 
152     final long maxWaitTimeMillis = System.currentTimeMillis() + DateTimeConstants.MILLIS_PER_DAY;
153     boolean warnedUnavailability = false;
154 
155     // Try forever
156     while (true) {
157 
158       List<ServiceRegistration> remoteServices = null;
159       List<String> servicesInWarningState = new ArrayList<String>();
160       List<String> servicesInKnownState = new ArrayList<String>();
161 
162       // Find available services
163       boolean warned = false;
164       while (remoteServices == null || remoteServices.size() == 0) {
165         try {
166           remoteServices = remoteServiceManager.getServiceRegistrationsByLoad(serviceType);
167           if (remoteServices == null || remoteServices.size() == 0) {
168             if (!warned) {
169               logger.warn("No services of type '{}' found, waiting...", serviceType);
170               warned = true;
171             }
172             logger.debug("Still no services of type '{}' found, waiting...", serviceType);
173             try {
174               Thread.sleep(TIMEOUT);
175             } catch (InterruptedException e) {
176               logger.warn("Interrupted while waiting for remote service of type '{}'", serviceType);
177               return null;
178             }
179           }
180         } catch (ServiceRegistryException e) {
181           logger.warn("Unable to obtain a list of remote services", e);
182           return null;
183         }
184       }
185 
186       URI originalUri = httpRequest.getURI();
187       String uriSuffix = null;
188       if (originalUri != null && StringUtils.isNotBlank(originalUri.toString())) {
189         uriSuffix = originalUri.toString();
190       }
191 
192       // Try each available service
193       String fullUrl = null;
194       for (ServiceRegistration remoteService : remoteServices) {
195         HttpResponse response = null;
196         try {
197           if (uriSuffix == null) {
198             fullUrl = UrlSupport.concat(remoteService.getHost(), remoteService.getPath());
199           } else {
200             fullUrl = UrlSupport.concat(new String[] { remoteService.getHost(), remoteService.getPath(), uriSuffix });
201           }
202 
203           logger.debug("Connecting to remote service of type '{}' at {}", serviceType, fullUrl);
204 
205           URI uri = new URI(fullUrl);
206           httpRequest.setURI(uri);
207           response = client.execute(httpRequest);
208           StatusLine status = response.getStatusLine();
209           if (Arrays.asList(expectedHttpStatus).contains(status.getStatusCode())) {
210             if (servicesInWarningState.contains(fullUrl) || servicesInKnownState.contains(fullUrl)) {
211               logger.warn("Service at {} is back to normal with expected status code {}", fullUrl,
212                       status.getStatusCode());
213             }
214             return response;
215           } else {
216             if (!knownHttpStatuses.contains(status.getStatusCode()) && !servicesInWarningState.contains(fullUrl)) {
217               logger.warn("Service at {} returned unexpected response code {}", fullUrl, status.getStatusCode());
218               servicesInWarningState.add(fullUrl);
219               servicesInKnownState.remove(fullUrl);
220             } else if (knownHttpStatuses.contains(status.getStatusCode()) && !servicesInKnownState.contains(fullUrl)) {
221               logger.info("Service at {} returned known response code {}", fullUrl, status.getStatusCode());
222               servicesInKnownState.add(fullUrl);
223               servicesInWarningState.remove(fullUrl);
224             }
225           }
226         } catch (Exception e) {
227           logger.error("Exception while trying to dispatch job to {}", fullUrl, e);
228           servicesInWarningState.add(fullUrl);
229         }
230         closeConnection(response);
231       }
232 
233       if (servicesInKnownState.isEmpty()) {
234         logger.warn("All services of type '{}' are in unknown state, abort remote call {}", serviceType, originalUri);
235         return null;
236       }
237 
238       // Reset Original URI
239       httpRequest.setURI(originalUri);
240 
241       // If none of them accepted the request, let's wait and retry
242       if (!warnedUnavailability) {
243         logger.warn("No service of type '{}' is currently readily available", serviceType);
244         warnedUnavailability = true;
245       } else {
246         logger.debug("All services of type '{}' are still unavailable", serviceType);
247       }
248 
249       try {
250         if (System.currentTimeMillis() > maxWaitTimeMillis) {
251           logger.warn(
252                   "Still no service of type '{}' available while waiting for more than one day, abort remote call {}",
253                   serviceType, originalUri);
254           return null;
255         }
256         Thread.sleep(TIMEOUT);
257       } catch (InterruptedException e) {
258         logger.warn("Interrupted while waiting for remote service of type '{}'", serviceType);
259         return null;
260       }
261 
262     }
263   }
264 
265   /**
266    * Closes any http connections kept open by this http response.
267    */
268   protected void closeConnection(HttpResponse response) {
269     if (response != null)
270       try {
271         client.close(response);
272       } catch (IOException e) {
273         // ignore
274       }
275   }
276 
277   /**
278    * A stream wrapper that closes the http response when the stream is closed. If a remote service proxy returns an
279    * inputstream, this implementation should be used to ensure that the http connection is closed properly.
280    */
281   public class HttpClientClosingInputStream extends InputStream {
282 
283     /** The input stream delivering the actual data */
284     protected InputStream delegateStream = null;
285 
286     /** The http response to close when the stream is closed */
287     protected HttpResponse httpResponse = null;
288 
289     /**
290      * Constructs an HttpClientClosingInputStream from a source stream and an http response.
291      *
292      * @throws IOException
293      * @throws IllegalStateException
294      */
295     public HttpClientClosingInputStream(HttpResponse resp) throws IllegalStateException, IOException {
296       this.delegateStream = resp.getEntity().getContent();
297       this.httpResponse = resp;
298     }
299 
300     /**
301      * {@inheritDoc}
302      *
303      * @see java.io.InputStream#read()
304      */
305     @Override
306     public int read() throws IOException {
307       return delegateStream.read();
308     }
309 
310     /**
311      * {@inheritDoc}
312      *
313      * @see java.io.InputStream#available()
314      */
315     @Override
316     public int available() throws IOException {
317       return delegateStream.available();
318     }
319 
320     /**
321      * @throws IOException
322      * @see java.io.InputStream#close()
323      */
324     @Override
325     public void close() throws IOException {
326       delegateStream.close();
327       closeConnection(httpResponse);
328     }
329 
330     /**
331      * @param readlimit
332      * @see java.io.InputStream#mark(int)
333      */
334     @Override
335     public void mark(int readlimit) {
336       delegateStream.mark(readlimit);
337     }
338 
339     /**
340      * @return whether this stream supports marking
341      * @see java.io.InputStream#markSupported()
342      */
343     @Override
344     public boolean markSupported() {
345       return delegateStream.markSupported();
346     }
347 
348     /**
349      * @param b
350      *          the buffer into which the data is read.
351      * @param off
352      *          the start offset in array <code>b</code> at which the data is written.
353      * @param len
354      *          the maximum number of bytes to read.
355      * @return the total number of bytes read into the buffer, or <code>-1</code> if there is no more data because the
356      *         end of the stream has been reached.
357      * @exception IOException
358      *              If the first byte cannot be read for any reason other than end of file, or if the input stream has
359      *              been closed, or if some other I/O error occurs.
360      * @exception NullPointerException
361      *              If <code>b</code> is <code>null</code>.
362      * @exception IndexOutOfBoundsException
363      *              If <code>off</code> is negative, <code>len</code> is negative, or <code>len</code> is greater than
364      *              <code>b.length - off</code>
365      * @see java.io.InputStream#read(byte[], int, int)
366      */
367     @Override
368     public int read(byte[] b, int off, int len) throws IOException {
369       return delegateStream.read(b, off, len);
370     }
371 
372     /**
373      * @param b
374      *          the buffer into which the data is read.
375      * @return the total number of bytes read into the buffer, or <code>-1</code> is there is no more data because the
376      *         end of the stream has been reached.
377      * @exception IOException
378      *              If the first byte cannot be read for any reason other than the end of the file, if the input stream
379      *              has been closed, or if some other I/O error occurs.
380      * @exception NullPointerException
381      *              if <code>b</code> is <code>null</code>.
382      * @see java.io.InputStream#read(byte[])
383      */
384     @Override
385     public int read(byte[] b) throws IOException {
386       return delegateStream.read(b);
387     }
388 
389     /**
390      * @throws IOException
391      * @see java.io.InputStream#reset()
392      */
393     @Override
394     public void reset() throws IOException {
395       delegateStream.reset();
396     }
397 
398     /**
399      * @param n
400      *          the number of bytes to be skipped.
401      * @return the actual number of bytes skipped.
402      * @exception IOException
403      *              if the stream does not support seek, or if some other I/O error occurs.
404      * @see java.io.InputStream#skip(long)
405      */
406     @Override
407     public long skip(long n) throws IOException {
408       return delegateStream.skip(n);
409     }
410 
411     /**
412      * @return
413      * @see java.lang.Object#toString()
414      */
415     @Override
416     public String toString() {
417       return getClass().getName() + " : " + delegateStream.toString();
418     }
419   }
420 
421 }