1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.event.handler;
23
24 import static org.opencastproject.job.api.Job.Status.FINISHED;
25 import static org.opencastproject.mediapackage.MediaPackageElementParser.getFromXml;
26 import static org.opencastproject.mediapackage.MediaPackageElements.XACML_POLICY_EPISODE;
27 import static org.opencastproject.workflow.handler.distribution.EngagePublicationChannel.CHANNEL_ID;
28
29 import org.opencastproject.distribution.api.DistributionException;
30 import org.opencastproject.distribution.api.DistributionService;
31 import org.opencastproject.job.api.Job;
32 import org.opencastproject.job.api.JobBarrier;
33 import org.opencastproject.job.api.JobBarrier.Result;
34 import org.opencastproject.mediapackage.Attachment;
35 import org.opencastproject.mediapackage.Catalog;
36 import org.opencastproject.mediapackage.MediaPackage;
37 import org.opencastproject.mediapackage.MediaPackageElement;
38 import org.opencastproject.mediapackage.MediaPackageElements;
39 import org.opencastproject.mediapackage.MediaPackageException;
40 import org.opencastproject.message.broker.api.series.SeriesItem;
41 import org.opencastproject.metadata.dublincore.DublinCore;
42 import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
43 import org.opencastproject.metadata.dublincore.DublinCoreCatalogService;
44 import org.opencastproject.metadata.dublincore.DublinCoreUtil;
45 import org.opencastproject.search.api.SearchException;
46 import org.opencastproject.search.api.SearchService;
47 import org.opencastproject.security.api.AclScope;
48 import org.opencastproject.security.api.AuthorizationService;
49 import org.opencastproject.security.api.Organization;
50 import org.opencastproject.security.api.OrganizationDirectoryService;
51 import org.opencastproject.security.api.SecurityService;
52 import org.opencastproject.security.api.UnauthorizedException;
53 import org.opencastproject.security.api.User;
54 import org.opencastproject.security.util.SecurityUtil;
55 import org.opencastproject.serviceregistry.api.ServiceRegistry;
56 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
57 import org.opencastproject.util.NotFoundException;
58 import org.opencastproject.workspace.api.Workspace;
59
60 import org.apache.commons.io.FilenameUtils;
61 import org.osgi.framework.BundleContext;
62 import org.osgi.service.component.annotations.Activate;
63 import org.osgi.service.component.annotations.Component;
64 import org.osgi.service.component.annotations.Reference;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
67
68 import java.io.IOException;
69 import java.net.URI;
70 import java.util.List;
71
72
73 @Component(
74 immediate = true,
75 service = {
76 SearchUpdatedEventHandler.class
77 },
78 property = {
79 "service.description=Search Updated Event Handler"
80 }
81 )
82 public class SearchUpdatedEventHandler {
83
84
85 protected static final Logger logger = LoggerFactory.getLogger(SearchUpdatedEventHandler.class);
86
87
88 protected ServiceRegistry serviceRegistry = null;
89
90
91 protected DistributionService distributionService = null;
92
93
94 protected SearchService searchService = null;
95
96
97 protected SecurityService securityService = null;
98
99
100 protected AuthorizationService authorizationService = null;
101
102
103 protected OrganizationDirectoryService organizationDirectoryService = null;
104
105
106 protected DublinCoreCatalogService dublinCoreService = null;
107
108
109 protected Workspace workspace = null;
110
111
112 protected String systemAccount = null;
113
114
115
116
117
118
119
120 @Activate
121 protected void activate(BundleContext bundleContext) {
122 this.systemAccount = bundleContext.getProperty("org.opencastproject.security.digest.user");
123 }
124
125
126
127
128
129 @Reference
130 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
131 this.serviceRegistry = serviceRegistry;
132 }
133
134
135
136
137
138 @Reference
139 public void setWorkspace(Workspace workspace) {
140 this.workspace = workspace;
141 }
142
143
144
145
146
147 @Reference
148 public void setDublinCoreCatalogService(DublinCoreCatalogService dublinCoreService) {
149 this.dublinCoreService = dublinCoreService;
150 }
151
152
153
154
155
156 @Reference(target = "(distribution.channel=download)")
157 public void setDistributionService(DistributionService distributionService) {
158 this.distributionService = distributionService;
159 }
160
161
162
163
164
165 @Reference
166 public void setSearchService(SearchService searchService) {
167 this.searchService = searchService;
168 }
169
170
171
172
173
174 @Reference
175 public void setSecurityService(SecurityService securityService) {
176 this.securityService = securityService;
177 }
178
179
180
181
182
183 @Reference
184 public void setAuthorizationService(AuthorizationService authorizationService) {
185 this.authorizationService = authorizationService;
186 }
187
188
189
190
191
192 @Reference
193 public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
194 this.organizationDirectoryService = organizationDirectoryService;
195 }
196
197 public void handleEvent(final SeriesItem seriesItem) {
198
199 logger.debug("Handling {}", seriesItem);
200 String seriesId = seriesItem.getSeriesId();
201 int jobBarrierPollingRate = 100;
202
203
204 final User prevUser = securityService.getUser();
205 final Organization prevOrg = securityService.getOrganization();
206 try {
207 securityService.setUser(SecurityUtil.createSystemUser(systemAccount, prevOrg));
208
209 for (var seriesData: searchService.getSeries(seriesId)) {
210 var mp = seriesData.getRight();
211 Organization org = seriesData.getLeft();
212 securityService.setOrganization(org);
213
214
215
216 if (SeriesItem.Type.UpdateAcl.equals(seriesItem.getType())) {
217 if (Boolean.TRUE.equals(seriesItem.getOverrideEpisodeAcl())) {
218
219 MediaPackageElement[] distributedEpisodeAcls = mp.getElementsByFlavor(XACML_POLICY_EPISODE);
220 for (MediaPackageElement distributedEpisodeAcl : distributedEpisodeAcls) {
221 List<MediaPackageElement> mpes = distributionService.retractSync(CHANNEL_ID, mp,
222 distributedEpisodeAcl.getIdentifier());
223 if (mpes == null) {
224 logger.error("Unable to retract episode XACML {}", distributedEpisodeAcl.getIdentifier());
225 } else {
226 authorizationService.removeAcl(mp, AclScope.Episode);
227 }
228 }
229 }
230
231 Attachment fileRepoCopy = authorizationService.setAcl(mp, AclScope.Series, seriesItem.getAcl()).getB();
232
233
234 List<MediaPackageElement> mpes = distributionService.distributeSync(CHANNEL_ID, mp,
235 fileRepoCopy.getIdentifier());
236 if (mpes != null && mpes.size() == 1) {
237 mp.remove(fileRepoCopy);
238 mp.add(mpes.get(0));
239 } else {
240 logger.error("Unable to distribute series XACML {}", fileRepoCopy.getIdentifier());
241 continue;
242 }
243 }
244
245
246 if (SeriesItem.Type.UpdateCatalog.equals(seriesItem.getType())) {
247 DublinCoreCatalog seriesDublinCore = seriesItem.getMetadata();
248 mp.setSeriesTitle(seriesDublinCore.getFirst(DublinCore.PROPERTY_TITLE));
249
250
251 Catalog[] seriesCatalogs = mp.getCatalogs(MediaPackageElements.SERIES);
252 if (seriesCatalogs.length == 1) {
253 Catalog c = seriesCatalogs[0];
254 String filename = FilenameUtils.getName(c.getURI().toString());
255 URI uri = workspace.put(mp.getIdentifier().toString(), c.getIdentifier(), filename,
256 dublinCoreService.serialize(seriesDublinCore));
257 c.setURI(uri);
258
259 c.setChecksum(null);
260
261
262 List<MediaPackageElement> mpes = distributionService.distributeSync(CHANNEL_ID, mp, c.getIdentifier());
263 if (mpes != null && mpes.size() == 1) {
264 mp.remove(c);
265 mp.add(mpes.get(0));
266 } else {
267 logger.error("Unable to distribute series catalog {}", c.getIdentifier());
268 continue;
269 }
270 }
271 }
272
273
274 if (SeriesItem.Type.Delete.equals(seriesItem.getType())) {
275 mp.setSeries(null);
276 mp.setSeriesTitle(null);
277
278 boolean retractSeriesCatalog = retractSeriesCatalog(mp);
279 boolean updateEpisodeCatalog = updateEpisodeCatalog(mp);
280
281 if (!retractSeriesCatalog || !updateEpisodeCatalog) {
282 continue;
283 }
284 }
285
286
287 searchService.addSynchronously(mp);
288 }
289
290 if (SeriesItem.Type.Delete.equals(seriesItem.getType())) {
291 searchService.deleteSeries(seriesId);
292 }
293 } catch (SearchException e) {
294 logger.warn("Unable to find mediapackages for series {} in search: {}", seriesItem, e.getMessage());
295 } catch (UnauthorizedException | MediaPackageException | ServiceRegistryException
296 | NotFoundException | IOException | DistributionException e) {
297 logger.warn("Unable to update mediapackages for series {} for user {}: {} {}",
298 seriesId, prevUser.getUsername(), e.getClass().getSimpleName(), e.getMessage());
299 } finally {
300 securityService.setOrganization(prevOrg);
301 securityService.setUser(prevUser);
302 }
303 }
304
305 private boolean retractSeriesCatalog(MediaPackage mp) throws DistributionException {
306
307 for (Catalog c : mp.getCatalogs(MediaPackageElements.SERIES)) {
308 Job retractJob = distributionService.retract(CHANNEL_ID, mp, c.getIdentifier());
309 JobBarrier barrier = new JobBarrier(null, serviceRegistry, retractJob);
310 Result jobResult = barrier.waitForJobs();
311 if (jobResult.getStatus().get(retractJob).equals(FINISHED)) {
312 mp.remove(c);
313 } else {
314 logger.error("Unable to retract series catalog {}", c.getIdentifier());
315 return false;
316 }
317 }
318 return true;
319 }
320
321 private boolean updateEpisodeCatalog(MediaPackage mp) throws DistributionException, MediaPackageException,
322 NotFoundException, ServiceRegistryException, IllegalArgumentException, IOException {
323
324 for (Catalog episodeCatalog : mp.getCatalogs(MediaPackageElements.EPISODE)) {
325 DublinCoreCatalog episodeDublinCore = DublinCoreUtil.loadDublinCore(workspace, episodeCatalog);
326 episodeDublinCore.remove(DublinCore.PROPERTY_IS_PART_OF);
327 String filename = FilenameUtils.getName(episodeCatalog.getURI().toString());
328 URI uri = workspace.put(mp.getIdentifier().toString(), episodeCatalog.getIdentifier(), filename,
329 dublinCoreService.serialize(episodeDublinCore));
330 episodeCatalog.setURI(uri);
331
332 episodeCatalog.setChecksum(null);
333
334
335 Job distributionJob = distributionService.distribute(CHANNEL_ID, mp, episodeCatalog.getIdentifier());
336 JobBarrier barrier = new JobBarrier(null, serviceRegistry, distributionJob);
337 Result jobResult = barrier.waitForJobs();
338 if (jobResult.getStatus().get(distributionJob).equals(FINISHED)) {
339 mp.remove(episodeCatalog);
340 mp.add(getFromXml(serviceRegistry.getJob(distributionJob.getId()).getPayload()));
341 } else {
342 logger.error("Unable to distribute episode catalog {}", episodeCatalog.getIdentifier());
343 return false;
344 }
345 }
346 return true;
347 }
348 }