1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.transcription.amberscript;
22
23 import org.opencastproject.assetmanager.api.AssetManager;
24 import org.opencastproject.assetmanager.api.Snapshot;
25 import org.opencastproject.assetmanager.util.Workflows;
26 import org.opencastproject.job.api.AbstractJobProducer;
27 import org.opencastproject.job.api.Job;
28 import org.opencastproject.mediapackage.Catalog;
29 import org.opencastproject.mediapackage.MediaPackageElement;
30 import org.opencastproject.mediapackage.MediaPackageElementBuilder;
31 import org.opencastproject.mediapackage.MediaPackageElementBuilderFactory;
32 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
33 import org.opencastproject.mediapackage.MediaPackageElementParser;
34 import org.opencastproject.mediapackage.MediaPackageElements;
35 import org.opencastproject.mediapackage.MediaPackageException;
36 import org.opencastproject.mediapackage.Track;
37 import org.opencastproject.metadata.dublincore.DublinCore;
38 import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
39 import org.opencastproject.metadata.dublincore.DublinCoreValue;
40 import org.opencastproject.metadata.dublincore.DublinCores;
41 import org.opencastproject.security.api.DefaultOrganization;
42 import org.opencastproject.security.api.Organization;
43 import org.opencastproject.security.api.OrganizationDirectoryService;
44 import org.opencastproject.security.api.SecurityService;
45 import org.opencastproject.security.api.UserDirectoryService;
46 import org.opencastproject.security.util.SecurityUtil;
47 import org.opencastproject.serviceregistry.api.ServiceRegistry;
48 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
49 import org.opencastproject.systems.OpencastConstants;
50 import org.opencastproject.transcription.api.TranscriptionService;
51 import org.opencastproject.transcription.api.TranscriptionServiceException;
52 import org.opencastproject.transcription.persistence.TranscriptionDatabase;
53 import org.opencastproject.transcription.persistence.TranscriptionDatabaseException;
54 import org.opencastproject.transcription.persistence.TranscriptionJobControl;
55 import org.opencastproject.transcription.persistence.TranscriptionProviderControl;
56 import org.opencastproject.util.NotFoundException;
57 import org.opencastproject.util.OsgiUtil;
58 import org.opencastproject.workflow.api.ConfiguredWorkflow;
59 import org.opencastproject.workflow.api.WorkflowDefinition;
60 import org.opencastproject.workflow.api.WorkflowInstance;
61 import org.opencastproject.workflow.api.WorkflowService;
62 import org.opencastproject.workingfilerepository.api.WorkingFileRepository;
63 import org.opencastproject.workspace.api.Workspace;
64
65 import org.apache.commons.lang3.StringUtils;
66 import org.apache.http.HttpEntity;
67 import org.apache.http.HttpStatus;
68 import org.apache.http.client.config.RequestConfig;
69 import org.apache.http.client.methods.CloseableHttpResponse;
70 import org.apache.http.client.methods.HttpGet;
71 import org.apache.http.client.methods.HttpPost;
72 import org.apache.http.entity.ContentType;
73 import org.apache.http.entity.mime.HttpMultipartMode;
74 import org.apache.http.entity.mime.MultipartEntityBuilder;
75 import org.apache.http.entity.mime.content.FileBody;
76 import org.apache.http.impl.client.CloseableHttpClient;
77 import org.apache.http.impl.client.HttpClientBuilder;
78 import org.apache.http.util.EntityUtils;
79 import org.json.simple.JSONObject;
80 import org.json.simple.parser.JSONParser;
81 import org.json.simple.parser.ParseException;
82 import org.osgi.service.component.ComponentContext;
83 import org.osgi.service.component.annotations.Activate;
84 import org.osgi.service.component.annotations.Component;
85 import org.osgi.service.component.annotations.Deactivate;
86 import org.osgi.service.component.annotations.Reference;
87 import org.slf4j.Logger;
88 import org.slf4j.LoggerFactory;
89
90 import java.io.IOException;
91 import java.io.InputStream;
92 import java.net.URI;
93 import java.util.Arrays;
94 import java.util.Date;
95 import java.util.HashMap;
96 import java.util.HashSet;
97 import java.util.List;
98 import java.util.Map;
99 import java.util.Optional;
100 import java.util.Set;
101 import java.util.concurrent.Executors;
102 import java.util.concurrent.ScheduledExecutorService;
103 import java.util.concurrent.TimeUnit;
104
105 @Component(
106 immediate = true,
107 service = { TranscriptionService.class,AmberscriptTranscriptionService.class },
108 property = {
109 "service.description=AmberScript Transcription Service",
110 "provider=amberscript"
111 }
112 )
113 public class AmberscriptTranscriptionService extends AbstractJobProducer implements TranscriptionService {
114
115 private static final Logger logger = LoggerFactory.getLogger(AmberscriptTranscriptionService.class);
116
117 private static final String JOB_TYPE = "org.opencastproject.transcription.amberscript";
118
119 public static final String SUBMISSION_COLLECTION = "amberscript-submission";
120 private static final String TRANSCRIPT_COLLECTION = "amberscript-transcripts";
121
122 private static final int CONNECTION_TIMEOUT = 60000;
123 private static final int SOCKET_TIMEOUT = 60000;
124
125
126
127 private static final String BASE_URL = "https://qs.amberscript.com";
128 private static final String STATUS_OPEN = "OPEN";
129 private static final String STATUS_DONE = "DONE";
130 private static final String STATUS_ERROR = "ERROR";
131
132 private static final String ERROR_NO_SPEECH = "No speech found";
133
134 private static final String PROVIDER = "amberscript";
135
136 private AssetManager assetManager;
137 private OrganizationDirectoryService organizationDirectoryService;
138 private ScheduledExecutorService scheduledExecutor;
139 private SecurityService securityService;
140 private ServiceRegistry serviceRegistry;
141 private TranscriptionDatabase database;
142 private UserDirectoryService userDirectoryService;
143 private WorkflowService workflowService;
144 private WorkingFileRepository wfr;
145 private Workspace workspace;
146
147
148 private Workflows wfUtil;
149
150 private enum Operation {
151 StartTranscription
152 }
153
154
155
156 private enum SpeakerMetadataField {
157 creator, contributor, both
158 }
159
160
161 private static final String ENABLED_CONFIG = "enabled";
162 private static final String CLIENT_KEY = "client.key";
163 private static final String LANGUAGE = "language";
164 private static final String LANGUAGE_FROM_DUBLINCORE = "language.from.dublincore";
165 private static final String LANGUAGE_CODE_MAP = "language.code.map";
166 private static final String AMBERSCRIPTJOBTYPE = "jobtype";
167 private static final String WORKFLOW_CONFIG = "workflow";
168 private static final String DISPATCH_WORKFLOW_INTERVAL_CONFIG = "workflow.dispatch.interval";
169 private static final String MAX_PROCESSING_TIME_CONFIG = "max.overdue.time";
170 private static final String CLEANUP_RESULTS_DAYS_CONFIG = "cleanup.results.days";
171 private static final String SPEAKER = "speaker";
172 private static final String SPEAKER_FROM_DUBLINCORE = "speaker.from.dublincore";
173 private static final String SPEAKER_METADATA_FIELD = "speaker.metadata.field";
174 private static final String TRANSCRIPTIONTYPE = "transcriptiontype";
175 private static final String GLOSSARY = "glossary";
176 private static final String TRANSCRIPTIONSTYLE = "cleanread";
177 private static final String TARGETLANGUAGE = "targetlanguage";
178
179
180 private boolean enabled = false;
181 private String clientKey;
182 private String language = "en";
183
184 private boolean languageFromDublinCore;
185 private String amberscriptJobType = "direct";
186 private String workflowDefinitionId = "amberscript-attach-transcripts";
187 private long workflowDispatchIntervalSeconds = 60;
188 private long maxProcessingSeconds = 8 * 24 * 60 * 60;
189 private int cleanupResultDays = 7;
190 private int numberOfSpeakers = 1;
191 private boolean speakerFromDublinCore = true;
192 private SpeakerMetadataField speakerMetadataField = SpeakerMetadataField.creator;
193 private String transcriptionType = "transcription";
194 private String glossary = "";
195 private String transcriptionStyle = "cleanread";
196 private String targetLanguage = "";
197
198
199
200
201
202 private AmberscriptLangUtil amberscriptLangUtil;
203
204 private String systemAccount;
205
206 public AmberscriptTranscriptionService() {
207 super(JOB_TYPE);
208 }
209
210 @Activate
211 public void activate(ComponentContext cc) {
212
213 Optional<Boolean> enabledOpt = OsgiUtil.getOptCfgAsBoolean(cc.getProperties(), ENABLED_CONFIG);
214 if (enabledOpt.isPresent()) {
215 enabled = enabledOpt.get();
216 }
217
218 if (!enabled) {
219 logger.info("Amberscript Transcription Service disabled."
220 + " If you want to enable it, please update the service configuration.");
221 return;
222 }
223
224 Optional<String> clientKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(), CLIENT_KEY);
225 if (clientKeyOpt.isPresent()) {
226 clientKey = clientKeyOpt.get();
227 } else {
228 logger.warn("API key was not set.");
229 return;
230 }
231
232 Optional<String> languageOpt = OsgiUtil.getOptCfg(cc.getProperties(), LANGUAGE);
233 if (languageOpt.isPresent()) {
234 language = languageOpt.get();
235 logger.info("Default language is set to '{}'.", language);
236 } else {
237 logger.info("Default language '{}' will be used.", language);
238 }
239
240 Optional<String> languageFromDublinCoreOpt = OsgiUtil.getOptCfg(cc.getProperties(), LANGUAGE_FROM_DUBLINCORE);
241 if (languageFromDublinCoreOpt.isPresent()) {
242 try {
243 languageFromDublinCore = Boolean.parseBoolean(languageFromDublinCoreOpt.get());
244 } catch (Exception e) {
245 logger.warn("Configuration value for '{}' is invalid, defaulting to false.", LANGUAGE_FROM_DUBLINCORE);
246 }
247 }
248 logger.info("Configuration value for '{}' is set to '{}'.", LANGUAGE_FROM_DUBLINCORE, languageFromDublinCore);
249
250 amberscriptLangUtil = AmberscriptLangUtil.getInstance();
251 int customMapEntriesCount = 0;
252 Optional<String> langCodeMapOpt = OsgiUtil.getOptCfg(cc.getProperties(), LANGUAGE_CODE_MAP);
253 if (langCodeMapOpt.isPresent()) {
254 try {
255 String langCodeMapStr = langCodeMapOpt.get();
256 if (langCodeMapStr != null) {
257 for (String mapping : langCodeMapStr.split(",")) {
258 String[] mapEntries = mapping.split(":");
259 amberscriptLangUtil.addCustomMapping(mapEntries[0], mapEntries[1]);
260 customMapEntriesCount += 1;
261 }
262 }
263 } catch (Exception e) {
264 logger.warn("Configuration '{}' is invalid. Using just default mapping.", LANGUAGE_CODE_MAP);
265 }
266 }
267 logger.info("Language code map was set. Added '{}' additional entries.", customMapEntriesCount);
268
269 Optional<String> amberscriptJobTypeOpt = OsgiUtil.getOptCfg(cc.getProperties(), AMBERSCRIPTJOBTYPE);
270 if (amberscriptJobTypeOpt.isPresent()) {
271 amberscriptJobType = amberscriptJobTypeOpt.get();
272 logger.info("Default Amberscript job type is set to '{}'.", amberscriptJobType);
273 } else {
274 logger.info("Default Amberscript job type '{}' will be used.", amberscriptJobType);
275 }
276
277 Optional<String> wfOpt = OsgiUtil.getOptCfg(cc.getProperties(), WORKFLOW_CONFIG);
278 if (wfOpt.isPresent()) {
279 workflowDefinitionId = wfOpt.get();
280 logger.info("Workflow is set to '{}'.", workflowDefinitionId);
281 } else {
282 logger.info("Default workflow '{}' will be used.", workflowDefinitionId);
283 }
284
285 Optional<String> intervalOpt = OsgiUtil.getOptCfg(cc.getProperties(), DISPATCH_WORKFLOW_INTERVAL_CONFIG);
286 if (intervalOpt.isPresent()) {
287 try {
288 workflowDispatchIntervalSeconds = Long.parseLong(intervalOpt.get());
289 } catch (NumberFormatException e) {
290 logger.warn("Configured '{}' is invalid. Using default.", DISPATCH_WORKFLOW_INTERVAL_CONFIG);
291 }
292 }
293 logger.info("Workflow dispatch interval is {} seconds.", workflowDispatchIntervalSeconds);
294
295 Optional<String> maxProcessingOpt = OsgiUtil.getOptCfg(cc.getProperties(), MAX_PROCESSING_TIME_CONFIG);
296 if (maxProcessingOpt.isPresent()) {
297 try {
298 maxProcessingSeconds = Long.parseLong(maxProcessingOpt.get());
299 } catch (NumberFormatException e) {
300 logger.warn("Configured '{}' is invalid. Using default.", MAX_PROCESSING_TIME_CONFIG);
301 }
302 }
303 logger.info("Maximum processing time for transcription job is {} seconds.", maxProcessingSeconds);
304
305 Optional<String> cleanupOpt = OsgiUtil.getOptCfg(cc.getProperties(), CLEANUP_RESULTS_DAYS_CONFIG);
306 if (cleanupOpt.isPresent()) {
307 try {
308 cleanupResultDays = Integer.parseInt(cleanupOpt.get());
309 } catch (NumberFormatException e) {
310 logger.warn("Configured '{}' is invalid. Using default.", CLEANUP_RESULTS_DAYS_CONFIG);
311 }
312 }
313 logger.info("Cleanup result files after {} days.", cleanupResultDays);
314
315 Optional<String> speakerOpt = OsgiUtil.getOptCfg(cc.getProperties(), SPEAKER);
316 if (speakerOpt.isPresent()) {
317 try {
318 numberOfSpeakers = Integer.parseInt(speakerOpt.get());
319 } catch (NumberFormatException e) {
320 logger.warn("Configured '{}' is invalid. Using default.", SPEAKER);
321 }
322 }
323 logger.info("Default number of speakers is set to '{}'.", numberOfSpeakers);
324
325 Optional<String> speakerFromDublinCoreOpt = OsgiUtil.getOptCfg(cc.getProperties(), SPEAKER_FROM_DUBLINCORE);
326 if (speakerFromDublinCoreOpt.isPresent()) {
327 try {
328 speakerFromDublinCore = Boolean.parseBoolean(speakerFromDublinCoreOpt.get());
329 } catch (Exception e) {
330 logger.warn("Configuration value for '{}' is invalid, defaulting to true.", SPEAKER_FROM_DUBLINCORE);
331 }
332 }
333 logger.info("Configuration value for '{}' is set to '{}'.", SPEAKER_FROM_DUBLINCORE, speakerFromDublinCore);
334
335 Optional<String> speakerMetadataFieldOpt = OsgiUtil.getOptCfg(cc.getProperties(), SPEAKER_METADATA_FIELD);
336 if (speakerMetadataFieldOpt.isPresent()) {
337 try {
338 speakerMetadataField = SpeakerMetadataField.valueOf(speakerMetadataFieldOpt.get());
339 } catch (IllegalArgumentException e) {
340 logger.warn("Value '{}' is invalid for configuration '{}'. Using default: '{}'.",
341 speakerMetadataFieldOpt.get(), SPEAKER_METADATA_FIELD, speakerMetadataField);
342 }
343 }
344 logger.info("Default metadata field for calculating the amount of speakers is set to '{}'.", speakerMetadataField);
345
346 Optional<String> transcriptionTypeOpt = OsgiUtil.getOptCfg(cc.getProperties(), TRANSCRIPTIONTYPE);
347 if (transcriptionTypeOpt.isPresent()) {
348 if (List.of("transcription", "captions", "translatedSubtitles").contains(transcriptionType)) {
349 transcriptionType = transcriptionTypeOpt.get();
350 logger.info("Default transcription type is set to '{}'.", transcriptionType);
351 } else {
352 logger.warn("Value '{}' is invalid for configuration '{}'. Using default: '{}'.",
353 transcriptionTypeOpt.get(), TRANSCRIPTIONTYPE, transcriptionType);
354 }
355 } else {
356 logger.info("Default transcription type '{}' will be used.", transcriptionType);
357 }
358
359 Optional<String> glossaryOpt = OsgiUtil.getOptCfg(cc.getProperties(), GLOSSARY);
360 if (glossaryOpt.isPresent()) {
361 glossary = glossaryOpt.get();
362 logger.info("Default glossary is set to '{}'.", glossary);
363 } else {
364 logger.info("No glossary will be used by default");
365 }
366
367 Optional<String> transcriptionStyleOpt = OsgiUtil.getOptCfg(cc.getProperties(), TRANSCRIPTIONSTYLE);
368 if (transcriptionStyleOpt.isPresent()) {
369 if (List.of("cleanread", "verbatim").contains(transcriptionStyle)) {
370 transcriptionStyle = transcriptionStyleOpt.get();
371 logger.info("Default transcription style is set to '{}'.", transcriptionStyle);
372 } else {
373 logger.warn("Value '{}' is invalid for configuration '{}'. Using default: '{}'.",
374 transcriptionStyleOpt.get(), TRANSCRIPTIONSTYLE, transcriptionStyle);
375 }
376 } else {
377 logger.info("Default transcription style '{}' will be used.", transcriptionStyle);
378 }
379
380 Optional<String> targetLanguageOpt = OsgiUtil.getOptCfg(cc.getProperties(), TARGETLANGUAGE);
381 if (targetLanguageOpt.isPresent()) {
382 targetLanguage = targetLanguageOpt.get();
383 logger.info("Default target language is set to '{}'.", targetLanguage);
384 } else {
385 logger.info("Transcriptions won't be translated");
386 }
387
388 systemAccount = OsgiUtil.getContextProperty(cc, OpencastConstants.DIGEST_USER_PROPERTY);
389
390 scheduledExecutor = Executors.newScheduledThreadPool(2);
391
392 scheduledExecutor.scheduleWithFixedDelay(new WorkflowDispatcher(), 120, workflowDispatchIntervalSeconds,
393 TimeUnit.SECONDS);
394
395 scheduledExecutor.scheduleWithFixedDelay(new ResultsFileCleanup(), 1, 1, TimeUnit.DAYS);
396
397 logger.info("Activated.");
398 }
399
400 @Deactivate
401 public void deactivate() {
402 if (scheduledExecutor != null) {
403 scheduledExecutor.shutdown();
404 }
405 }
406
407 @Override
408 public Job startTranscription(String mpId, Track track) throws TranscriptionServiceException {
409 throw new UnsupportedOperationException("Not supported.");
410 }
411
412 @Override
413 public Job startTranscription(String mpId, Track track, String... args) throws TranscriptionServiceException {
414 if (!enabled) {
415 throw new TranscriptionServiceException("AmberScript Transcription Service disabled."
416 + " If you want to enable it, please update the service configuration.");
417 }
418
419 String language = null;
420
421 if (languageFromDublinCore) {
422 for (Catalog catalog : track.getMediaPackage().getCatalogs(MediaPackageElements.EPISODE)) {
423 try (InputStream in = workspace.read(catalog.getURI())) {
424 DublinCoreCatalog dublinCatalog = DublinCores.read(in);
425 String dublinCoreLang = dublinCatalog.getFirst(DublinCore.PROPERTY_LANGUAGE);
426 if (dublinCoreLang != null) {
427 language = amberscriptLangUtil.getLanguageCodeOrNull(dublinCoreLang);
428 }
429 if (language != null) {
430 break;
431 }
432 } catch (IOException | NotFoundException e) {
433 logger.error(String.format("Unable to load dublin core catalog for event '%s'",
434 track.getMediaPackage().getIdentifier()), e);
435 }
436 }
437 }
438
439 if (language == null) {
440 if (args.length > 0 && StringUtils.isNotBlank(args[0])) {
441 language = args[0];
442 } else {
443 language = getLanguage();
444 }
445 }
446
447 String jobType;
448 if (args.length > 1 && StringUtils.isNotBlank(args[1])) {
449 jobType = args[1];
450 } else {
451 jobType = getAmberscriptJobType();
452 }
453
454 int numberOfSpeakers = 0;
455 if (speakerFromDublinCore) {
456 Set<String> speakers = new HashSet<>();
457 for (Catalog catalog : track.getMediaPackage().getCatalogs(MediaPackageElements.EPISODE)) {
458 try (InputStream in = workspace.read(catalog.getURI())) {
459 DublinCoreCatalog dublinCatalog = DublinCores.read(in);
460 if (speakerMetadataField.equals(SpeakerMetadataField.creator)
461 || speakerMetadataField.equals(SpeakerMetadataField.both)) {
462 dublinCatalog.get(DublinCore.PROPERTY_CREATOR).stream()
463 .map(DublinCoreValue::getValue).forEach(speakers::add);
464 }
465 if (speakerMetadataField.equals(SpeakerMetadataField.contributor)
466 || speakerMetadataField.equals(SpeakerMetadataField.both)) {
467 dublinCatalog.get(DublinCore.PROPERTY_CONTRIBUTOR).stream()
468 .map(DublinCoreValue::getValue).forEach(speakers::add);
469 }
470
471 } catch (IOException | NotFoundException e) {
472 logger.error("Unable to load dublin core catalog for event '{}'",
473 track.getMediaPackage().getIdentifier(), e);
474 }
475 }
476
477 if (speakers.size() >= 1) {
478 numberOfSpeakers = speakers.size();
479 }
480 }
481
482 if (numberOfSpeakers == 0) {
483 if (args.length > 2 && StringUtils.isNotBlank(args[2])) {
484 numberOfSpeakers = Integer.parseInt(args[2]);
485 } else {
486 numberOfSpeakers = getNumberOfSpeakers();
487 }
488 }
489
490 String transcriptionType;
491 if (args.length > 3 && StringUtils.isNotBlank(args[3])) {
492 transcriptionType = args[3];
493 } else {
494 transcriptionType = getTranscriptionType();
495 }
496
497 String glossary;
498 if (args.length > 4 && args[4] != null) {
499 glossary = args[4];
500 } else {
501 glossary = getGlossary();
502 }
503
504 String transcriptionStyle;
505 if (args.length > 5 && StringUtils.isNotBlank(args[5])) {
506 transcriptionStyle = args[5];
507 } else {
508 transcriptionStyle = getTranscriptionStyle();
509 }
510
511 String targetLanguage;
512 if (args.length > 6 && args[6] != null) {
513 targetLanguage = args[6];
514 } else {
515 targetLanguage = getTargetLanguage();
516 }
517
518 logger.info("New transcription job for mpId '{}' language '{}' job type '{}' speakers '{}' transcription type '{}'"
519 + "glossary '{}'.", mpId, language, jobType, numberOfSpeakers, transcriptionType, glossary);
520
521 try {
522 return serviceRegistry.createJob(JOB_TYPE, Operation.StartTranscription.name(), Arrays.asList(
523 mpId, MediaPackageElementParser.getAsXml(track), language, jobType, Integer.toString(numberOfSpeakers),
524 transcriptionType, glossary, transcriptionStyle, targetLanguage));
525 } catch (ServiceRegistryException e) {
526 throw new TranscriptionServiceException("Unable to create a job", e);
527 } catch (MediaPackageException e) {
528 throw new TranscriptionServiceException("Invalid track '" + track.toString() + "'", e);
529 }
530 }
531
532 @Override
533 public void transcriptionDone(String mpId, Object results) { }
534
535 private void transcriptionDone(String mpId, String jobId) {
536 try {
537 logger.info("Transcription done for mpId '{}'.", mpId);
538 if (getAndSaveJobResult(jobId)) {
539 database.updateJobControl(jobId, TranscriptionJobControl.Status.TranscriptionComplete.name());
540 } else {
541 logger.debug("Unable to get and save the transcription result for mpId '{}'.", mpId);
542 }
543 } catch (IOException | TranscriptionServiceException e) {
544 logger.warn("Could not save transcription results file for mpId '{}': {}", mpId, e.toString());
545 } catch (TranscriptionDatabaseException e) {
546 logger.warn("Transcription results file were saved but state in db not updated for mpId '{}': ", mpId, e);
547 }
548 }
549
550 @Override
551 public void transcriptionError(String mpId, Object obj) throws TranscriptionServiceException {
552 JSONObject jsonObj = null;
553 String jobId = null;
554 try {
555 jsonObj = (JSONObject) obj;
556 jobId = (String) jsonObj.get("name");
557
558 database.updateJobControl(jobId, TranscriptionJobControl.Status.Error.name());
559 TranscriptionJobControl jobControl = database.findByJob(jobId);
560 logger.warn(String.format("Error received for media package %s, job id %s",
561 jobControl.getMediaPackageId(), jobId));
562
563 } catch (TranscriptionDatabaseException e) {
564 logger.warn("Transcription error. State in db could not be updated to error for mpId {}, jobId {}", mpId, jobId);
565 throw new TranscriptionServiceException("Could not update transcription job control db", e);
566 }
567 }
568
569 @Override
570 public Map<String, Object> getReturnValues(String mpId, String jobId) throws TranscriptionServiceException {
571 throw new TranscriptionServiceException("Method not implemented");
572 }
573
574 @Override
575 public String getLanguage() {
576 return language;
577 }
578
579 public String getAmberscriptJobType() {
580 return amberscriptJobType;
581 }
582
583 public int getNumberOfSpeakers() {
584 return numberOfSpeakers;
585 }
586
587 public String getTranscriptionType() {
588 return transcriptionType;
589 }
590
591 public String getGlossary() {
592 return glossary;
593 }
594
595 public String getTranscriptionStyle() {
596 return transcriptionStyle;
597 }
598
599 public String getTargetLanguage() {
600 return targetLanguage;
601 }
602
603
604 @Override
605 protected String process(Job job) throws Exception {
606 Operation op = null;
607 String operation = job.getOperation();
608 List<String> arguments = job.getArguments();
609 String result = "";
610 op = Operation.valueOf(operation);
611 switch (op) {
612 case StartTranscription:
613 String mpId = arguments.get(0);
614 Track track = (Track) MediaPackageElementParser.getFromXml(arguments.get(1));
615 String languageCode = arguments.get(2);
616 String jobType = arguments.get(3);
617 String numberOfSpeakers = arguments.get(4);
618 String transcriptionType = arguments.get(5);
619 String glossary = arguments.get(6);
620 String transcriptionStyle = arguments.get(7);
621 String targetLanguage = arguments.get(8);
622 createRecognitionsJob(mpId, track, languageCode, jobType, numberOfSpeakers, transcriptionType, glossary,
623 transcriptionStyle, targetLanguage);
624 break;
625 default:
626 throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
627 }
628 return result;
629 }
630
631 void createRecognitionsJob(String mpId, Track track, String languageCode, String jobType, String numberOfSpeakers,
632 String transcriptionType, String glossary, String transcriptionStyle, String targetLanguage)
633 throws TranscriptionServiceException {
634
635
636 CloseableHttpClient httpClient = makeHttpClient(3 * 3600 * 1000);
637 CloseableHttpResponse response = null;
638
639 String submitUrl = BASE_URL + "/jobs/upload-media"
640 + "?apiKey=" + clientKey
641 + "&language=" + languageCode
642 + "&jobType=" + jobType
643 + "&numberOfSpeakers=" + numberOfSpeakers
644 + "&transcriptionType=" + transcriptionType
645 + "&transcriptionStyle=" + transcriptionStyle;
646 if (StringUtils.isNotBlank(glossary)) {
647 submitUrl += "&glossary=" + glossary;
648 }
649 if (StringUtils.isNotBlank(targetLanguage)) {
650 submitUrl += "&targetLanguage=" + targetLanguage;
651 }
652
653 try {
654 FileBody fileBody = new FileBody(workspace.get(track.getURI()), ContentType.DEFAULT_BINARY);
655 MultipartEntityBuilder builder = MultipartEntityBuilder.create();
656 builder.setMode(HttpMultipartMode.BROWSER_COMPATIBLE);
657 builder.addPart("file", fileBody);
658 HttpEntity multipartEntity = builder.build();
659
660 HttpPost httpPost = new HttpPost(submitUrl);
661 httpPost.setEntity(multipartEntity);
662
663 response = httpClient.execute(httpPost);
664 int code = response.getStatusLine().getStatusCode();
665 HttpEntity entity = response.getEntity();
666
667 String jsonString = EntityUtils.toString(response.getEntity());
668 JSONParser jsonParser = new JSONParser();
669 JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
670
671 logger.debug("Submitting new transcription job: {}" + System.lineSeparator()
672 + "Response: {}", removePrivateInfo(submitUrl), jsonString);
673
674 JSONObject result = (JSONObject) jsonObject.get("jobStatus");
675 String jobId = (String) result.get("jobId");
676
677 switch (code) {
678 case HttpStatus.SC_OK:
679 logger.info("mp {} has been submitted to AmberScript service with jobId {}.", mpId, jobId);
680 database.storeJobControl(mpId, track.getIdentifier(), jobId, TranscriptionJobControl.Status.InProgress.name(),
681 track.getDuration() == null ? 0 : track.getDuration().longValue(), new Date(), PROVIDER);
682 EntityUtils.consume(entity);
683 return;
684 default:
685 String error = (String) result.get("error");
686 String message = (String) result.get("message");
687 String msg = String.format("Unable to submit job: API returned %s - %s: %s", code, error, message);
688 logger.warn(msg);
689 throw new TranscriptionServiceException(msg);
690 }
691 } catch (Exception e) {
692 logger.warn("Exception when calling the captions endpoint", e);
693 throw new TranscriptionServiceException("Exception when calling the captions endpoint", e);
694 } finally {
695 try {
696 httpClient.close();
697 if (response != null) {
698 response.close();
699 }
700 } catch (IOException e) {
701 }
702 }
703 }
704
705 boolean checkJobResults(String jobId) throws TranscriptionServiceException {
706
707 String mpId = "unknown";
708
709 CloseableHttpClient httpClient = makeHttpClient();
710 CloseableHttpResponse response = null;
711
712 String checkUrl = BASE_URL + "/jobs/status?jobId=" + jobId + "&apiKey=" + clientKey;
713
714 try {
715 HttpGet httpGet = new HttpGet(checkUrl);
716 response = httpClient.execute(httpGet);
717 int code = response.getStatusLine().getStatusCode();
718
719 HttpEntity entity = response.getEntity();
720 String jsonString = EntityUtils.toString(entity);
721 EntityUtils.consume(entity);
722
723 logger.debug("AmberScript API call was '{}'." + System.lineSeparator() + "Response: {}",
724 removePrivateInfo(checkUrl), jsonString);
725
726 JSONParser jsonParser = new JSONParser();
727 JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
728
729 switch (code) {
730 case HttpStatus.SC_OK:
731 JSONObject result = (JSONObject) jsonObject.get("jobStatus");
732 String status = (String) result.get("status");
733 switch (status) {
734 case STATUS_OPEN:
735 logger.debug("Captions job '{}' has not finished yet.", jobId);
736 return false;
737 case STATUS_ERROR:
738 var errorMsg = (String) result.get("errorMsg");
739 throw new TranscriptionServiceException(
740 String.format("Captions job '%s' failed: %s", jobId, errorMsg),
741 code,
742 ERROR_NO_SPEECH.equals(errorMsg));
743 case STATUS_DONE:
744 logger.info("Captions job '{}' has finished.", jobId);
745 TranscriptionJobControl jc = database.findByJob(jobId);
746 if (jc != null) {
747 mpId = jc.getMediaPackageId();
748 }
749 transcriptionDone(mpId, jobId);
750 return true;
751 default:
752 return false;
753 }
754 default:
755 String error = (String) jsonObject.get("error");
756 String errorMessage = (String) jsonObject.get("errorMessage");
757 logger.warn("Error while checking status: {}."
758 + System.lineSeparator() + "{}: {}", code, error, errorMessage);
759 throw new TranscriptionServiceException(
760 String.format("Captions job '%s' failed: Return Code %d", jobId, code), code);
761 }
762 } catch (TranscriptionDatabaseException | IOException | ParseException e) {
763 logger.warn("Error while checking status: {}", e.toString());
764 } finally {
765 try {
766 httpClient.close();
767 if (response != null) {
768 response.close();
769 }
770 } catch (IOException e) {
771 }
772 }
773 return false;
774 }
775
776 private boolean getAndSaveJobResult(String jobId) throws TranscriptionServiceException, IOException {
777
778 CloseableHttpClient httpClient = makeHttpClient();
779 CloseableHttpResponse response = null;
780
781 String transcriptUrl = BASE_URL + "/jobs/export?format=srt&jobId=" + jobId + "&apiKey=" + clientKey;
782
783 boolean done = false;
784
785 try {
786 HttpGet httpGet = new HttpGet(transcriptUrl);
787
788 response = httpClient.execute(httpGet);
789 int code = response.getStatusLine().getStatusCode();
790
791 logger.debug("AmberScript API {} http response {}", removePrivateInfo(transcriptUrl), code);
792
793 switch (code) {
794 case HttpStatus.SC_OK:
795 HttpEntity entity = response.getEntity();
796 logger.info("Retrieved details for transcription with jobid: '{}'", jobId);
797
798
799 workspace.putInCollection(TRANSCRIPT_COLLECTION, jobId + ".srt", entity.getContent());
800 done = true;
801 break;
802
803 default:
804 logger.warn("Error retrieving details for transcription with jobid: '{}', return status: {}.", jobId, code);
805 break;
806 }
807 } catch (Exception e) {
808 throw new TranscriptionServiceException(String.format(
809 "Exception when calling the transcription service for jobid: %s", jobId), e);
810 } finally {
811 try {
812 httpClient.close();
813 if (response != null) {
814 response.close();
815 }
816 } catch (IOException e) {
817 }
818 }
819
820 return done;
821 }
822
823 @Override
824
825 public MediaPackageElement getGeneratedTranscription(String mpId, String jobId, MediaPackageElement.Type type)
826 throws TranscriptionServiceException {
827 try {
828
829 if (jobId == null || "null".equals(jobId)) {
830 jobId = null;
831 for (TranscriptionJobControl jc : database.findByMediaPackage(mpId)) {
832 if (TranscriptionJobControl.Status.Closed.name().equals(jc.getStatus())
833 || TranscriptionJobControl.Status.TranscriptionComplete.name().equals(jc.getStatus())) {
834 jobId = jc.getTranscriptionJobId();
835 }
836 }
837 }
838
839 if (jobId == null) {
840 throw new TranscriptionServiceException(
841 "No completed or closed transcription job found in database for media package " + mpId);
842 }
843
844
845 URI uri = workspace.getCollectionURI(TRANSCRIPT_COLLECTION, jobId + ".srt");
846
847 logger.info("Looking for transcript at URI: {}", uri);
848
849 try {
850 workspace.get(uri);
851 logger.info("Found captions at URI: {}", uri);
852 } catch (Exception e) {
853 logger.info("Results not saved: getting from service for jobId {}", jobId);
854
855 checkJobResults(jobId);
856 }
857 MediaPackageElementBuilder builder = MediaPackageElementBuilderFactory.newInstance().newElementBuilder();
858 logger.debug("Returning MPE with results file URI: {}", uri);
859 return builder.elementFromURI(uri, type, new MediaPackageElementFlavor("captions", "srt"));
860 } catch (TranscriptionDatabaseException e) {
861 throw new TranscriptionServiceException("Job id not informed and could not find transcription", e);
862 }
863 }
864
865
866
867
868
869
870
871
872 public String getTranscriptionStatus(String mpId) throws TranscriptionServiceException {
873 try {
874 for (TranscriptionJobControl jc : database.findByMediaPackage(mpId)) {
875 return jc.getStatus();
876 }
877 } catch (TranscriptionDatabaseException e) {
878 throw new TranscriptionServiceException("Mediapackage id transcription status unknown", e);
879 }
880 return "Unknown";
881 }
882
883
884
885
886
887
888 protected CloseableHttpClient makeHttpClient() {
889 return makeHttpClient(SOCKET_TIMEOUT);
890 }
891
892
893
894
895
896
897 protected CloseableHttpClient makeHttpClient(int socketTimeout) {
898 RequestConfig reqConfig = RequestConfig.custom()
899 .setConnectTimeout(AmberscriptTranscriptionService.CONNECTION_TIMEOUT)
900 .setSocketTimeout(socketTimeout)
901 .setConnectionRequestTimeout(AmberscriptTranscriptionService.CONNECTION_TIMEOUT)
902 .build();
903 CloseableHttpClient httpClient = HttpClientBuilder.create()
904 .useSystemProperties()
905 .setDefaultRequestConfig(reqConfig)
906 .build();
907 return httpClient;
908 }
909
910
911 protected void deleteStorageFile(String filename) {
912 try {
913 logger.debug("Removing {} from collection {}.", filename, SUBMISSION_COLLECTION);
914 wfr.deleteFromCollection(SUBMISSION_COLLECTION, filename, false);
915 } catch (IOException e) {
916 logger.warn("Unable to remove submission file {} from collection {}", filename, SUBMISSION_COLLECTION);
917 }
918 }
919
920 @Reference
921 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
922 this.serviceRegistry = serviceRegistry;
923 }
924
925 @Reference
926 public void setSecurityService(SecurityService securityService) {
927 this.securityService = securityService;
928 }
929
930 @Reference
931 public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
932 this.userDirectoryService = userDirectoryService;
933 }
934
935 @Reference
936 public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
937 this.organizationDirectoryService = organizationDirectoryService;
938 }
939
940 @Reference
941 public void setWorkspace(Workspace ws) {
942 this.workspace = ws;
943 }
944
945 @Reference
946 public void setWorkingFileRepository(WorkingFileRepository wfr) {
947 this.wfr = wfr;
948 }
949
950 @Reference
951 public void setDatabase(TranscriptionDatabase service) {
952 this.database = service;
953 }
954
955 @Reference
956 public void setAssetManager(AssetManager service) {
957 this.assetManager = service;
958 }
959
960 @Reference
961 public void setWorkflowService(WorkflowService service) {
962 this.workflowService = service;
963 }
964
965 @Override
966 protected ServiceRegistry getServiceRegistry() {
967 return serviceRegistry;
968 }
969
970 @Override
971 protected SecurityService getSecurityService() {
972 return securityService;
973 }
974
975 @Override
976 protected UserDirectoryService getUserDirectoryService() {
977 return userDirectoryService;
978 }
979
980 @Override
981 protected OrganizationDirectoryService getOrganizationDirectoryService() {
982 return organizationDirectoryService;
983 }
984
985
986 void setWfUtil(Workflows wfUtil) {
987 this.wfUtil = wfUtil;
988 }
989
990 class WorkflowDispatcher implements Runnable {
991
992
993
994
995
996
997 @Override
998 public void run() {
999 logger.debug("WorkflowDispatcher waking up...");
1000
1001 try {
1002
1003
1004 long providerId;
1005 TranscriptionProviderControl providerInfo = database.findIdByProvider(PROVIDER);
1006 if (providerInfo != null) {
1007 providerId = providerInfo.getId();
1008 } else {
1009 logger.debug("No jobs yet for provider {}.", PROVIDER);
1010 return;
1011 }
1012
1013 List<TranscriptionJobControl> jobs = database.findByStatus(TranscriptionJobControl.Status.InProgress.name(),
1014 TranscriptionJobControl.Status.TranscriptionComplete.name());
1015
1016 for (TranscriptionJobControl j : jobs) {
1017
1018
1019 if (j.getProviderId() != providerId) {
1020 continue;
1021 }
1022
1023 String mpId = j.getMediaPackageId();
1024 String jobId = j.getTranscriptionJobId();
1025
1026
1027 if (TranscriptionJobControl.Status.InProgress.name().equals(j.getStatus())) {
1028
1029 if (j.getDateExpected().getTime() < System.currentTimeMillis()) {
1030 try {
1031 if (!checkJobResults(jobId)) {
1032
1033 if (j.getDateExpected().getTime() + maxProcessingSeconds * 1000 < System.currentTimeMillis()) {
1034
1035 database.updateJobControl(jobId, TranscriptionJobControl.Status.Canceled.name());
1036 }
1037
1038 continue;
1039 }
1040 } catch (TranscriptionServiceException e) {
1041 try {
1042 database.updateJobControl(jobId, TranscriptionJobControl.Status.Canceled.name());
1043 continue;
1044 } catch (TranscriptionDatabaseException ex) {
1045 logger.warn("Could not cancel job '{}'.", jobId);
1046 }
1047 }
1048 } else {
1049 continue;
1050 }
1051 }
1052
1053
1054 try {
1055 DefaultOrganization defaultOrg = new DefaultOrganization();
1056 securityService.setOrganization(defaultOrg);
1057 securityService.setUser(SecurityUtil.createSystemUser(systemAccount, defaultOrg));
1058
1059
1060 Optional<Snapshot> snapshot = assetManager.getLatestSnapshot(mpId);
1061 if (snapshot.isEmpty()) {
1062 logger.warn("Media package {} no longer exists in the asset manager. It was likely deleted. "
1063 + "Dropping the generated transcription.", mpId);
1064 database.updateJobControl(jobId, TranscriptionJobControl.Status.Error.name());
1065 continue;
1066 }
1067
1068 String org = snapshot.get().getOrganizationId();
1069 Organization organization = organizationDirectoryService.getOrganization(org);
1070 if (organization == null) {
1071 logger.warn("Media package {} has an unknown organization {}. Skipped.", mpId, org);
1072 continue;
1073 }
1074 securityService.setOrganization(organization);
1075
1076
1077 Map<String, String> params = new HashMap<String, String>();
1078 params.put("transcriptionJobId", jobId);
1079 WorkflowDefinition wfDef = workflowService.getWorkflowDefinitionById(workflowDefinitionId);
1080
1081
1082
1083 Workflows workflows = wfUtil != null ? wfUtil : new Workflows(assetManager, workflowService);
1084 Set<String> mpIds = new HashSet<String>();
1085 mpIds.add(mpId);
1086 List<WorkflowInstance> wfList = workflows
1087 .applyWorkflowToLatestVersion(mpIds, ConfiguredWorkflow.workflow(wfDef, params));
1088 String wfId = wfList.size() > 0 ? Long.toString(wfList.get(0).getId()) : "Unknown";
1089
1090
1091 database.updateJobControl(jobId, TranscriptionJobControl.Status.Closed.name());
1092 logger.info("Attach transcription workflow {} scheduled for mp {}, transcription service job {}",
1093 wfId, mpId, jobId);
1094 } catch (Exception e) {
1095 logger.warn("Attach transcription workflow could NOT be scheduled for mp {}, amberscript job {}, {}: {}",
1096 mpId, jobId, e.getClass().getName(), e.getMessage());
1097 }
1098 }
1099 } catch (TranscriptionDatabaseException e) {
1100 logger.warn("Could not read transcription job control database: {}", e.getMessage());
1101 }
1102 }
1103 }
1104
1105 class ResultsFileCleanup implements Runnable {
1106
1107 @Override
1108 public void run() {
1109 logger.info("ResultsFileCleanup waking up...");
1110 try {
1111
1112 wfr.cleanupOldFilesFromCollection(TRANSCRIPT_COLLECTION, cleanupResultDays);
1113 wfr.cleanupOldFilesFromCollection(SUBMISSION_COLLECTION, cleanupResultDays);
1114 } catch (IOException e) {
1115 logger.warn("Could not cleanup old submission and transcript results files", e);
1116 }
1117 }
1118 }
1119
1120 private String removePrivateInfo(String unsafeString) {
1121 String safeString = unsafeString.replace(clientKey, "__api-key-was-hidden__");
1122 return safeString;
1123 }
1124 }