View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.publication.configurable;
22  
23  import org.opencastproject.distribution.api.DistributionException;
24  import org.opencastproject.distribution.api.DownloadDistributionService;
25  import org.opencastproject.job.api.AbstractJobProducer;
26  import org.opencastproject.job.api.Job;
27  import org.opencastproject.mediapackage.MediaPackage;
28  import org.opencastproject.mediapackage.MediaPackageElement;
29  import org.opencastproject.mediapackage.MediaPackageElementParser;
30  import org.opencastproject.mediapackage.MediaPackageException;
31  import org.opencastproject.mediapackage.MediaPackageParser;
32  import org.opencastproject.mediapackage.Publication;
33  import org.opencastproject.mediapackage.PublicationImpl;
34  import org.opencastproject.publication.api.ConfigurablePublicationService;
35  import org.opencastproject.publication.api.PublicationException;
36  import org.opencastproject.security.api.OrganizationDirectoryService;
37  import org.opencastproject.security.api.SecurityService;
38  import org.opencastproject.security.api.UserDirectoryService;
39  import org.opencastproject.serviceregistry.api.ServiceRegistry;
40  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
41  import org.opencastproject.util.JobUtil;
42  
43  import com.google.gson.Gson;
44  import com.google.gson.reflect.TypeToken;
45  
46  import org.osgi.service.component.ComponentContext;
47  import org.osgi.service.component.annotations.Component;
48  import org.osgi.service.component.annotations.Reference;
49  
50  import java.util.Arrays;
51  import java.util.Collection;
52  import java.util.HashSet;
53  import java.util.List;
54  import java.util.Optional;
55  import java.util.Set;
56  import java.util.UUID;
57  
58  @Component(
59      immediate = true,
60      service = ConfigurablePublicationService.class,
61      property = {
62          "service.description=Publication Service (Configurable)"
63      }
64  )
65  public class ConfigurablePublicationServiceImpl extends AbstractJobProducer implements ConfigurablePublicationService {
66  
67    /* Gson is thread-safe so we use a single instance */
68    private Gson gson = new Gson();
69  
70    @Override
71    public void activate(ComponentContext cc) {
72      super.activate(cc);
73    }
74  
75    public ConfigurablePublicationServiceImpl() {
76      super(JOB_TYPE);
77    }
78  
79    @Override
80    public String getJobType() {
81      return super.getJobType();
82    }
83  
84    public enum Operation {
85      Replace
86    }
87  
88    private DownloadDistributionService distributionService;
89  
90    private SecurityService securityService;
91  
92    private UserDirectoryService userDirectoryService;
93  
94    private OrganizationDirectoryService organizationDirectoryService;
95  
96    private ServiceRegistry serviceRegistry;
97  
98    @Override
99    public Job replace(final MediaPackage mediaPackage, final String channelId,
100           final Collection<? extends MediaPackageElement> addElements, final Set<String> retractElementIds)
101           throws PublicationException, MediaPackageException {
102     try {
103       return serviceRegistry.createJob(JOB_TYPE, Operation.Replace.toString(),
104               Arrays.asList(MediaPackageParser.getAsXml(mediaPackage), channelId,
105                       MediaPackageElementParser.getArrayAsXml(addElements), gson.toJson(retractElementIds)));
106     } catch (final ServiceRegistryException e) {
107       throw new PublicationException("Unable to create job", e);
108     }
109   }
110 
111   @Override
112   public Publication replaceSync(
113       MediaPackage mediaPackage, String channelId, Collection<? extends MediaPackageElement> addElements,
114       Set<String> retractElementIds) throws PublicationException, MediaPackageException {
115     try {
116       return doReplaceSync(mediaPackage, channelId, addElements, retractElementIds);
117     } catch (DistributionException e) {
118       throw new PublicationException(e);
119     }
120   }
121 
122   @Override
123   protected String process(final Job job) throws Exception {
124     final List<String> arguments = job.getArguments();
125     final MediaPackage mediaPackage = MediaPackageParser.getFromXml(arguments.get(0));
126     final String channelId = arguments.get(1);
127     final Collection<? extends MediaPackageElement> addElements = MediaPackageElementParser
128             .getArrayFromXml(arguments.get(2));
129     Set<String> retractElementIds = gson.fromJson(arguments.get(3), new TypeToken<Set<String>>() { }.getType());
130 
131     Publication result = null;
132     switch (Operation.valueOf(job.getOperation())) {
133       case Replace:
134         result = doReplace(mediaPackage, channelId, addElements, retractElementIds);
135         break;
136       default:
137         break;
138     }
139     if (result != null) {
140       return MediaPackageElementParser.getAsXml(result);
141     } else {
142       return null;
143     }
144   }
145 
146   private void distributeMany(final MediaPackage mp, final String channelId,
147           final Collection<? extends MediaPackageElement> elements)
148           throws DistributionException, MediaPackageException {
149 
150     final Optional<Publication> publicationOpt = getPublication(mp, channelId);
151 
152     if (publicationOpt.isPresent()) {
153 
154       final Publication publication = publicationOpt.get();
155 
156       // Add all the elements top-level so the distribution service knows what to do
157       elements.forEach(mp::add);
158 
159       Set<String> elementIds = new HashSet<>();
160       for (final MediaPackageElement mpe : elements) {
161         elementIds.add(mpe.getIdentifier());
162       }
163 
164       try {
165         Job job = distributionService.distribute(channelId, mp, elementIds, false);
166 
167         if (!JobUtil.waitForJob(serviceRegistry, job).isSuccess()) {
168           throw new DistributionException("At least one of the publication jobs did not complete successfully");
169         }
170         List<? extends MediaPackageElement> distributedElements
171             = MediaPackageElementParser.getArrayFromXml(job.getPayload());
172         for (MediaPackageElement mpe : distributedElements) {
173           PublicationImpl.addElementToPublication(publication, mpe);
174         }
175       } finally {
176         // Remove our changes
177         elements.stream().map(MediaPackageElement::getIdentifier).forEach(mp::removeElementById);
178       }
179     }
180   }
181 
182   private void distributeManySync(final MediaPackage mp, final String channelId,
183           final Collection<? extends MediaPackageElement> elements) throws DistributionException {
184 
185     final Optional<Publication> publicationOpt = getPublication(mp, channelId);
186 
187     if (publicationOpt.isPresent()) {
188 
189       final Publication publication = publicationOpt.get();
190 
191       // Add all the elements top-level so the distribution service knows what to do
192       elements.forEach(mp::add);
193 
194       Set<String> elementIds = new HashSet<>();
195       for (final MediaPackageElement mpe : elements) {
196         elementIds.add(mpe.getIdentifier());
197       }
198 
199       try {
200         List<? extends MediaPackageElement> distributedElements = distributionService.distributeSync(channelId, mp,
201             elementIds, false);
202         for (MediaPackageElement mpe : distributedElements) {
203           PublicationImpl.addElementToPublication(publication, mpe);
204         }
205       } finally {
206         // Remove our changes
207         elements.stream().map(MediaPackageElement::getIdentifier).forEach(mp::removeElementById);
208       }
209     }
210   }
211 
212   private Publication doReplace(final MediaPackage mp, final String channelId,
213           final Collection<? extends MediaPackageElement> addElementIds, final Set<String> retractElementIds)
214           throws DistributionException, MediaPackageException {
215     // Retract old elements
216     final Job retractJob = distributionService.retract(channelId, mp, retractElementIds);
217 
218     if (!JobUtil.waitForJobs(serviceRegistry, retractJob).isSuccess()) {
219       throw new DistributionException("At least one of the retraction jobs did not complete successfully");
220     }
221 
222     final Optional<Publication> priorPublication = getPublication(mp, channelId);
223 
224     final Publication publication;
225 
226     if (priorPublication.isPresent()) {
227       publication = priorPublication.get();
228     } else {
229       final String publicationUUID = UUID.randomUUID().toString();
230       publication = PublicationImpl.publication(publicationUUID, channelId, null, null);
231       mp.add(publication);
232     }
233 
234     retractElementIds.forEach(publication::removeAttachmentById);
235 
236     distributeMany(mp, channelId, addElementIds);
237 
238     return publication;
239   }
240 
241   private Publication doReplaceSync(final MediaPackage mp, final String channelId,
242           final Collection<? extends MediaPackageElement> addElementIds, final Set<String> retractElementIds)
243           throws DistributionException {
244     // Retract old elements
245     distributionService.retractSync(channelId, mp, retractElementIds);
246 
247     final Optional<Publication> priorPublication = getPublication(mp, channelId);
248 
249     final Publication publication;
250 
251     if (priorPublication.isPresent()) {
252       publication = priorPublication.get();
253     } else {
254       final String publicationUUID = UUID.randomUUID().toString();
255       publication = PublicationImpl.publication(publicationUUID, channelId, null, null);
256       mp.add(publication);
257     }
258 
259     retractElementIds.forEach(publication::removeAttachmentById);
260 
261     distributeManySync(mp, channelId, addElementIds);
262 
263     return publication;
264   }
265 
266   private Optional<Publication> getPublication(final MediaPackage mp, final String channelId) {
267     return Arrays.stream(mp.getPublications()).filter(p -> p.getChannel().equalsIgnoreCase(channelId)).findAny();
268   }
269 
270   @Override
271   protected ServiceRegistry getServiceRegistry() {
272     return this.serviceRegistry;
273   }
274 
275   @Override
276   protected SecurityService getSecurityService() {
277     return this.securityService;
278   }
279 
280   @Override
281   protected UserDirectoryService getUserDirectoryService() {
282     return this.userDirectoryService;
283   }
284 
285   @Override
286   protected OrganizationDirectoryService getOrganizationDirectoryService() {
287     return this.organizationDirectoryService;
288   }
289 
290   @Reference
291   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
292     this.serviceRegistry = serviceRegistry;
293   }
294 
295   @Reference
296   public void setSecurityService(SecurityService securityService) {
297     this.serviceRegistry = serviceRegistry;
298   }
299 
300   @Reference
301   public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
302     this.userDirectoryService = userDirectoryService;
303   }
304 
305   @Reference
306   public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
307     this.organizationDirectoryService = organizationDirectoryService;
308   }
309 
310   @Reference(target = "(distribution.channel=download)")
311   public void setDownloadDistributionService(DownloadDistributionService downloadDistributionService) {
312     this.distributionService = downloadDistributionService;
313   }
314 
315 }