1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.transcription.amberscript;
22
23 import org.opencastproject.assetmanager.api.AssetManager;
24 import org.opencastproject.assetmanager.api.fn.Enrichments;
25 import org.opencastproject.assetmanager.api.query.AQueryBuilder;
26 import org.opencastproject.assetmanager.api.query.AResult;
27 import org.opencastproject.assetmanager.util.Workflows;
28 import org.opencastproject.job.api.AbstractJobProducer;
29 import org.opencastproject.job.api.Job;
30 import org.opencastproject.mediapackage.Catalog;
31 import org.opencastproject.mediapackage.MediaPackageElement;
32 import org.opencastproject.mediapackage.MediaPackageElementBuilder;
33 import org.opencastproject.mediapackage.MediaPackageElementBuilderFactory;
34 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
35 import org.opencastproject.mediapackage.MediaPackageElementParser;
36 import org.opencastproject.mediapackage.MediaPackageElements;
37 import org.opencastproject.mediapackage.MediaPackageException;
38 import org.opencastproject.mediapackage.Track;
39 import org.opencastproject.metadata.dublincore.DublinCore;
40 import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
41 import org.opencastproject.metadata.dublincore.DublinCoreValue;
42 import org.opencastproject.metadata.dublincore.DublinCores;
43 import org.opencastproject.security.api.DefaultOrganization;
44 import org.opencastproject.security.api.Organization;
45 import org.opencastproject.security.api.OrganizationDirectoryService;
46 import org.opencastproject.security.api.SecurityService;
47 import org.opencastproject.security.api.UserDirectoryService;
48 import org.opencastproject.security.util.SecurityUtil;
49 import org.opencastproject.serviceregistry.api.ServiceRegistry;
50 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
51 import org.opencastproject.systems.OpencastConstants;
52 import org.opencastproject.transcription.api.TranscriptionService;
53 import org.opencastproject.transcription.api.TranscriptionServiceException;
54 import org.opencastproject.transcription.persistence.TranscriptionDatabase;
55 import org.opencastproject.transcription.persistence.TranscriptionDatabaseException;
56 import org.opencastproject.transcription.persistence.TranscriptionJobControl;
57 import org.opencastproject.transcription.persistence.TranscriptionProviderControl;
58 import org.opencastproject.util.NotFoundException;
59 import org.opencastproject.util.OsgiUtil;
60 import org.opencastproject.util.data.Option;
61 import org.opencastproject.workflow.api.ConfiguredWorkflow;
62 import org.opencastproject.workflow.api.WorkflowDefinition;
63 import org.opencastproject.workflow.api.WorkflowInstance;
64 import org.opencastproject.workflow.api.WorkflowService;
65 import org.opencastproject.workingfilerepository.api.WorkingFileRepository;
66 import org.opencastproject.workspace.api.Workspace;
67
68 import org.apache.commons.lang3.StringUtils;
69 import org.apache.http.HttpEntity;
70 import org.apache.http.HttpStatus;
71 import org.apache.http.client.config.RequestConfig;
72 import org.apache.http.client.methods.CloseableHttpResponse;
73 import org.apache.http.client.methods.HttpGet;
74 import org.apache.http.client.methods.HttpPost;
75 import org.apache.http.entity.ContentType;
76 import org.apache.http.entity.mime.HttpMultipartMode;
77 import org.apache.http.entity.mime.MultipartEntityBuilder;
78 import org.apache.http.entity.mime.content.FileBody;
79 import org.apache.http.impl.client.CloseableHttpClient;
80 import org.apache.http.impl.client.HttpClientBuilder;
81 import org.apache.http.util.EntityUtils;
82 import org.json.simple.JSONObject;
83 import org.json.simple.parser.JSONParser;
84 import org.json.simple.parser.ParseException;
85 import org.osgi.service.component.ComponentContext;
86 import org.osgi.service.component.annotations.Activate;
87 import org.osgi.service.component.annotations.Component;
88 import org.osgi.service.component.annotations.Deactivate;
89 import org.osgi.service.component.annotations.Reference;
90 import org.slf4j.Logger;
91 import org.slf4j.LoggerFactory;
92
93 import java.io.IOException;
94 import java.io.InputStream;
95 import java.net.URI;
96 import java.util.Arrays;
97 import java.util.Date;
98 import java.util.HashMap;
99 import java.util.HashSet;
100 import java.util.List;
101 import java.util.Map;
102 import java.util.Set;
103 import java.util.concurrent.Executors;
104 import java.util.concurrent.ScheduledExecutorService;
105 import java.util.concurrent.TimeUnit;
106
107 @Component(
108 immediate = true,
109 service = { TranscriptionService.class,AmberscriptTranscriptionService.class },
110 property = {
111 "service.description=AmberScript Transcription Service",
112 "provider=amberscript"
113 }
114 )
115 public class AmberscriptTranscriptionService extends AbstractJobProducer implements TranscriptionService {
116
117 private static final Logger logger = LoggerFactory.getLogger(AmberscriptTranscriptionService.class);
118
119 private static final String JOB_TYPE = "org.opencastproject.transcription.amberscript";
120
121 public static final String SUBMISSION_COLLECTION = "amberscript-submission";
122 private static final String TRANSCRIPT_COLLECTION = "amberscript-transcripts";
123
124 private static final int CONNECTION_TIMEOUT = 60000;
125 private static final int SOCKET_TIMEOUT = 60000;
126
127
128
129 private static final String BASE_URL = "https://qs.amberscript.com";
130 private static final String STATUS_OPEN = "OPEN";
131 private static final String STATUS_DONE = "DONE";
132 private static final String STATUS_ERROR = "ERROR";
133
134 private static final String ERROR_NO_SPEECH = "No speech found";
135
136 private static final String PROVIDER = "amberscript";
137
138 private AssetManager assetManager;
139 private OrganizationDirectoryService organizationDirectoryService;
140 private ScheduledExecutorService scheduledExecutor;
141 private SecurityService securityService;
142 private ServiceRegistry serviceRegistry;
143 private TranscriptionDatabase database;
144 private UserDirectoryService userDirectoryService;
145 private WorkflowService workflowService;
146 private WorkingFileRepository wfr;
147 private Workspace workspace;
148
149
150 private Workflows wfUtil;
151
152 private enum Operation {
153 StartTranscription
154 }
155
156
157
158 private enum SpeakerMetadataField {
159 creator, contributor, both
160 }
161
162
163 private static final String ENABLED_CONFIG = "enabled";
164 private static final String CLIENT_KEY = "client.key";
165 private static final String LANGUAGE = "language";
166 private static final String LANGUAGE_FROM_DUBLINCORE = "language.from.dublincore";
167 private static final String LANGUAGE_CODE_MAP = "language.code.map";
168 private static final String AMBERSCRIPTJOBTYPE = "jobtype";
169 private static final String WORKFLOW_CONFIG = "workflow";
170 private static final String DISPATCH_WORKFLOW_INTERVAL_CONFIG = "workflow.dispatch.interval";
171 private static final String MAX_PROCESSING_TIME_CONFIG = "max.overdue.time";
172 private static final String CLEANUP_RESULTS_DAYS_CONFIG = "cleanup.results.days";
173 private static final String SPEAKER = "speaker";
174 private static final String SPEAKER_FROM_DUBLINCORE = "speaker.from.dublincore";
175 private static final String SPEAKER_METADATA_FIELD = "speaker.metadata.field";
176 private static final String TRANSCRIPTIONTYPE = "transcriptiontype";
177 private static final String GLOSSARY = "glossary";
178 private static final String TRANSCRIPTIONSTYLE = "cleanread";
179 private static final String TARGETLANGUAGE = "targetlanguage";
180
181
182 private boolean enabled = false;
183 private String clientKey;
184 private String language = "en";
185
186 private boolean languageFromDublinCore;
187 private String amberscriptJobType = "direct";
188 private String workflowDefinitionId = "amberscript-attach-transcripts";
189 private long workflowDispatchIntervalSeconds = 60;
190 private long maxProcessingSeconds = 8 * 24 * 60 * 60;
191 private int cleanupResultDays = 7;
192 private int numberOfSpeakers = 1;
193 private boolean speakerFromDublinCore = true;
194 private SpeakerMetadataField speakerMetadataField = SpeakerMetadataField.creator;
195 private String transcriptionType = "transcription";
196 private String glossary = "";
197 private String transcriptionStyle = "cleanread";
198 private String targetLanguage = "";
199
200
201
202
203
204 private AmberscriptLangUtil amberscriptLangUtil;
205
206 private String systemAccount;
207
208 public AmberscriptTranscriptionService() {
209 super(JOB_TYPE);
210 }
211
212 @Activate
213 public void activate(ComponentContext cc) {
214
215 Option<Boolean> enabledOpt = OsgiUtil.getOptCfgAsBoolean(cc.getProperties(), ENABLED_CONFIG);
216 if (enabledOpt.isSome()) {
217 enabled = enabledOpt.get();
218 }
219
220 if (!enabled) {
221 logger.info("Amberscript Transcription Service disabled."
222 + " If you want to enable it, please update the service configuration.");
223 return;
224 }
225
226 Option<String> clientKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(), CLIENT_KEY);
227 if (clientKeyOpt.isSome()) {
228 clientKey = clientKeyOpt.get();
229 } else {
230 logger.warn("API key was not set.");
231 return;
232 }
233
234 Option<String> languageOpt = OsgiUtil.getOptCfg(cc.getProperties(), LANGUAGE);
235 if (languageOpt.isSome()) {
236 language = languageOpt.get();
237 logger.info("Default language is set to '{}'.", language);
238 } else {
239 logger.info("Default language '{}' will be used.", language);
240 }
241
242 Option<String> languageFromDublinCoreOpt = OsgiUtil.getOptCfg(cc.getProperties(), LANGUAGE_FROM_DUBLINCORE);
243 if (languageFromDublinCoreOpt.isSome()) {
244 try {
245 languageFromDublinCore = Boolean.parseBoolean(languageFromDublinCoreOpt.get());
246 } catch (Exception e) {
247 logger.warn("Configuration value for '{}' is invalid, defaulting to false.", LANGUAGE_FROM_DUBLINCORE);
248 }
249 }
250 logger.info("Configuration value for '{}' is set to '{}'.", LANGUAGE_FROM_DUBLINCORE, languageFromDublinCore);
251
252 amberscriptLangUtil = AmberscriptLangUtil.getInstance();
253 int customMapEntriesCount = 0;
254 Option<String> langCodeMapOpt = OsgiUtil.getOptCfg(cc.getProperties(), LANGUAGE_CODE_MAP);
255 if (langCodeMapOpt.isSome()) {
256 try {
257 String langCodeMapStr = langCodeMapOpt.get();
258 if (langCodeMapStr != null) {
259 for (String mapping : langCodeMapStr.split(",")) {
260 String[] mapEntries = mapping.split(":");
261 amberscriptLangUtil.addCustomMapping(mapEntries[0], mapEntries[1]);
262 customMapEntriesCount += 1;
263 }
264 }
265 } catch (Exception e) {
266 logger.warn("Configuration '{}' is invalid. Using just default mapping.", LANGUAGE_CODE_MAP);
267 }
268 }
269 logger.info("Language code map was set. Added '{}' additional entries.", customMapEntriesCount);
270
271 Option<String> amberscriptJobTypeOpt = OsgiUtil.getOptCfg(cc.getProperties(), AMBERSCRIPTJOBTYPE);
272 if (amberscriptJobTypeOpt.isSome()) {
273 amberscriptJobType = amberscriptJobTypeOpt.get();
274 logger.info("Default Amberscript job type is set to '{}'.", amberscriptJobType);
275 } else {
276 logger.info("Default Amberscript job type '{}' will be used.", amberscriptJobType);
277 }
278
279 Option<String> wfOpt = OsgiUtil.getOptCfg(cc.getProperties(), WORKFLOW_CONFIG);
280 if (wfOpt.isSome()) {
281 workflowDefinitionId = wfOpt.get();
282 logger.info("Workflow is set to '{}'.", workflowDefinitionId);
283 } else {
284 logger.info("Default workflow '{}' will be used.", workflowDefinitionId);
285 }
286
287 Option<String> intervalOpt = OsgiUtil.getOptCfg(cc.getProperties(), DISPATCH_WORKFLOW_INTERVAL_CONFIG);
288 if (intervalOpt.isSome()) {
289 try {
290 workflowDispatchIntervalSeconds = Long.parseLong(intervalOpt.get());
291 } catch (NumberFormatException e) {
292 logger.warn("Configured '{}' is invalid. Using default.", DISPATCH_WORKFLOW_INTERVAL_CONFIG);
293 }
294 }
295 logger.info("Workflow dispatch interval is {} seconds.", workflowDispatchIntervalSeconds);
296
297 Option<String> maxProcessingOpt = OsgiUtil.getOptCfg(cc.getProperties(), MAX_PROCESSING_TIME_CONFIG);
298 if (maxProcessingOpt.isSome()) {
299 try {
300 maxProcessingSeconds = Long.parseLong(maxProcessingOpt.get());
301 } catch (NumberFormatException e) {
302 logger.warn("Configured '{}' is invalid. Using default.", MAX_PROCESSING_TIME_CONFIG);
303 }
304 }
305 logger.info("Maximum processing time for transcription job is {} seconds.", maxProcessingSeconds);
306
307 Option<String> cleanupOpt = OsgiUtil.getOptCfg(cc.getProperties(), CLEANUP_RESULTS_DAYS_CONFIG);
308 if (cleanupOpt.isSome()) {
309 try {
310 cleanupResultDays = Integer.parseInt(cleanupOpt.get());
311 } catch (NumberFormatException e) {
312 logger.warn("Configured '{}' is invalid. Using default.", CLEANUP_RESULTS_DAYS_CONFIG);
313 }
314 }
315 logger.info("Cleanup result files after {} days.", cleanupResultDays);
316
317 Option<String> speakerOpt = OsgiUtil.getOptCfg(cc.getProperties(), SPEAKER);
318 if (speakerOpt.isSome()) {
319 try {
320 numberOfSpeakers = Integer.parseInt(speakerOpt.get());
321 } catch (NumberFormatException e) {
322 logger.warn("Configured '{}' is invalid. Using default.", SPEAKER);
323 }
324 }
325 logger.info("Default number of speakers is set to '{}'.", numberOfSpeakers);
326
327 Option<String> speakerFromDublinCoreOpt = OsgiUtil.getOptCfg(cc.getProperties(), SPEAKER_FROM_DUBLINCORE);
328 if (speakerFromDublinCoreOpt.isSome()) {
329 try {
330 speakerFromDublinCore = Boolean.parseBoolean(speakerFromDublinCoreOpt.get());
331 } catch (Exception e) {
332 logger.warn("Configuration value for '{}' is invalid, defaulting to true.", SPEAKER_FROM_DUBLINCORE);
333 }
334 }
335 logger.info("Configuration value for '{}' is set to '{}'.", SPEAKER_FROM_DUBLINCORE, speakerFromDublinCore);
336
337 Option<String> speakerMetadataFieldOpt = OsgiUtil.getOptCfg(cc.getProperties(), SPEAKER_METADATA_FIELD);
338 if (speakerMetadataFieldOpt.isSome()) {
339 try {
340 speakerMetadataField = SpeakerMetadataField.valueOf(speakerMetadataFieldOpt.get());
341 } catch (IllegalArgumentException e) {
342 logger.warn("Value '{}' is invalid for configuration '{}'. Using default: '{}'.",
343 speakerMetadataFieldOpt.get(), SPEAKER_METADATA_FIELD, speakerMetadataField);
344 }
345 }
346 logger.info("Default metadata field for calculating the amount of speakers is set to '{}'.", speakerMetadataField);
347
348 Option<String> transcriptionTypeOpt = OsgiUtil.getOptCfg(cc.getProperties(), TRANSCRIPTIONTYPE);
349 if (transcriptionTypeOpt.isSome()) {
350 if (List.of("transcription", "captions", "translatedSubtitles").contains(transcriptionType)) {
351 transcriptionType = transcriptionTypeOpt.get();
352 logger.info("Default transcription type is set to '{}'.", transcriptionType);
353 } else {
354 logger.warn("Value '{}' is invalid for configuration '{}'. Using default: '{}'.",
355 transcriptionTypeOpt.get(), TRANSCRIPTIONTYPE, transcriptionType);
356 }
357 } else {
358 logger.info("Default transcription type '{}' will be used.", transcriptionType);
359 }
360
361 Option<String> glossaryOpt = OsgiUtil.getOptCfg(cc.getProperties(), GLOSSARY);
362 if (glossaryOpt.isSome()) {
363 glossary = glossaryOpt.get();
364 logger.info("Default glossary is set to '{}'.", glossary);
365 } else {
366 logger.info("No glossary will be used by default");
367 }
368
369 Option<String> transcriptionStyleOpt = OsgiUtil.getOptCfg(cc.getProperties(), TRANSCRIPTIONSTYLE);
370 if (transcriptionStyleOpt.isSome()) {
371 if (List.of("cleanread", "verbatim").contains(transcriptionStyle)) {
372 transcriptionStyle = transcriptionStyleOpt.get();
373 logger.info("Default transcription style is set to '{}'.", transcriptionStyle);
374 } else {
375 logger.warn("Value '{}' is invalid for configuration '{}'. Using default: '{}'.",
376 transcriptionStyleOpt.get(), TRANSCRIPTIONSTYLE, transcriptionStyle);
377 }
378 } else {
379 logger.info("Default transcription style '{}' will be used.", transcriptionStyle);
380 }
381
382 Option<String> targetLanguageOpt = OsgiUtil.getOptCfg(cc.getProperties(), TARGETLANGUAGE);
383 if (targetLanguageOpt.isSome()) {
384 targetLanguage = targetLanguageOpt.get();
385 logger.info("Default target language is set to '{}'.", targetLanguage);
386 } else {
387 logger.info("Transcriptions won't be translated");
388 }
389
390 systemAccount = OsgiUtil.getContextProperty(cc, OpencastConstants.DIGEST_USER_PROPERTY);
391
392 scheduledExecutor = Executors.newScheduledThreadPool(2);
393
394 scheduledExecutor.scheduleWithFixedDelay(new WorkflowDispatcher(), 120, workflowDispatchIntervalSeconds,
395 TimeUnit.SECONDS);
396
397 scheduledExecutor.scheduleWithFixedDelay(new ResultsFileCleanup(), 1, 1, TimeUnit.DAYS);
398
399 logger.info("Activated.");
400 }
401
402 @Deactivate
403 public void deactivate() {
404 if (scheduledExecutor != null) {
405 scheduledExecutor.shutdown();
406 }
407 }
408
409 @Override
410 public Job startTranscription(String mpId, Track track) throws TranscriptionServiceException {
411 throw new UnsupportedOperationException("Not supported.");
412 }
413
414 @Override
415 public Job startTranscription(String mpId, Track track, String... args) throws TranscriptionServiceException {
416 if (!enabled) {
417 throw new TranscriptionServiceException("AmberScript Transcription Service disabled."
418 + " If you want to enable it, please update the service configuration.");
419 }
420
421 String language = null;
422
423 if (languageFromDublinCore) {
424 for (Catalog catalog : track.getMediaPackage().getCatalogs(MediaPackageElements.EPISODE)) {
425 try (InputStream in = workspace.read(catalog.getURI())) {
426 DublinCoreCatalog dublinCatalog = DublinCores.read(in);
427 String dublinCoreLang = dublinCatalog.getFirst(DublinCore.PROPERTY_LANGUAGE);
428 if (dublinCoreLang != null) {
429 language = amberscriptLangUtil.getLanguageCodeOrNull(dublinCoreLang);
430 }
431 if (language != null) {
432 break;
433 }
434 } catch (IOException | NotFoundException e) {
435 logger.error(String.format("Unable to load dublin core catalog for event '%s'",
436 track.getMediaPackage().getIdentifier()), e);
437 }
438 }
439 }
440
441 if (language == null) {
442 if (args.length > 0 && StringUtils.isNotBlank(args[0])) {
443 language = args[0];
444 } else {
445 language = getLanguage();
446 }
447 }
448
449 String jobType;
450 if (args.length > 1 && StringUtils.isNotBlank(args[1])) {
451 jobType = args[1];
452 } else {
453 jobType = getAmberscriptJobType();
454 }
455
456 int numberOfSpeakers = 0;
457 if (speakerFromDublinCore) {
458 Set<String> speakers = new HashSet<>();
459 for (Catalog catalog : track.getMediaPackage().getCatalogs(MediaPackageElements.EPISODE)) {
460 try (InputStream in = workspace.read(catalog.getURI())) {
461 DublinCoreCatalog dublinCatalog = DublinCores.read(in);
462 if (speakerMetadataField.equals(SpeakerMetadataField.creator)
463 || speakerMetadataField.equals(SpeakerMetadataField.both)) {
464 dublinCatalog.get(DublinCore.PROPERTY_CREATOR).stream()
465 .map(DublinCoreValue::getValue).forEach(speakers::add);
466 }
467 if (speakerMetadataField.equals(SpeakerMetadataField.contributor)
468 || speakerMetadataField.equals(SpeakerMetadataField.both)) {
469 dublinCatalog.get(DublinCore.PROPERTY_CONTRIBUTOR).stream()
470 .map(DublinCoreValue::getValue).forEach(speakers::add);
471 }
472
473 } catch (IOException | NotFoundException e) {
474 logger.error("Unable to load dublin core catalog for event '{}'",
475 track.getMediaPackage().getIdentifier(), e);
476 }
477 }
478
479 if (speakers.size() >= 1) {
480 numberOfSpeakers = speakers.size();
481 }
482 }
483
484 if (numberOfSpeakers == 0) {
485 if (args.length > 2 && StringUtils.isNotBlank(args[2])) {
486 numberOfSpeakers = Integer.parseInt(args[2]);
487 } else {
488 numberOfSpeakers = getNumberOfSpeakers();
489 }
490 }
491
492 String transcriptionType;
493 if (args.length > 3 && StringUtils.isNotBlank(args[3])) {
494 transcriptionType = args[3];
495 } else {
496 transcriptionType = getTranscriptionType();
497 }
498
499 String glossary;
500 if (args.length > 4 && args[4] != null) {
501 glossary = args[4];
502 } else {
503 glossary = getGlossary();
504 }
505
506 String transcriptionStyle;
507 if (args.length > 5 && StringUtils.isNotBlank(args[5])) {
508 transcriptionStyle = args[5];
509 } else {
510 transcriptionStyle = getTranscriptionStyle();
511 }
512
513 String targetLanguage;
514 if (args.length > 6 && args[6] != null) {
515 targetLanguage = args[6];
516 } else {
517 targetLanguage = getTargetLanguage();
518 }
519
520 logger.info("New transcription job for mpId '{}' language '{}' job type '{}' speakers '{}' transcription type '{}'"
521 + "glossary '{}'.", mpId, language, jobType, numberOfSpeakers, transcriptionType, glossary);
522
523 try {
524 return serviceRegistry.createJob(JOB_TYPE, Operation.StartTranscription.name(), Arrays.asList(
525 mpId, MediaPackageElementParser.getAsXml(track), language, jobType, Integer.toString(numberOfSpeakers),
526 transcriptionType, glossary, transcriptionStyle, targetLanguage));
527 } catch (ServiceRegistryException e) {
528 throw new TranscriptionServiceException("Unable to create a job", e);
529 } catch (MediaPackageException e) {
530 throw new TranscriptionServiceException("Invalid track '" + track.toString() + "'", e);
531 }
532 }
533
534 @Override
535 public void transcriptionDone(String mpId, Object results) { }
536
537 private void transcriptionDone(String mpId, String jobId) {
538 try {
539 logger.info("Transcription done for mpId '{}'.", mpId);
540 if (getAndSaveJobResult(jobId)) {
541 database.updateJobControl(jobId, TranscriptionJobControl.Status.TranscriptionComplete.name());
542 } else {
543 logger.debug("Unable to get and save the transcription result for mpId '{}'.", mpId);
544 }
545 } catch (IOException | TranscriptionServiceException e) {
546 logger.warn("Could not save transcription results file for mpId '{}': {}", mpId, e.toString());
547 } catch (TranscriptionDatabaseException e) {
548 logger.warn("Transcription results file were saved but state in db not updated for mpId '{}': ", mpId, e);
549 }
550 }
551
552 @Override
553 public void transcriptionError(String mpId, Object obj) throws TranscriptionServiceException {
554 JSONObject jsonObj = null;
555 String jobId = null;
556 try {
557 jsonObj = (JSONObject) obj;
558 jobId = (String) jsonObj.get("name");
559
560 database.updateJobControl(jobId, TranscriptionJobControl.Status.Error.name());
561 TranscriptionJobControl jobControl = database.findByJob(jobId);
562 logger.warn(String.format("Error received for media package %s, job id %s",
563 jobControl.getMediaPackageId(), jobId));
564
565 } catch (TranscriptionDatabaseException e) {
566 logger.warn("Transcription error. State in db could not be updated to error for mpId {}, jobId {}", mpId, jobId);
567 throw new TranscriptionServiceException("Could not update transcription job control db", e);
568 }
569 }
570
571 @Override
572 public Map<String, Object> getReturnValues(String mpId, String jobId) throws TranscriptionServiceException {
573 throw new TranscriptionServiceException("Method not implemented");
574 }
575
576 @Override
577 public String getLanguage() {
578 return language;
579 }
580
581 public String getAmberscriptJobType() {
582 return amberscriptJobType;
583 }
584
585 public int getNumberOfSpeakers() {
586 return numberOfSpeakers;
587 }
588
589 public String getTranscriptionType() {
590 return transcriptionType;
591 }
592
593 public String getGlossary() {
594 return glossary;
595 }
596
597 public String getTranscriptionStyle() {
598 return transcriptionStyle;
599 }
600
601 public String getTargetLanguage() {
602 return targetLanguage;
603 }
604
605
606 @Override
607 protected String process(Job job) throws Exception {
608 Operation op = null;
609 String operation = job.getOperation();
610 List<String> arguments = job.getArguments();
611 String result = "";
612 op = Operation.valueOf(operation);
613 switch (op) {
614 case StartTranscription:
615 String mpId = arguments.get(0);
616 Track track = (Track) MediaPackageElementParser.getFromXml(arguments.get(1));
617 String languageCode = arguments.get(2);
618 String jobType = arguments.get(3);
619 String numberOfSpeakers = arguments.get(4);
620 String transcriptionType = arguments.get(5);
621 String glossary = arguments.get(6);
622 String transcriptionStyle = arguments.get(7);
623 String targetLanguage = arguments.get(8);
624 createRecognitionsJob(mpId, track, languageCode, jobType, numberOfSpeakers, transcriptionType, glossary,
625 transcriptionStyle, targetLanguage);
626 break;
627 default:
628 throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
629 }
630 return result;
631 }
632
633 void createRecognitionsJob(String mpId, Track track, String languageCode, String jobType, String numberOfSpeakers,
634 String transcriptionType, String glossary, String transcriptionStyle, String targetLanguage)
635 throws TranscriptionServiceException {
636
637
638 CloseableHttpClient httpClient = makeHttpClient(3 * 3600 * 1000);
639 CloseableHttpResponse response = null;
640
641 String submitUrl = BASE_URL + "/jobs/upload-media"
642 + "?apiKey=" + clientKey
643 + "&language=" + languageCode
644 + "&jobType=" + jobType
645 + "&numberOfSpeakers=" + numberOfSpeakers
646 + "&transcriptionType=" + transcriptionType
647 + "&transcriptionStyle=" + transcriptionStyle;
648 if (StringUtils.isNotBlank(glossary)) {
649 submitUrl += "&glossary=" + glossary;
650 }
651 if (StringUtils.isNotBlank(targetLanguage)) {
652 submitUrl += "&targetLanguage=" + targetLanguage;
653 }
654
655 try {
656 FileBody fileBody = new FileBody(workspace.get(track.getURI()), ContentType.DEFAULT_BINARY);
657 MultipartEntityBuilder builder = MultipartEntityBuilder.create();
658 builder.setMode(HttpMultipartMode.BROWSER_COMPATIBLE);
659 builder.addPart("file", fileBody);
660 HttpEntity multipartEntity = builder.build();
661
662 HttpPost httpPost = new HttpPost(submitUrl);
663 httpPost.setEntity(multipartEntity);
664
665 response = httpClient.execute(httpPost);
666 int code = response.getStatusLine().getStatusCode();
667 HttpEntity entity = response.getEntity();
668
669 String jsonString = EntityUtils.toString(response.getEntity());
670 JSONParser jsonParser = new JSONParser();
671 JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
672
673 logger.debug("Submitting new transcription job: {}" + System.lineSeparator()
674 + "Response: {}", removePrivateInfo(submitUrl), jsonString);
675
676 JSONObject result = (JSONObject) jsonObject.get("jobStatus");
677 String jobId = (String) result.get("jobId");
678
679 switch (code) {
680 case HttpStatus.SC_OK:
681 logger.info("mp {} has been submitted to AmberScript service with jobId {}.", mpId, jobId);
682 database.storeJobControl(mpId, track.getIdentifier(), jobId, TranscriptionJobControl.Status.InProgress.name(),
683 track.getDuration() == null ? 0 : track.getDuration().longValue(), new Date(), PROVIDER);
684 EntityUtils.consume(entity);
685 return;
686 default:
687 String error = (String) result.get("error");
688 String message = (String) result.get("message");
689 String msg = String.format("Unable to submit job: API returned %s - %s: %s", code, error, message);
690 logger.warn(msg);
691 throw new TranscriptionServiceException(msg);
692 }
693 } catch (Exception e) {
694 logger.warn("Exception when calling the captions endpoint", e);
695 throw new TranscriptionServiceException("Exception when calling the captions endpoint", e);
696 } finally {
697 try {
698 httpClient.close();
699 if (response != null) {
700 response.close();
701 }
702 } catch (IOException e) {
703 }
704 }
705 }
706
707 boolean checkJobResults(String jobId) throws TranscriptionServiceException {
708
709 String mpId = "unknown";
710
711 CloseableHttpClient httpClient = makeHttpClient();
712 CloseableHttpResponse response = null;
713
714 String checkUrl = BASE_URL + "/jobs/status?jobId=" + jobId + "&apiKey=" + clientKey;
715
716 try {
717 HttpGet httpGet = new HttpGet(checkUrl);
718 response = httpClient.execute(httpGet);
719 int code = response.getStatusLine().getStatusCode();
720
721 HttpEntity entity = response.getEntity();
722 String jsonString = EntityUtils.toString(entity);
723 EntityUtils.consume(entity);
724
725 logger.debug("AmberScript API call was '{}'." + System.lineSeparator() + "Response: {}",
726 removePrivateInfo(checkUrl), jsonString);
727
728 JSONParser jsonParser = new JSONParser();
729 JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
730
731 switch (code) {
732 case HttpStatus.SC_OK:
733 JSONObject result = (JSONObject) jsonObject.get("jobStatus");
734 String status = (String) result.get("status");
735 switch (status) {
736 case STATUS_OPEN:
737 logger.debug("Captions job '{}' has not finished yet.", jobId);
738 return false;
739 case STATUS_ERROR:
740 var errorMsg = (String) result.get("errorMsg");
741 throw new TranscriptionServiceException(
742 String.format("Captions job '%s' failed: %s", jobId, errorMsg),
743 code,
744 ERROR_NO_SPEECH.equals(errorMsg));
745 case STATUS_DONE:
746 logger.info("Captions job '{}' has finished.", jobId);
747 TranscriptionJobControl jc = database.findByJob(jobId);
748 if (jc != null) {
749 mpId = jc.getMediaPackageId();
750 }
751 transcriptionDone(mpId, jobId);
752 return true;
753 default:
754 return false;
755 }
756 default:
757 String error = (String) jsonObject.get("error");
758 String errorMessage = (String) jsonObject.get("errorMessage");
759 logger.warn("Error while checking status: {}."
760 + System.lineSeparator() + "{}: {}", code, error, errorMessage);
761 throw new TranscriptionServiceException(
762 String.format("Captions job '%s' failed: Return Code %d", jobId, code), code);
763 }
764 } catch (TranscriptionDatabaseException | IOException | ParseException e) {
765 logger.warn("Error while checking status: {}", e.toString());
766 } finally {
767 try {
768 httpClient.close();
769 if (response != null) {
770 response.close();
771 }
772 } catch (IOException e) {
773 }
774 }
775 return false;
776 }
777
778 private boolean getAndSaveJobResult(String jobId) throws TranscriptionServiceException, IOException {
779
780 CloseableHttpClient httpClient = makeHttpClient();
781 CloseableHttpResponse response = null;
782
783 String transcriptUrl = BASE_URL + "/jobs/export?format=srt&jobId=" + jobId + "&apiKey=" + clientKey;
784
785 boolean done = false;
786
787 try {
788 HttpGet httpGet = new HttpGet(transcriptUrl);
789
790 response = httpClient.execute(httpGet);
791 int code = response.getStatusLine().getStatusCode();
792
793 logger.debug("AmberScript API {} http response {}", removePrivateInfo(transcriptUrl), code);
794
795 switch (code) {
796 case HttpStatus.SC_OK:
797 HttpEntity entity = response.getEntity();
798 logger.info("Retrieved details for transcription with jobid: '{}'", jobId);
799
800
801 workspace.putInCollection(TRANSCRIPT_COLLECTION, jobId + ".srt", entity.getContent());
802 done = true;
803 break;
804
805 default:
806 logger.warn("Error retrieving details for transcription with jobid: '{}', return status: {}.", jobId, code);
807 break;
808 }
809 } catch (Exception e) {
810 throw new TranscriptionServiceException(String.format(
811 "Exception when calling the transcription service for jobid: %s", jobId), e);
812 } finally {
813 try {
814 httpClient.close();
815 if (response != null) {
816 response.close();
817 }
818 } catch (IOException e) {
819 }
820 }
821
822 return done;
823 }
824
825 @Override
826
827 public MediaPackageElement getGeneratedTranscription(String mpId, String jobId, MediaPackageElement.Type type)
828 throws TranscriptionServiceException {
829 try {
830
831 if (jobId == null || "null".equals(jobId)) {
832 jobId = null;
833 for (TranscriptionJobControl jc : database.findByMediaPackage(mpId)) {
834 if (TranscriptionJobControl.Status.Closed.name().equals(jc.getStatus())
835 || TranscriptionJobControl.Status.TranscriptionComplete.name().equals(jc.getStatus())) {
836 jobId = jc.getTranscriptionJobId();
837 }
838 }
839 }
840
841 if (jobId == null) {
842 throw new TranscriptionServiceException(
843 "No completed or closed transcription job found in database for media package " + mpId);
844 }
845
846
847 URI uri = workspace.getCollectionURI(TRANSCRIPT_COLLECTION, jobId + ".srt");
848
849 logger.info("Looking for transcript at URI: {}", uri);
850
851 try {
852 workspace.get(uri);
853 logger.info("Found captions at URI: {}", uri);
854 } catch (Exception e) {
855 logger.info("Results not saved: getting from service for jobId {}", jobId);
856
857 checkJobResults(jobId);
858 }
859 MediaPackageElementBuilder builder = MediaPackageElementBuilderFactory.newInstance().newElementBuilder();
860 logger.debug("Returning MPE with results file URI: {}", uri);
861 return builder.elementFromURI(uri, type, new MediaPackageElementFlavor("captions", "srt"));
862 } catch (TranscriptionDatabaseException e) {
863 throw new TranscriptionServiceException("Job id not informed and could not find transcription", e);
864 }
865 }
866
867
868
869
870
871
872
873
874 public String getTranscriptionStatus(String mpId) throws TranscriptionServiceException {
875 try {
876 for (TranscriptionJobControl jc : database.findByMediaPackage(mpId)) {
877 return jc.getStatus();
878 }
879 } catch (TranscriptionDatabaseException e) {
880 throw new TranscriptionServiceException("Mediapackage id transcription status unknown", e);
881 }
882 return "Unknown";
883 }
884
885
886
887
888
889
890 protected CloseableHttpClient makeHttpClient() {
891 return makeHttpClient(SOCKET_TIMEOUT);
892 }
893
894
895
896
897
898
899 protected CloseableHttpClient makeHttpClient(int socketTimeout) {
900 RequestConfig reqConfig = RequestConfig.custom()
901 .setConnectTimeout(AmberscriptTranscriptionService.CONNECTION_TIMEOUT)
902 .setSocketTimeout(socketTimeout)
903 .setConnectionRequestTimeout(AmberscriptTranscriptionService.CONNECTION_TIMEOUT)
904 .build();
905 CloseableHttpClient httpClient = HttpClientBuilder.create()
906 .useSystemProperties()
907 .setDefaultRequestConfig(reqConfig)
908 .build();
909 return httpClient;
910 }
911
912
913 protected void deleteStorageFile(String filename) {
914 try {
915 logger.debug("Removing {} from collection {}.", filename, SUBMISSION_COLLECTION);
916 wfr.deleteFromCollection(SUBMISSION_COLLECTION, filename, false);
917 } catch (IOException e) {
918 logger.warn("Unable to remove submission file {} from collection {}", filename, SUBMISSION_COLLECTION);
919 }
920 }
921
922 @Reference
923 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
924 this.serviceRegistry = serviceRegistry;
925 }
926
927 @Reference
928 public void setSecurityService(SecurityService securityService) {
929 this.securityService = securityService;
930 }
931
932 @Reference
933 public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
934 this.userDirectoryService = userDirectoryService;
935 }
936
937 @Reference
938 public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
939 this.organizationDirectoryService = organizationDirectoryService;
940 }
941
942 @Reference
943 public void setWorkspace(Workspace ws) {
944 this.workspace = ws;
945 }
946
947 @Reference
948 public void setWorkingFileRepository(WorkingFileRepository wfr) {
949 this.wfr = wfr;
950 }
951
952 @Reference
953 public void setDatabase(TranscriptionDatabase service) {
954 this.database = service;
955 }
956
957 @Reference
958 public void setAssetManager(AssetManager service) {
959 this.assetManager = service;
960 }
961
962 @Reference
963 public void setWorkflowService(WorkflowService service) {
964 this.workflowService = service;
965 }
966
967 @Override
968 protected ServiceRegistry getServiceRegistry() {
969 return serviceRegistry;
970 }
971
972 @Override
973 protected SecurityService getSecurityService() {
974 return securityService;
975 }
976
977 @Override
978 protected UserDirectoryService getUserDirectoryService() {
979 return userDirectoryService;
980 }
981
982 @Override
983 protected OrganizationDirectoryService getOrganizationDirectoryService() {
984 return organizationDirectoryService;
985 }
986
987
988 void setWfUtil(Workflows wfUtil) {
989 this.wfUtil = wfUtil;
990 }
991
992 class WorkflowDispatcher implements Runnable {
993
994
995
996
997
998
999 @Override
1000 public void run() {
1001 logger.debug("WorkflowDispatcher waking up...");
1002
1003 try {
1004
1005
1006 long providerId;
1007 TranscriptionProviderControl providerInfo = database.findIdByProvider(PROVIDER);
1008 if (providerInfo != null) {
1009 providerId = providerInfo.getId();
1010 } else {
1011 logger.debug("No jobs yet for provider {}.", PROVIDER);
1012 return;
1013 }
1014
1015 List<TranscriptionJobControl> jobs = database.findByStatus(TranscriptionJobControl.Status.InProgress.name(),
1016 TranscriptionJobControl.Status.TranscriptionComplete.name());
1017
1018 for (TranscriptionJobControl j : jobs) {
1019
1020
1021 if (j.getProviderId() != providerId) {
1022 continue;
1023 }
1024
1025 String mpId = j.getMediaPackageId();
1026 String jobId = j.getTranscriptionJobId();
1027
1028
1029 if (TranscriptionJobControl.Status.InProgress.name().equals(j.getStatus())) {
1030
1031 if (j.getDateExpected().getTime() < System.currentTimeMillis()) {
1032 try {
1033 if (!checkJobResults(jobId)) {
1034
1035 if (j.getDateExpected().getTime() + maxProcessingSeconds * 1000 < System.currentTimeMillis()) {
1036
1037 database.updateJobControl(jobId, TranscriptionJobControl.Status.Canceled.name());
1038 }
1039
1040 continue;
1041 }
1042 } catch (TranscriptionServiceException e) {
1043 try {
1044 database.updateJobControl(jobId, TranscriptionJobControl.Status.Canceled.name());
1045 continue;
1046 } catch (TranscriptionDatabaseException ex) {
1047 logger.warn("Could not cancel job '{}'.", jobId);
1048 }
1049 }
1050 } else {
1051 continue;
1052 }
1053 }
1054
1055
1056 try {
1057 DefaultOrganization defaultOrg = new DefaultOrganization();
1058 securityService.setOrganization(defaultOrg);
1059 securityService.setUser(SecurityUtil.createSystemUser(systemAccount, defaultOrg));
1060
1061
1062 final AQueryBuilder q = assetManager.createQuery();
1063 final AResult r = q.select(q.snapshot()).where(q.mediaPackageId(mpId).and(q.version().isLatest())).run();
1064 if (r.getSize() == 0) {
1065 logger.warn("Media package {} no longer exists in the asset manager. It was likely deleted. "
1066 + "Dropping the generated transcription.", mpId);
1067 database.updateJobControl(jobId, TranscriptionJobControl.Status.Error.name());
1068 continue;
1069 }
1070
1071 String org = Enrichments.enrich(r).getSnapshots().stream().findFirst().get().getOrganizationId();
1072 Organization organization = organizationDirectoryService.getOrganization(org);
1073 if (organization == null) {
1074 logger.warn("Media package {} has an unknown organization {}. Skipped.", mpId, org);
1075 continue;
1076 }
1077 securityService.setOrganization(organization);
1078
1079
1080 Map<String, String> params = new HashMap<String, String>();
1081 params.put("transcriptionJobId", jobId);
1082 WorkflowDefinition wfDef = workflowService.getWorkflowDefinitionById(workflowDefinitionId);
1083
1084
1085
1086 Workflows workflows = wfUtil != null ? wfUtil : new Workflows(assetManager, workflowService);
1087 Set<String> mpIds = new HashSet<String>();
1088 mpIds.add(mpId);
1089 List<WorkflowInstance> wfList = workflows
1090 .applyWorkflowToLatestVersion(mpIds, ConfiguredWorkflow.workflow(wfDef, params)).toList();
1091 String wfId = wfList.size() > 0 ? Long.toString(wfList.get(0).getId()) : "Unknown";
1092
1093
1094 database.updateJobControl(jobId, TranscriptionJobControl.Status.Closed.name());
1095 logger.info("Attach transcription workflow {} scheduled for mp {}, transcription service job {}",
1096 wfId, mpId, jobId);
1097 } catch (Exception e) {
1098 logger.warn("Attach transcription workflow could NOT be scheduled for mp {}, amberscript job {}, {}: {}",
1099 mpId, jobId, e.getClass().getName(), e.getMessage());
1100 }
1101 }
1102 } catch (TranscriptionDatabaseException e) {
1103 logger.warn("Could not read transcription job control database: {}", e.getMessage());
1104 }
1105 }
1106 }
1107
1108 class ResultsFileCleanup implements Runnable {
1109
1110 @Override
1111 public void run() {
1112 logger.info("ResultsFileCleanup waking up...");
1113 try {
1114
1115 wfr.cleanupOldFilesFromCollection(TRANSCRIPT_COLLECTION, cleanupResultDays);
1116 wfr.cleanupOldFilesFromCollection(SUBMISSION_COLLECTION, cleanupResultDays);
1117 } catch (IOException e) {
1118 logger.warn("Could not cleanup old submission and transcript results files", e);
1119 }
1120 }
1121 }
1122
1123 private String removePrivateInfo(String unsafeString) {
1124 String safeString = unsafeString.replace(clientKey, "__api-key-was-hidden__");
1125 return safeString;
1126 }
1127 }