1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.workflow.handler.distribution;
23
24 import static org.apache.commons.lang3.StringUtils.isBlank;
25 import static org.opencastproject.workflow.handler.distribution.EngagePublicationChannel.CHANNEL_ID;
26
27 import org.opencastproject.distribution.api.DownloadDistributionService;
28 import org.opencastproject.distribution.api.StreamingDistributionService;
29 import org.opencastproject.job.api.Job;
30 import org.opencastproject.job.api.JobContext;
31 import org.opencastproject.mediapackage.MediaPackage;
32 import org.opencastproject.mediapackage.MediaPackageElement;
33 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
34 import org.opencastproject.mediapackage.MediaPackageException;
35 import org.opencastproject.mediapackage.Publication;
36 import org.opencastproject.mediapackage.selector.SimpleElementSelector;
37 import org.opencastproject.search.api.SearchException;
38 import org.opencastproject.search.api.SearchService;
39 import org.opencastproject.security.api.UnauthorizedException;
40 import org.opencastproject.serviceregistry.api.ServiceRegistry;
41 import org.opencastproject.util.NotFoundException;
42 import org.opencastproject.workflow.api.WorkflowInstance;
43 import org.opencastproject.workflow.api.WorkflowOperationException;
44 import org.opencastproject.workflow.api.WorkflowOperationHandler;
45 import org.opencastproject.workflow.api.WorkflowOperationInstance;
46 import org.opencastproject.workflow.api.WorkflowOperationResult;
47
48 import org.apache.commons.lang3.StringUtils;
49 import org.osgi.service.component.ComponentContext;
50 import org.osgi.service.component.annotations.Activate;
51 import org.osgi.service.component.annotations.Component;
52 import org.osgi.service.component.annotations.Reference;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 import java.util.List;
57 import java.util.stream.Collectors;
58
59
60
61
62 @Component(
63 immediate = true,
64 service = WorkflowOperationHandler.class,
65 property = {
66 "service.description=Engage Partial Retraction Workflow Operation Handler",
67 "workflow.operation=retract-partial"
68 }
69 )
70 public class PartialRetractEngageWorkflowOperationHandler extends RetractEngageWorkflowOperationHandler {
71
72
73 private static final Logger logger = LoggerFactory.getLogger(PartialRetractEngageWorkflowOperationHandler.class);
74
75 private static final String RETRACT_FLAVORS = "retract-flavors";
76 private static final String RETRACT_TAGS = "retract-tags";
77
78 @Activate
79 @Override
80 public void activate(ComponentContext cc) {
81 super.activate(cc);
82 }
83
84
85
86
87
88
89 @Override
90 public WorkflowOperationResult start(WorkflowInstance workflowInstance, JobContext context)
91 throws WorkflowOperationException {
92 WorkflowOperationInstance op = workflowInstance.getCurrentOperation();
93
94
95 String retractTargetTags = StringUtils.trimToEmpty(op.getConfiguration(RETRACT_TAGS));
96 String retractTargetFlavors = StringUtils.trimToEmpty(op.getConfiguration(RETRACT_FLAVORS));
97
98 String[] retractTags = StringUtils.split(retractTargetTags, ",");
99 String[] retractFlavors = StringUtils.split(retractTargetFlavors, ",");
100
101
102
103 SimpleElementSelector retractElementSelector = new SimpleElementSelector();
104 for (String flavor : retractFlavors) {
105 MediaPackageElementFlavor f = MediaPackageElementFlavor.parseFlavor(flavor);
106 logger.debug("Selecting for flavor {}", f);
107 retractElementSelector.addFlavor(f);
108 }
109 for (String tag : retractTags) {
110 logger.debug("Selecting for tag {}", tag);
111 retractElementSelector.addTag(tag);
112 }
113
114 MediaPackage mediaPackage = workflowInstance.getMediaPackage();
115 MediaPackage searchMP = null;
116 logger.info("Partially retracting {}", mediaPackage.getIdentifier());
117 List<Job> jobs;
118 try {
119 try {
120 searchMP = searchService.get(mediaPackage.getIdentifier().toString());
121 } catch (NotFoundException e) {
122 logger.info("The search service doesn't know media package {}", mediaPackage);
123 return createResult(mediaPackage, WorkflowOperationResult.Action.SKIP);
124 } catch (UnauthorizedException e) {
125 throw new WorkflowOperationException("Not allowed to access media package " + mediaPackage);
126 }
127 var retractElements = retractElementSelector.select(searchMP, false);
128 var retractElementIds = retractElements.stream()
129 .map(MediaPackageElement::getIdentifier)
130 .collect(Collectors.toSet());
131 logger.debug("Found {} matching elements", retractElementIds.size());
132
133
134 jobs = retractElements(retractElementIds, searchMP);
135 if (jobs.size() < 1) {
136 logger.debug("No matching elements found");
137 return createResult(mediaPackage, WorkflowOperationResult.Action.CONTINUE);
138 }
139
140 for (MediaPackageElement element : retractElements) {
141 logger.debug("Removing {} from mediapackage", element.getIdentifier());
142
143 mediaPackage.remove(element);
144 searchMP.remove(element);
145 }
146
147
148 if (!waitForStatus(jobs.toArray(new Job[0])).isSuccess()) {
149 throw new WorkflowOperationException("The retract jobs did not complete successfully");
150 }
151 Job deleteSearchJob = null;
152 logger.info("Retracting publication for mediapackage: {}", mediaPackage.getIdentifier().toString());
153 deleteSearchJob = searchService.delete(mediaPackage.getIdentifier().toString());
154 if (!waitForStatus(deleteSearchJob).isSuccess()) {
155 throw new WorkflowOperationException("Mediapackage " + mediaPackage.getIdentifier()
156 + " could not be retracted");
157 }
158
159 logger.info("Retraction operations complete, republishing updated mediapackage");
160
161 if (!isPublishable(mediaPackage)) {
162 throw new WorkflowOperationException("Media package does not meet criteria for publication");
163 }
164
165
166 Job publishJob = null;
167 try {
168 publishJob = searchService.add(searchMP);
169 if (!waitForStatus(publishJob).isSuccess()) {
170 throw new WorkflowOperationException("Mediapackage " + mediaPackage.getIdentifier()
171 + " could not be published");
172 }
173 } catch (SearchException e) {
174 throw new WorkflowOperationException("Error publishing media package", e);
175 } catch (MediaPackageException e) {
176 throw new WorkflowOperationException("Error parsing media package", e);
177 }
178
179 logger.info("Publication of {} complete", mediaPackage.getIdentifier());
180 return createResult(mediaPackage, WorkflowOperationResult.Action.CONTINUE);
181 } catch (Exception e) {
182 if (e instanceof WorkflowOperationException) {
183 throw (WorkflowOperationException) e;
184 } else {
185 throw new WorkflowOperationException(e);
186 }
187 }
188 }
189
190 private Publication findPublicationElement(MediaPackage mediaPackage) throws WorkflowOperationException {
191 for (Publication element : mediaPackage.getPublications()) {
192 if (CHANNEL_ID.equals(element.getChannel())) {
193 logger.debug("Found the publication element");
194 return element;
195 }
196 }
197 throw new WorkflowOperationException("Unable to find publication element!");
198 }
199
200
201
202
203 private boolean isPublishable(MediaPackage mp) {
204 boolean hasTitle = !isBlank(mp.getTitle());
205 if (!hasTitle) {
206 logger.warn("Media package does not meet criteria for publication: There is no title");
207 }
208
209 boolean hasTracks = mp.hasTracks();
210 if (!hasTracks) {
211 logger.warn("Media package does not meet criteria for publication: There are no tracks");
212 }
213
214 return hasTitle && hasTracks;
215 }
216
217 @Reference(target = "(distribution.channel=download)")
218 @Override
219 public void setDownloadDistributionService(DownloadDistributionService downloadDistributionService) {
220 super.setDownloadDistributionService(downloadDistributionService);
221 }
222
223 @Reference
224 @Override
225 public void setSearchService(SearchService searchService) {
226 super.setSearchService(searchService);
227 }
228
229 @Reference
230 @Override
231 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
232 super.setServiceRegistry(serviceRegistry);
233 }
234
235 @Reference(target = "(distribution.channel=streaming)")
236 @Override
237 public void setStreamingDistributionService(StreamingDistributionService streamingDistributionService) {
238 super.setStreamingDistributionService(streamingDistributionService);
239 }
240
241 }