1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.workflow.handler.composer;
23
24 import org.opencastproject.composer.api.ComposerService;
25 import org.opencastproject.composer.api.EncoderException;
26 import org.opencastproject.composer.api.EncodingProfile;
27 import org.opencastproject.composer.api.EncodingProfile.MediaType;
28 import org.opencastproject.job.api.Job;
29 import org.opencastproject.job.api.JobContext;
30 import org.opencastproject.mediapackage.MediaPackage;
31 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
32 import org.opencastproject.mediapackage.MediaPackageElementParser;
33 import org.opencastproject.mediapackage.MediaPackageException;
34 import org.opencastproject.mediapackage.Track;
35 import org.opencastproject.mediapackage.selector.AbstractMediaPackageElementSelector;
36 import org.opencastproject.mediapackage.selector.TrackSelector;
37 import org.opencastproject.serviceregistry.api.ServiceRegistry;
38 import org.opencastproject.util.NotFoundException;
39 import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
40 import org.opencastproject.workflow.api.ConfiguredTagsAndFlavors;
41 import org.opencastproject.workflow.api.WorkflowInstance;
42 import org.opencastproject.workflow.api.WorkflowOperationException;
43 import org.opencastproject.workflow.api.WorkflowOperationHandler;
44 import org.opencastproject.workflow.api.WorkflowOperationInstance;
45 import org.opencastproject.workflow.api.WorkflowOperationResult;
46 import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
47 import org.opencastproject.workspace.api.Workspace;
48
49 import org.apache.commons.lang3.StringUtils;
50 import org.osgi.service.component.annotations.Component;
51 import org.osgi.service.component.annotations.Reference;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 import java.io.IOException;
56 import java.util.ArrayList;
57 import java.util.Collection;
58 import java.util.HashMap;
59 import java.util.List;
60 import java.util.Map;
61
62
63
64
65 @Component(
66 immediate = true,
67 service = WorkflowOperationHandler.class,
68 property = {
69 "service.description=Encode Workflow Operation Handler",
70 "workflow.operation=encode"
71 }
72 )
73 public class EncodeWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
74
75
76 private static final Logger logger = LoggerFactory.getLogger(EncodeWorkflowOperationHandler.class);
77
78
79 private ComposerService composerService = null;
80
81
82 private Workspace workspace = null;
83
84
85
86
87
88
89
90 @Reference
91 protected void setComposerService(ComposerService composerService) {
92 this.composerService = composerService;
93 }
94
95
96
97
98
99
100
101
102 @Reference
103 public void setWorkspace(Workspace workspace) {
104 this.workspace = workspace;
105 }
106
107
108
109
110
111
112
113 public WorkflowOperationResult start(final WorkflowInstance workflowInstance, JobContext context)
114 throws WorkflowOperationException {
115 logger.debug("Running parallel encoding workflow operation on workflow {}", workflowInstance.getId());
116
117 try {
118 return encode(workflowInstance);
119 } catch (Exception e) {
120 throw new WorkflowOperationException(e);
121 }
122 }
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139 private WorkflowOperationResult encode(WorkflowInstance workflowInstance)
140 throws EncoderException, IOException, NotFoundException, MediaPackageException, WorkflowOperationException {
141 MediaPackage src = workflowInstance.getMediaPackage();
142 MediaPackage mediaPackage = (MediaPackage) src.clone();
143 WorkflowOperationInstance operation = workflowInstance.getCurrentOperation();
144
145 ConfiguredTagsAndFlavors tagsAndFlavors = getTagsAndFlavors(workflowInstance,
146 Configuration.many, Configuration.many, Configuration.many, Configuration.one);
147 List<String> sourceTagsOption = tagsAndFlavors.getSrcTags();
148 List<String> targetTagsOption = tagsAndFlavors.getTargetTags();
149 List<MediaPackageElementFlavor> sourceFlavorsOption = tagsAndFlavors.getSrcFlavors();
150 MediaPackageElementFlavor targetFlavor = tagsAndFlavors.getSingleTargetFlavor();
151
152 AbstractMediaPackageElementSelector<Track> elementSelector = new TrackSelector();
153
154
155 if (sourceTagsOption.isEmpty() && sourceFlavorsOption.isEmpty()) {
156 logger.warn("No source tags or flavors have been specified, not matching anything");
157 return createResult(mediaPackage, Action.SKIP);
158 }
159
160
161 for (MediaPackageElementFlavor flavor : sourceFlavorsOption) {
162 elementSelector.addFlavor(flavor);
163 }
164
165
166 for (String tag : sourceTagsOption) {
167 elementSelector.addTag(tag);
168 }
169
170
171 String profilesOption = StringUtils.trimToNull(operation.getConfiguration("encoding-profiles"));
172 List<EncodingProfile> profiles = new ArrayList<EncodingProfile>();
173 for (String profileName : asList(profilesOption)) {
174 EncodingProfile profile = composerService.getProfile(profileName);
175 if (profile == null)
176 throw new WorkflowOperationException("Encoding profile '" + profileName + "' was not found");
177 profiles.add(profile);
178 }
179
180
181 String profileOption = StringUtils.trimToNull(operation.getConfiguration("encoding-profile"));
182 if (StringUtils.isNotBlank(profileOption)) {
183 String profileId = StringUtils.trim(profileOption);
184 EncodingProfile profile = composerService.getProfile(profileId);
185 if (profile == null)
186 throw new WorkflowOperationException("Encoding profile '" + profileId + "' was not found");
187 profiles.add(profile);
188 }
189
190
191 if (profiles.isEmpty())
192 throw new WorkflowOperationException("No encoding profile was specified");
193
194
195 Collection<Track> elements = elementSelector.select(mediaPackage, false);
196
197
198 long totalTimeInQueue = 0;
199 Map<Job, JobInformation> encodingJobs = new HashMap<Job, JobInformation>();
200 for (Track track : elements) {
201
202
203 for (EncodingProfile profile : profiles) {
204
205
206 MediaType inputType = profile.getApplicableMediaType();
207 if (inputType.equals(MediaType.Audio) && !track.hasAudio()) {
208 logger.info("Skipping encoding of '{}', since it lacks an audio stream", track);
209 continue;
210 } else if (inputType.equals(MediaType.Visual) && !track.hasVideo()) {
211 logger.info("Skipping encoding of '{}', since it lacks a video stream", track);
212 continue;
213 }
214
215 logger.info("Encoding track {} using encoding profile '{}'", track, profile);
216
217
218 encodingJobs.put(composerService.parallelEncode(track, profile.getIdentifier()), new JobInformation(track, profile));
219 }
220 }
221
222 if (encodingJobs.isEmpty()) {
223 logger.info("No matching tracks found");
224 return createResult(mediaPackage, Action.SKIP);
225 }
226
227
228 if (!waitForStatus(encodingJobs.keySet().toArray(new Job[encodingJobs.size()])).isSuccess()) {
229 throw new WorkflowOperationException("One of the encoding jobs did not complete successfully");
230 }
231
232
233 for (Map.Entry<Job, JobInformation> entry : encodingJobs.entrySet()) {
234 Job job = entry.getKey();
235 Track track = entry.getValue().getTrack();
236
237
238 totalTimeInQueue += job.getQueueTime();
239
240 if (job.getPayload().length() > 0) {
241 List <Track> composedTracks = (List <Track>) MediaPackageElementParser.getArrayFromXml(job.getPayload());
242
243
244 for (Track encodedTrack : composedTracks) {
245 for (String tag : targetTagsOption) {
246 logger.trace("Tagging composed track {} with '{}'", encodedTrack.toString(), tag);
247 encodedTrack.addTag(tag);
248 }
249 }
250
251
252 if (targetFlavor != null) {
253 String flavorType = targetFlavor.getType();
254 String flavorSubtype = targetFlavor.getSubtype();
255 if ("*".equals(flavorType))
256 flavorType = track.getFlavor().getType();
257 if ("*".equals(flavorSubtype))
258 flavorSubtype = track.getFlavor().getSubtype();
259 for (Track encodedTrack : composedTracks) {
260 encodedTrack.setFlavor(new MediaPackageElementFlavor(flavorType, flavorSubtype));
261 logger.debug("Composed track {} has flavor '{}'", encodedTrack.toString(), encodedTrack.getFlavor());
262 }
263 }
264
265
266 for (Track encodedTrack : composedTracks) {
267 mediaPackage.addDerived(encodedTrack, track);
268 String fileName = getFileNameFromElements(track, encodedTrack);
269 encodedTrack.setURI(workspace.moveTo(encodedTrack.getURI(), mediaPackage.getIdentifier().toString(),
270 encodedTrack.getIdentifier(), fileName));
271 }
272 }
273 }
274
275 WorkflowOperationResult result = createResult(mediaPackage, Action.CONTINUE, totalTimeInQueue);
276 logger.debug("Parallel encode operation completed");
277 return result;
278 }
279
280
281
282
283 private static final class JobInformation {
284
285 private Track track = null;
286 private EncodingProfile profile = null;
287
288 JobInformation(Track track, EncodingProfile profile) {
289 this.track = track;
290 this.profile = profile;
291 }
292
293
294
295
296
297
298 public Track getTrack() {
299 return track;
300 }
301
302
303
304
305
306
307 public EncodingProfile getProfile() {
308 return profile;
309 }
310
311 }
312
313 @Reference
314 @Override
315 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
316 super.setServiceRegistry(serviceRegistry);
317 }
318
319 }