View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.event.handler;
23  
24  import static org.opencastproject.job.api.Job.Status.FINISHED;
25  import static org.opencastproject.mediapackage.MediaPackageElementParser.getFromXml;
26  import static org.opencastproject.mediapackage.MediaPackageElements.XACML_POLICY_EPISODE;
27  import static org.opencastproject.workflow.handler.distribution.EngagePublicationChannel.CHANNEL_ID;
28  
29  import org.opencastproject.distribution.api.DistributionException;
30  import org.opencastproject.distribution.api.DistributionService;
31  import org.opencastproject.job.api.Job;
32  import org.opencastproject.job.api.JobBarrier;
33  import org.opencastproject.job.api.JobBarrier.Result;
34  import org.opencastproject.mediapackage.Attachment;
35  import org.opencastproject.mediapackage.Catalog;
36  import org.opencastproject.mediapackage.MediaPackage;
37  import org.opencastproject.mediapackage.MediaPackageElement;
38  import org.opencastproject.mediapackage.MediaPackageElements;
39  import org.opencastproject.mediapackage.MediaPackageException;
40  import org.opencastproject.message.broker.api.series.SeriesItem;
41  import org.opencastproject.metadata.dublincore.DublinCore;
42  import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
43  import org.opencastproject.metadata.dublincore.DublinCoreCatalogService;
44  import org.opencastproject.metadata.dublincore.DublinCoreUtil;
45  import org.opencastproject.search.api.SearchException;
46  import org.opencastproject.search.api.SearchService;
47  import org.opencastproject.security.api.AclScope;
48  import org.opencastproject.security.api.AuthorizationService;
49  import org.opencastproject.security.api.Organization;
50  import org.opencastproject.security.api.OrganizationDirectoryService;
51  import org.opencastproject.security.api.SecurityService;
52  import org.opencastproject.security.api.UnauthorizedException;
53  import org.opencastproject.security.api.User;
54  import org.opencastproject.security.util.SecurityUtil;
55  import org.opencastproject.serviceregistry.api.ServiceRegistry;
56  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
57  import org.opencastproject.util.NotFoundException;
58  import org.opencastproject.workspace.api.Workspace;
59  
60  import org.apache.commons.io.FilenameUtils;
61  import org.osgi.framework.BundleContext;
62  import org.osgi.service.component.annotations.Activate;
63  import org.osgi.service.component.annotations.Component;
64  import org.osgi.service.component.annotations.Reference;
65  import org.slf4j.Logger;
66  import org.slf4j.LoggerFactory;
67  
68  import java.io.IOException;
69  import java.net.URI;
70  import java.util.List;
71  
72  /** Responds to series events by re-distributing metadata and security policy files for published mediapackages. */
73  @Component(
74      immediate = true,
75      service = {
76          SearchUpdatedEventHandler.class
77      },
78      property = {
79          "service.description=Search Updated Event Handler"
80      }
81  )
82  public class SearchUpdatedEventHandler {
83  
84    /** The logger */
85    protected static final Logger logger = LoggerFactory.getLogger(SearchUpdatedEventHandler.class);
86  
87    /** The service registry */
88    protected ServiceRegistry serviceRegistry = null;
89  
90    /** The distribution service */
91    protected DistributionService distributionService = null;
92  
93    /** The search service */
94    protected SearchService searchService = null;
95  
96    /** The security service */
97    protected SecurityService securityService = null;
98  
99    /** The authorization service */
100   protected AuthorizationService authorizationService = null;
101 
102   /** The organization directory */
103   protected OrganizationDirectoryService organizationDirectoryService = null;
104 
105   /** Dublin core catalog service */
106   protected DublinCoreCatalogService dublinCoreService = null;
107 
108   /** The workspace */
109   protected Workspace workspace = null;
110 
111   /** The system account to use for running asynchronous events */
112   protected String systemAccount = null;
113 
114   /**
115    * OSGI callback for component activation.
116    *
117    * @param bundleContext
118    *          the OSGI bundle context
119    */
120   @Activate
121   protected void activate(BundleContext bundleContext) {
122     this.systemAccount = bundleContext.getProperty("org.opencastproject.security.digest.user");
123   }
124 
125   /**
126    * @param serviceRegistry
127    *          the serviceRegistry to set
128    */
129   @Reference
130   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
131     this.serviceRegistry = serviceRegistry;
132   }
133 
134   /**
135    * @param workspace
136    *          the workspace to set
137    */
138   @Reference
139   public void setWorkspace(Workspace workspace) {
140     this.workspace = workspace;
141   }
142 
143   /**
144    * @param dublinCoreService
145    *          the dublin core service to set
146    */
147   @Reference
148   public void setDublinCoreCatalogService(DublinCoreCatalogService dublinCoreService) {
149     this.dublinCoreService = dublinCoreService;
150   }
151 
152   /**
153    * @param distributionService
154    *          the distributionService to set
155    */
156   @Reference(target = "(distribution.channel=download)")
157   public void setDistributionService(DistributionService distributionService) {
158     this.distributionService = distributionService;
159   }
160 
161   /**
162    * @param searchService
163    *          the searchService to set
164    */
165   @Reference
166   public void setSearchService(SearchService searchService) {
167     this.searchService = searchService;
168   }
169 
170   /**
171    * @param securityService
172    *          the securityService to set
173    */
174   @Reference
175   public void setSecurityService(SecurityService securityService) {
176     this.securityService = securityService;
177   }
178 
179   /**
180    * @param authorizationService
181    *          the authorizationService to set
182    */
183   @Reference
184   public void setAuthorizationService(AuthorizationService authorizationService) {
185     this.authorizationService = authorizationService;
186   }
187 
188   /**
189    * @param organizationDirectoryService
190    *          the organizationDirectoryService to set
191    */
192   @Reference
193   public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
194     this.organizationDirectoryService = organizationDirectoryService;
195   }
196 
197   public void handleEvent(final SeriesItem seriesItem) {
198     // A series or its ACL has been updated. Find any mediapackages with that series, and update them.
199     logger.debug("Handling {}", seriesItem);
200     String seriesId = seriesItem.getSeriesId();
201     int jobBarrierPollingRate = 100;    // in ms
202 
203     // We must be an administrative user to make this query
204     final User prevUser = securityService.getUser();
205     final Organization prevOrg = securityService.getOrganization();
206     try {
207       securityService.setUser(SecurityUtil.createSystemUser(systemAccount, prevOrg));
208 
209       for (var seriesData: searchService.getSeries(seriesId)) {
210         var mp = seriesData.getRight();
211         Organization org = seriesData.getLeft();
212         securityService.setOrganization(org);
213 
214         // If the security policy has been updated, make sure to distribute that change
215         // to the distribution channels as well
216         if (SeriesItem.Type.UpdateAcl.equals(seriesItem.getType())) {
217           if (Boolean.TRUE.equals(seriesItem.getOverrideEpisodeAcl())) {
218 
219             MediaPackageElement[] distributedEpisodeAcls = mp.getElementsByFlavor(XACML_POLICY_EPISODE);
220             for (MediaPackageElement distributedEpisodeAcl : distributedEpisodeAcls) {
221               List<MediaPackageElement> mpes = distributionService.retractSync(CHANNEL_ID, mp,
222                       distributedEpisodeAcl.getIdentifier());
223               if (mpes == null) {
224                 logger.error("Unable to retract episode XACML {}", distributedEpisodeAcl.getIdentifier());
225               } else {
226                 authorizationService.removeAcl(mp, AclScope.Episode);
227               }
228             }
229           }
230 
231           Attachment fileRepoCopy = authorizationService.setAcl(mp, AclScope.Series, seriesItem.getAcl()).getB();
232 
233           // Distribute the updated XACML file
234           List<MediaPackageElement> mpes = distributionService.distributeSync(CHANNEL_ID, mp,
235                   fileRepoCopy.getIdentifier());
236           if (mpes != null && mpes.size() == 1) {
237             mp.remove(fileRepoCopy);
238             mp.add(mpes.get(0));
239           } else {
240             logger.error("Unable to distribute series XACML {}", fileRepoCopy.getIdentifier());
241             continue;
242           }
243         }
244 
245         // Update the series dublin core
246         if (SeriesItem.Type.UpdateCatalog.equals(seriesItem.getType())) {
247           DublinCoreCatalog seriesDublinCore = seriesItem.getMetadata();
248           mp.setSeriesTitle(seriesDublinCore.getFirst(DublinCore.PROPERTY_TITLE));
249 
250           // Update the series dublin core
251           Catalog[] seriesCatalogs = mp.getCatalogs(MediaPackageElements.SERIES);
252           if (seriesCatalogs.length == 1) {
253             Catalog c = seriesCatalogs[0];
254             String filename = FilenameUtils.getName(c.getURI().toString());
255             URI uri = workspace.put(mp.getIdentifier().toString(), c.getIdentifier(), filename,
256                     dublinCoreService.serialize(seriesDublinCore));
257             c.setURI(uri);
258             // setting the URI to a new source so the checksum will most like be invalid
259             c.setChecksum(null);
260 
261             // Distribute the updated series dc
262             List<MediaPackageElement> mpes = distributionService.distributeSync(CHANNEL_ID, mp, c.getIdentifier());
263             if (mpes != null && mpes.size() == 1) {
264               mp.remove(c);
265               mp.add(mpes.get(0));
266             } else {
267               logger.error("Unable to distribute series catalog {}", c.getIdentifier());
268               continue;
269             }
270           }
271         }
272 
273         // Remove the series catalog and isPartOf from episode catalog
274         if (SeriesItem.Type.Delete.equals(seriesItem.getType())) {
275           mp.setSeries(null);
276           mp.setSeriesTitle(null);
277 
278           boolean retractSeriesCatalog = retractSeriesCatalog(mp);
279           boolean updateEpisodeCatalog = updateEpisodeCatalog(mp);
280 
281           if (!retractSeriesCatalog || !updateEpisodeCatalog) {
282             continue;
283           }
284         }
285 
286         // Update the search index with the modified mediapackage
287         searchService.addSynchronously(mp);
288       }
289       //We remove the episode->series links above, which effectively orphaned the series in the index, now we remove it
290       if (SeriesItem.Type.Delete.equals(seriesItem.getType())) {
291         searchService.deleteSeries(seriesId);
292       }
293     } catch (SearchException e) {
294       logger.warn("Unable to find mediapackages for series {} in search: {}", seriesItem, e.getMessage());
295     } catch (UnauthorizedException | MediaPackageException | ServiceRegistryException
296              | NotFoundException | IOException | DistributionException e) {
297       logger.warn("Unable to update mediapackages for series {} for user {}: {} {}",
298                   seriesId, prevUser.getUsername(), e.getClass().getSimpleName(), e.getMessage());
299     } finally {
300       securityService.setOrganization(prevOrg);
301       securityService.setUser(prevUser);
302     }
303   }
304 
305   private boolean retractSeriesCatalog(MediaPackage mp) throws DistributionException {
306     // Retract the series catalog
307     for (Catalog c : mp.getCatalogs(MediaPackageElements.SERIES)) {
308       Job retractJob = distributionService.retract(CHANNEL_ID, mp, c.getIdentifier());
309       JobBarrier barrier = new JobBarrier(null, serviceRegistry, retractJob);
310       Result jobResult = barrier.waitForJobs();
311       if (jobResult.getStatus().get(retractJob).equals(FINISHED)) {
312         mp.remove(c);
313       } else {
314         logger.error("Unable to retract series catalog {}", c.getIdentifier());
315         return false;
316       }
317     }
318     return true;
319   }
320 
321   private boolean updateEpisodeCatalog(MediaPackage mp) throws DistributionException, MediaPackageException,
322           NotFoundException, ServiceRegistryException, IllegalArgumentException, IOException {
323     // Update the episode catalog
324     for (Catalog episodeCatalog : mp.getCatalogs(MediaPackageElements.EPISODE)) {
325       DublinCoreCatalog episodeDublinCore = DublinCoreUtil.loadDublinCore(workspace, episodeCatalog);
326       episodeDublinCore.remove(DublinCore.PROPERTY_IS_PART_OF);
327       String filename = FilenameUtils.getName(episodeCatalog.getURI().toString());
328       URI uri = workspace.put(mp.getIdentifier().toString(), episodeCatalog.getIdentifier(), filename,
329               dublinCoreService.serialize(episodeDublinCore));
330       episodeCatalog.setURI(uri);
331       // setting the URI to a new source so the checksum will most like be invalid
332       episodeCatalog.setChecksum(null);
333 
334       // Distribute the updated episode dublincore
335       Job distributionJob = distributionService.distribute(CHANNEL_ID, mp, episodeCatalog.getIdentifier());
336       JobBarrier barrier = new JobBarrier(null, serviceRegistry, distributionJob);
337       Result jobResult = barrier.waitForJobs();
338       if (jobResult.getStatus().get(distributionJob).equals(FINISHED)) {
339         mp.remove(episodeCatalog);
340         mp.add(getFromXml(serviceRegistry.getJob(distributionJob.getId()).getPayload()));
341       } else {
342         logger.error("Unable to distribute episode catalog {}", episodeCatalog.getIdentifier());
343         return false;
344       }
345     }
346     return true;
347   }
348 }