1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.workflow.handler.composer;
23
24 import org.opencastproject.composer.api.ComposerService;
25 import org.opencastproject.composer.api.EncoderException;
26 import org.opencastproject.composer.api.EncodingProfile;
27 import org.opencastproject.job.api.Job;
28 import org.opencastproject.job.api.JobContext;
29 import org.opencastproject.mediapackage.MediaPackage;
30 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
31 import org.opencastproject.mediapackage.MediaPackageElementParser;
32 import org.opencastproject.mediapackage.MediaPackageException;
33 import org.opencastproject.mediapackage.Track;
34 import org.opencastproject.mediapackage.selector.AbstractMediaPackageElementSelector;
35 import org.opencastproject.mediapackage.selector.TrackSelector;
36 import org.opencastproject.serviceregistry.api.ServiceRegistry;
37 import org.opencastproject.util.NotFoundException;
38 import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
39 import org.opencastproject.workflow.api.ConfiguredTagsAndFlavors;
40 import org.opencastproject.workflow.api.WorkflowInstance;
41 import org.opencastproject.workflow.api.WorkflowOperationException;
42 import org.opencastproject.workflow.api.WorkflowOperationHandler;
43 import org.opencastproject.workflow.api.WorkflowOperationInstance;
44 import org.opencastproject.workflow.api.WorkflowOperationResult;
45 import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
46 import org.opencastproject.workspace.api.Workspace;
47
48 import org.apache.commons.lang3.StringUtils;
49 import org.osgi.service.component.annotations.Component;
50 import org.osgi.service.component.annotations.Reference;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 import java.io.IOException;
55 import java.util.Collection;
56 import java.util.HashMap;
57 import java.util.List;
58 import java.util.Map;
59
60
61
62
63 @Component(
64 immediate = true,
65 service = WorkflowOperationHandler.class,
66 property = {
67 "service.description=Demux Workflow Operation Handler",
68 "workflow.operation=demux"
69 }
70 )
71 public class DemuxWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
72
73
74 private static final Logger logger = LoggerFactory.getLogger(DemuxWorkflowOperationHandler.class);
75
76
77 private ComposerService composerService = null;
78
79
80 private Workspace workspace = null;
81
82
83
84
85
86
87
88 @Reference
89 protected void setComposerService(ComposerService composerService) {
90 this.composerService = composerService;
91 }
92
93
94
95
96
97
98
99
100 @Reference
101 public void setWorkspace(Workspace workspace) {
102 this.workspace = workspace;
103 }
104
105
106
107
108
109
110
111 @Override
112 public WorkflowOperationResult start(final WorkflowInstance workflowInstance, JobContext context)
113 throws WorkflowOperationException {
114 logger.debug("Running demux workflow operation on workflow {}", workflowInstance.getId());
115
116 try {
117 return demux(workflowInstance.getMediaPackage(), workflowInstance);
118 } catch (Exception e) {
119 throw new WorkflowOperationException(e);
120 }
121 }
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140 private WorkflowOperationResult demux(MediaPackage src, WorkflowInstance workflowInstance)
141 throws EncoderException, IOException, NotFoundException, MediaPackageException, WorkflowOperationException {
142 MediaPackage mediaPackage = (MediaPackage) src.clone();
143 WorkflowOperationInstance operation = workflowInstance.getCurrentOperation();
144 final String sectionSeparator = ";";
145
146 ConfiguredTagsAndFlavors tagsAndFlavors = getTagsAndFlavors(workflowInstance, Configuration.many, Configuration.many, Configuration.many, Configuration.many);
147
148 List<String> sourceTagsOption = tagsAndFlavors.getSrcTags();
149 List<MediaPackageElementFlavor> sourceFlavorsOption = tagsAndFlavors.getSrcFlavors();
150
151 String targetTagsOption = StringUtils.trimToNull(operation.getConfiguration("target-tags"));
152 List<MediaPackageElementFlavor> targetFlavorsOption = tagsAndFlavors.getTargetFlavors();
153 String encodingProfile = StringUtils.trimToEmpty(operation.getConfiguration("encoding-profile"));
154
155
156 if (sourceTagsOption.isEmpty() && sourceFlavorsOption.isEmpty()) {
157 logger.info("No source tags or flavors have been specified, not matching anything");
158 return createResult(mediaPackage, Action.CONTINUE);
159 }
160
161 List<MediaPackageElementFlavor> targetFlavors = targetFlavorsOption;
162 String[] targetTags = StringUtils.split(targetTagsOption, sectionSeparator);
163 AbstractMediaPackageElementSelector<Track> elementSelector = new TrackSelector();
164
165
166 for (MediaPackageElementFlavor flavor : sourceFlavorsOption) {
167 elementSelector.addFlavor(flavor);
168 }
169
170
171 for (String tag : sourceTagsOption) {
172 elementSelector.addTag(tag);
173 }
174
175
176 EncodingProfile profile = composerService.getProfile(encodingProfile);
177 if (profile == null) {
178 throw new WorkflowOperationException(String.format("Encoding profile '%s' was not found", encodingProfile));
179 }
180
181 Collection<Track> sourceTracks = elementSelector.select(mediaPackage, false);
182 if (sourceTracks.isEmpty()) {
183 logger.info("No matching tracks found");
184 return createResult(mediaPackage, Action.CONTINUE);
185 }
186
187 long totalTimeInQueue = 0;
188 Map<Job, Track> encodingJobs = new HashMap<>();
189 for (Track track : sourceTracks) {
190 logger.info("Demuxing track {} using encoding profile '{}'", track, profile);
191
192 encodingJobs.put(composerService.demux(track, profile.getIdentifier()), track);
193 }
194
195
196 if (!waitForStatus(encodingJobs.keySet().toArray(new Job[encodingJobs.size()])).isSuccess()) {
197 throw new WorkflowOperationException("One of the encoding jobs did not complete successfully");
198 }
199
200
201 for (Map.Entry<Job, Track> entry : encodingJobs.entrySet()) {
202 Job job = entry.getKey();
203 Track sourceTrack = entry.getValue();
204
205
206 totalTimeInQueue += job.getQueueTime();
207
208
209 if (job.getPayload().length() <= 0) {
210 logger.warn("No output from Demux operation");
211 continue;
212 }
213
214 List<Track> composedTracks = (List<Track>) MediaPackageElementParser.getArrayFromXml(job.getPayload());
215 if (composedTracks.size() != targetFlavors.size() && targetFlavors.size() != 1) {
216 throw new WorkflowOperationException(String.format("Number of target flavors (%d) and output tracks (%d) do "
217 + "not match", targetFlavors.size(), composedTracks.size()));
218 }
219 if (composedTracks.size() != targetTags.length && targetTags.length != 1 && targetTags.length != 0) {
220 throw new WorkflowOperationException(String.format("Number of target tag groups (%d) and output tracks (%d) "
221 + "do not match", targetTags.length, composedTracks.size()));
222 }
223
224
225 int flavorIndex = 0;
226 int tagsIndex = 0;
227 for (Track composedTrack : composedTracks) {
228
229 composedTrack.setFlavor(newFlavor(sourceTrack, targetFlavors.get(flavorIndex).toString()));
230 if (targetFlavors.size() > 1) {
231 flavorIndex++;
232 }
233 if (targetTags.length > 0) {
234 asList(targetTags[tagsIndex]).forEach(composedTrack::addTag);
235 logger.trace("Tagging composed track with '{}'", targetTags[tagsIndex]);
236 if (targetTags.length > 1) {
237 tagsIndex++;
238 }
239 }
240
241 String fileName = getFileNameFromElements(sourceTrack, composedTrack);
242 composedTrack.setURI(workspace.moveTo(composedTrack.getURI(), mediaPackage.getIdentifier().toString(),
243 composedTrack.getIdentifier(), fileName));
244 mediaPackage.addDerived(composedTrack, sourceTrack);
245 }
246 }
247
248 logger.debug("Demux operation completed");
249 return createResult(mediaPackage, Action.CONTINUE, totalTimeInQueue);
250 }
251
252 private MediaPackageElementFlavor newFlavor(Track track, String flavor) throws WorkflowOperationException {
253 MediaPackageElementFlavor targetFlavor;
254
255 try {
256 targetFlavor = MediaPackageElementFlavor.parseFlavor(flavor);
257 String flavorType = targetFlavor.getType();
258 String flavorSubtype = targetFlavor.getSubtype();
259
260 if ("*".equals(flavorType))
261 flavorType = track.getFlavor().getType();
262 if ("*".equals(flavorSubtype))
263 flavorSubtype = track.getFlavor().getSubtype();
264 return (new MediaPackageElementFlavor(flavorType, flavorSubtype));
265 } catch (IllegalArgumentException e) {
266 throw new WorkflowOperationException(String.format("Target flavor '%s' is malformed", flavor));
267 }
268 }
269
270 @Reference
271 @Override
272 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
273 super.setServiceRegistry(serviceRegistry);
274 }
275
276 }