View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.transcription.amberscript;
22  
23  import org.opencastproject.assetmanager.api.AssetManager;
24  import org.opencastproject.assetmanager.api.fn.Enrichments;
25  import org.opencastproject.assetmanager.api.query.AQueryBuilder;
26  import org.opencastproject.assetmanager.api.query.AResult;
27  import org.opencastproject.assetmanager.util.Workflows;
28  import org.opencastproject.job.api.AbstractJobProducer;
29  import org.opencastproject.job.api.Job;
30  import org.opencastproject.mediapackage.Catalog;
31  import org.opencastproject.mediapackage.MediaPackageElement;
32  import org.opencastproject.mediapackage.MediaPackageElementBuilder;
33  import org.opencastproject.mediapackage.MediaPackageElementBuilderFactory;
34  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
35  import org.opencastproject.mediapackage.MediaPackageElementParser;
36  import org.opencastproject.mediapackage.MediaPackageElements;
37  import org.opencastproject.mediapackage.MediaPackageException;
38  import org.opencastproject.mediapackage.Track;
39  import org.opencastproject.metadata.dublincore.DublinCore;
40  import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
41  import org.opencastproject.metadata.dublincore.DublinCoreValue;
42  import org.opencastproject.metadata.dublincore.DublinCores;
43  import org.opencastproject.security.api.DefaultOrganization;
44  import org.opencastproject.security.api.Organization;
45  import org.opencastproject.security.api.OrganizationDirectoryService;
46  import org.opencastproject.security.api.SecurityService;
47  import org.opencastproject.security.api.UserDirectoryService;
48  import org.opencastproject.security.util.SecurityUtil;
49  import org.opencastproject.serviceregistry.api.ServiceRegistry;
50  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
51  import org.opencastproject.systems.OpencastConstants;
52  import org.opencastproject.transcription.api.TranscriptionService;
53  import org.opencastproject.transcription.api.TranscriptionServiceException;
54  import org.opencastproject.transcription.persistence.TranscriptionDatabase;
55  import org.opencastproject.transcription.persistence.TranscriptionDatabaseException;
56  import org.opencastproject.transcription.persistence.TranscriptionJobControl;
57  import org.opencastproject.transcription.persistence.TranscriptionProviderControl;
58  import org.opencastproject.util.NotFoundException;
59  import org.opencastproject.util.OsgiUtil;
60  import org.opencastproject.util.data.Option;
61  import org.opencastproject.workflow.api.ConfiguredWorkflow;
62  import org.opencastproject.workflow.api.WorkflowDefinition;
63  import org.opencastproject.workflow.api.WorkflowInstance;
64  import org.opencastproject.workflow.api.WorkflowService;
65  import org.opencastproject.workingfilerepository.api.WorkingFileRepository;
66  import org.opencastproject.workspace.api.Workspace;
67  
68  import org.apache.commons.lang3.StringUtils;
69  import org.apache.http.HttpEntity;
70  import org.apache.http.HttpStatus;
71  import org.apache.http.client.config.RequestConfig;
72  import org.apache.http.client.methods.CloseableHttpResponse;
73  import org.apache.http.client.methods.HttpGet;
74  import org.apache.http.client.methods.HttpPost;
75  import org.apache.http.entity.ContentType;
76  import org.apache.http.entity.mime.HttpMultipartMode;
77  import org.apache.http.entity.mime.MultipartEntityBuilder;
78  import org.apache.http.entity.mime.content.FileBody;
79  import org.apache.http.impl.client.CloseableHttpClient;
80  import org.apache.http.impl.client.HttpClientBuilder;
81  import org.apache.http.util.EntityUtils;
82  import org.json.simple.JSONObject;
83  import org.json.simple.parser.JSONParser;
84  import org.json.simple.parser.ParseException;
85  import org.osgi.service.component.ComponentContext;
86  import org.osgi.service.component.annotations.Activate;
87  import org.osgi.service.component.annotations.Component;
88  import org.osgi.service.component.annotations.Deactivate;
89  import org.osgi.service.component.annotations.Reference;
90  import org.slf4j.Logger;
91  import org.slf4j.LoggerFactory;
92  
93  import java.io.IOException;
94  import java.io.InputStream;
95  import java.net.URI;
96  import java.util.Arrays;
97  import java.util.Date;
98  import java.util.HashMap;
99  import java.util.HashSet;
100 import java.util.List;
101 import java.util.Map;
102 import java.util.Set;
103 import java.util.concurrent.Executors;
104 import java.util.concurrent.ScheduledExecutorService;
105 import java.util.concurrent.TimeUnit;
106 
107 @Component(
108     immediate = true,
109     service = { TranscriptionService.class,AmberscriptTranscriptionService.class },
110     property = {
111         "service.description=AmberScript Transcription Service",
112         "provider=amberscript"
113     }
114 )
115 public class AmberscriptTranscriptionService extends AbstractJobProducer implements TranscriptionService {
116 
117   private static final Logger logger = LoggerFactory.getLogger(AmberscriptTranscriptionService.class);
118 
119   private static final String JOB_TYPE = "org.opencastproject.transcription.amberscript";
120 
121   public static final String SUBMISSION_COLLECTION = "amberscript-submission";
122   private static final String TRANSCRIPT_COLLECTION = "amberscript-transcripts";
123 
124   private static final int CONNECTION_TIMEOUT = 60000; // ms, 1 minute
125   private static final int SOCKET_TIMEOUT = 60000; // ms, 1 minute
126 
127 
128 
129   private static final String BASE_URL = "https://qs.amberscript.com";
130   private static final String STATUS_OPEN = "OPEN";
131   private static final String STATUS_DONE = "DONE";
132   private static final String STATUS_ERROR = "ERROR";
133 
134   private static final String ERROR_NO_SPEECH = "No speech found";
135 
136   private static final String PROVIDER = "amberscript";
137 
138   private AssetManager assetManager;
139   private OrganizationDirectoryService organizationDirectoryService;
140   private ScheduledExecutorService scheduledExecutor;
141   private SecurityService securityService;
142   private ServiceRegistry serviceRegistry;
143   private TranscriptionDatabase database;
144   private UserDirectoryService userDirectoryService;
145   private WorkflowService workflowService;
146   private WorkingFileRepository wfr;
147   private Workspace workspace;
148 
149   // Only used by unit tests
150   private Workflows wfUtil;
151 
152   private enum Operation {
153     StartTranscription
154   }
155 
156   // Enum for configuring which metadata field shall be used
157   // for determining the number of speakers of an event
158   private enum SpeakerMetadataField {
159     creator, contributor, both
160   }
161 
162   // service configuration keys
163   private static final String ENABLED_CONFIG = "enabled";
164   private static final String CLIENT_KEY = "client.key";
165   private static final String LANGUAGE = "language";
166   private static final String LANGUAGE_FROM_DUBLINCORE = "language.from.dublincore";
167   private static final String LANGUAGE_CODE_MAP = "language.code.map";
168   private static final String AMBERSCRIPTJOBTYPE = "jobtype";
169   private static final String WORKFLOW_CONFIG = "workflow";
170   private static final String DISPATCH_WORKFLOW_INTERVAL_CONFIG = "workflow.dispatch.interval";
171   private static final String MAX_PROCESSING_TIME_CONFIG = "max.overdue.time";
172   private static final String CLEANUP_RESULTS_DAYS_CONFIG = "cleanup.results.days";
173   private static final String SPEAKER = "speaker";
174   private static final String SPEAKER_FROM_DUBLINCORE = "speaker.from.dublincore";
175   private static final String SPEAKER_METADATA_FIELD = "speaker.metadata.field";
176   private static final String TRANSCRIPTIONTYPE = "transcriptiontype";
177   private static final String GLOSSARY = "glossary";
178   private static final String TRANSCRIPTIONSTYLE = "cleanread";
179   private static final String TARGETLANGUAGE = "targetlanguage";
180 
181   // service configuration default values
182   private boolean enabled = false;
183   private String clientKey;
184   private String language = "en";
185   /** determines if the transcription language should be taken from the dublincore */
186   private boolean languageFromDublinCore;
187   private String amberscriptJobType = "direct";
188   private String workflowDefinitionId = "amberscript-attach-transcripts";
189   private long workflowDispatchIntervalSeconds = 60;
190   private long maxProcessingSeconds = 8 * 24 * 60 * 60; // maximum runtime for jobType perfect is 8 days
191   private int cleanupResultDays = 7;
192   private int numberOfSpeakers = 1;
193   private boolean speakerFromDublinCore = true;
194   private SpeakerMetadataField speakerMetadataField = SpeakerMetadataField.creator;
195   private String transcriptionType = "transcription";
196   private String glossary = "";
197   private String transcriptionStyle = "cleanread";
198   private String targetLanguage = "";
199 
200   /**
201    * Contains mappings from several possible ways of writing a language name/code to the
202    * corresponding amberscript language code
203    */
204   private AmberscriptLangUtil amberscriptLangUtil;
205 
206   private String systemAccount;
207 
208   public AmberscriptTranscriptionService() {
209     super(JOB_TYPE);
210   }
211 
212   @Activate
213   public void activate(ComponentContext cc) {
214 
215     Option<Boolean> enabledOpt = OsgiUtil.getOptCfgAsBoolean(cc.getProperties(), ENABLED_CONFIG);
216     if (enabledOpt.isSome()) {
217       enabled = enabledOpt.get();
218     }
219 
220     if (!enabled) {
221       logger.info("Amberscript Transcription Service disabled."
222               + " If you want to enable it, please update the service configuration.");
223       return;
224     }
225 
226     Option<String> clientKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(), CLIENT_KEY);
227     if (clientKeyOpt.isSome()) {
228       clientKey = clientKeyOpt.get();
229     } else {
230       logger.warn("API key was not set.");
231       return;
232     }
233 
234     Option<String> languageOpt = OsgiUtil.getOptCfg(cc.getProperties(), LANGUAGE);
235     if (languageOpt.isSome()) {
236       language = languageOpt.get();
237       logger.info("Default language is set to '{}'.", language);
238     } else {
239       logger.info("Default language '{}' will be used.", language);
240     }
241 
242     Option<String> languageFromDublinCoreOpt = OsgiUtil.getOptCfg(cc.getProperties(), LANGUAGE_FROM_DUBLINCORE);
243     if (languageFromDublinCoreOpt.isSome()) {
244       try {
245         languageFromDublinCore = Boolean.parseBoolean(languageFromDublinCoreOpt.get());
246       } catch (Exception e) {
247         logger.warn("Configuration value for '{}' is invalid, defaulting to false.", LANGUAGE_FROM_DUBLINCORE);
248       }
249     }
250     logger.info("Configuration value for '{}' is set to '{}'.", LANGUAGE_FROM_DUBLINCORE, languageFromDublinCore);
251 
252     amberscriptLangUtil = AmberscriptLangUtil.getInstance();
253     int customMapEntriesCount = 0;
254     Option<String> langCodeMapOpt = OsgiUtil.getOptCfg(cc.getProperties(), LANGUAGE_CODE_MAP);
255     if (langCodeMapOpt.isSome()) {
256       try {
257         String langCodeMapStr = langCodeMapOpt.get();
258         if (langCodeMapStr != null) {
259           for (String mapping : langCodeMapStr.split(",")) {
260             String[] mapEntries = mapping.split(":");
261             amberscriptLangUtil.addCustomMapping(mapEntries[0], mapEntries[1]);
262             customMapEntriesCount += 1;
263           }
264         }
265       } catch (Exception e) {
266         logger.warn("Configuration '{}' is invalid. Using just default mapping.", LANGUAGE_CODE_MAP);
267       }
268     }
269     logger.info("Language code map was set. Added '{}' additional entries.", customMapEntriesCount);
270 
271     Option<String> amberscriptJobTypeOpt = OsgiUtil.getOptCfg(cc.getProperties(), AMBERSCRIPTJOBTYPE);
272     if (amberscriptJobTypeOpt.isSome()) {
273       amberscriptJobType = amberscriptJobTypeOpt.get();
274       logger.info("Default Amberscript job type is set to '{}'.", amberscriptJobType);
275     } else {
276       logger.info("Default Amberscript job type '{}' will be used.", amberscriptJobType);
277     }
278 
279     Option<String> wfOpt = OsgiUtil.getOptCfg(cc.getProperties(), WORKFLOW_CONFIG);
280     if (wfOpt.isSome()) {
281       workflowDefinitionId = wfOpt.get();
282       logger.info("Workflow is set to '{}'.", workflowDefinitionId);
283     } else {
284       logger.info("Default workflow '{}' will be used.", workflowDefinitionId);
285     }
286 
287     Option<String> intervalOpt = OsgiUtil.getOptCfg(cc.getProperties(), DISPATCH_WORKFLOW_INTERVAL_CONFIG);
288     if (intervalOpt.isSome()) {
289       try {
290         workflowDispatchIntervalSeconds = Long.parseLong(intervalOpt.get());
291       } catch (NumberFormatException e) {
292         logger.warn("Configured '{}' is invalid. Using default.", DISPATCH_WORKFLOW_INTERVAL_CONFIG);
293       }
294     }
295     logger.info("Workflow dispatch interval is {} seconds.", workflowDispatchIntervalSeconds);
296 
297     Option<String> maxProcessingOpt = OsgiUtil.getOptCfg(cc.getProperties(), MAX_PROCESSING_TIME_CONFIG);
298     if (maxProcessingOpt.isSome()) {
299       try {
300         maxProcessingSeconds = Long.parseLong(maxProcessingOpt.get());
301       } catch (NumberFormatException e) {
302         logger.warn("Configured '{}' is invalid. Using default.", MAX_PROCESSING_TIME_CONFIG);
303       }
304     }
305     logger.info("Maximum processing time for transcription job is {} seconds.", maxProcessingSeconds);
306 
307     Option<String> cleanupOpt = OsgiUtil.getOptCfg(cc.getProperties(), CLEANUP_RESULTS_DAYS_CONFIG);
308     if (cleanupOpt.isSome()) {
309       try {
310         cleanupResultDays = Integer.parseInt(cleanupOpt.get());
311       } catch (NumberFormatException e) {
312         logger.warn("Configured '{}' is invalid. Using default.", CLEANUP_RESULTS_DAYS_CONFIG);
313       }
314     }
315     logger.info("Cleanup result files after {} days.", cleanupResultDays);
316 
317     Option<String> speakerOpt = OsgiUtil.getOptCfg(cc.getProperties(), SPEAKER);
318     if (speakerOpt.isSome()) {
319       try {
320         numberOfSpeakers = Integer.parseInt(speakerOpt.get());
321       } catch (NumberFormatException e) {
322         logger.warn("Configured '{}' is invalid. Using default.", SPEAKER);
323       }
324     }
325     logger.info("Default number of speakers is set to '{}'.", numberOfSpeakers);
326 
327     Option<String> speakerFromDublinCoreOpt = OsgiUtil.getOptCfg(cc.getProperties(), SPEAKER_FROM_DUBLINCORE);
328     if (speakerFromDublinCoreOpt.isSome()) {
329       try {
330         speakerFromDublinCore = Boolean.parseBoolean(speakerFromDublinCoreOpt.get());
331       } catch (Exception e) {
332         logger.warn("Configuration value for '{}' is invalid, defaulting to true.", SPEAKER_FROM_DUBLINCORE);
333       }
334     }
335     logger.info("Configuration value for '{}' is set to '{}'.", SPEAKER_FROM_DUBLINCORE, speakerFromDublinCore);
336 
337     Option<String> speakerMetadataFieldOpt = OsgiUtil.getOptCfg(cc.getProperties(), SPEAKER_METADATA_FIELD);
338     if (speakerMetadataFieldOpt.isSome()) {
339       try {
340         speakerMetadataField = SpeakerMetadataField.valueOf(speakerMetadataFieldOpt.get());
341       } catch (IllegalArgumentException e) {
342         logger.warn("Value '{}' is invalid for configuration '{}'. Using default: '{}'.",
343             speakerMetadataFieldOpt.get(), SPEAKER_METADATA_FIELD, speakerMetadataField);
344       }
345     }
346     logger.info("Default metadata field for calculating the amount of speakers is set to '{}'.", speakerMetadataField);
347 
348     Option<String> transcriptionTypeOpt = OsgiUtil.getOptCfg(cc.getProperties(), TRANSCRIPTIONTYPE);
349     if (transcriptionTypeOpt.isSome()) {
350       if (List.of("transcription", "captions", "translatedSubtitles").contains(transcriptionType)) {
351         transcriptionType = transcriptionTypeOpt.get();
352         logger.info("Default transcription type is set to '{}'.", transcriptionType);
353       } else {
354         logger.warn("Value '{}' is invalid for configuration '{}'. Using default: '{}'.",
355             transcriptionTypeOpt.get(), TRANSCRIPTIONTYPE, transcriptionType);
356       }
357     } else {
358       logger.info("Default transcription type '{}' will be used.", transcriptionType);
359     }
360 
361     Option<String> glossaryOpt = OsgiUtil.getOptCfg(cc.getProperties(), GLOSSARY);
362     if (glossaryOpt.isSome()) {
363       glossary = glossaryOpt.get();
364       logger.info("Default glossary is set to '{}'.", glossary);
365     } else {
366       logger.info("No glossary will be used by default");
367     }
368 
369     Option<String> transcriptionStyleOpt = OsgiUtil.getOptCfg(cc.getProperties(), TRANSCRIPTIONSTYLE);
370     if (transcriptionStyleOpt.isSome()) {
371       if (List.of("cleanread", "verbatim").contains(transcriptionStyle)) {
372         transcriptionStyle = transcriptionStyleOpt.get();
373         logger.info("Default transcription style is set to '{}'.", transcriptionStyle);
374       } else {
375         logger.warn("Value '{}' is invalid for configuration '{}'. Using default: '{}'.",
376             transcriptionStyleOpt.get(), TRANSCRIPTIONSTYLE, transcriptionStyle);
377       }
378     } else {
379       logger.info("Default transcription style '{}' will be used.", transcriptionStyle);
380     }
381 
382     Option<String> targetLanguageOpt = OsgiUtil.getOptCfg(cc.getProperties(), TARGETLANGUAGE);
383     if (targetLanguageOpt.isSome()) {
384       targetLanguage = targetLanguageOpt.get();
385       logger.info("Default target language is set to '{}'.", targetLanguage);
386     } else {
387       logger.info("Transcriptions won't be translated");
388     }
389 
390     systemAccount = OsgiUtil.getContextProperty(cc, OpencastConstants.DIGEST_USER_PROPERTY);
391 
392     scheduledExecutor = Executors.newScheduledThreadPool(2);
393 
394     scheduledExecutor.scheduleWithFixedDelay(new WorkflowDispatcher(), 120, workflowDispatchIntervalSeconds,
395             TimeUnit.SECONDS);
396 
397     scheduledExecutor.scheduleWithFixedDelay(new ResultsFileCleanup(), 1, 1, TimeUnit.DAYS);
398 
399     logger.info("Activated.");
400   }
401 
402   @Deactivate
403   public void deactivate() {
404     if (scheduledExecutor != null) {
405       scheduledExecutor.shutdown();
406     }
407   }
408 
409   @Override
410   public Job startTranscription(String mpId, Track track) throws TranscriptionServiceException {
411     throw new UnsupportedOperationException("Not supported.");
412   }
413 
414   @Override
415   public Job startTranscription(String mpId, Track track, String... args) throws TranscriptionServiceException {
416     if (!enabled) {
417       throw new TranscriptionServiceException("AmberScript Transcription Service disabled."
418               + " If you want to enable it, please update the service configuration.");
419     }
420 
421     String language = null;
422 
423     if (languageFromDublinCore) {
424       for (Catalog catalog : track.getMediaPackage().getCatalogs(MediaPackageElements.EPISODE)) {
425         try (InputStream in = workspace.read(catalog.getURI())) {
426           DublinCoreCatalog dublinCatalog = DublinCores.read(in);
427           String dublinCoreLang = dublinCatalog.getFirst(DublinCore.PROPERTY_LANGUAGE);
428           if (dublinCoreLang != null) {
429             language = amberscriptLangUtil.getLanguageCodeOrNull(dublinCoreLang);
430           }
431           if (language != null) {
432             break;
433           }
434         } catch (IOException | NotFoundException e) {
435           logger.error(String.format("Unable to load dublin core catalog for event '%s'",
436               track.getMediaPackage().getIdentifier()), e);
437         }
438       }
439     }
440 
441     if (language == null) {
442       if (args.length > 0 && StringUtils.isNotBlank(args[0])) {
443         language = args[0];
444       } else {
445         language = getLanguage();
446       }
447     }
448 
449     String jobType;
450     if (args.length > 1 && StringUtils.isNotBlank(args[1])) {
451       jobType = args[1];
452     } else {
453       jobType = getAmberscriptJobType();
454     }
455 
456     int numberOfSpeakers = 0;
457     if (speakerFromDublinCore) {
458       Set<String> speakers = new HashSet<>();
459       for (Catalog catalog : track.getMediaPackage().getCatalogs(MediaPackageElements.EPISODE)) {
460         try (InputStream in = workspace.read(catalog.getURI())) {
461           DublinCoreCatalog dublinCatalog = DublinCores.read(in);
462           if (speakerMetadataField.equals(SpeakerMetadataField.creator)
463                   || speakerMetadataField.equals(SpeakerMetadataField.both)) {
464             dublinCatalog.get(DublinCore.PROPERTY_CREATOR).stream()
465                     .map(DublinCoreValue::getValue).forEach(speakers::add);
466           }
467           if (speakerMetadataField.equals(SpeakerMetadataField.contributor)
468                   || speakerMetadataField.equals(SpeakerMetadataField.both)) {
469             dublinCatalog.get(DublinCore.PROPERTY_CONTRIBUTOR).stream()
470                     .map(DublinCoreValue::getValue).forEach(speakers::add);
471           }
472 
473         } catch (IOException | NotFoundException e) {
474           logger.error("Unable to load dublin core catalog for event '{}'",
475                   track.getMediaPackage().getIdentifier(), e);
476         }
477       }
478 
479       if (speakers.size() >= 1) {
480         numberOfSpeakers = speakers.size();
481       }
482     }
483 
484     if (numberOfSpeakers == 0) {
485       if (args.length > 2 && StringUtils.isNotBlank(args[2])) {
486         numberOfSpeakers = Integer.parseInt(args[2]);
487       } else {
488         numberOfSpeakers = getNumberOfSpeakers();
489       }
490     }
491 
492     String transcriptionType;
493     if (args.length > 3 && StringUtils.isNotBlank(args[3])) {
494       transcriptionType = args[3];
495     } else {
496       transcriptionType = getTranscriptionType();
497     }
498 
499     String glossary;
500     if (args.length > 4 && args[4] != null) {
501       glossary = args[4];
502     } else {
503       glossary = getGlossary();
504     }
505 
506     String transcriptionStyle;
507     if (args.length > 5 && StringUtils.isNotBlank(args[5])) {
508       transcriptionStyle = args[5];
509     } else {
510       transcriptionStyle = getTranscriptionStyle();
511     }
512 
513     String targetLanguage;
514     if (args.length > 6 && args[6] != null) {
515       targetLanguage = args[6];
516     } else {
517       targetLanguage = getTargetLanguage();
518     }
519 
520     logger.info("New transcription job for mpId '{}' language '{}' job type '{}' speakers '{}' transcription type '{}'"
521             + "glossary '{}'.", mpId, language, jobType, numberOfSpeakers, transcriptionType, glossary);
522 
523     try {
524       return serviceRegistry.createJob(JOB_TYPE, Operation.StartTranscription.name(), Arrays.asList(
525           mpId, MediaPackageElementParser.getAsXml(track), language, jobType, Integer.toString(numberOfSpeakers),
526           transcriptionType, glossary, transcriptionStyle, targetLanguage));
527     } catch (ServiceRegistryException e) {
528       throw new TranscriptionServiceException("Unable to create a job", e);
529     } catch (MediaPackageException e) {
530       throw new TranscriptionServiceException("Invalid track '" + track.toString() + "'", e);
531     }
532   }
533 
534   @Override
535   public void transcriptionDone(String mpId, Object results) { }
536 
537   private void transcriptionDone(String mpId, String jobId) {
538     try {
539       logger.info("Transcription done for mpId '{}'.", mpId);
540       if (getAndSaveJobResult(jobId)) {
541         database.updateJobControl(jobId, TranscriptionJobControl.Status.TranscriptionComplete.name());
542       } else {
543         logger.debug("Unable to get and save the transcription result for mpId '{}'.", mpId);
544       }
545     } catch (IOException | TranscriptionServiceException e) {
546       logger.warn("Could not save transcription results file for mpId '{}': {}", mpId, e.toString());
547     } catch (TranscriptionDatabaseException e) {
548       logger.warn("Transcription results file were saved but state in db not updated for mpId '{}': ", mpId, e);
549     }
550   }
551 
552   @Override
553   public void transcriptionError(String mpId, Object obj) throws TranscriptionServiceException {
554     JSONObject jsonObj = null;
555     String jobId = null;
556     try {
557       jsonObj = (JSONObject) obj;
558       jobId = (String) jsonObj.get("name");
559       // Update state in database
560       database.updateJobControl(jobId, TranscriptionJobControl.Status.Error.name());
561       TranscriptionJobControl jobControl = database.findByJob(jobId);
562       logger.warn(String.format("Error received for media package %s, job id %s",
563               jobControl.getMediaPackageId(), jobId));
564       // Send notification email
565     } catch (TranscriptionDatabaseException e) {
566       logger.warn("Transcription error. State in db could not be updated to error for mpId {}, jobId {}", mpId, jobId);
567       throw new TranscriptionServiceException("Could not update transcription job control db", e);
568     }
569   }
570 
571   @Override
572   public Map<String, Object> getReturnValues(String mpId, String jobId) throws TranscriptionServiceException {
573     throw new TranscriptionServiceException("Method not implemented");
574   }
575 
576   @Override
577   public String getLanguage() {
578     return language;
579   }
580 
581   public String getAmberscriptJobType() {
582     return amberscriptJobType;
583   }
584 
585   public int getNumberOfSpeakers() {
586     return numberOfSpeakers;
587   }
588 
589   public String getTranscriptionType() {
590     return transcriptionType;
591   }
592 
593   public String getGlossary() {
594     return glossary;
595   }
596 
597   public String getTranscriptionStyle() {
598     return transcriptionStyle;
599   }
600 
601   public String getTargetLanguage() {
602     return targetLanguage;
603   }
604 
605   // Called by workflow
606   @Override
607   protected String process(Job job) throws Exception {
608     Operation op = null;
609     String operation = job.getOperation();
610     List<String> arguments = job.getArguments();
611     String result = "";
612     op = Operation.valueOf(operation);
613     switch (op) {
614       case StartTranscription:
615         String mpId = arguments.get(0);
616         Track track = (Track) MediaPackageElementParser.getFromXml(arguments.get(1));
617         String languageCode = arguments.get(2);
618         String jobType = arguments.get(3);
619         String numberOfSpeakers = arguments.get(4);
620         String transcriptionType = arguments.get(5);
621         String glossary = arguments.get(6);
622         String transcriptionStyle = arguments.get(7);
623         String targetLanguage = arguments.get(8);
624         createRecognitionsJob(mpId, track, languageCode, jobType, numberOfSpeakers, transcriptionType, glossary,
625                 transcriptionStyle, targetLanguage);
626         break;
627       default:
628         throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
629     }
630     return result;
631   }
632 
633   void createRecognitionsJob(String mpId, Track track, String languageCode, String jobType, String numberOfSpeakers,
634           String transcriptionType, String glossary, String transcriptionStyle, String targetLanguage)
635           throws TranscriptionServiceException {
636     // Timeout 3 hours (needs to include the time for the remote service to
637     // fetch the media URL before sending final response)
638     CloseableHttpClient httpClient = makeHttpClient(3 * 3600 * 1000);
639     CloseableHttpResponse response = null;
640 
641     String submitUrl = BASE_URL + "/jobs/upload-media"
642             + "?apiKey=" + clientKey
643             + "&language=" + languageCode
644             + "&jobType=" + jobType
645             + "&numberOfSpeakers=" + numberOfSpeakers
646             + "&transcriptionType=" + transcriptionType
647             + "&transcriptionStyle=" + transcriptionStyle;
648     if (StringUtils.isNotBlank(glossary)) {
649       submitUrl += "&glossary=" + glossary;
650     }
651     if (StringUtils.isNotBlank(targetLanguage)) {
652       submitUrl += "&targetLanguage=" + targetLanguage;
653     }
654 
655     try {
656       FileBody fileBody = new FileBody(workspace.get(track.getURI()), ContentType.DEFAULT_BINARY);
657       MultipartEntityBuilder builder = MultipartEntityBuilder.create();
658       builder.setMode(HttpMultipartMode.BROWSER_COMPATIBLE);
659       builder.addPart("file", fileBody);
660       HttpEntity multipartEntity = builder.build();
661 
662       HttpPost httpPost = new HttpPost(submitUrl);
663       httpPost.setEntity(multipartEntity);
664 
665       response = httpClient.execute(httpPost);
666       int code = response.getStatusLine().getStatusCode();
667       HttpEntity entity = response.getEntity();
668 
669       String jsonString = EntityUtils.toString(response.getEntity());
670       JSONParser jsonParser = new JSONParser();
671       JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
672 
673       logger.debug("Submitting new transcription job: {}" + System.lineSeparator()
674               + "Response: {}", removePrivateInfo(submitUrl), jsonString);
675 
676       JSONObject result = (JSONObject) jsonObject.get("jobStatus");
677       String jobId = (String) result.get("jobId");
678 
679       switch (code) {
680         case HttpStatus.SC_OK: // 200
681           logger.info("mp {} has been submitted to AmberScript service with jobId {}.", mpId, jobId);
682           database.storeJobControl(mpId, track.getIdentifier(), jobId, TranscriptionJobControl.Status.InProgress.name(),
683                   track.getDuration() == null ? 0 : track.getDuration().longValue(), new Date(), PROVIDER);
684           EntityUtils.consume(entity);
685           return;
686         default:
687           String error = (String) result.get("error");
688           String message = (String) result.get("message");
689           String msg = String.format("Unable to submit job: API returned %s - %s: %s", code, error, message);
690           logger.warn(msg);
691           throw new TranscriptionServiceException(msg);
692       }
693     } catch (Exception e) {
694       logger.warn("Exception when calling the captions endpoint", e);
695       throw new TranscriptionServiceException("Exception when calling the captions endpoint", e);
696     } finally {
697       try {
698         httpClient.close();
699         if (response != null) {
700           response.close();
701         }
702       } catch (IOException e) {
703       }
704     }
705   }
706 
707   boolean checkJobResults(String jobId) throws TranscriptionServiceException {
708 
709     String mpId = "unknown";
710 
711     CloseableHttpClient httpClient = makeHttpClient();
712     CloseableHttpResponse response = null;
713 
714     String checkUrl = BASE_URL + "/jobs/status?jobId=" + jobId + "&apiKey=" + clientKey;
715 
716     try {
717       HttpGet httpGet = new HttpGet(checkUrl);
718       response = httpClient.execute(httpGet);
719       int code = response.getStatusLine().getStatusCode();
720 
721       HttpEntity entity = response.getEntity();
722       String jsonString = EntityUtils.toString(entity);
723       EntityUtils.consume(entity);
724 
725       logger.debug("AmberScript API call was '{}'." + System.lineSeparator() + "Response: {}",
726               removePrivateInfo(checkUrl), jsonString);
727 
728       JSONParser jsonParser = new JSONParser();
729       JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
730 
731       switch (code) {
732         case HttpStatus.SC_OK:
733           JSONObject result = (JSONObject) jsonObject.get("jobStatus");
734           String status = (String) result.get("status");
735           switch (status) {
736             case STATUS_OPEN:
737               logger.debug("Captions job '{}' has not finished yet.", jobId);
738               return false;
739             case STATUS_ERROR:
740               var errorMsg = (String) result.get("errorMsg");
741               throw new TranscriptionServiceException(
742                       String.format("Captions job '%s' failed: %s", jobId, errorMsg),
743                       code,
744                       ERROR_NO_SPEECH.equals(errorMsg));
745             case STATUS_DONE:
746               logger.info("Captions job '{}' has finished.", jobId);
747               TranscriptionJobControl jc = database.findByJob(jobId);
748               if (jc != null) {
749                 mpId = jc.getMediaPackageId();
750               }
751               transcriptionDone(mpId, jobId);
752               return true;
753             default:
754               return false; // only here to obey checkstyle
755           }
756         default:
757           String error = (String) jsonObject.get("error");
758           String errorMessage = (String) jsonObject.get("errorMessage");
759           logger.warn("Error while checking status: {}."
760                   + System.lineSeparator() + "{}: {}", code, error, errorMessage);
761           throw new TranscriptionServiceException(
762                   String.format("Captions job '%s' failed: Return Code %d", jobId, code), code);
763       }
764     } catch (TranscriptionDatabaseException | IOException | ParseException e) {
765       logger.warn("Error while checking status: {}", e.toString());
766     } finally {
767       try {
768         httpClient.close();
769         if (response != null) {
770           response.close();
771         }
772       } catch (IOException e) {
773       }
774     }
775     return false;
776   }
777 
778   private boolean getAndSaveJobResult(String jobId) throws TranscriptionServiceException, IOException {
779 
780     CloseableHttpClient httpClient = makeHttpClient();
781     CloseableHttpResponse response = null;
782 
783     String transcriptUrl = BASE_URL + "/jobs/export?format=srt&jobId=" + jobId + "&apiKey=" + clientKey;
784 
785     boolean done = false;
786 
787     try {
788       HttpGet httpGet = new HttpGet(transcriptUrl);
789 
790       response = httpClient.execute(httpGet);
791       int code = response.getStatusLine().getStatusCode();
792 
793       logger.debug("AmberScript API {} http response {}", removePrivateInfo(transcriptUrl), code);
794 
795       switch (code) {
796         case HttpStatus.SC_OK: // 200
797           HttpEntity entity = response.getEntity();
798           logger.info("Retrieved details for transcription with jobid: '{}'", jobId);
799 
800           // Save the result subrip (srt) file into a collection
801           workspace.putInCollection(TRANSCRIPT_COLLECTION, jobId + ".srt", entity.getContent());
802           done = true;
803           break;
804 
805         default:
806           logger.warn("Error retrieving details for transcription with jobid: '{}', return status: {}.", jobId, code);
807           break;
808       }
809     } catch (Exception e) {
810       throw new TranscriptionServiceException(String.format(
811               "Exception when calling the transcription service for jobid: %s", jobId), e);
812     } finally {
813       try {
814         httpClient.close();
815         if (response != null) {
816           response.close();
817         }
818       } catch (IOException e) {
819       }
820     }
821 
822     return done;
823   }
824 
825   @Override
826   // Called by the attach workflow operation
827   public MediaPackageElement getGeneratedTranscription(String mpId, String jobId, MediaPackageElement.Type type)
828           throws TranscriptionServiceException {
829     try {
830       // If jobId is unknown, look for all jobs associated to that mpId
831       if (jobId == null || "null".equals(jobId)) {
832         jobId = null;
833         for (TranscriptionJobControl jc : database.findByMediaPackage(mpId)) {
834           if (TranscriptionJobControl.Status.Closed.name().equals(jc.getStatus())
835                   || TranscriptionJobControl.Status.TranscriptionComplete.name().equals(jc.getStatus())) {
836             jobId = jc.getTranscriptionJobId();
837           }
838         }
839       }
840 
841       if (jobId == null) {
842         throw new TranscriptionServiceException(
843                 "No completed or closed transcription job found in database for media package " + mpId);
844       }
845 
846       // Results already saved?
847       URI uri = workspace.getCollectionURI(TRANSCRIPT_COLLECTION, jobId + ".srt");
848 
849       logger.info("Looking for transcript at URI: {}", uri);
850 
851       try {
852         workspace.get(uri);
853         logger.info("Found captions at URI: {}", uri);
854       } catch (Exception e) {
855         logger.info("Results not saved: getting from service for jobId {}", jobId);
856         // Not saved yet so call the transcription service to get the results
857         checkJobResults(jobId);
858       }
859       MediaPackageElementBuilder builder = MediaPackageElementBuilderFactory.newInstance().newElementBuilder();
860       logger.debug("Returning MPE with results file URI: {}", uri);
861       return builder.elementFromURI(uri, type, new MediaPackageElementFlavor("captions", "srt"));
862     } catch (TranscriptionDatabaseException e) {
863       throw new TranscriptionServiceException("Job id not informed and could not find transcription", e);
864     }
865   }
866 
867   /**
868    * Get mediapackage transcription status
869    *
870    * @param mpId, mediapackage id
871    * @return transcription status
872    * @throws TranscriptionServiceException
873    */
874   public String getTranscriptionStatus(String mpId) throws TranscriptionServiceException {
875     try {
876       for (TranscriptionJobControl jc : database.findByMediaPackage(mpId)) {
877         return jc.getStatus();
878       }
879     } catch (TranscriptionDatabaseException e) {
880       throw new TranscriptionServiceException("Mediapackage id transcription status unknown", e);
881     }
882     return "Unknown";
883   }
884 
885   /**
886    * Creates a closable http client with default configuration.
887    *
888    * @return closable http client
889    */
890   protected CloseableHttpClient makeHttpClient() {
891     return makeHttpClient(SOCKET_TIMEOUT);
892   }
893 
894   /**
895    * Creates a closable http client.
896    *
897    * @param socketTimeout http socket timeout value in milliseconds
898    */
899   protected CloseableHttpClient makeHttpClient(int socketTimeout) {
900     RequestConfig reqConfig = RequestConfig.custom()
901         .setConnectTimeout(AmberscriptTranscriptionService.CONNECTION_TIMEOUT)
902         .setSocketTimeout(socketTimeout)
903         .setConnectionRequestTimeout(AmberscriptTranscriptionService.CONNECTION_TIMEOUT)
904         .build();
905     CloseableHttpClient httpClient = HttpClientBuilder.create()
906         .useSystemProperties()
907         .setDefaultRequestConfig(reqConfig)
908         .build();
909     return httpClient;
910   }
911 
912   // Called when a transcription job has been submitted
913   protected void deleteStorageFile(String filename) {
914     try {
915       logger.debug("Removing {} from collection {}.", filename, SUBMISSION_COLLECTION);
916       wfr.deleteFromCollection(SUBMISSION_COLLECTION, filename, false);
917     } catch (IOException e) {
918       logger.warn("Unable to remove submission file {} from collection {}", filename, SUBMISSION_COLLECTION);
919     }
920   }
921 
922   @Reference
923   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
924     this.serviceRegistry = serviceRegistry;
925   }
926 
927   @Reference
928   public void setSecurityService(SecurityService securityService) {
929     this.securityService = securityService;
930   }
931 
932   @Reference
933   public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
934     this.userDirectoryService = userDirectoryService;
935   }
936 
937   @Reference
938   public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
939     this.organizationDirectoryService = organizationDirectoryService;
940   }
941 
942   @Reference
943   public void setWorkspace(Workspace ws) {
944     this.workspace = ws;
945   }
946 
947   @Reference
948   public void setWorkingFileRepository(WorkingFileRepository wfr) {
949     this.wfr = wfr;
950   }
951 
952   @Reference
953   public void setDatabase(TranscriptionDatabase service) {
954     this.database = service;
955   }
956 
957   @Reference
958   public void setAssetManager(AssetManager service) {
959     this.assetManager = service;
960   }
961 
962   @Reference
963   public void setWorkflowService(WorkflowService service) {
964     this.workflowService = service;
965   }
966 
967   @Override
968   protected ServiceRegistry getServiceRegistry() {
969     return serviceRegistry;
970   }
971 
972   @Override
973   protected SecurityService getSecurityService() {
974     return securityService;
975   }
976 
977   @Override
978   protected UserDirectoryService getUserDirectoryService() {
979     return userDirectoryService;
980   }
981 
982   @Override
983   protected OrganizationDirectoryService getOrganizationDirectoryService() {
984     return organizationDirectoryService;
985   }
986 
987   // Only used by unit tests!
988   void setWfUtil(Workflows wfUtil) {
989     this.wfUtil = wfUtil;
990   }
991 
992   class WorkflowDispatcher implements Runnable {
993 
994     /**
995      * {@inheritDoc}
996      *
997      * @see java.lang.Thread#run()
998      */
999     @Override
1000     public void run() {
1001       logger.debug("WorkflowDispatcher waking up...");
1002 
1003       try {
1004         // Find jobs that are in progress and jobs that had transcription complete
1005 
1006         long providerId;
1007         TranscriptionProviderControl providerInfo = database.findIdByProvider(PROVIDER);
1008         if (providerInfo != null) {
1009           providerId = providerInfo.getId();
1010         } else {
1011           logger.debug("No jobs yet for provider {}.", PROVIDER);
1012           return;
1013         }
1014 
1015         List<TranscriptionJobControl> jobs = database.findByStatus(TranscriptionJobControl.Status.InProgress.name(),
1016                 TranscriptionJobControl.Status.TranscriptionComplete.name());
1017 
1018         for (TranscriptionJobControl j : jobs) {
1019 
1020           // Don't process jobs for other services
1021           if (j.getProviderId() != providerId) {
1022             continue;
1023           }
1024 
1025           String mpId = j.getMediaPackageId();
1026           String jobId = j.getTranscriptionJobId();
1027 
1028           // If the job in progress, check if it should already have finished.
1029           if (TranscriptionJobControl.Status.InProgress.name().equals(j.getStatus())) {
1030             // If job should already have been completed, try to get the results.
1031             if (j.getDateExpected().getTime() < System.currentTimeMillis()) {
1032               try {
1033                 if (!checkJobResults(jobId)) {
1034                   // Job still running, not finished, so check if it should have finished more than N seconds ago
1035                   if (j.getDateExpected().getTime() + maxProcessingSeconds * 1000 < System.currentTimeMillis()) {
1036                     // Processing for too long, mark job as canceled and don't check anymore
1037                     database.updateJobControl(jobId, TranscriptionJobControl.Status.Canceled.name());
1038                   }
1039                   // else Job still running, not finished
1040                   continue;
1041                 }
1042               } catch (TranscriptionServiceException e) {
1043                 try {
1044                   database.updateJobControl(jobId, TranscriptionJobControl.Status.Canceled.name());
1045                   continue;
1046                 } catch (TranscriptionDatabaseException ex) {
1047                   logger.warn("Could not cancel job '{}'.", jobId);
1048                 }
1049               }
1050             } else {
1051               continue; // Not time to check yet
1052             }
1053           }
1054 
1055           // Jobs that get here have state TranscriptionCompleted
1056           try {
1057             DefaultOrganization defaultOrg = new DefaultOrganization();
1058             securityService.setOrganization(defaultOrg);
1059             securityService.setUser(SecurityUtil.createSystemUser(systemAccount, defaultOrg));
1060 
1061             // Find the episode
1062             final AQueryBuilder q = assetManager.createQuery();
1063             final AResult r = q.select(q.snapshot()).where(q.mediaPackageId(mpId).and(q.version().isLatest())).run();
1064             if (r.getSize() == 0) {
1065               logger.warn("Media package {} no longer exists in the asset manager. It was likely deleted. "
1066                   + "Dropping the generated transcription.", mpId);
1067               database.updateJobControl(jobId, TranscriptionJobControl.Status.Error.name());
1068               continue;
1069             }
1070 
1071             String org = Enrichments.enrich(r).getSnapshots().stream().findFirst().get().getOrganizationId();
1072             Organization organization = organizationDirectoryService.getOrganization(org);
1073             if (organization == null) {
1074               logger.warn("Media package {} has an unknown organization {}. Skipped.", mpId, org);
1075               continue;
1076             }
1077             securityService.setOrganization(organization);
1078 
1079             // Build workflow
1080             Map<String, String> params = new HashMap<String, String>();
1081             params.put("transcriptionJobId", jobId);
1082             WorkflowDefinition wfDef = workflowService.getWorkflowDefinitionById(workflowDefinitionId);
1083 
1084             // Apply workflow
1085             // wfUtil is only used by unit tests
1086             Workflows workflows = wfUtil != null ? wfUtil : new Workflows(assetManager, workflowService);
1087             Set<String> mpIds = new HashSet<String>();
1088             mpIds.add(mpId);
1089             List<WorkflowInstance> wfList = workflows
1090                     .applyWorkflowToLatestVersion(mpIds, ConfiguredWorkflow.workflow(wfDef, params));
1091             String wfId = wfList.size() > 0 ? Long.toString(wfList.get(0).getId()) : "Unknown";
1092 
1093             // Update state in the database
1094             database.updateJobControl(jobId, TranscriptionJobControl.Status.Closed.name());
1095             logger.info("Attach transcription workflow {} scheduled for mp {}, transcription service job {}",
1096                     wfId, mpId, jobId);
1097           } catch (Exception e) {
1098             logger.warn("Attach transcription workflow could NOT be scheduled for mp {}, amberscript job {}, {}: {}",
1099                     mpId, jobId, e.getClass().getName(), e.getMessage());
1100           }
1101         }
1102       } catch (TranscriptionDatabaseException e) {
1103         logger.warn("Could not read transcription job control database: {}", e.getMessage());
1104       }
1105     }
1106   }
1107 
1108   class ResultsFileCleanup implements Runnable {
1109 
1110     @Override
1111     public void run() {
1112       logger.info("ResultsFileCleanup waking up...");
1113       try {
1114         // Cleans up results files older than CLEANUP_RESULT_FILES_DAYS days
1115         wfr.cleanupOldFilesFromCollection(TRANSCRIPT_COLLECTION, cleanupResultDays);
1116         wfr.cleanupOldFilesFromCollection(SUBMISSION_COLLECTION, cleanupResultDays);
1117       } catch (IOException e) {
1118         logger.warn("Could not cleanup old submission and transcript results files", e);
1119       }
1120     }
1121   }
1122 
1123   private String removePrivateInfo(String unsafeString) {
1124     String safeString = unsafeString.replace(clientKey, "__api-key-was-hidden__");
1125     return safeString;
1126   }
1127 }