View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.transcription.microsoft.azure;
23  
24  import org.opencastproject.assetmanager.api.AssetManager;
25  import org.opencastproject.assetmanager.api.Snapshot;
26  import org.opencastproject.assetmanager.util.Workflows;
27  import org.opencastproject.job.api.AbstractJobProducer;
28  import org.opencastproject.job.api.Job;
29  import org.opencastproject.mediapackage.MediaPackageElement;
30  import org.opencastproject.mediapackage.MediaPackageElementBuilderFactory;
31  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
32  import org.opencastproject.mediapackage.MediaPackageElementParser;
33  import org.opencastproject.mediapackage.MediaPackageException;
34  import org.opencastproject.mediapackage.Track;
35  import org.opencastproject.security.api.DefaultOrganization;
36  import org.opencastproject.security.api.Organization;
37  import org.opencastproject.security.api.OrganizationDirectoryService;
38  import org.opencastproject.security.api.SecurityService;
39  import org.opencastproject.security.api.UserDirectoryService;
40  import org.opencastproject.security.util.SecurityUtil;
41  import org.opencastproject.serviceregistry.api.ServiceRegistry;
42  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
43  import org.opencastproject.systems.OpencastConstants;
44  import org.opencastproject.transcription.api.TranscriptionService;
45  import org.opencastproject.transcription.api.TranscriptionServiceException;
46  import org.opencastproject.transcription.microsoft.azure.model.MicrosoftAzureSpeechTranscription;
47  import org.opencastproject.transcription.microsoft.azure.model.MicrosoftAzureSpeechTranscriptionFile;
48  import org.opencastproject.transcription.microsoft.azure.model.MicrosoftAzureSpeechTranscriptionFiles;
49  import org.opencastproject.transcription.microsoft.azure.model.MicrosoftAzureSpeechTranscriptionJson;
50  import org.opencastproject.transcription.persistence.TranscriptionDatabase;
51  import org.opencastproject.transcription.persistence.TranscriptionDatabaseException;
52  import org.opencastproject.transcription.persistence.TranscriptionJobControl;
53  import org.opencastproject.transcription.persistence.TranscriptionProviderControl;
54  import org.opencastproject.util.NotFoundException;
55  import org.opencastproject.util.OsgiUtil;
56  import org.opencastproject.workflow.api.ConfiguredWorkflow;
57  import org.opencastproject.workflow.api.WorkflowDatabaseException;
58  import org.opencastproject.workflow.api.WorkflowDefinition;
59  import org.opencastproject.workflow.api.WorkflowInstance;
60  import org.opencastproject.workflow.api.WorkflowService;
61  import org.opencastproject.workspace.api.Workspace;
62  
63  import org.apache.commons.io.FilenameUtils;
64  import org.apache.commons.lang3.NotImplementedException;
65  import org.apache.commons.lang3.StringUtils;
66  import org.osgi.service.component.ComponentContext;
67  import org.osgi.service.component.annotations.Activate;
68  import org.osgi.service.component.annotations.Component;
69  import org.osgi.service.component.annotations.Deactivate;
70  import org.osgi.service.component.annotations.Modified;
71  import org.osgi.service.component.annotations.Reference;
72  import org.slf4j.Logger;
73  import org.slf4j.LoggerFactory;
74  
75  import java.io.File;
76  import java.io.IOException;
77  import java.net.URI;
78  import java.net.URL;
79  import java.time.Instant;
80  import java.time.temporal.ChronoUnit;
81  import java.util.ArrayList;
82  import java.util.Arrays;
83  import java.util.Date;
84  import java.util.HashMap;
85  import java.util.HashSet;
86  import java.util.List;
87  import java.util.Locale;
88  import java.util.Map;
89  import java.util.Optional;
90  import java.util.Set;
91  import java.util.concurrent.Executors;
92  import java.util.concurrent.ScheduledExecutorService;
93  import java.util.concurrent.TimeUnit;
94  
95  @Component(immediate = true, service = {
96      TranscriptionService.class, MicrosoftAzureTranscriptionService.class }, property = {
97      "service.description=Microsoft Azure Transcription Service", "provider=microsoft.azure" })
98  public class MicrosoftAzureTranscriptionService extends AbstractJobProducer implements TranscriptionService {
99  
100   private static final Logger logger = LoggerFactory.getLogger(MicrosoftAzureTranscriptionService.class);
101 
102   private static final String JOB_TYPE = "org.opencastproject.transcription.microsoft.azure";
103   private static final String PROVIDER = "microsoft-azure-speech-services";
104   private static final String DEFAULT_WORKFLOW_DEFINITION_ID = "microsoft-azure-attach-transcription";
105   private static final String DEFAULT_LANGUAGE = "en-GB";
106 
107   private static final String DEFAULT_AZURE_BLOB_PATH = "";
108   private static final String DEFAULT_AZURE_CONTAINER_NAME = "opencast-transcriptions";
109   private static final float DEFAULT_MIN_CONFIDENCE = 1.0f;
110   private static final int DEFAULT_SPLIT_TEXT_LINE_LENGTH = 100;
111   private static final String DEFAULT_OUTPUT_FILE_FORMAT = "vtt";
112   private static final String KEY_ENABLED = "enabled";
113   private static final String KEY_LANGUAGE = "language";
114   private static final String KEY_AUTO_DETECT_LANGUAGES = "auto.detect.languages";
115   private static final String KEY_WORKFLOW = "workflow";
116   private static final String KEY_AZURE_STORAGE_ACCOUNT_NAME = "azure_storage_account_name";
117   private static final String KEY_AZURE_ACCOUNT_ACCESS_KEY = "azure_account_access_key";
118   private static final String KEY_AZURE_BOLB_PATH = "azure_blob_path";
119   private static final String KEY_AZURE_CONTAINER_NAME = "azure_container_name";
120   private static final String KEY_AZURE_SPEECH_SERVICES_ENDPOINT = "azure_speech_services_endpoint";
121   private static final String KEY_COGNITIVE_SERVICES_SUBSCRIPTION_KEY = "azure_cognitive_services_subscription_key";
122   private static final String KEY_AZURE_SPEECH_RECOGNITION_MIN_CONFIDENCE = "azure_speech_recognition_min_confidence";
123   private static final String KEY_SPLIT_TEXT_LINE_LENGTH = "split.text.line.length";
124   private static final String KEY_OUTPUT_FILE_FORMAT = "output.file.format";
125 
126 
127   private AssetManager assetManager;
128   private OrganizationDirectoryService organizationDirectoryService;
129   private SecurityService securityService;
130   private ServiceRegistry serviceRegistry;
131   private TranscriptionDatabase database;
132   private UserDirectoryService userDirectoryService;
133   private WorkflowService workflowService;
134   private Workspace workspace;
135   private ScheduledExecutorService scheduledExecutorService;
136   private Workflows wfUtil;
137   private String systemAccount;
138   private boolean enabled;
139   private String language;
140   private List<String> autodetectLanguages;
141   private String workflowDefinitionId;
142   private String azureStorageAccountName;
143   private String azureAccountAccessKey;
144   private String azureBlobPath;
145   private String azureContainerName;
146   private String azureSpeechServicesEndpoint;
147   private String azureCognitiveServicesSubscriptionKey;
148   private MicrosoftAzureAuthorization azureAuthorization;
149   private MicrosoftAzureStorageClient azureStorageClient;
150   private MicrosoftAzureSpeechServicesClient azureSpeechServicesClient;
151   private Float azureSpeechRecognitionMinConfidence;
152   private Integer splitTextLineLength;
153   private String outputFileFormat;
154 
155   private enum Operation {
156     StartTranscription
157   }
158 
159   /**
160    * A public constructor, required by OSGi.
161    */
162   public MicrosoftAzureTranscriptionService() {
163     super(JOB_TYPE);
164   }
165 
166   @Activate
167   public void activate(ComponentContext cc) {
168     super.activate(cc);
169     systemAccount = OsgiUtil.getContextProperty(cc, OpencastConstants.DIGEST_USER_PROPERTY);
170     logger.debug("Activating...");
171     modified(cc);
172   }
173 
174   @Modified
175   public void modified(ComponentContext cc) {
176     logger.debug("Updating config...");
177     Optional<Boolean> enabledOpt = OsgiUtil.getOptCfgAsBoolean(cc.getProperties(), KEY_ENABLED);
178     if (enabledOpt.isPresent()) {
179       enabled = enabledOpt.get();
180     } else {
181       deactivate();
182     }
183 
184     if (!enabled) {
185       logger.info("Microsoft Azure transcription service disabled."
186           + " If you want to enable it, please update the service configuration.");
187       return;
188     }
189 
190     Optional<String> azureStorageAccountNameKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(),
191         KEY_AZURE_STORAGE_ACCOUNT_NAME);
192     if (azureStorageAccountNameKeyOpt.isPresent()) {
193       azureStorageAccountName = azureStorageAccountNameKeyOpt.get();
194     } else {
195       logger.warn("Azure storage account name key was not set. Disabling Microsoft Azure transcription service.");
196       deactivate();
197       return;
198     }
199 
200     Optional<String> azureAccountAccessKeyKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_AZURE_ACCOUNT_ACCESS_KEY);
201     if (azureAccountAccessKeyKeyOpt.isPresent()) {
202       azureAccountAccessKey = azureAccountAccessKeyKeyOpt.get();
203     } else {
204       logger.warn("Azure storage account access key was not set. Disabling Microsoft Azure transcription service.");
205       deactivate();
206       return;
207     }
208 
209     Optional<String> azureSpeechServicesKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(),
210         KEY_AZURE_SPEECH_SERVICES_ENDPOINT);
211     if (azureSpeechServicesKeyOpt.isPresent()) {
212       azureSpeechServicesEndpoint = azureSpeechServicesKeyOpt.get();
213     } else {
214       logger.warn("Azure speech services endpoint was not set. Disabling Microsoft Azure transcription service.");
215       deactivate();
216       return;
217     }
218 
219     Optional<String> azureCognitiveServicesSubscriptionKeyKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(),
220         KEY_COGNITIVE_SERVICES_SUBSCRIPTION_KEY);
221     if (azureCognitiveServicesSubscriptionKeyKeyOpt.isPresent()) {
222       azureCognitiveServicesSubscriptionKey = azureCognitiveServicesSubscriptionKeyKeyOpt.get();
223     } else {
224       logger.warn("Azure cognitive services subscription key was not set. "
225           + "Disabling Microsoft Azure transcription service.");
226       deactivate();
227       return;
228     }
229 
230     // optional values
231     Optional<String> workflowKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_WORKFLOW);
232     if (workflowKeyOpt.isPresent()) {
233       workflowDefinitionId = workflowKeyOpt.get();
234       logger.info("Workflow is set to '{}'.", workflowDefinitionId);
235     } else {
236       workflowDefinitionId = DEFAULT_WORKFLOW_DEFINITION_ID;
237       logger.info("Default workflow '{}' will be used.", workflowDefinitionId);
238     }
239 
240     Optional<String> azureBlobPathKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_AZURE_BOLB_PATH);
241     if (azureBlobPathKeyOpt.isPresent()) {
242       azureBlobPath = azureBlobPathKeyOpt.get();
243     } else {
244       logger.debug("Azure blob path was not set, using default path.");
245       azureBlobPath = DEFAULT_AZURE_BLOB_PATH;
246     }
247 
248     Optional<String> languageOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_LANGUAGE);
249     if (languageOpt.isPresent()) {
250       language = languageOpt.get();
251       logger.info("Default language is set to '{}'.", language);
252     } else {
253       language = DEFAULT_LANGUAGE;
254       logger.info("Default language '{}' will be used.", language);
255     }
256 
257     autodetectLanguages = new ArrayList<>();
258     Optional<String> autoDetectLanguagesOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_AUTO_DETECT_LANGUAGES);
259     if (languageOpt.isPresent()) {
260       for (String lang : StringUtils.split(autoDetectLanguagesOpt.get(), ",")) {
261         if (StringUtils.isNotBlank(lang)) {
262           autodetectLanguages.add(StringUtils.trimToEmpty(lang));
263         }
264       }
265     }
266 
267     Optional<String> azureContainerNameKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_AZURE_CONTAINER_NAME);
268     if (azureContainerNameKeyOpt.isPresent()) {
269       azureContainerName = azureContainerNameKeyOpt.get();
270     } else {
271       logger.debug("Azure storage container name was not set, using default path.");
272       azureContainerName = DEFAULT_AZURE_CONTAINER_NAME;
273     }
274 
275     Optional<String> azureSpeechRecognitionMinConfidenceKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(),
276         KEY_AZURE_SPEECH_RECOGNITION_MIN_CONFIDENCE);
277     if (azureSpeechRecognitionMinConfidenceKeyOpt.isPresent()) {
278       String azureSpeechRecognitionMinConfidenceStr = azureSpeechRecognitionMinConfidenceKeyOpt.get();
279       try {
280         azureSpeechRecognitionMinConfidence = Float.valueOf(azureSpeechRecognitionMinConfidenceStr);
281       } catch (NumberFormatException e) {
282         logger.error("Azure speech recognition min confidence value is not valid. "
283             + "Please set a value between 0.0 and 1.0. "
284             + "Setting to default value of {}.", DEFAULT_MIN_CONFIDENCE);
285         azureSpeechRecognitionMinConfidence = DEFAULT_MIN_CONFIDENCE;
286       }
287     } else {
288       logger.debug("Azure speech recognition min confidence value was not set. Setting to default value of {}.",
289           DEFAULT_MIN_CONFIDENCE);
290       azureSpeechRecognitionMinConfidence = DEFAULT_MIN_CONFIDENCE;
291     }
292 
293     Optional<String> splitTextLineLengthOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_SPLIT_TEXT_LINE_LENGTH);
294     if (splitTextLineLengthOpt.isPresent()) {
295       try {
296         splitTextLineLength = Integer.parseInt(splitTextLineLengthOpt.get());
297       } catch (NumberFormatException e) {
298         splitTextLineLength = DEFAULT_SPLIT_TEXT_LINE_LENGTH;
299         logger.error("Invalid configuration value for '{}'. Set default value {}.", KEY_SPLIT_TEXT_LINE_LENGTH,
300             DEFAULT_SPLIT_TEXT_LINE_LENGTH);
301       }
302     } else {
303       logger.debug("Configuration value for '{}' was not set. Setting to default value of {}.",
304           KEY_SPLIT_TEXT_LINE_LENGTH, DEFAULT_MIN_CONFIDENCE);
305       splitTextLineLength = DEFAULT_SPLIT_TEXT_LINE_LENGTH;
306     }
307 
308     Optional<String> outputFileFormatOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_OUTPUT_FILE_FORMAT);
309     if (outputFileFormatOpt.isPresent()) {
310       outputFileFormat = outputFileFormatOpt.get();
311       switch (outputFileFormat) {
312         case "srt":
313         case "vtt":
314           break;
315         default:
316           logger.debug("Azure output file format not valid, using default format {}.",
317               DEFAULT_OUTPUT_FILE_FORMAT);
318           outputFileFormat = DEFAULT_OUTPUT_FILE_FORMAT;
319           break;
320       }
321     } else {
322       logger.debug("Azure output file format not set, using default format {}.", DEFAULT_OUTPUT_FILE_FORMAT);
323       outputFileFormat = DEFAULT_OUTPUT_FILE_FORMAT;
324     }
325     logger.info("Transcription output format is set to '{}'.", outputFileFormat);
326 
327     //// create Azure storage client
328     try {
329       azureAuthorization = new MicrosoftAzureAuthorization(azureStorageAccountName, azureAccountAccessKey);
330       azureStorageClient = new MicrosoftAzureStorageClient(azureAuthorization);
331     } catch (MicrosoftAzureStorageClientException e) {
332       logger.error("Unable to create Microsoft Azure storage client. "
333           + "Deactivating Microsoft Azure Transcription service.", e);
334       deactivate();
335       return;
336     }
337 
338     // create Azure Speech Services client
339     azureSpeechServicesClient = new MicrosoftAzureSpeechServicesClient(
340         azureSpeechServicesEndpoint, azureCognitiveServicesSubscriptionKey);
341 
342     if (scheduledExecutorService != null) {
343       scheduledExecutorService.shutdown();
344       try {
345         scheduledExecutorService.awaitTermination(60, TimeUnit.SECONDS);
346       } catch (InterruptedException e) {
347         // pending task took to long
348         // pending task will be restarted on next run
349       }
350     }
351     scheduledExecutorService = Executors.newScheduledThreadPool(2);
352     scheduledExecutorService.scheduleWithFixedDelay(new WorkflowDispatcher(), 120, 120, TimeUnit.SECONDS);
353     logger.info("Activated.");
354   }
355 
356   @Deactivate
357   public void deactivate() {
358     enabled = false;
359     if (scheduledExecutorService != null) {
360       scheduledExecutorService.shutdown();
361       try {
362         scheduledExecutorService.awaitTermination(60, TimeUnit.SECONDS);
363       } catch (InterruptedException e) {
364         // pending task took to long
365         // pending task will be restarted on next run
366       }
367     }
368     azureAuthorization = null;
369     azureStorageClient = null;
370     azureSpeechServicesClient = null;
371     logger.info("Deactivated.");
372   }
373 
374   @Override
375   protected String process(Job job) throws Exception {
376     Operation op = null;
377     String operation = job.getOperation();
378     List<String> arguments = job.getArguments();
379     op = Operation.valueOf(operation);
380     switch (op) {
381       case StartTranscription:
382         long jobId = job.getId();
383         String mpId = arguments.get(0);
384         Track track = (Track) MediaPackageElementParser.getFromXml(arguments.get(1));
385         String languageCode = arguments.get(2);
386         if (StringUtils.isBlank(languageCode)) {
387           languageCode = getLanguage();
388         }
389         return createTranscriptionJob(jobId, mpId, track, languageCode);
390       default:
391         throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
392     }
393   }
394 
395   @Override
396   public Job startTranscription(String mpId, Track track) throws TranscriptionServiceException {
397     return startTranscription(mpId, track, getLanguage());
398   }
399 
400   @Override
401   public Job startTranscription(String mpId, Track track, String... args) throws TranscriptionServiceException {
402     try {
403       List<String> jobArgs = new ArrayList<>(2 + args.length);
404       jobArgs.add(mpId);
405       jobArgs.add(MediaPackageElementParser.getAsXml(track));
406       jobArgs.addAll(Arrays.asList(args));
407       return serviceRegistry.createJob(JOB_TYPE, Operation.StartTranscription.toString(), jobArgs);
408     } catch (ServiceRegistryException e) {
409       throw new TranscriptionServiceException(String.format(
410           "Unable to create transcription job for media package '%s'.", mpId), e);
411     } catch (MediaPackageException e) {
412       throw new TranscriptionServiceException(String.format(
413           "Unable to to parse track from media package '%s'.", mpId), e);
414     }
415   }
416 
417   @Override
418   public MediaPackageElement getGeneratedTranscription(String mpId, String jobId, MediaPackageElement.Type type)
419           throws TranscriptionServiceException {
420     if (type != MediaPackageElement.Type.Track && type != MediaPackageElement.Type.Attachment) {
421       throw new IllegalArgumentException("Target type must be a Track or Attachment.");
422     }
423     MicrosoftAzureSpeechTranscription transcription;
424     try {
425       transcription = azureSpeechServicesClient.getTranscriptionById(jobId);
426     } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureSpeechClientException
427              | MicrosoftAzureNotFoundException e) {
428       throw new TranscriptionServiceException(String.format(
429           "Unable to get transcription '%s' for media package '%s'.", jobId, mpId), e);
430     }
431     MicrosoftAzureSpeechTranscriptionJson transcriptionJson;
432     URI transcriptionFileUri;
433     try {
434       transcriptionJson = getTranscriptionJson(mpId, transcription);
435       transcriptionFileUri = MicrosoftAzureSpeechServicesClient.writeTranscriptionFile(transcriptionJson,
436           workspace, outputFileFormat, azureSpeechRecognitionMinConfidence, splitTextLineLength);
437     } catch (IOException | MicrosoftAzureNotFoundException e) {
438       throw new TranscriptionServiceException(String.format(
439           "Unable to download transcription file for media package '%s'.", mpId), e);
440     }
441     MediaPackageElement transcriptionElement =  MediaPackageElementBuilderFactory.newInstance().newElementBuilder()
442         .elementFromURI(transcriptionFileUri, type, new MediaPackageElementFlavor("captions", outputFileFormat));
443     if (type.equals(MediaPackageElement.Type.Track)) {
444       // apply predefined tags as documented in https://docs.opencast.org/develop/admin/#configuration/subtitles/
445       transcriptionElement.setTags(new String[] { "generator-type:auto", "generator:azure", "type:subtitle"});
446     }
447     return transcriptionElement;
448   }
449 
450   @Override
451   public void transcriptionDone(String mpId, Object results) throws TranscriptionServiceException {
452     MicrosoftAzureSpeechTranscription transcription = (MicrosoftAzureSpeechTranscription) results;
453     logger.info("Transcription job {} for media package {} done.", transcription.getID(), mpId);
454     // delete audio source files in Azure storage
455     try {
456       deleteTranscriptionSourceFiles(mpId, transcription.getID());
457     } catch (TranscriptionServiceException e) {
458       logger.warn("Unable to delete transcription source files for media package {} after transcription job done.",
459           mpId, e);
460     }
461     try {
462       database.updateJobControl(transcription.getID(), TranscriptionJobControl.Status.TranscriptionComplete.name());
463     } catch (TranscriptionDatabaseException e) {
464       throw new TranscriptionServiceException(String.format(
465           "Transcription job for media package '%s' succeeded but storing job status in the database failed."
466           , mpId), e);
467     }
468   }
469 
470   @Override
471   public void transcriptionError(String mpId, Object results) throws TranscriptionServiceException {
472     MicrosoftAzureSpeechTranscription transcription = (MicrosoftAzureSpeechTranscription) results;
473     String message = "";
474     if (transcription != null && transcription.properties != null && transcription.properties.containsKey("error")) {
475       Map<String, Object> errorInfo = (Map<String, Object>) transcription.properties.get("error");
476       message = String.format(" Microsoft error code %s: %s", errorInfo.getOrDefault("code", "UNKNOWN"),
477           errorInfo.getOrDefault("message", "No info"));
478     }
479     logger.info("Transcription job {} for media package {} failed.{}", transcription.getID(), mpId, message);
480     // delete audio source files in Azure storage
481     try {
482       deleteTranscriptionSourceFiles(mpId, transcription.getID());
483     } catch (TranscriptionServiceException e) {
484       logger.warn("Unable to delete transcription source files for media package {} after transcription kob failure.",
485           mpId, e);
486     }
487     try {
488       database.updateJobControl(transcription.getID(), TranscriptionJobControl.Status.Error.name());
489     } catch (TranscriptionDatabaseException e) {
490       throw new TranscriptionServiceException(String.format(
491           "Transcription job for media package '%s' failed and storing job status in the database failed too."
492           , mpId), e);
493     }
494   }
495 
496   @Override
497   public String getLanguage() {
498     return language;
499   }
500 
501   @Override
502   public Map<String, Object> getReturnValues(String mpId, String jobId) throws TranscriptionServiceException {
503     throw new NotImplementedException();
504   }
505 
506   public String createTranscriptionJob(long jobId, String mpId, Track track, String language)
507           throws TranscriptionServiceException {
508     // load media file into workspace
509     File trackFile;
510     try {
511       trackFile = workspace.get(track.getURI());
512     } catch (NotFoundException e) {
513       throw new TranscriptionServiceException(String.format("Track %s not found.", track.getURI()), e);
514     } catch (IOException e) {
515       throw new TranscriptionServiceException(String.format(
516           "Unable to get track %s for transcription.", track.getURI()), e);
517     }
518     // upload media file to azure storage
519     //// assure azure storage container exists
520     try {
521       azureStorageClient.createContainer(azureContainerName);
522     } catch (IOException | MicrosoftAzureStorageClientException | MicrosoftAzureNotAllowedException e) {
523       throw new TranscriptionServiceException(String.format(
524           "Unable to query or create a storage container '%s' on Microsoft Azure.", azureContainerName), e);
525     }
526     //// upload file to azure storage container
527     String azureBlobUrl;
528     try {
529       String filename = String.format("%d-%s.%s", jobId, mpId, FilenameUtils.getExtension(trackFile.getName()));
530       azureBlobUrl = azureStorageClient.uploadFile(trackFile, azureContainerName, azureBlobPath, filename);
531     } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureStorageClientException e) {
532       throw new TranscriptionServiceException(String.format(
533           "Unable to upload track %s from media package '%s' to Microsoft Azure storage container '%s'.",
534           track.getURI(), mpId, azureContainerName), e);
535     }
536     // start azure transcription job
537     List<String> contentUrls = Arrays.asList(azureBlobUrl);
538     String azureDestContainerUrl = String.format("%s?%s", azureStorageClient.getContainerUrl(azureContainerName),
539         azureAuthorization.generateServiceSasToken("cw", null, null, azureContainerName, "c"));
540     MicrosoftAzureSpeechTranscription transcription;
541     try {
542       transcription = azureSpeechServicesClient.createTranscription(contentUrls,
543           azureDestContainerUrl, String.format("Transcription job %d", jobId), language, autodetectLanguages,
544           null, null);
545       logger.info("Started transcription of {} from media package '{}' on Microsoft Azure Speech Services at {}",
546           track.getURI(), mpId, transcription.self);
547     } catch (MicrosoftAzureNotAllowedException | IOException | MicrosoftAzureSpeechClientException e) {
548       throw new TranscriptionServiceException(String.format(
549           "Unable to create transcription of track %s from media package '%s' "
550               + "in Microsoft Azure storage container '%s'.",
551           track.getURI(), mpId, azureContainerName), e);
552     }
553     // store transcription job ID and status
554     try {
555       database.storeJobControl(mpId, track.getIdentifier(), transcription.getID(),
556           TranscriptionJobControl.Status.InProgress.name(),
557           track.getDuration() == null ? 0 : track.getDuration(), new Date(), PROVIDER);
558     } catch (TranscriptionDatabaseException e) {
559       throw new TranscriptionServiceException(String.format(
560           "Unable to store transcription job of track %s from media package '%s' in the database.",
561           track.getURI(), mpId), e);
562     }
563     // return transcription job ID
564     return transcription.getID();
565   }
566 
567   MicrosoftAzureSpeechTranscriptionJson getTranscriptionJson(String mpId,
568       MicrosoftAzureSpeechTranscription transcription)
569           throws TranscriptionServiceException, MicrosoftAzureNotFoundException {
570     if (!transcription.isSucceeded()) {
571       if (transcription.isRunning()) {
572         throw new TranscriptionServiceException(String.format("Unable to get generated transcription. "
573             + "Transcription job '%s' for media package '%s' is currently running.", transcription.getID(), mpId));
574       } else if (transcription.isFailed()) {
575         throw new TranscriptionServiceException(String.format("Unable to get generated transcription. "
576             + "Transcription job '%s' for media package '%s' is failed.", transcription.getID(), mpId));
577       }
578     }
579     // query transcription files
580     MicrosoftAzureSpeechTranscriptionFiles transcriptionFiles;
581     try {
582       transcriptionFiles = azureSpeechServicesClient.getTranscriptionFilesById(transcription.getID());
583     } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureSpeechClientException e) {
584       throw new TranscriptionServiceException(String.format(
585           "Unable to get transcription files '%s' for media package '%s'.", transcription.getID(), mpId), e);
586     }
587     // download transcription file to workspace
588     MicrosoftAzureSpeechTranscriptionFile transcriptionFile = null;
589     for (MicrosoftAzureSpeechTranscriptionFile tf : transcriptionFiles.values) {
590       if (tf.isTranscriptionFile()) {
591         transcriptionFile = tf;
592         break;
593       }
594     }
595     if (transcriptionFile == null) {
596       // get more files with transcriptionFiles.nextLink
597       // TODO
598       throw new NotImplementedException("At least one transcription file should be provided.");
599     }
600 
601     try {
602       return MicrosoftAzureSpeechServicesClient
603           .getTranscriptionJson(transcriptionFile);
604     } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureSpeechClientException
605              | MicrosoftAzureNotFoundException e) {
606       throw new TranscriptionServiceException(String.format(
607           "Unable to download transcription file '%s' for media package '%s'.", transcriptionFile.self, mpId), e);
608     }
609   }
610 
611   String startWorkflow(String mpId, MicrosoftAzureSpeechTranscription transcription)
612           throws TranscriptionDatabaseException, NotFoundException, WorkflowDatabaseException,
613           TranscriptionServiceException, MicrosoftAzureNotFoundException {
614     MicrosoftAzureSpeechTranscriptionJson transcriptionJson = getTranscriptionJson(mpId, transcription);
615     String transcriptionLocale = transcriptionJson.getRecognizedLocale();
616 
617     DefaultOrganization defaultOrg = new DefaultOrganization();
618     securityService.setOrganization(defaultOrg);
619     securityService.setUser(SecurityUtil.createSystemUser(systemAccount, defaultOrg));
620 
621     // Find the episode
622     Optional<Snapshot> snapshot = assetManager.getLatestSnapshot(mpId);
623     if (snapshot.isEmpty()) {
624       // Media package not archived yet? Skip until next time.
625       logger.warn("Media package {} has not been archived yet. Skipped.", mpId);
626       return null;
627     }
628 
629     String org = snapshot.get().getOrganizationId();
630     Organization organization = organizationDirectoryService.getOrganization(org);
631     if (organization == null) {
632       logger.warn("Media package {} has an unknown organization {}. Skipped.", mpId, org);
633       return null;
634     }
635     securityService.setOrganization(organization);
636 
637     // Build workflow
638     Map<String, String> params = new HashMap<>();
639     params.put("transcriptionJobId", transcription.getID());
640     String locale = "";
641     String language = "";
642     if (StringUtils.isNotBlank(transcriptionLocale)) {
643       locale = transcriptionLocale;
644       language = Locale.forLanguageTag(transcriptionLocale).getLanguage();
645     }
646     params.put("transcriptionLocale", locale);
647     params.put("transcriptionLocaleSet", Boolean.toString(!StringUtils.isEmpty(locale)));
648     params.put("transcriptionLocaleSubtypeSuffix", !StringUtils.isEmpty(locale) ? "+" + locale : "");
649     params.put("transcriptionLocaleTag", !StringUtils.isEmpty(locale) ? "lang:" + locale : "");
650     params.put("transcriptionLanguage", language);
651     params.put("transcriptionLanguageSet", Boolean.toString(!StringUtils.isEmpty(language)));
652     params.put("transcriptionLanguageSubtypeSuffix", !StringUtils.isEmpty(language) ? "+" + language : "");
653     params.put("transcriptionLanguageTag", !StringUtils.isEmpty(language) ? "lang:" + language : "");
654     WorkflowDefinition wfDef = workflowService.getWorkflowDefinitionById(workflowDefinitionId);
655 
656     // Apply workflow
657     // wfUtil is only used by unit tests
658     Workflows workflows = wfUtil != null ? wfUtil : new Workflows(assetManager, workflowService);
659     Set<String> mpIds = new HashSet<>();
660     mpIds.add(mpId);
661     List<WorkflowInstance> wfList = workflows
662         .applyWorkflowToLatestVersion(mpIds, ConfiguredWorkflow.workflow(wfDef, params));
663     return wfList.size() > 0 ? Long.toString(wfList.get(0).getId()) : null;
664   }
665 
666   public void deleteTranscription(String mpId, String transcriptionId)
667           throws TranscriptionServiceException, TranscriptionDatabaseException {
668     TranscriptionJobControl transcriptionJobControl = database.findByJob(transcriptionId);
669     TranscriptionJobControl.Status transcriptionJobControlStatus = TranscriptionJobControl.Status.valueOf(
670         transcriptionJobControl.getStatus());
671     if (transcriptionJobControlStatus != TranscriptionJobControl.Status.Closed
672         && transcriptionJobControlStatus != TranscriptionJobControl.Status.Canceled
673         && transcriptionJobControlStatus != TranscriptionJobControl.Status.Error) {
674       throw new TranscriptionServiceException(String.format("Abort deleting transcription %s with invalid status '%s'.",
675           transcriptionId, transcriptionJobControl.getStatus()));
676     }
677     deleteTranscriptionSourceFiles(mpId, transcriptionId);
678     try {
679       azureSpeechServicesClient.deleteTranscription(transcriptionId);
680     } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureSpeechClientException e) {
681       throw new TranscriptionServiceException(String.format(
682           "Unable to delete transcription '%s' for media package '%s'.", transcriptionId, mpId), e);
683     }
684     database.deleteJobControl(transcriptionJobControl.getTranscriptionJobId());
685   }
686 
687   public void deleteTranscriptionSourceFiles(String mpId, String transcriptionId)
688           throws TranscriptionServiceException {
689     MicrosoftAzureSpeechTranscriptionFiles transcriptionFiles;
690     try {
691       transcriptionFiles = azureSpeechServicesClient.getTranscriptionFilesById(transcriptionId);
692     } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureSpeechClientException e) {
693       throw new TranscriptionServiceException(String.format(
694           "Unable to get for transcription '%s' from media package '%s'.", transcriptionId, mpId), e);
695     } catch (MicrosoftAzureNotFoundException e) {
696       // catch deleting non-existing file
697       logger.debug("Failed to get non existing transcription files from media package {} for deleting.", mpId, e);
698       return;
699     }
700     for (MicrosoftAzureSpeechTranscriptionFile transcriptionFile : transcriptionFiles.values) {
701       if (!transcriptionFile.isTranscriptionFile()) {
702         continue;
703       }
704       MicrosoftAzureSpeechTranscriptionJson transcriptionJson;
705       try {
706         transcriptionJson = MicrosoftAzureSpeechServicesClient
707             .getTranscriptionJson(transcriptionFile);
708       } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureSpeechClientException e) {
709         throw new TranscriptionServiceException(String.format(
710             "Unable to download transcription file '%s' for media package '%s'.", transcriptionFile.self, mpId), e);
711       } catch (MicrosoftAzureNotFoundException e) {
712         // catch deleting non-existing file
713         logger.debug("Failed to get non existing transcription file {} from media package {} for deleting.",
714             transcriptionFile.self, mpId, e);
715         continue;
716       }
717       if (StringUtils.isNotBlank(transcriptionJson.source)) {
718         try {
719           azureStorageClient.deleteFile(new URL(transcriptionJson.source));
720         } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureStorageClientException e) {
721           throw new TranscriptionServiceException(String.format(
722               "Unable to delete audio source file for media package %s.", mpId, e));
723         }
724       }
725     }
726   }
727 
728   class WorkflowDispatcher implements Runnable {
729 
730     @Override
731     public void run() {
732       if (!enabled) {
733         logger.debug("Service disabled, cancel processing.");
734         return;
735       }
736       logger.debug("Run jobs handling loop for transcription provider {}.", PROVIDER);
737       long providerId;
738       try {
739         TranscriptionProviderControl providerInfo = database.findIdByProvider(PROVIDER);
740         if (providerInfo != null) {
741           providerId = providerInfo.getId();
742         } else {
743           logger.debug("No jobs yet for provider {}.", PROVIDER);
744           return;
745         }
746         // handle jobs in progress
747         for (TranscriptionJobControl jobControl : database.findByStatus(
748             TranscriptionJobControl.Status.InProgress.name())) {
749           if (providerId != jobControl.getProviderId()) {
750             continue;
751           }
752           String mpId = jobControl.getMediaPackageId();
753           String transcriptionId = jobControl.getTranscriptionJobId();
754           try {
755             MicrosoftAzureSpeechTranscription transcription = azureSpeechServicesClient.getTranscriptionById(
756                 transcriptionId);
757             // check and update job status
758             if (!transcription.isRunning()) {
759               if (transcription.isFailed()) {
760                 transcriptionError(mpId, transcription);
761               } else if (transcription.isSucceeded()) {
762                 transcriptionDone(mpId, transcription);
763               }
764             }
765           } catch (MicrosoftAzureNotAllowedException | IOException | MicrosoftAzureSpeechClientException e) {
766             logger.error("Unable to get or update transcription {} or transcription file from media package {}.",
767                 transcriptionId, mpId, e);
768           } catch (MicrosoftAzureNotFoundException e) {
769             logger.warn("Transcription {} from media package {} not found.", transcriptionId, mpId);
770             database.updateJobControl(transcriptionId, TranscriptionJobControl.Status.Error.name());
771           } catch (TranscriptionServiceException e) {
772             logger.error(e.getMessage(), e);
773           }
774         }
775         // handle completed jobs
776         for (TranscriptionJobControl jobControl : database.findByStatus(
777             TranscriptionJobControl.Status.TranscriptionComplete.name())) {
778           if (providerId != jobControl.getProviderId()) {
779             continue;
780           }
781           String mpId = jobControl.getMediaPackageId();
782           String transcriptionId = jobControl.getTranscriptionJobId();
783           try {
784             MicrosoftAzureSpeechTranscription transcription = azureSpeechServicesClient.getTranscriptionById(
785                 transcriptionId);
786             // start workflow
787             String workflowId = startWorkflow(mpId, transcription);
788             // update db
789             if (workflowId != null) {
790               database.updateJobControl(transcriptionId, TranscriptionJobControl.Status.Closed.name());
791               logger.info("Attach transcription workflow {} scheduled for mp {}, microsoft azure transcription job {}",
792                   workflowId, mpId, transcriptionId);
793             }
794           } catch (MicrosoftAzureNotAllowedException | IOException
795                    | MicrosoftAzureSpeechClientException e) {
796             logger.warn("Unable to get transcription {} or transcription file from media package {}.",
797                 transcriptionId, mpId, e);
798           } catch (MicrosoftAzureNotFoundException e) {
799             logger.warn("Transcription {} from media package {} not found.", transcriptionId, mpId);
800             database.updateJobControl(transcriptionId, TranscriptionJobControl.Status.Error.name());
801           } catch (TranscriptionServiceException e) {
802             logger.warn(e.getMessage(), e);
803           } catch (NotFoundException e) {
804             logger.warn("Unable to load organization.", e);
805           } catch (IllegalStateException e) {
806             logger.debug(e.getMessage());
807           }
808         }
809         // cleanup all old jobs
810         for (TranscriptionJobControl jobControl : database.findByStatus(
811             TranscriptionJobControl.Status.Closed.name(), TranscriptionJobControl.Status.Error.name())) {
812           if (providerId != jobControl.getProviderId()) {
813             continue;
814           }
815           String mpId = jobControl.getMediaPackageId();
816           String transcriptionId = jobControl.getTranscriptionJobId();
817           if (Instant.now().minus(7, ChronoUnit.DAYS).isAfter(jobControl.getDateCreated().toInstant())) {
818             try {
819               deleteTranscription(jobControl.getMediaPackageId(), jobControl.getTranscriptionJobId());
820             } catch (TranscriptionServiceException e) {
821               logger.error("Unable to delete transcription {} or transcription files from media package {}.",
822                   transcriptionId, mpId, e);
823             }
824           }
825         }
826       } catch (TranscriptionDatabaseException e) {
827         logger.warn("Could not read or update transcription job control database", e);
828       } catch (WorkflowDatabaseException e) {
829         logger.warn("Unable to get workflow definition.", e);
830       } catch (Throwable e) {
831         // catch all
832         logger.error("Something went wrong in transcription job processing loop. Exception unhandled!!!", e);
833       }
834     }
835   }
836 
837   // Only used by unit tests!
838   void setWfUtil(Workflows wfUtil) {
839     this.wfUtil = wfUtil;
840   }
841 
842   @Override
843   protected ServiceRegistry getServiceRegistry() {
844     return serviceRegistry;
845   }
846 
847   @Override
848   protected SecurityService getSecurityService() {
849     return securityService;
850   }
851 
852   @Override
853   protected UserDirectoryService getUserDirectoryService() {
854     return userDirectoryService;
855   }
856 
857   @Override
858   protected OrganizationDirectoryService getOrganizationDirectoryService() {
859     return organizationDirectoryService;
860   }
861 
862   @Reference
863   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
864     this.serviceRegistry = serviceRegistry;
865   }
866 
867   @Reference
868   public void setSecurityService(SecurityService securityService) {
869     this.securityService = securityService;
870   }
871 
872   @Reference
873   public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
874     this.userDirectoryService = userDirectoryService;
875   }
876 
877   @Reference
878   public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
879     this.organizationDirectoryService = organizationDirectoryService;
880   }
881 
882   @Reference
883   public void setWorkspace(Workspace ws) {
884     this.workspace = ws;
885   }
886 
887 //  @Reference
888 //  public void setWorkingFileRepository(WorkingFileRepository wfr) {
889 //    this.wfr = wfr;
890 //  }
891 
892   @Reference
893   public void setDatabase(TranscriptionDatabase service) {
894     this.database = service;
895   }
896 
897   @Reference
898   public void setAssetManager(AssetManager service) {
899     this.assetManager = service;
900   }
901 
902   @Reference
903   public void setWorkflowService(WorkflowService service) {
904     this.workflowService = service;
905   }
906 }