1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.workflow.handler.distribution;
23
24 import static org.opencastproject.workflow.handler.distribution.EngagePublicationChannel.CHANNEL_ID;
25
26 import org.opencastproject.distribution.api.DistributionException;
27 import org.opencastproject.distribution.api.DownloadDistributionService;
28 import org.opencastproject.distribution.api.StreamingDistributionService;
29 import org.opencastproject.job.api.Job;
30 import org.opencastproject.job.api.JobContext;
31 import org.opencastproject.mediapackage.MediaPackage;
32 import org.opencastproject.mediapackage.MediaPackageElement;
33 import org.opencastproject.mediapackage.Publication;
34 import org.opencastproject.search.api.SearchService;
35 import org.opencastproject.serviceregistry.api.ServiceRegistry;
36 import org.opencastproject.util.NotFoundException;
37 import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
38 import org.opencastproject.workflow.api.WorkflowInstance;
39 import org.opencastproject.workflow.api.WorkflowOperationException;
40 import org.opencastproject.workflow.api.WorkflowOperationHandler;
41 import org.opencastproject.workflow.api.WorkflowOperationResult;
42 import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
43
44 import org.osgi.service.component.ComponentContext;
45 import org.osgi.service.component.annotations.Activate;
46 import org.osgi.service.component.annotations.Component;
47 import org.osgi.service.component.annotations.Reference;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 import java.util.ArrayList;
52 import java.util.Arrays;
53 import java.util.Collections;
54 import java.util.List;
55 import java.util.Set;
56 import java.util.stream.Collectors;
57
58
59
60
61 @Component(
62 immediate = true,
63 service = WorkflowOperationHandler.class,
64 property = {
65 "service.description=Engage Retraction Workflow Operation Handler",
66 "workflow.operation=retract-engage"
67 }
68 )
69 public class RetractEngageWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
70
71
72 private static final Logger logger = LoggerFactory.getLogger(RetractEngageWorkflowOperationHandler.class);
73
74
75 protected StreamingDistributionService streamingDistributionService = null;
76
77
78 protected DownloadDistributionService downloadDistributionService = null;
79
80
81 protected SearchService searchService = null;
82
83
84
85
86
87
88
89 @Reference(target = "(distribution.channel=streaming)")
90 protected void setStreamingDistributionService(StreamingDistributionService streamingDistributionService) {
91 this.streamingDistributionService = streamingDistributionService;
92 }
93
94
95
96
97
98
99
100
101 @Reference(target = "(distribution.channel=download)")
102 protected void setDownloadDistributionService(DownloadDistributionService downloadDistributionService) {
103 this.downloadDistributionService = downloadDistributionService;
104 }
105
106
107
108
109
110
111
112
113
114 @Reference
115 protected void setSearchService(SearchService searchService) {
116 this.searchService = searchService;
117 }
118
119 @Reference
120 @Override public void setServiceRegistry(ServiceRegistry serviceRegistry) {
121 super.setServiceRegistry(serviceRegistry);
122 }
123
124 @Override
125 @Activate
126 protected void activate(ComponentContext cc) {
127 super.activate(cc);
128 }
129
130
131
132
133
134
135
136
137 protected List<Job> retractElements(Set<String> retractElementIds, MediaPackage searchMediaPackage) throws
138 DistributionException {
139 if (retractElementIds.isEmpty()) {
140 return Collections.emptyList();
141 }
142
143 List<Job> jobs = new ArrayList<>();
144
145 jobs.add(downloadDistributionService.retract(CHANNEL_ID, searchMediaPackage, retractElementIds));
146
147 if (streamingDistributionService != null && streamingDistributionService.publishToStreaming()) {
148 jobs.add(streamingDistributionService.retract(CHANNEL_ID, searchMediaPackage, retractElementIds));
149 }
150
151 return jobs;
152 }
153
154
155
156
157
158
159 @Override
160 public WorkflowOperationResult start(WorkflowInstance workflowInstance, JobContext context)
161 throws WorkflowOperationException {
162 MediaPackage mediaPackage = workflowInstance.getMediaPackage();
163 List<Job> jobs;
164 try {
165 MediaPackage searchMediaPackage = null;
166 try {
167 searchMediaPackage = searchService.get(mediaPackage.getIdentifier().toString());
168 } catch (NotFoundException e) {
169 logger.info("The search service doesn't know media package {}", mediaPackage);
170 return createResult(mediaPackage, Action.SKIP);
171 }
172 logger.info("Retracting media package {} from download/streaming distribution channel", searchMediaPackage);
173 var retractElementIds = Arrays.stream(searchMediaPackage.getElements())
174 .map(MediaPackageElement::getIdentifier)
175 .collect(Collectors.toSet());
176 jobs = retractElements(retractElementIds, searchMediaPackage);
177
178
179 if (!waitForStatus(jobs.toArray(new Job[0])).isSuccess()) {
180 throw new WorkflowOperationException("One of the download/streaming retract job did not complete successfully");
181 }
182
183 logger.debug("Retraction operation complete");
184
185 logger.info("Removing media package {} from the search index", mediaPackage);
186 Job deleteFromSearch = searchService.delete(mediaPackage.getIdentifier().toString());
187 if (!waitForStatus(deleteFromSearch).isSuccess()) {
188 throw new WorkflowOperationException("Removing media package from search did not complete successfully");
189 }
190
191 logger.debug("Remove from search operation complete");
192
193
194 logger.info("Removing engage publication element from media package {}", mediaPackage);
195 Publication[] publications = mediaPackage.getPublications();
196 for (Publication publication : publications) {
197 if (CHANNEL_ID.equals(publication.getChannel())) {
198 mediaPackage.remove(publication);
199 logger.debug("Remove engage publication element '{}' complete", publication);
200 }
201 }
202
203 return createResult(mediaPackage, Action.CONTINUE);
204 } catch (Throwable t) {
205 throw new WorkflowOperationException(t);
206 }
207 }
208
209 }