1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.workflow.handler.composer;
23
24 import org.opencastproject.composer.api.ComposerService;
25 import org.opencastproject.composer.api.EncoderException;
26 import org.opencastproject.composer.api.EncodingProfile;
27 import org.opencastproject.job.api.Job;
28 import org.opencastproject.job.api.JobContext;
29 import org.opencastproject.mediapackage.MediaPackage;
30 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
31 import org.opencastproject.mediapackage.MediaPackageElementParser;
32 import org.opencastproject.mediapackage.MediaPackageException;
33 import org.opencastproject.mediapackage.Track;
34 import org.opencastproject.mediapackage.selector.AbstractMediaPackageElementSelector;
35 import org.opencastproject.mediapackage.selector.TrackSelector;
36 import org.opencastproject.serviceregistry.api.ServiceRegistry;
37 import org.opencastproject.util.NotFoundException;
38 import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
39 import org.opencastproject.workflow.api.ConfiguredTagsAndFlavors;
40 import org.opencastproject.workflow.api.WorkflowInstance;
41 import org.opencastproject.workflow.api.WorkflowOperationException;
42 import org.opencastproject.workflow.api.WorkflowOperationHandler;
43 import org.opencastproject.workflow.api.WorkflowOperationInstance;
44 import org.opencastproject.workflow.api.WorkflowOperationResult;
45 import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
46 import org.opencastproject.workspace.api.Workspace;
47
48 import org.apache.commons.lang3.StringUtils;
49 import org.osgi.service.component.annotations.Component;
50 import org.osgi.service.component.annotations.Reference;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 import java.io.IOException;
55 import java.util.Collection;
56 import java.util.HashMap;
57 import java.util.List;
58 import java.util.Map;
59
60
61
62
63 @Component(
64 immediate = true,
65 service = WorkflowOperationHandler.class,
66 property = {
67 "service.description=Demux Workflow Operation Handler",
68 "workflow.operation=demux"
69 }
70 )
71 public class DemuxWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
72
73
74 private static final Logger logger = LoggerFactory.getLogger(DemuxWorkflowOperationHandler.class);
75
76
77 private ComposerService composerService = null;
78
79
80 private Workspace workspace = null;
81
82
83
84
85
86
87
88 @Reference
89 protected void setComposerService(ComposerService composerService) {
90 this.composerService = composerService;
91 }
92
93
94
95
96
97
98
99
100 @Reference
101 public void setWorkspace(Workspace workspace) {
102 this.workspace = workspace;
103 }
104
105
106
107
108
109
110
111 @Override
112 public WorkflowOperationResult start(final WorkflowInstance workflowInstance, JobContext context)
113 throws WorkflowOperationException {
114 logger.debug("Running demux workflow operation on workflow {}", workflowInstance.getId());
115
116 try {
117 return demux(workflowInstance.getMediaPackage(), workflowInstance);
118 } catch (Exception e) {
119 throw new WorkflowOperationException(e);
120 }
121 }
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140 private WorkflowOperationResult demux(MediaPackage src, WorkflowInstance workflowInstance)
141 throws EncoderException, IOException, NotFoundException, MediaPackageException, WorkflowOperationException {
142 MediaPackage mediaPackage = (MediaPackage) src.clone();
143 WorkflowOperationInstance operation = workflowInstance.getCurrentOperation();
144 final String sectionSeparator = ";";
145
146 ConfiguredTagsAndFlavors tagsAndFlavors = getTagsAndFlavors(workflowInstance, Configuration.many,
147 Configuration.many, Configuration.many, Configuration.many);
148
149 List<String> sourceTagsOption = tagsAndFlavors.getSrcTags();
150 List<MediaPackageElementFlavor> sourceFlavorsOption = tagsAndFlavors.getSrcFlavors();
151
152 String targetTagsOption = StringUtils.trimToNull(operation.getConfiguration("target-tags"));
153 List<MediaPackageElementFlavor> targetFlavorsOption = tagsAndFlavors.getTargetFlavors();
154 String encodingProfile = StringUtils.trimToEmpty(operation.getConfiguration("encoding-profile"));
155
156
157 if (sourceTagsOption.isEmpty() && sourceFlavorsOption.isEmpty()) {
158 logger.info("No source tags or flavors have been specified, not matching anything");
159 return createResult(mediaPackage, Action.CONTINUE);
160 }
161
162 List<MediaPackageElementFlavor> targetFlavors = targetFlavorsOption;
163 String[] targetTags = StringUtils.split(targetTagsOption, sectionSeparator);
164 AbstractMediaPackageElementSelector<Track> elementSelector = new TrackSelector();
165
166
167 for (MediaPackageElementFlavor flavor : sourceFlavorsOption) {
168 elementSelector.addFlavor(flavor);
169 }
170
171
172 for (String tag : sourceTagsOption) {
173 elementSelector.addTag(tag);
174 }
175
176
177 EncodingProfile profile = composerService.getProfile(encodingProfile);
178 if (profile == null) {
179 throw new WorkflowOperationException(String.format("Encoding profile '%s' was not found", encodingProfile));
180 }
181
182 Collection<Track> sourceTracks = elementSelector.select(mediaPackage, false);
183 if (sourceTracks.isEmpty()) {
184 logger.info("No matching tracks found");
185 return createResult(mediaPackage, Action.CONTINUE);
186 }
187
188 long totalTimeInQueue = 0;
189 Map<Job, Track> encodingJobs = new HashMap<>();
190 for (Track track : sourceTracks) {
191 logger.info("Demuxing track {} using encoding profile '{}'", track, profile);
192
193 encodingJobs.put(composerService.demux(track, profile.getIdentifier()), track);
194 }
195
196
197 if (!waitForStatus(encodingJobs.keySet().toArray(new Job[encodingJobs.size()])).isSuccess()) {
198 throw new WorkflowOperationException("One of the encoding jobs did not complete successfully");
199 }
200
201
202 for (Map.Entry<Job, Track> entry : encodingJobs.entrySet()) {
203 Job job = entry.getKey();
204 Track sourceTrack = entry.getValue();
205
206
207 totalTimeInQueue += job.getQueueTime();
208
209
210 if (job.getPayload().length() <= 0) {
211 logger.warn("No output from Demux operation");
212 continue;
213 }
214
215 List<Track> composedTracks = (List<Track>) MediaPackageElementParser.getArrayFromXml(job.getPayload());
216 if (composedTracks.size() != targetFlavors.size() && targetFlavors.size() != 1) {
217 throw new WorkflowOperationException(String.format("Number of target flavors (%d) and output tracks (%d) do "
218 + "not match", targetFlavors.size(), composedTracks.size()));
219 }
220 if (composedTracks.size() != targetTags.length && targetTags.length != 1 && targetTags.length != 0) {
221 throw new WorkflowOperationException(String.format("Number of target tag groups (%d) and output tracks (%d) "
222 + "do not match", targetTags.length, composedTracks.size()));
223 }
224
225
226 int flavorIndex = 0;
227 int tagsIndex = 0;
228 for (Track composedTrack : composedTracks) {
229
230 composedTrack.setFlavor(newFlavor(sourceTrack, targetFlavors.get(flavorIndex).toString()));
231 if (targetFlavors.size() > 1) {
232 flavorIndex++;
233 }
234 if (targetTags.length > 0) {
235 asList(targetTags[tagsIndex]).forEach(composedTrack::addTag);
236 logger.trace("Tagging composed track with '{}'", targetTags[tagsIndex]);
237 if (targetTags.length > 1) {
238 tagsIndex++;
239 }
240 }
241
242 String fileName = getFileNameFromElements(sourceTrack, composedTrack);
243 composedTrack.setURI(workspace.moveTo(composedTrack.getURI(), mediaPackage.getIdentifier().toString(),
244 composedTrack.getIdentifier(), fileName));
245 mediaPackage.addDerived(composedTrack, sourceTrack);
246 }
247 }
248
249 logger.debug("Demux operation completed");
250 return createResult(mediaPackage, Action.CONTINUE, totalTimeInQueue);
251 }
252
253 private MediaPackageElementFlavor newFlavor(Track track, String flavor) throws WorkflowOperationException {
254 MediaPackageElementFlavor targetFlavor;
255
256 try {
257 targetFlavor = MediaPackageElementFlavor.parseFlavor(flavor);
258 String flavorType = targetFlavor.getType();
259 String flavorSubtype = targetFlavor.getSubtype();
260
261 if ("*".equals(flavorType)) {
262 flavorType = track.getFlavor().getType();
263 }
264 if ("*".equals(flavorSubtype)) {
265 flavorSubtype = track.getFlavor().getSubtype();
266 }
267 return (new MediaPackageElementFlavor(flavorType, flavorSubtype));
268 } catch (IllegalArgumentException e) {
269 throw new WorkflowOperationException(String.format("Target flavor '%s' is malformed", flavor));
270 }
271 }
272
273 @Reference
274 @Override
275 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
276 super.setServiceRegistry(serviceRegistry);
277 }
278
279 }