View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.transcription.amberscript;
22  
23  import org.opencastproject.assetmanager.api.AssetManager;
24  import org.opencastproject.assetmanager.api.Snapshot;
25  import org.opencastproject.assetmanager.util.Workflows;
26  import org.opencastproject.job.api.AbstractJobProducer;
27  import org.opencastproject.job.api.Job;
28  import org.opencastproject.mediapackage.Catalog;
29  import org.opencastproject.mediapackage.MediaPackageElement;
30  import org.opencastproject.mediapackage.MediaPackageElementBuilder;
31  import org.opencastproject.mediapackage.MediaPackageElementBuilderFactory;
32  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
33  import org.opencastproject.mediapackage.MediaPackageElementParser;
34  import org.opencastproject.mediapackage.MediaPackageElements;
35  import org.opencastproject.mediapackage.MediaPackageException;
36  import org.opencastproject.mediapackage.Track;
37  import org.opencastproject.metadata.dublincore.DublinCore;
38  import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
39  import org.opencastproject.metadata.dublincore.DublinCoreValue;
40  import org.opencastproject.metadata.dublincore.DublinCores;
41  import org.opencastproject.security.api.DefaultOrganization;
42  import org.opencastproject.security.api.Organization;
43  import org.opencastproject.security.api.OrganizationDirectoryService;
44  import org.opencastproject.security.api.SecurityService;
45  import org.opencastproject.security.api.UserDirectoryService;
46  import org.opencastproject.security.util.SecurityUtil;
47  import org.opencastproject.serviceregistry.api.ServiceRegistry;
48  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
49  import org.opencastproject.systems.OpencastConstants;
50  import org.opencastproject.transcription.api.TranscriptionService;
51  import org.opencastproject.transcription.api.TranscriptionServiceException;
52  import org.opencastproject.transcription.persistence.TranscriptionDatabase;
53  import org.opencastproject.transcription.persistence.TranscriptionDatabaseException;
54  import org.opencastproject.transcription.persistence.TranscriptionJobControl;
55  import org.opencastproject.transcription.persistence.TranscriptionProviderControl;
56  import org.opencastproject.util.NotFoundException;
57  import org.opencastproject.util.OsgiUtil;
58  import org.opencastproject.workflow.api.ConfiguredWorkflow;
59  import org.opencastproject.workflow.api.WorkflowDefinition;
60  import org.opencastproject.workflow.api.WorkflowInstance;
61  import org.opencastproject.workflow.api.WorkflowService;
62  import org.opencastproject.workingfilerepository.api.WorkingFileRepository;
63  import org.opencastproject.workspace.api.Workspace;
64  
65  import org.apache.commons.lang3.StringUtils;
66  import org.apache.http.HttpEntity;
67  import org.apache.http.HttpStatus;
68  import org.apache.http.client.config.RequestConfig;
69  import org.apache.http.client.methods.CloseableHttpResponse;
70  import org.apache.http.client.methods.HttpGet;
71  import org.apache.http.client.methods.HttpPost;
72  import org.apache.http.entity.ContentType;
73  import org.apache.http.entity.mime.HttpMultipartMode;
74  import org.apache.http.entity.mime.MultipartEntityBuilder;
75  import org.apache.http.entity.mime.content.FileBody;
76  import org.apache.http.impl.client.CloseableHttpClient;
77  import org.apache.http.impl.client.HttpClientBuilder;
78  import org.apache.http.util.EntityUtils;
79  import org.json.simple.JSONObject;
80  import org.json.simple.parser.JSONParser;
81  import org.json.simple.parser.ParseException;
82  import org.osgi.service.component.ComponentContext;
83  import org.osgi.service.component.annotations.Activate;
84  import org.osgi.service.component.annotations.Component;
85  import org.osgi.service.component.annotations.Deactivate;
86  import org.osgi.service.component.annotations.Reference;
87  import org.slf4j.Logger;
88  import org.slf4j.LoggerFactory;
89  
90  import java.io.IOException;
91  import java.io.InputStream;
92  import java.net.URI;
93  import java.util.Arrays;
94  import java.util.Date;
95  import java.util.HashMap;
96  import java.util.HashSet;
97  import java.util.List;
98  import java.util.Map;
99  import java.util.Optional;
100 import java.util.Set;
101 import java.util.concurrent.Executors;
102 import java.util.concurrent.ScheduledExecutorService;
103 import java.util.concurrent.TimeUnit;
104 
105 @Component(
106     immediate = true,
107     service = { TranscriptionService.class,AmberscriptTranscriptionService.class },
108     property = {
109         "service.description=AmberScript Transcription Service",
110         "provider=amberscript"
111     }
112 )
113 public class AmberscriptTranscriptionService extends AbstractJobProducer implements TranscriptionService {
114 
115   private static final Logger logger = LoggerFactory.getLogger(AmberscriptTranscriptionService.class);
116 
117   private static final String JOB_TYPE = "org.opencastproject.transcription.amberscript";
118 
119   public static final String SUBMISSION_COLLECTION = "amberscript-submission";
120   private static final String TRANSCRIPT_COLLECTION = "amberscript-transcripts";
121 
122   private static final int CONNECTION_TIMEOUT = 60000; // ms, 1 minute
123   private static final int SOCKET_TIMEOUT = 60000; // ms, 1 minute
124 
125 
126 
127   private static final String BASE_URL = "https://qs.amberscript.com";
128   private static final String STATUS_OPEN = "OPEN";
129   private static final String STATUS_DONE = "DONE";
130   private static final String STATUS_ERROR = "ERROR";
131 
132   private static final String ERROR_NO_SPEECH = "No speech found";
133 
134   private static final String PROVIDER = "amberscript";
135 
136   private AssetManager assetManager;
137   private OrganizationDirectoryService organizationDirectoryService;
138   private ScheduledExecutorService scheduledExecutor;
139   private SecurityService securityService;
140   private ServiceRegistry serviceRegistry;
141   private TranscriptionDatabase database;
142   private UserDirectoryService userDirectoryService;
143   private WorkflowService workflowService;
144   private WorkingFileRepository wfr;
145   private Workspace workspace;
146 
147   // Only used by unit tests
148   private Workflows wfUtil;
149 
150   private enum Operation {
151     StartTranscription
152   }
153 
154   // Enum for configuring which metadata field shall be used
155   // for determining the number of speakers of an event
156   private enum SpeakerMetadataField {
157     creator, contributor, both
158   }
159 
160   // service configuration keys
161   private static final String ENABLED_CONFIG = "enabled";
162   private static final String CLIENT_KEY = "client.key";
163   private static final String LANGUAGE = "language";
164   private static final String LANGUAGE_FROM_DUBLINCORE = "language.from.dublincore";
165   private static final String LANGUAGE_CODE_MAP = "language.code.map";
166   private static final String AMBERSCRIPTJOBTYPE = "jobtype";
167   private static final String WORKFLOW_CONFIG = "workflow";
168   private static final String DISPATCH_WORKFLOW_INTERVAL_CONFIG = "workflow.dispatch.interval";
169   private static final String MAX_PROCESSING_TIME_CONFIG = "max.overdue.time";
170   private static final String CLEANUP_RESULTS_DAYS_CONFIG = "cleanup.results.days";
171   private static final String SPEAKER = "speaker";
172   private static final String SPEAKER_FROM_DUBLINCORE = "speaker.from.dublincore";
173   private static final String SPEAKER_METADATA_FIELD = "speaker.metadata.field";
174   private static final String TRANSCRIPTIONTYPE = "transcriptiontype";
175   private static final String GLOSSARY = "glossary";
176   private static final String TRANSCRIPTIONSTYLE = "cleanread";
177   private static final String TARGETLANGUAGE = "targetlanguage";
178 
179   // service configuration default values
180   private boolean enabled = false;
181   private String clientKey;
182   private String language = "en";
183   /** determines if the transcription language should be taken from the dublincore */
184   private boolean languageFromDublinCore;
185   private String amberscriptJobType = "direct";
186   private String workflowDefinitionId = "amberscript-attach-transcripts";
187   private long workflowDispatchIntervalSeconds = 60;
188   private long maxProcessingSeconds = 8 * 24 * 60 * 60; // maximum runtime for jobType perfect is 8 days
189   private int cleanupResultDays = 7;
190   private int numberOfSpeakers = 1;
191   private boolean speakerFromDublinCore = true;
192   private SpeakerMetadataField speakerMetadataField = SpeakerMetadataField.creator;
193   private String transcriptionType = "transcription";
194   private String glossary = "";
195   private String transcriptionStyle = "cleanread";
196   private String targetLanguage = "";
197 
198   /**
199    * Contains mappings from several possible ways of writing a language name/code to the
200    * corresponding amberscript language code
201    */
202   private AmberscriptLangUtil amberscriptLangUtil;
203 
204   private String systemAccount;
205 
206   public AmberscriptTranscriptionService() {
207     super(JOB_TYPE);
208   }
209 
210   @Activate
211   public void activate(ComponentContext cc) {
212 
213     Optional<Boolean> enabledOpt = OsgiUtil.getOptCfgAsBoolean(cc.getProperties(), ENABLED_CONFIG);
214     if (enabledOpt.isPresent()) {
215       enabled = enabledOpt.get();
216     }
217 
218     if (!enabled) {
219       logger.info("Amberscript Transcription Service disabled."
220               + " If you want to enable it, please update the service configuration.");
221       return;
222     }
223 
224     Optional<String> clientKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(), CLIENT_KEY);
225     if (clientKeyOpt.isPresent()) {
226       clientKey = clientKeyOpt.get();
227     } else {
228       logger.warn("API key was not set.");
229       return;
230     }
231 
232     Optional<String> languageOpt = OsgiUtil.getOptCfg(cc.getProperties(), LANGUAGE);
233     if (languageOpt.isPresent()) {
234       language = languageOpt.get();
235       logger.info("Default language is set to '{}'.", language);
236     } else {
237       logger.info("Default language '{}' will be used.", language);
238     }
239 
240     Optional<String> languageFromDublinCoreOpt = OsgiUtil.getOptCfg(cc.getProperties(), LANGUAGE_FROM_DUBLINCORE);
241     if (languageFromDublinCoreOpt.isPresent()) {
242       try {
243         languageFromDublinCore = Boolean.parseBoolean(languageFromDublinCoreOpt.get());
244       } catch (Exception e) {
245         logger.warn("Configuration value for '{}' is invalid, defaulting to false.", LANGUAGE_FROM_DUBLINCORE);
246       }
247     }
248     logger.info("Configuration value for '{}' is set to '{}'.", LANGUAGE_FROM_DUBLINCORE, languageFromDublinCore);
249 
250     amberscriptLangUtil = AmberscriptLangUtil.getInstance();
251     int customMapEntriesCount = 0;
252     Optional<String> langCodeMapOpt = OsgiUtil.getOptCfg(cc.getProperties(), LANGUAGE_CODE_MAP);
253     if (langCodeMapOpt.isPresent()) {
254       try {
255         String langCodeMapStr = langCodeMapOpt.get();
256         if (langCodeMapStr != null) {
257           for (String mapping : langCodeMapStr.split(",")) {
258             String[] mapEntries = mapping.split(":");
259             amberscriptLangUtil.addCustomMapping(mapEntries[0], mapEntries[1]);
260             customMapEntriesCount += 1;
261           }
262         }
263       } catch (Exception e) {
264         logger.warn("Configuration '{}' is invalid. Using just default mapping.", LANGUAGE_CODE_MAP);
265       }
266     }
267     logger.info("Language code map was set. Added '{}' additional entries.", customMapEntriesCount);
268 
269     Optional<String> amberscriptJobTypeOpt = OsgiUtil.getOptCfg(cc.getProperties(), AMBERSCRIPTJOBTYPE);
270     if (amberscriptJobTypeOpt.isPresent()) {
271       amberscriptJobType = amberscriptJobTypeOpt.get();
272       logger.info("Default Amberscript job type is set to '{}'.", amberscriptJobType);
273     } else {
274       logger.info("Default Amberscript job type '{}' will be used.", amberscriptJobType);
275     }
276 
277     Optional<String> wfOpt = OsgiUtil.getOptCfg(cc.getProperties(), WORKFLOW_CONFIG);
278     if (wfOpt.isPresent()) {
279       workflowDefinitionId = wfOpt.get();
280       logger.info("Workflow is set to '{}'.", workflowDefinitionId);
281     } else {
282       logger.info("Default workflow '{}' will be used.", workflowDefinitionId);
283     }
284 
285     Optional<String> intervalOpt = OsgiUtil.getOptCfg(cc.getProperties(), DISPATCH_WORKFLOW_INTERVAL_CONFIG);
286     if (intervalOpt.isPresent()) {
287       try {
288         workflowDispatchIntervalSeconds = Long.parseLong(intervalOpt.get());
289       } catch (NumberFormatException e) {
290         logger.warn("Configured '{}' is invalid. Using default.", DISPATCH_WORKFLOW_INTERVAL_CONFIG);
291       }
292     }
293     logger.info("Workflow dispatch interval is {} seconds.", workflowDispatchIntervalSeconds);
294 
295     Optional<String> maxProcessingOpt = OsgiUtil.getOptCfg(cc.getProperties(), MAX_PROCESSING_TIME_CONFIG);
296     if (maxProcessingOpt.isPresent()) {
297       try {
298         maxProcessingSeconds = Long.parseLong(maxProcessingOpt.get());
299       } catch (NumberFormatException e) {
300         logger.warn("Configured '{}' is invalid. Using default.", MAX_PROCESSING_TIME_CONFIG);
301       }
302     }
303     logger.info("Maximum processing time for transcription job is {} seconds.", maxProcessingSeconds);
304 
305     Optional<String> cleanupOpt = OsgiUtil.getOptCfg(cc.getProperties(), CLEANUP_RESULTS_DAYS_CONFIG);
306     if (cleanupOpt.isPresent()) {
307       try {
308         cleanupResultDays = Integer.parseInt(cleanupOpt.get());
309       } catch (NumberFormatException e) {
310         logger.warn("Configured '{}' is invalid. Using default.", CLEANUP_RESULTS_DAYS_CONFIG);
311       }
312     }
313     logger.info("Cleanup result files after {} days.", cleanupResultDays);
314 
315     Optional<String> speakerOpt = OsgiUtil.getOptCfg(cc.getProperties(), SPEAKER);
316     if (speakerOpt.isPresent()) {
317       try {
318         numberOfSpeakers = Integer.parseInt(speakerOpt.get());
319       } catch (NumberFormatException e) {
320         logger.warn("Configured '{}' is invalid. Using default.", SPEAKER);
321       }
322     }
323     logger.info("Default number of speakers is set to '{}'.", numberOfSpeakers);
324 
325     Optional<String> speakerFromDublinCoreOpt = OsgiUtil.getOptCfg(cc.getProperties(), SPEAKER_FROM_DUBLINCORE);
326     if (speakerFromDublinCoreOpt.isPresent()) {
327       try {
328         speakerFromDublinCore = Boolean.parseBoolean(speakerFromDublinCoreOpt.get());
329       } catch (Exception e) {
330         logger.warn("Configuration value for '{}' is invalid, defaulting to true.", SPEAKER_FROM_DUBLINCORE);
331       }
332     }
333     logger.info("Configuration value for '{}' is set to '{}'.", SPEAKER_FROM_DUBLINCORE, speakerFromDublinCore);
334 
335     Optional<String> speakerMetadataFieldOpt = OsgiUtil.getOptCfg(cc.getProperties(), SPEAKER_METADATA_FIELD);
336     if (speakerMetadataFieldOpt.isPresent()) {
337       try {
338         speakerMetadataField = SpeakerMetadataField.valueOf(speakerMetadataFieldOpt.get());
339       } catch (IllegalArgumentException e) {
340         logger.warn("Value '{}' is invalid for configuration '{}'. Using default: '{}'.",
341             speakerMetadataFieldOpt.get(), SPEAKER_METADATA_FIELD, speakerMetadataField);
342       }
343     }
344     logger.info("Default metadata field for calculating the amount of speakers is set to '{}'.", speakerMetadataField);
345 
346     Optional<String> transcriptionTypeOpt = OsgiUtil.getOptCfg(cc.getProperties(), TRANSCRIPTIONTYPE);
347     if (transcriptionTypeOpt.isPresent()) {
348       if (List.of("transcription", "captions", "translatedSubtitles").contains(transcriptionType)) {
349         transcriptionType = transcriptionTypeOpt.get();
350         logger.info("Default transcription type is set to '{}'.", transcriptionType);
351       } else {
352         logger.warn("Value '{}' is invalid for configuration '{}'. Using default: '{}'.",
353             transcriptionTypeOpt.get(), TRANSCRIPTIONTYPE, transcriptionType);
354       }
355     } else {
356       logger.info("Default transcription type '{}' will be used.", transcriptionType);
357     }
358 
359     Optional<String> glossaryOpt = OsgiUtil.getOptCfg(cc.getProperties(), GLOSSARY);
360     if (glossaryOpt.isPresent()) {
361       glossary = glossaryOpt.get();
362       logger.info("Default glossary is set to '{}'.", glossary);
363     } else {
364       logger.info("No glossary will be used by default");
365     }
366 
367     Optional<String> transcriptionStyleOpt = OsgiUtil.getOptCfg(cc.getProperties(), TRANSCRIPTIONSTYLE);
368     if (transcriptionStyleOpt.isPresent()) {
369       if (List.of("cleanread", "verbatim").contains(transcriptionStyle)) {
370         transcriptionStyle = transcriptionStyleOpt.get();
371         logger.info("Default transcription style is set to '{}'.", transcriptionStyle);
372       } else {
373         logger.warn("Value '{}' is invalid for configuration '{}'. Using default: '{}'.",
374             transcriptionStyleOpt.get(), TRANSCRIPTIONSTYLE, transcriptionStyle);
375       }
376     } else {
377       logger.info("Default transcription style '{}' will be used.", transcriptionStyle);
378     }
379 
380     Optional<String> targetLanguageOpt = OsgiUtil.getOptCfg(cc.getProperties(), TARGETLANGUAGE);
381     if (targetLanguageOpt.isPresent()) {
382       targetLanguage = targetLanguageOpt.get();
383       logger.info("Default target language is set to '{}'.", targetLanguage);
384     } else {
385       logger.info("Transcriptions won't be translated");
386     }
387 
388     systemAccount = OsgiUtil.getContextProperty(cc, OpencastConstants.DIGEST_USER_PROPERTY);
389 
390     scheduledExecutor = Executors.newScheduledThreadPool(2);
391 
392     scheduledExecutor.scheduleWithFixedDelay(new WorkflowDispatcher(), 120, workflowDispatchIntervalSeconds,
393             TimeUnit.SECONDS);
394 
395     scheduledExecutor.scheduleWithFixedDelay(new ResultsFileCleanup(), 1, 1, TimeUnit.DAYS);
396 
397     logger.info("Activated.");
398   }
399 
400   @Deactivate
401   public void deactivate() {
402     if (scheduledExecutor != null) {
403       scheduledExecutor.shutdown();
404     }
405   }
406 
407   @Override
408   public Job startTranscription(String mpId, Track track) throws TranscriptionServiceException {
409     throw new UnsupportedOperationException("Not supported.");
410   }
411 
412   @Override
413   public Job startTranscription(String mpId, Track track, String... args) throws TranscriptionServiceException {
414     if (!enabled) {
415       throw new TranscriptionServiceException("AmberScript Transcription Service disabled."
416               + " If you want to enable it, please update the service configuration.");
417     }
418 
419     String language = null;
420 
421     if (languageFromDublinCore) {
422       for (Catalog catalog : track.getMediaPackage().getCatalogs(MediaPackageElements.EPISODE)) {
423         try (InputStream in = workspace.read(catalog.getURI())) {
424           DublinCoreCatalog dublinCatalog = DublinCores.read(in);
425           String dublinCoreLang = dublinCatalog.getFirst(DublinCore.PROPERTY_LANGUAGE);
426           if (dublinCoreLang != null) {
427             language = amberscriptLangUtil.getLanguageCodeOrNull(dublinCoreLang);
428           }
429           if (language != null) {
430             break;
431           }
432         } catch (IOException | NotFoundException e) {
433           logger.error(String.format("Unable to load dublin core catalog for event '%s'",
434               track.getMediaPackage().getIdentifier()), e);
435         }
436       }
437     }
438 
439     if (language == null) {
440       if (args.length > 0 && StringUtils.isNotBlank(args[0])) {
441         language = args[0];
442       } else {
443         language = getLanguage();
444       }
445     }
446 
447     String jobType;
448     if (args.length > 1 && StringUtils.isNotBlank(args[1])) {
449       jobType = args[1];
450     } else {
451       jobType = getAmberscriptJobType();
452     }
453 
454     int numberOfSpeakers = 0;
455     if (speakerFromDublinCore) {
456       Set<String> speakers = new HashSet<>();
457       for (Catalog catalog : track.getMediaPackage().getCatalogs(MediaPackageElements.EPISODE)) {
458         try (InputStream in = workspace.read(catalog.getURI())) {
459           DublinCoreCatalog dublinCatalog = DublinCores.read(in);
460           if (speakerMetadataField.equals(SpeakerMetadataField.creator)
461                   || speakerMetadataField.equals(SpeakerMetadataField.both)) {
462             dublinCatalog.get(DublinCore.PROPERTY_CREATOR).stream()
463                     .map(DublinCoreValue::getValue).forEach(speakers::add);
464           }
465           if (speakerMetadataField.equals(SpeakerMetadataField.contributor)
466                   || speakerMetadataField.equals(SpeakerMetadataField.both)) {
467             dublinCatalog.get(DublinCore.PROPERTY_CONTRIBUTOR).stream()
468                     .map(DublinCoreValue::getValue).forEach(speakers::add);
469           }
470 
471         } catch (IOException | NotFoundException e) {
472           logger.error("Unable to load dublin core catalog for event '{}'",
473                   track.getMediaPackage().getIdentifier(), e);
474         }
475       }
476 
477       if (speakers.size() >= 1) {
478         numberOfSpeakers = speakers.size();
479       }
480     }
481 
482     if (numberOfSpeakers == 0) {
483       if (args.length > 2 && StringUtils.isNotBlank(args[2])) {
484         numberOfSpeakers = Integer.parseInt(args[2]);
485       } else {
486         numberOfSpeakers = getNumberOfSpeakers();
487       }
488     }
489 
490     String transcriptionType;
491     if (args.length > 3 && StringUtils.isNotBlank(args[3])) {
492       transcriptionType = args[3];
493     } else {
494       transcriptionType = getTranscriptionType();
495     }
496 
497     String glossary;
498     if (args.length > 4 && args[4] != null) {
499       glossary = args[4];
500     } else {
501       glossary = getGlossary();
502     }
503 
504     String transcriptionStyle;
505     if (args.length > 5 && StringUtils.isNotBlank(args[5])) {
506       transcriptionStyle = args[5];
507     } else {
508       transcriptionStyle = getTranscriptionStyle();
509     }
510 
511     String targetLanguage;
512     if (args.length > 6 && args[6] != null) {
513       targetLanguage = args[6];
514     } else {
515       targetLanguage = getTargetLanguage();
516     }
517 
518     logger.info("New transcription job for mpId '{}' language '{}' job type '{}' speakers '{}' transcription type '{}'"
519             + "glossary '{}'.", mpId, language, jobType, numberOfSpeakers, transcriptionType, glossary);
520 
521     try {
522       return serviceRegistry.createJob(JOB_TYPE, Operation.StartTranscription.name(), Arrays.asList(
523           mpId, MediaPackageElementParser.getAsXml(track), language, jobType, Integer.toString(numberOfSpeakers),
524           transcriptionType, glossary, transcriptionStyle, targetLanguage));
525     } catch (ServiceRegistryException e) {
526       throw new TranscriptionServiceException("Unable to create a job", e);
527     } catch (MediaPackageException e) {
528       throw new TranscriptionServiceException("Invalid track '" + track.toString() + "'", e);
529     }
530   }
531 
532   @Override
533   public void transcriptionDone(String mpId, Object results) { }
534 
535   private void transcriptionDone(String mpId, String jobId) {
536     try {
537       logger.info("Transcription done for mpId '{}'.", mpId);
538       if (getAndSaveJobResult(jobId)) {
539         database.updateJobControl(jobId, TranscriptionJobControl.Status.TranscriptionComplete.name());
540       } else {
541         logger.debug("Unable to get and save the transcription result for mpId '{}'.", mpId);
542       }
543     } catch (IOException | TranscriptionServiceException e) {
544       logger.warn("Could not save transcription results file for mpId '{}': {}", mpId, e.toString());
545     } catch (TranscriptionDatabaseException e) {
546       logger.warn("Transcription results file were saved but state in db not updated for mpId '{}': ", mpId, e);
547     }
548   }
549 
550   @Override
551   public void transcriptionError(String mpId, Object obj) throws TranscriptionServiceException {
552     JSONObject jsonObj = null;
553     String jobId = null;
554     try {
555       jsonObj = (JSONObject) obj;
556       jobId = (String) jsonObj.get("name");
557       // Update state in database
558       database.updateJobControl(jobId, TranscriptionJobControl.Status.Error.name());
559       TranscriptionJobControl jobControl = database.findByJob(jobId);
560       logger.warn(String.format("Error received for media package %s, job id %s",
561               jobControl.getMediaPackageId(), jobId));
562       // Send notification email
563     } catch (TranscriptionDatabaseException e) {
564       logger.warn("Transcription error. State in db could not be updated to error for mpId {}, jobId {}", mpId, jobId);
565       throw new TranscriptionServiceException("Could not update transcription job control db", e);
566     }
567   }
568 
569   @Override
570   public Map<String, Object> getReturnValues(String mpId, String jobId) throws TranscriptionServiceException {
571     throw new TranscriptionServiceException("Method not implemented");
572   }
573 
574   @Override
575   public String getLanguage() {
576     return language;
577   }
578 
579   public String getAmberscriptJobType() {
580     return amberscriptJobType;
581   }
582 
583   public int getNumberOfSpeakers() {
584     return numberOfSpeakers;
585   }
586 
587   public String getTranscriptionType() {
588     return transcriptionType;
589   }
590 
591   public String getGlossary() {
592     return glossary;
593   }
594 
595   public String getTranscriptionStyle() {
596     return transcriptionStyle;
597   }
598 
599   public String getTargetLanguage() {
600     return targetLanguage;
601   }
602 
603   // Called by workflow
604   @Override
605   protected String process(Job job) throws Exception {
606     Operation op = null;
607     String operation = job.getOperation();
608     List<String> arguments = job.getArguments();
609     String result = "";
610     op = Operation.valueOf(operation);
611     switch (op) {
612       case StartTranscription:
613         String mpId = arguments.get(0);
614         Track track = (Track) MediaPackageElementParser.getFromXml(arguments.get(1));
615         String languageCode = arguments.get(2);
616         String jobType = arguments.get(3);
617         String numberOfSpeakers = arguments.get(4);
618         String transcriptionType = arguments.get(5);
619         String glossary = arguments.get(6);
620         String transcriptionStyle = arguments.get(7);
621         String targetLanguage = arguments.get(8);
622         createRecognitionsJob(mpId, track, languageCode, jobType, numberOfSpeakers, transcriptionType, glossary,
623                 transcriptionStyle, targetLanguage);
624         break;
625       default:
626         throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
627     }
628     return result;
629   }
630 
631   void createRecognitionsJob(String mpId, Track track, String languageCode, String jobType, String numberOfSpeakers,
632           String transcriptionType, String glossary, String transcriptionStyle, String targetLanguage)
633           throws TranscriptionServiceException {
634     // Timeout 3 hours (needs to include the time for the remote service to
635     // fetch the media URL before sending final response)
636     CloseableHttpClient httpClient = makeHttpClient(3 * 3600 * 1000);
637     CloseableHttpResponse response = null;
638 
639     String submitUrl = BASE_URL + "/jobs/upload-media"
640             + "?apiKey=" + clientKey
641             + "&language=" + languageCode
642             + "&jobType=" + jobType
643             + "&numberOfSpeakers=" + numberOfSpeakers
644             + "&transcriptionType=" + transcriptionType
645             + "&transcriptionStyle=" + transcriptionStyle;
646     if (StringUtils.isNotBlank(glossary)) {
647       submitUrl += "&glossary=" + glossary;
648     }
649     if (StringUtils.isNotBlank(targetLanguage)) {
650       submitUrl += "&targetLanguage=" + targetLanguage;
651     }
652 
653     try {
654       FileBody fileBody = new FileBody(workspace.get(track.getURI()), ContentType.DEFAULT_BINARY);
655       MultipartEntityBuilder builder = MultipartEntityBuilder.create();
656       builder.setMode(HttpMultipartMode.BROWSER_COMPATIBLE);
657       builder.addPart("file", fileBody);
658       HttpEntity multipartEntity = builder.build();
659 
660       HttpPost httpPost = new HttpPost(submitUrl);
661       httpPost.setEntity(multipartEntity);
662 
663       response = httpClient.execute(httpPost);
664       int code = response.getStatusLine().getStatusCode();
665       HttpEntity entity = response.getEntity();
666 
667       String jsonString = EntityUtils.toString(response.getEntity());
668       JSONParser jsonParser = new JSONParser();
669       JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
670 
671       logger.debug("Submitting new transcription job: {}" + System.lineSeparator()
672               + "Response: {}", removePrivateInfo(submitUrl), jsonString);
673 
674       JSONObject result = (JSONObject) jsonObject.get("jobStatus");
675       String jobId = (String) result.get("jobId");
676 
677       switch (code) {
678         case HttpStatus.SC_OK: // 200
679           logger.info("mp {} has been submitted to AmberScript service with jobId {}.", mpId, jobId);
680           database.storeJobControl(mpId, track.getIdentifier(), jobId, TranscriptionJobControl.Status.InProgress.name(),
681                   track.getDuration() == null ? 0 : track.getDuration().longValue(), new Date(), PROVIDER);
682           EntityUtils.consume(entity);
683           return;
684         default:
685           String error = (String) result.get("error");
686           String message = (String) result.get("message");
687           String msg = String.format("Unable to submit job: API returned %s - %s: %s", code, error, message);
688           logger.warn(msg);
689           throw new TranscriptionServiceException(msg);
690       }
691     } catch (Exception e) {
692       logger.warn("Exception when calling the captions endpoint", e);
693       throw new TranscriptionServiceException("Exception when calling the captions endpoint", e);
694     } finally {
695       try {
696         httpClient.close();
697         if (response != null) {
698           response.close();
699         }
700       } catch (IOException e) {
701       }
702     }
703   }
704 
705   boolean checkJobResults(String jobId) throws TranscriptionServiceException {
706 
707     String mpId = "unknown";
708 
709     CloseableHttpClient httpClient = makeHttpClient();
710     CloseableHttpResponse response = null;
711 
712     String checkUrl = BASE_URL + "/jobs/status?jobId=" + jobId + "&apiKey=" + clientKey;
713 
714     try {
715       HttpGet httpGet = new HttpGet(checkUrl);
716       response = httpClient.execute(httpGet);
717       int code = response.getStatusLine().getStatusCode();
718 
719       HttpEntity entity = response.getEntity();
720       String jsonString = EntityUtils.toString(entity);
721       EntityUtils.consume(entity);
722 
723       logger.debug("AmberScript API call was '{}'." + System.lineSeparator() + "Response: {}",
724               removePrivateInfo(checkUrl), jsonString);
725 
726       JSONParser jsonParser = new JSONParser();
727       JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
728 
729       switch (code) {
730         case HttpStatus.SC_OK:
731           JSONObject result = (JSONObject) jsonObject.get("jobStatus");
732           String status = (String) result.get("status");
733           switch (status) {
734             case STATUS_OPEN:
735               logger.debug("Captions job '{}' has not finished yet.", jobId);
736               return false;
737             case STATUS_ERROR:
738               var errorMsg = (String) result.get("errorMsg");
739               throw new TranscriptionServiceException(
740                       String.format("Captions job '%s' failed: %s", jobId, errorMsg),
741                       code,
742                       ERROR_NO_SPEECH.equals(errorMsg));
743             case STATUS_DONE:
744               logger.info("Captions job '{}' has finished.", jobId);
745               TranscriptionJobControl jc = database.findByJob(jobId);
746               if (jc != null) {
747                 mpId = jc.getMediaPackageId();
748               }
749               transcriptionDone(mpId, jobId);
750               return true;
751             default:
752               return false; // only here to obey checkstyle
753           }
754         default:
755           String error = (String) jsonObject.get("error");
756           String errorMessage = (String) jsonObject.get("errorMessage");
757           logger.warn("Error while checking status: {}."
758                   + System.lineSeparator() + "{}: {}", code, error, errorMessage);
759           throw new TranscriptionServiceException(
760                   String.format("Captions job '%s' failed: Return Code %d", jobId, code), code);
761       }
762     } catch (TranscriptionDatabaseException | IOException | ParseException e) {
763       logger.warn("Error while checking status: {}", e.toString());
764     } finally {
765       try {
766         httpClient.close();
767         if (response != null) {
768           response.close();
769         }
770       } catch (IOException e) {
771       }
772     }
773     return false;
774   }
775 
776   private boolean getAndSaveJobResult(String jobId) throws TranscriptionServiceException, IOException {
777 
778     CloseableHttpClient httpClient = makeHttpClient();
779     CloseableHttpResponse response = null;
780 
781     String transcriptUrl = BASE_URL + "/jobs/export?format=srt&jobId=" + jobId + "&apiKey=" + clientKey;
782 
783     boolean done = false;
784 
785     try {
786       HttpGet httpGet = new HttpGet(transcriptUrl);
787 
788       response = httpClient.execute(httpGet);
789       int code = response.getStatusLine().getStatusCode();
790 
791       logger.debug("AmberScript API {} http response {}", removePrivateInfo(transcriptUrl), code);
792 
793       switch (code) {
794         case HttpStatus.SC_OK: // 200
795           HttpEntity entity = response.getEntity();
796           logger.info("Retrieved details for transcription with jobid: '{}'", jobId);
797 
798           // Save the result subrip (srt) file into a collection
799           workspace.putInCollection(TRANSCRIPT_COLLECTION, jobId + ".srt", entity.getContent());
800           done = true;
801           break;
802 
803         default:
804           logger.warn("Error retrieving details for transcription with jobid: '{}', return status: {}.", jobId, code);
805           break;
806       }
807     } catch (Exception e) {
808       throw new TranscriptionServiceException(String.format(
809               "Exception when calling the transcription service for jobid: %s", jobId), e);
810     } finally {
811       try {
812         httpClient.close();
813         if (response != null) {
814           response.close();
815         }
816       } catch (IOException e) {
817       }
818     }
819 
820     return done;
821   }
822 
823   @Override
824   // Called by the attach workflow operation
825   public MediaPackageElement getGeneratedTranscription(String mpId, String jobId, MediaPackageElement.Type type)
826           throws TranscriptionServiceException {
827     try {
828       // If jobId is unknown, look for all jobs associated to that mpId
829       if (jobId == null || "null".equals(jobId)) {
830         jobId = null;
831         for (TranscriptionJobControl jc : database.findByMediaPackage(mpId)) {
832           if (TranscriptionJobControl.Status.Closed.name().equals(jc.getStatus())
833                   || TranscriptionJobControl.Status.TranscriptionComplete.name().equals(jc.getStatus())) {
834             jobId = jc.getTranscriptionJobId();
835           }
836         }
837       }
838 
839       if (jobId == null) {
840         throw new TranscriptionServiceException(
841                 "No completed or closed transcription job found in database for media package " + mpId);
842       }
843 
844       // Results already saved?
845       URI uri = workspace.getCollectionURI(TRANSCRIPT_COLLECTION, jobId + ".srt");
846 
847       logger.info("Looking for transcript at URI: {}", uri);
848 
849       try {
850         workspace.get(uri);
851         logger.info("Found captions at URI: {}", uri);
852       } catch (Exception e) {
853         logger.info("Results not saved: getting from service for jobId {}", jobId);
854         // Not saved yet so call the transcription service to get the results
855         checkJobResults(jobId);
856       }
857       MediaPackageElementBuilder builder = MediaPackageElementBuilderFactory.newInstance().newElementBuilder();
858       logger.debug("Returning MPE with results file URI: {}", uri);
859       return builder.elementFromURI(uri, type, new MediaPackageElementFlavor("captions", "srt"));
860     } catch (TranscriptionDatabaseException e) {
861       throw new TranscriptionServiceException("Job id not informed and could not find transcription", e);
862     }
863   }
864 
865   /**
866    * Get mediapackage transcription status
867    *
868    * @param mpId, mediapackage id
869    * @return transcription status
870    * @throws TranscriptionServiceException
871    */
872   public String getTranscriptionStatus(String mpId) throws TranscriptionServiceException {
873     try {
874       for (TranscriptionJobControl jc : database.findByMediaPackage(mpId)) {
875         return jc.getStatus();
876       }
877     } catch (TranscriptionDatabaseException e) {
878       throw new TranscriptionServiceException("Mediapackage id transcription status unknown", e);
879     }
880     return "Unknown";
881   }
882 
883   /**
884    * Creates a closable http client with default configuration.
885    *
886    * @return closable http client
887    */
888   protected CloseableHttpClient makeHttpClient() {
889     return makeHttpClient(SOCKET_TIMEOUT);
890   }
891 
892   /**
893    * Creates a closable http client.
894    *
895    * @param socketTimeout http socket timeout value in milliseconds
896    */
897   protected CloseableHttpClient makeHttpClient(int socketTimeout) {
898     RequestConfig reqConfig = RequestConfig.custom()
899         .setConnectTimeout(AmberscriptTranscriptionService.CONNECTION_TIMEOUT)
900         .setSocketTimeout(socketTimeout)
901         .setConnectionRequestTimeout(AmberscriptTranscriptionService.CONNECTION_TIMEOUT)
902         .build();
903     CloseableHttpClient httpClient = HttpClientBuilder.create()
904         .useSystemProperties()
905         .setDefaultRequestConfig(reqConfig)
906         .build();
907     return httpClient;
908   }
909 
910   // Called when a transcription job has been submitted
911   protected void deleteStorageFile(String filename) {
912     try {
913       logger.debug("Removing {} from collection {}.", filename, SUBMISSION_COLLECTION);
914       wfr.deleteFromCollection(SUBMISSION_COLLECTION, filename, false);
915     } catch (IOException e) {
916       logger.warn("Unable to remove submission file {} from collection {}", filename, SUBMISSION_COLLECTION);
917     }
918   }
919 
920   @Reference
921   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
922     this.serviceRegistry = serviceRegistry;
923   }
924 
925   @Reference
926   public void setSecurityService(SecurityService securityService) {
927     this.securityService = securityService;
928   }
929 
930   @Reference
931   public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
932     this.userDirectoryService = userDirectoryService;
933   }
934 
935   @Reference
936   public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
937     this.organizationDirectoryService = organizationDirectoryService;
938   }
939 
940   @Reference
941   public void setWorkspace(Workspace ws) {
942     this.workspace = ws;
943   }
944 
945   @Reference
946   public void setWorkingFileRepository(WorkingFileRepository wfr) {
947     this.wfr = wfr;
948   }
949 
950   @Reference
951   public void setDatabase(TranscriptionDatabase service) {
952     this.database = service;
953   }
954 
955   @Reference
956   public void setAssetManager(AssetManager service) {
957     this.assetManager = service;
958   }
959 
960   @Reference
961   public void setWorkflowService(WorkflowService service) {
962     this.workflowService = service;
963   }
964 
965   @Override
966   protected ServiceRegistry getServiceRegistry() {
967     return serviceRegistry;
968   }
969 
970   @Override
971   protected SecurityService getSecurityService() {
972     return securityService;
973   }
974 
975   @Override
976   protected UserDirectoryService getUserDirectoryService() {
977     return userDirectoryService;
978   }
979 
980   @Override
981   protected OrganizationDirectoryService getOrganizationDirectoryService() {
982     return organizationDirectoryService;
983   }
984 
985   // Only used by unit tests!
986   void setWfUtil(Workflows wfUtil) {
987     this.wfUtil = wfUtil;
988   }
989 
990   class WorkflowDispatcher implements Runnable {
991 
992     /**
993      * {@inheritDoc}
994      *
995      * @see java.lang.Thread#run()
996      */
997     @Override
998     public void run() {
999       logger.debug("WorkflowDispatcher waking up...");
1000 
1001       try {
1002         // Find jobs that are in progress and jobs that had transcription complete
1003 
1004         long providerId;
1005         TranscriptionProviderControl providerInfo = database.findIdByProvider(PROVIDER);
1006         if (providerInfo != null) {
1007           providerId = providerInfo.getId();
1008         } else {
1009           logger.debug("No jobs yet for provider {}.", PROVIDER);
1010           return;
1011         }
1012 
1013         List<TranscriptionJobControl> jobs = database.findByStatus(TranscriptionJobControl.Status.InProgress.name(),
1014                 TranscriptionJobControl.Status.TranscriptionComplete.name());
1015 
1016         for (TranscriptionJobControl j : jobs) {
1017 
1018           // Don't process jobs for other services
1019           if (j.getProviderId() != providerId) {
1020             continue;
1021           }
1022 
1023           String mpId = j.getMediaPackageId();
1024           String jobId = j.getTranscriptionJobId();
1025 
1026           // If the job in progress, check if it should already have finished.
1027           if (TranscriptionJobControl.Status.InProgress.name().equals(j.getStatus())) {
1028             // If job should already have been completed, try to get the results.
1029             if (j.getDateExpected().getTime() < System.currentTimeMillis()) {
1030               try {
1031                 if (!checkJobResults(jobId)) {
1032                   // Job still running, not finished, so check if it should have finished more than N seconds ago
1033                   if (j.getDateExpected().getTime() + maxProcessingSeconds * 1000 < System.currentTimeMillis()) {
1034                     // Processing for too long, mark job as canceled and don't check anymore
1035                     database.updateJobControl(jobId, TranscriptionJobControl.Status.Canceled.name());
1036                   }
1037                   // else Job still running, not finished
1038                   continue;
1039                 }
1040               } catch (TranscriptionServiceException e) {
1041                 try {
1042                   database.updateJobControl(jobId, TranscriptionJobControl.Status.Canceled.name());
1043                   continue;
1044                 } catch (TranscriptionDatabaseException ex) {
1045                   logger.warn("Could not cancel job '{}'.", jobId);
1046                 }
1047               }
1048             } else {
1049               continue; // Not time to check yet
1050             }
1051           }
1052 
1053           // Jobs that get here have state TranscriptionCompleted
1054           try {
1055             DefaultOrganization defaultOrg = new DefaultOrganization();
1056             securityService.setOrganization(defaultOrg);
1057             securityService.setUser(SecurityUtil.createSystemUser(systemAccount, defaultOrg));
1058 
1059             // Find the episode
1060             Optional<Snapshot> snapshot = assetManager.getLatestSnapshot(mpId);
1061             if (snapshot.isEmpty()) {
1062               logger.warn("Media package {} no longer exists in the asset manager. It was likely deleted. "
1063                   + "Dropping the generated transcription.", mpId);
1064               database.updateJobControl(jobId, TranscriptionJobControl.Status.Error.name());
1065               continue;
1066             }
1067 
1068             String org = snapshot.get().getOrganizationId();
1069             Organization organization = organizationDirectoryService.getOrganization(org);
1070             if (organization == null) {
1071               logger.warn("Media package {} has an unknown organization {}. Skipped.", mpId, org);
1072               continue;
1073             }
1074             securityService.setOrganization(organization);
1075 
1076             // Build workflow
1077             Map<String, String> params = new HashMap<String, String>();
1078             params.put("transcriptionJobId", jobId);
1079             WorkflowDefinition wfDef = workflowService.getWorkflowDefinitionById(workflowDefinitionId);
1080 
1081             // Apply workflow
1082             // wfUtil is only used by unit tests
1083             Workflows workflows = wfUtil != null ? wfUtil : new Workflows(assetManager, workflowService);
1084             Set<String> mpIds = new HashSet<String>();
1085             mpIds.add(mpId);
1086             List<WorkflowInstance> wfList = workflows
1087                     .applyWorkflowToLatestVersion(mpIds, ConfiguredWorkflow.workflow(wfDef, params));
1088             String wfId = wfList.size() > 0 ? Long.toString(wfList.get(0).getId()) : "Unknown";
1089 
1090             // Update state in the database
1091             database.updateJobControl(jobId, TranscriptionJobControl.Status.Closed.name());
1092             logger.info("Attach transcription workflow {} scheduled for mp {}, transcription service job {}",
1093                     wfId, mpId, jobId);
1094           } catch (Exception e) {
1095             logger.warn("Attach transcription workflow could NOT be scheduled for mp {}, amberscript job {}, {}: {}",
1096                     mpId, jobId, e.getClass().getName(), e.getMessage());
1097           }
1098         }
1099       } catch (TranscriptionDatabaseException e) {
1100         logger.warn("Could not read transcription job control database: {}", e.getMessage());
1101       }
1102     }
1103   }
1104 
1105   class ResultsFileCleanup implements Runnable {
1106 
1107     @Override
1108     public void run() {
1109       logger.info("ResultsFileCleanup waking up...");
1110       try {
1111         // Cleans up results files older than CLEANUP_RESULT_FILES_DAYS days
1112         wfr.cleanupOldFilesFromCollection(TRANSCRIPT_COLLECTION, cleanupResultDays);
1113         wfr.cleanupOldFilesFromCollection(SUBMISSION_COLLECTION, cleanupResultDays);
1114       } catch (IOException e) {
1115         logger.warn("Could not cleanup old submission and transcript results files", e);
1116       }
1117     }
1118   }
1119 
1120   private String removePrivateInfo(String unsafeString) {
1121     String safeString = unsafeString.replace(clientKey, "__api-key-was-hidden__");
1122     return safeString;
1123   }
1124 }