View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.distribution.aws.s3.remote;
22  
23  import static java.lang.String.format;
24  import static org.opencastproject.util.HttpUtil.param;
25  import static org.opencastproject.util.HttpUtil.post;
26  
27  import org.opencastproject.distribution.api.DistributionException;
28  import org.opencastproject.distribution.api.DistributionService;
29  import org.opencastproject.distribution.api.DownloadDistributionService;
30  import org.opencastproject.distribution.aws.s3.api.AwsS3DistributionService;
31  import org.opencastproject.job.api.Job;
32  import org.opencastproject.mediapackage.MediaPackage;
33  import org.opencastproject.mediapackage.MediaPackageElement;
34  import org.opencastproject.mediapackage.MediaPackageException;
35  import org.opencastproject.mediapackage.MediaPackageParser;
36  import org.opencastproject.security.api.TrustedHttpClient;
37  import org.opencastproject.serviceregistry.api.RemoteBase;
38  import org.opencastproject.serviceregistry.api.ServiceRegistry;
39  import org.opencastproject.util.JobUtil;
40  import org.opencastproject.util.OsgiUtil;
41  
42  import com.google.gson.Gson;
43  
44  import org.apache.http.client.methods.HttpPost;
45  import org.osgi.service.component.ComponentContext;
46  import org.osgi.service.component.annotations.Activate;
47  import org.osgi.service.component.annotations.Component;
48  import org.osgi.service.component.annotations.Reference;
49  import org.slf4j.Logger;
50  import org.slf4j.LoggerFactory;
51  
52  import java.util.HashSet;
53  import java.util.List;
54  import java.util.Optional;
55  import java.util.Set;
56  
57  /**
58   * This is a copy of DownloadDistributionServiceRemoteImpl.
59   *
60   * @author rsantos
61   */
62  @Component(
63      immediate = true,
64      service = { DistributionService.class, DownloadDistributionService.class, AwsS3DistributionService.class },
65      property = {
66          "service.description=Distribution (AWS S3) Remote Service Proxy",
67          "distribution.channel=aws.s3"
68      }
69  )
70  public class AwsS3DistributionServiceRemoteImpl extends RemoteBase implements AwsS3DistributionService,
71          DownloadDistributionService {
72    /** The logger */
73    private static final Logger logger = LoggerFactory.getLogger(AwsS3DistributionServiceRemoteImpl.class);
74  
75    private static final String PARAM_CHANNEL_ID = "channelId";
76    private static final String PARAM_MEDIAPACKAGE = "mediapackage";
77    private static final String PARAM_ELEMENT_ID = "elementId";
78    private static final String PARAM_FILENAME = "fileName";
79    private static final String PARAM_CHECK_AVAILABILITY = "checkAvailability";
80  
81    private final Gson gson = new Gson();
82  
83    /** The distribution channel identifier */
84    private String distributionChannel;
85  
86    public AwsS3DistributionServiceRemoteImpl() {
87      // the service type is not available at construction time. we need to wait for activation to set this value
88      super("waiting for activation");
89    }
90  
91    /** activates the component */
92    @Activate
93    protected void activate(ComponentContext cc) {
94      this.distributionChannel = OsgiUtil.getComponentContextProperty(cc, CONFIG_KEY_STORE_TYPE);
95      super.serviceType = JOB_TYPE_PREFIX + this.distributionChannel;
96    }
97  
98    public String getDistributionType() {
99      return this.distributionChannel;
100   }
101 
102   @Override
103   public Job distribute(String channelId, MediaPackage mediaPackage, String elementId) throws DistributionException {
104     return distribute(channelId, mediaPackage, elementId, true);
105   }
106 
107   @Override
108   public Job distribute(String channelId, MediaPackage mediaPackage, String elementId, boolean checkAvailability)
109           throws DistributionException {
110     Set<String> elementIds = new HashSet<String>();
111     elementIds.add(elementId);
112     return distribute(channelId, mediaPackage, elementIds, checkAvailability);
113   }
114 
115   @Override
116   public Job distribute(String channelId, final MediaPackage mediaPackage, Set<String> elementIds,
117           boolean checkAvailability)
118           throws DistributionException {
119     logger.info("Distributing {} elements to {}@{}", elementIds.size(), channelId, distributionChannel);
120     final HttpPost req = post(param(PARAM_CHANNEL_ID, channelId),
121             param(PARAM_MEDIAPACKAGE, MediaPackageParser.getAsXml(mediaPackage)),
122             param(PARAM_ELEMENT_ID, gson.toJson(elementIds)),
123             param(PARAM_CHECK_AVAILABILITY, Boolean.toString(checkAvailability)));
124     Optional<Job> job = runRequest(req, JobUtil::jobFromHttpResponse);
125     if (job.isPresent()) {
126       return job.get();
127     }
128     throw new DistributionException(format("Unable to distribute '%s' elements of "
129                     + "mediapackage '%s' using a remote destribution service proxy",
130             elementIds.size(), mediaPackage.getIdentifier().toString()));
131   }
132 
133   @Override
134   public Job retract(String channelId, MediaPackage mediaPackage, String elementId) throws DistributionException {
135     Set<String> elementIds = new HashSet<String>();
136     elementIds.add(elementId);
137     return retract(channelId, mediaPackage, elementIds);
138   }
139 
140   @Override
141   public Job retract(String channelId, MediaPackage mediaPackage, Set<String> elementIds) throws DistributionException {
142     logger.info("Retracting {} elements from {}@{}", elementIds.size(), channelId, distributionChannel);
143     final HttpPost req = post("/retract",
144             param(PARAM_MEDIAPACKAGE, MediaPackageParser.getAsXml(mediaPackage)),
145             param(PARAM_ELEMENT_ID, gson.toJson(elementIds)),
146             param(PARAM_CHANNEL_ID, channelId));
147     Optional<Job> job = runRequest(req, JobUtil::jobFromHttpResponse);
148     if (job.isPresent()) {
149       return job.get();
150     }
151     throw new DistributionException(format("Unable to retract '%s' elements of "
152                     + "mediapackage '%s' using a remote destribution service proxy",
153             elementIds.size(), mediaPackage.getIdentifier().toString()));
154   }
155 
156   @Override
157   public List<MediaPackageElement>  distributeSync(String channelId, MediaPackage mediaPackage, String elementId)
158           throws DistributionException {
159     Set<String> elementIds = new HashSet<String>();
160     elementIds.add(elementId);
161     return distributeSync(channelId, mediaPackage, elementIds, true);
162   }
163 
164   @Override
165   public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediapackage, Set<String> elementIds,
166          boolean checkAvailability) throws DistributionException {
167     logger.info("Distributing {} elements to {}@{}", elementIds.size(), channelId, distributionChannel);
168     final HttpPost req = post("/distributesync", param(PARAM_CHANNEL_ID, channelId),
169         param(PARAM_MEDIAPACKAGE, MediaPackageParser.getAsXml(mediapackage)),
170         param(PARAM_ELEMENT_ID, gson.toJson(elementIds)),
171         param(PARAM_CHECK_AVAILABILITY, Boolean.toString(checkAvailability)));
172     Optional<List<MediaPackageElement>> elements = runRequest(req, RemoteBase::elementsFromHttpResponse);
173     if (elements.isPresent()) {
174       return elements.get();
175     }
176     throw new DistributionException(format("Unable to distribute '%s' elements of "
177             + "mediapackage '%s' using a remote destribution service proxy",
178         elementIds.size(), mediapackage.getIdentifier().toString()));
179   }
180 
181   @Override
182   public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediaPackage, String elementId)
183           throws DistributionException {
184     Set<String> elementIds = new HashSet<String>();
185     elementIds.add(elementId);
186     return retractSync(channelId, mediaPackage, elementIds);
187   }
188 
189   @Override
190   public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediaPackage, Set<String> elementIds)
191           throws DistributionException {
192     logger.info("Retracting {} elements from {}@{}", elementIds.size(), channelId, distributionChannel);
193     final HttpPost req = post("/retractsync",
194         param(PARAM_MEDIAPACKAGE, MediaPackageParser.getAsXml(mediaPackage)),
195         param(PARAM_ELEMENT_ID, gson.toJson(elementIds)),
196         param(PARAM_CHANNEL_ID, channelId));
197     Optional<List<MediaPackageElement>> elements = runRequest(req, RemoteBase::elementsFromHttpResponse);
198     if (elements.isPresent()) {
199       return elements.get();
200     }
201     throw new DistributionException(format("Unable to retract '%s' elements of "
202             + "mediapackage '%s' using a remote destribution service proxy",
203         elementIds.size(), mediaPackage.getIdentifier().toString()));
204   }
205 
206   @Override
207   public Job distribute(String pubChannelId, MediaPackage mediaPackage, Set<String> downloadIds,
208       boolean checkAvailability, boolean preserveReference) throws DistributionException, MediaPackageException {
209     throw new UnsupportedOperationException("Not supported yet.");
210   //stub function
211   }
212 
213   @Reference
214   @Override
215   public void setTrustedHttpClient(TrustedHttpClient trustedHttpClient) {
216     super.setTrustedHttpClient(trustedHttpClient);
217   }
218 
219   @Reference
220   @Override
221   public void setRemoteServiceManager(ServiceRegistry serviceRegistry) {
222     super.setRemoteServiceManager(serviceRegistry);
223   }
224 
225 }