View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.distribution.streaming.remote;
23  
24  import static java.lang.String.format;
25  import static org.opencastproject.util.HttpUtil.param;
26  import static org.opencastproject.util.HttpUtil.post;
27  
28  import org.opencastproject.distribution.api.DistributionException;
29  import org.opencastproject.distribution.api.DistributionService;
30  import org.opencastproject.distribution.api.StreamingDistributionService;
31  import org.opencastproject.job.api.Job;
32  import org.opencastproject.mediapackage.MediaPackage;
33  import org.opencastproject.mediapackage.MediaPackageElement;
34  import org.opencastproject.mediapackage.MediaPackageException;
35  import org.opencastproject.mediapackage.MediaPackageParser;
36  import org.opencastproject.security.api.TrustedHttpClient;
37  import org.opencastproject.serviceregistry.api.RemoteBase;
38  import org.opencastproject.serviceregistry.api.ServiceRegistry;
39  import org.opencastproject.util.JobUtil;
40  import org.opencastproject.util.OsgiUtil;
41  
42  import com.google.gson.Gson;
43  
44  import org.apache.http.HttpResponse;
45  import org.apache.http.client.methods.HttpGet;
46  import org.apache.http.client.methods.HttpPost;
47  import org.osgi.service.component.ComponentContext;
48  import org.osgi.service.component.annotations.Activate;
49  import org.osgi.service.component.annotations.Component;
50  import org.osgi.service.component.annotations.Reference;
51  import org.slf4j.Logger;
52  import org.slf4j.LoggerFactory;
53  
54  import java.io.BufferedReader;
55  import java.io.InputStreamReader;
56  import java.util.HashSet;
57  import java.util.List;
58  import java.util.Optional;
59  import java.util.Set;
60  
61  /**
62   * A remote distribution service invoker.
63   */
64  @Component(
65      immediate = true,
66      service = { DistributionService.class, StreamingDistributionService.class },
67      property = {
68          "service.description=Distribution (Streaming) Remote Service Proxy",
69          "distribution.channel=streaming"
70      }
71  )
72  public class StreamingDistributionServiceRemoteImpl extends RemoteBase implements StreamingDistributionService {
73  
74    /** The logger */
75    private static final Logger logger = LoggerFactory.getLogger(StreamingDistributionServiceRemoteImpl.class);
76  
77    /** The distribution channel identifier */
78    protected String distributionChannel;
79  
80      /** The property to look up and append to REMOTE_SERVICE_TYPE_PREFIX */
81    private static final String PARAM_CHANNEL_ID = "channelId";
82    private static final String PARAM_MEDIAPACKAGE = "mediapackage";
83    private static final String PARAM_ELEMENT_IDS = "elementIds";
84  
85    private static final Gson gson  = new Gson();
86  
87    public StreamingDistributionServiceRemoteImpl() {
88      // the service type is not available at construction time. we need to wait for activation to set this value
89      super("waiting for activation");
90    }
91  
92    /** activates the component */
93    @Activate
94    protected void activate(ComponentContext cc) {
95      this.distributionChannel = OsgiUtil.getComponentContextProperty(cc, CONFIG_KEY_STORE_TYPE);
96      super.serviceType = JOB_TYPE_PREFIX + this.distributionChannel;
97    }
98  
99    public String getDistributionType() {
100     return this.distributionChannel;
101   }
102 
103   @Override
104   public Job distribute(String channelId, MediaPackage mediaPackage, String elementId)
105           throws DistributionException, MediaPackageException {
106     Set<String> elementIds = new HashSet<String>();
107     elementIds.add(elementId);
108     return distribute(channelId, mediaPackage, elementIds);
109   }
110 
111   @Override
112   public boolean publishToStreaming() {
113     HttpGet get = new HttpGet("/publishToStreaming");
114     HttpResponse response = getResponse(get);
115 
116     if (response != null) {
117       String content = null;
118       try (BufferedReader r = new BufferedReader(new InputStreamReader(response.getEntity().getContent()))) {
119         content = r.readLine();
120       } catch (Exception e) {
121         logger.error("Failed to read response from remote service: ", e);
122       } finally {
123         closeConnection(response);
124       }
125       if (content != null) {
126         return Boolean.parseBoolean(content.trim());
127       }
128     }
129     return false;
130   }
131 
132   @Override
133   public Job distribute(String channelId, final MediaPackage mediaPackage, Set<String> elementIds)
134           throws DistributionException,  MediaPackageException {
135     logger.info("Distributing {} elements to {}@{}", elementIds.size(), channelId, distributionChannel);
136     final HttpPost req = post(param(PARAM_CHANNEL_ID, channelId),
137                               param(PARAM_MEDIAPACKAGE, MediaPackageParser.getAsXml(mediaPackage)),
138                               param(PARAM_ELEMENT_IDS, gson.toJson(elementIds)));
139     Optional<Job> job = runRequest(req, JobUtil::jobFromHttpResponse);
140     if (job.isPresent()) {
141       return job.get();
142     }
143     throw new DistributionException(format("Unable to distribute '%s' elements of "
144                                                    + "mediapackage '%s' using a remote destribution service proxy",
145                                            elementIds.size(), mediaPackage.getIdentifier().toString()));
146   }
147 
148   @Override
149   public Job retract(String channelId, MediaPackage mediaPackage, String elementId) throws DistributionException {
150     Set<String> elementIds = new HashSet<String>();
151     elementIds.add(elementId);
152     return retract(channelId, mediaPackage, elementIds);
153   }
154 
155   @Override
156   public Job retract(String channelId, MediaPackage mediaPackage, Set<String> elementIds) throws DistributionException {
157     logger.info("Retracting {} elements from {}@{}", elementIds.size(), channelId, distributionChannel);
158     final HttpPost req = post("/retract",
159                               param(PARAM_MEDIAPACKAGE, MediaPackageParser.getAsXml(mediaPackage)),
160                               param(PARAM_ELEMENT_IDS, gson.toJson(elementIds)),
161                               param(PARAM_CHANNEL_ID, channelId));
162     Optional<Job> job = runRequest(req, JobUtil::jobFromHttpResponse);
163     if (job.isPresent()) {
164       return job.get();
165     }
166     throw new DistributionException(format("Unable to retract '%s' elements of "
167                                                    + "mediapackage '%s' using a remote destribution service proxy",
168                                            elementIds.size(), mediaPackage.getIdentifier().toString()));
169   }
170 
171   @Override
172   public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediaPackage, String elementId)
173           throws DistributionException {
174     Set<String> elementIds = new HashSet<String>();
175     elementIds.add(elementId);
176     return distributeSync(channelId, mediaPackage, elementIds);
177   }
178 
179   @Override
180   public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediapackage, Set<String> elementIds)
181           throws DistributionException {
182     logger.info("Distributing {} elements to {}@{}", elementIds.size(), channelId, distributionChannel);
183     final HttpPost req = post("/distributesync", param(PARAM_CHANNEL_ID, channelId),
184         param(PARAM_MEDIAPACKAGE, MediaPackageParser.getAsXml(mediapackage)),
185         param(PARAM_ELEMENT_IDS, gson.toJson(elementIds)));
186     Optional<List<MediaPackageElement>> elements = runRequest(req, RemoteBase::elementsFromHttpResponse);
187     if (elements.isPresent()) {
188       return elements.get();
189     }
190     throw new DistributionException(format("Unable to distribute '%s' elements of "
191             + "mediapackage '%s' using a remote destribution service proxy",
192         elementIds.size(), mediapackage.getIdentifier().toString()));
193   }
194 
195   @Override
196   public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediaPackage, String elementId)
197           throws DistributionException {
198     Set<String> elementIds = new HashSet<String>();
199     elementIds.add(elementId);
200     return retractSync(channelId, mediaPackage, elementIds);
201   }
202 
203   @Override
204   public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediaPackage, Set<String> elementIds)
205           throws DistributionException {
206     logger.info("Retracting {} elements from {}@{}", elementIds.size(), channelId, distributionChannel);
207     final HttpPost req = post("/retract",
208         param(PARAM_MEDIAPACKAGE, MediaPackageParser.getAsXml(mediaPackage)),
209         param(PARAM_ELEMENT_IDS, gson.toJson(elementIds)),
210         param(PARAM_CHANNEL_ID, channelId));
211     Optional<List<MediaPackageElement>> elements = runRequest(req, RemoteBase::elementsFromHttpResponse);
212     if (elements.isPresent()) {
213       return elements.get();
214     }
215     throw new DistributionException(format("Unable to retract '%s' elements of "
216             + "mediapackage '%s' using a remote destribution service proxy",
217         elementIds.size(), mediaPackage.getIdentifier().toString()));
218   }
219 
220   @Reference
221   @Override
222   public void setTrustedHttpClient(TrustedHttpClient trustedHttpClient) {
223     super.setTrustedHttpClient(trustedHttpClient);
224   }
225   @Reference
226   @Override
227   public void setRemoteServiceManager(ServiceRegistry serviceRegistry) {
228     super.setRemoteServiceManager(serviceRegistry);
229   }
230 
231 }