View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.transcription.microsoft.azure;
23  
24  import org.opencastproject.assetmanager.api.AssetManager;
25  import org.opencastproject.assetmanager.api.fn.Enrichments;
26  import org.opencastproject.assetmanager.api.query.AQueryBuilder;
27  import org.opencastproject.assetmanager.api.query.AResult;
28  import org.opencastproject.assetmanager.util.Workflows;
29  import org.opencastproject.job.api.AbstractJobProducer;
30  import org.opencastproject.job.api.Job;
31  import org.opencastproject.mediapackage.MediaPackageElement;
32  import org.opencastproject.mediapackage.MediaPackageElementBuilderFactory;
33  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
34  import org.opencastproject.mediapackage.MediaPackageElementParser;
35  import org.opencastproject.mediapackage.MediaPackageException;
36  import org.opencastproject.mediapackage.Track;
37  import org.opencastproject.security.api.DefaultOrganization;
38  import org.opencastproject.security.api.Organization;
39  import org.opencastproject.security.api.OrganizationDirectoryService;
40  import org.opencastproject.security.api.SecurityService;
41  import org.opencastproject.security.api.UserDirectoryService;
42  import org.opencastproject.security.util.SecurityUtil;
43  import org.opencastproject.serviceregistry.api.ServiceRegistry;
44  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
45  import org.opencastproject.systems.OpencastConstants;
46  import org.opencastproject.transcription.api.TranscriptionService;
47  import org.opencastproject.transcription.api.TranscriptionServiceException;
48  import org.opencastproject.transcription.microsoft.azure.model.MicrosoftAzureSpeechTranscription;
49  import org.opencastproject.transcription.microsoft.azure.model.MicrosoftAzureSpeechTranscriptionFile;
50  import org.opencastproject.transcription.microsoft.azure.model.MicrosoftAzureSpeechTranscriptionFiles;
51  import org.opencastproject.transcription.microsoft.azure.model.MicrosoftAzureSpeechTranscriptionJson;
52  import org.opencastproject.transcription.persistence.TranscriptionDatabase;
53  import org.opencastproject.transcription.persistence.TranscriptionDatabaseException;
54  import org.opencastproject.transcription.persistence.TranscriptionJobControl;
55  import org.opencastproject.transcription.persistence.TranscriptionProviderControl;
56  import org.opencastproject.util.NotFoundException;
57  import org.opencastproject.util.OsgiUtil;
58  import org.opencastproject.util.data.Option;
59  import org.opencastproject.workflow.api.ConfiguredWorkflow;
60  import org.opencastproject.workflow.api.WorkflowDatabaseException;
61  import org.opencastproject.workflow.api.WorkflowDefinition;
62  import org.opencastproject.workflow.api.WorkflowInstance;
63  import org.opencastproject.workflow.api.WorkflowService;
64  import org.opencastproject.workspace.api.Workspace;
65  
66  import org.apache.commons.io.FilenameUtils;
67  import org.apache.commons.lang3.NotImplementedException;
68  import org.apache.commons.lang3.StringUtils;
69  import org.osgi.service.component.ComponentContext;
70  import org.osgi.service.component.annotations.Activate;
71  import org.osgi.service.component.annotations.Component;
72  import org.osgi.service.component.annotations.Deactivate;
73  import org.osgi.service.component.annotations.Modified;
74  import org.osgi.service.component.annotations.Reference;
75  import org.slf4j.Logger;
76  import org.slf4j.LoggerFactory;
77  
78  import java.io.File;
79  import java.io.IOException;
80  import java.net.URI;
81  import java.net.URL;
82  import java.time.Instant;
83  import java.time.temporal.ChronoUnit;
84  import java.util.ArrayList;
85  import java.util.Arrays;
86  import java.util.Date;
87  import java.util.HashMap;
88  import java.util.HashSet;
89  import java.util.List;
90  import java.util.Locale;
91  import java.util.Map;
92  import java.util.Set;
93  import java.util.concurrent.Executors;
94  import java.util.concurrent.ScheduledExecutorService;
95  import java.util.concurrent.TimeUnit;
96  
97  @Component(immediate = true, service = {
98      TranscriptionService.class, MicrosoftAzureTranscriptionService.class }, property = {
99      "service.description=Microsoft Azure Transcription Service", "provider=microsoft.azure" })
100 public class MicrosoftAzureTranscriptionService extends AbstractJobProducer implements TranscriptionService {
101 
102   private static final Logger logger = LoggerFactory.getLogger(MicrosoftAzureTranscriptionService.class);
103 
104   private static final String JOB_TYPE = "org.opencastproject.transcription.microsoft.azure";
105   private static final String PROVIDER = "microsoft-azure-speech-services";
106   private static final String DEFAULT_WORKFLOW_DEFINITION_ID = "microsoft-azure-attach-transcription";
107   private static final String DEFAULT_LANGUAGE = "en-GB";
108 
109   private static final String DEFAULT_AZURE_BLOB_PATH = "";
110   private static final String DEFAULT_AZURE_CONTAINER_NAME = "opencast-transcriptions";
111   private static final float DEFAULT_MIN_CONFIDENCE = 1.0f;
112   private static final int DEFAULT_SPLIT_TEXT_LINE_LENGTH = 100;
113   private static final String DEFAULT_OUTPUT_FILE_FORMAT = "vtt";
114   private static final String KEY_ENABLED = "enabled";
115   private static final String KEY_LANGUAGE = "language";
116   private static final String KEY_AUTO_DETECT_LANGUAGES = "auto.detect.languages";
117   private static final String KEY_WORKFLOW = "workflow";
118   private static final String KEY_AZURE_STORAGE_ACCOUNT_NAME = "azure_storage_account_name";
119   private static final String KEY_AZURE_ACCOUNT_ACCESS_KEY = "azure_account_access_key";
120   private static final String KEY_AZURE_BOLB_PATH = "azure_blob_path";
121   private static final String KEY_AZURE_CONTAINER_NAME = "azure_container_name";
122   private static final String KEY_AZURE_SPEECH_SERVICES_ENDPOINT = "azure_speech_services_endpoint";
123   private static final String KEY_COGNITIVE_SERVICES_SUBSCRIPTION_KEY = "azure_cognitive_services_subscription_key";
124   private static final String KEY_AZURE_SPEECH_RECOGNITION_MIN_CONFIDENCE = "azure_speech_recognition_min_confidence";
125   private static final String KEY_SPLIT_TEXT_LINE_LENGTH = "split.text.line.length";
126   private static final String KEY_OUTPUT_FILE_FORMAT = "output.file.format";
127 
128 
129   private AssetManager assetManager;
130   private OrganizationDirectoryService organizationDirectoryService;
131   private SecurityService securityService;
132   private ServiceRegistry serviceRegistry;
133   private TranscriptionDatabase database;
134   private UserDirectoryService userDirectoryService;
135   private WorkflowService workflowService;
136   private Workspace workspace;
137   private ScheduledExecutorService scheduledExecutorService;
138   private Workflows wfUtil;
139   private String systemAccount;
140   private boolean enabled;
141   private String language;
142   private List<String> autodetectLanguages;
143   private String workflowDefinitionId;
144   private String azureStorageAccountName;
145   private String azureAccountAccessKey;
146   private String azureBlobPath;
147   private String azureContainerName;
148   private String azureSpeechServicesEndpoint;
149   private String azureCognitiveServicesSubscriptionKey;
150   private MicrosoftAzureAuthorization azureAuthorization;
151   private MicrosoftAzureStorageClient azureStorageClient;
152   private MicrosoftAzureSpeechServicesClient azureSpeechServicesClient;
153   private Float azureSpeechRecognitionMinConfidence;
154   private Integer splitTextLineLength;
155   private String outputFileFormat;
156 
157   private enum Operation {
158     StartTranscription
159   }
160 
161   /**
162    * A public constructor, required by OSGi.
163    */
164   public MicrosoftAzureTranscriptionService() {
165     super(JOB_TYPE);
166   }
167 
168   @Activate
169   public void activate(ComponentContext cc) {
170     super.activate(cc);
171     systemAccount = OsgiUtil.getContextProperty(cc, OpencastConstants.DIGEST_USER_PROPERTY);
172     logger.debug("Activating...");
173     modified(cc);
174   }
175 
176   @Modified
177   public void modified(ComponentContext cc) {
178     logger.debug("Updating config...");
179     Option<Boolean> enabledOpt = OsgiUtil.getOptCfgAsBoolean(cc.getProperties(), KEY_ENABLED);
180     if (enabledOpt.isSome()) {
181       enabled = enabledOpt.get();
182     } else {
183       deactivate();
184     }
185 
186     if (!enabled) {
187       logger.info("Microsoft Azure transcription service disabled."
188           + " If you want to enable it, please update the service configuration.");
189       return;
190     }
191 
192     Option<String> azureStorageAccountNameKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(),
193         KEY_AZURE_STORAGE_ACCOUNT_NAME);
194     if (azureStorageAccountNameKeyOpt.isSome()) {
195       azureStorageAccountName = azureStorageAccountNameKeyOpt.get();
196     } else {
197       logger.warn("Azure storage account name key was not set. Disabling Microsoft Azure transcription service.");
198       deactivate();
199       return;
200     }
201 
202     Option<String> azureAccountAccessKeyKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_AZURE_ACCOUNT_ACCESS_KEY);
203     if (azureAccountAccessKeyKeyOpt.isSome()) {
204       azureAccountAccessKey = azureAccountAccessKeyKeyOpt.get();
205     } else {
206       logger.warn("Azure storage account access key was not set. Disabling Microsoft Azure transcription service.");
207       deactivate();
208       return;
209     }
210 
211     Option<String> azureSpeechServicesKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(),
212         KEY_AZURE_SPEECH_SERVICES_ENDPOINT);
213     if (azureSpeechServicesKeyOpt.isSome()) {
214       azureSpeechServicesEndpoint = azureSpeechServicesKeyOpt.get();
215     } else {
216       logger.warn("Azure speech services endpoint was not set. Disabling Microsoft Azure transcription service.");
217       deactivate();
218       return;
219     }
220 
221     Option<String> azureCognitiveServicesSubscriptionKeyKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(),
222         KEY_COGNITIVE_SERVICES_SUBSCRIPTION_KEY);
223     if (azureCognitiveServicesSubscriptionKeyKeyOpt.isSome()) {
224       azureCognitiveServicesSubscriptionKey = azureCognitiveServicesSubscriptionKeyKeyOpt.get();
225     } else {
226       logger.warn("Azure cognitive services subscription key was not set. "
227           + "Disabling Microsoft Azure transcription service.");
228       deactivate();
229       return;
230     }
231 
232     // optional values
233     Option<String> workflowKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_WORKFLOW);
234     if (workflowKeyOpt.isSome()) {
235       workflowDefinitionId = workflowKeyOpt.get();
236       logger.info("Workflow is set to '{}'.", workflowDefinitionId);
237     } else {
238       workflowDefinitionId = DEFAULT_WORKFLOW_DEFINITION_ID;
239       logger.info("Default workflow '{}' will be used.", workflowDefinitionId);
240     }
241 
242     Option<String> azureBlobPathKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_AZURE_BOLB_PATH);
243     if (azureBlobPathKeyOpt.isSome()) {
244       azureBlobPath = azureBlobPathKeyOpt.get();
245     } else {
246       logger.debug("Azure blob path was not set, using default path.");
247       azureBlobPath = DEFAULT_AZURE_BLOB_PATH;
248     }
249 
250     Option<String> languageOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_LANGUAGE);
251     if (languageOpt.isSome()) {
252       language = languageOpt.get();
253       logger.info("Default language is set to '{}'.", language);
254     } else {
255       language = DEFAULT_LANGUAGE;
256       logger.info("Default language '{}' will be used.", language);
257     }
258 
259     autodetectLanguages = new ArrayList<>();
260     Option<String> autoDetectLanguagesOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_AUTO_DETECT_LANGUAGES);
261     if (languageOpt.isSome()) {
262       for (String lang : StringUtils.split(autoDetectLanguagesOpt.get(), ",")) {
263         if (StringUtils.isNotBlank(lang)) {
264           autodetectLanguages.add(StringUtils.trimToEmpty(lang));
265         }
266       }
267     }
268 
269     Option<String> azureContainerNameKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_AZURE_CONTAINER_NAME);
270     if (azureContainerNameKeyOpt.isSome()) {
271       azureContainerName = azureContainerNameKeyOpt.get();
272     } else {
273       logger.debug("Azure storage container name was not set, using default path.");
274       azureContainerName = DEFAULT_AZURE_CONTAINER_NAME;
275     }
276 
277     Option<String> azureSpeechRecognitionMinConfidenceKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(),
278         KEY_AZURE_SPEECH_RECOGNITION_MIN_CONFIDENCE);
279     if (azureSpeechRecognitionMinConfidenceKeyOpt.isSome()) {
280       String azureSpeechRecognitionMinConfidenceStr = azureSpeechRecognitionMinConfidenceKeyOpt.get();
281       try {
282         azureSpeechRecognitionMinConfidence = Float.valueOf(azureSpeechRecognitionMinConfidenceStr);
283       } catch (NumberFormatException e) {
284         logger.error("Azure speech recognition min confidence value is not valid. "
285             + "Please set a value between 0.0 and 1.0. "
286             + "Setting to default value of {}.", DEFAULT_MIN_CONFIDENCE);
287         azureSpeechRecognitionMinConfidence = DEFAULT_MIN_CONFIDENCE;
288       }
289     } else {
290       logger.debug("Azure speech recognition min confidence value was not set. Setting to default value of {}.",
291           DEFAULT_MIN_CONFIDENCE);
292       azureSpeechRecognitionMinConfidence = DEFAULT_MIN_CONFIDENCE;
293     }
294 
295     Option<String> splitTextLineLengthOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_SPLIT_TEXT_LINE_LENGTH);
296     if (splitTextLineLengthOpt.isSome()) {
297       try {
298         splitTextLineLength = Integer.parseInt(splitTextLineLengthOpt.get());
299       } catch (NumberFormatException e) {
300         splitTextLineLength = DEFAULT_SPLIT_TEXT_LINE_LENGTH;
301         logger.error("Invalid configuration value for '{}'. Set default value {}.", KEY_SPLIT_TEXT_LINE_LENGTH,
302             DEFAULT_SPLIT_TEXT_LINE_LENGTH);
303       }
304     } else {
305       logger.debug("Configuration value for '{}' was not set. Setting to default value of {}.",
306           KEY_SPLIT_TEXT_LINE_LENGTH, DEFAULT_MIN_CONFIDENCE);
307       splitTextLineLength = DEFAULT_SPLIT_TEXT_LINE_LENGTH;
308     }
309 
310     Option<String> outputFileFormatOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_OUTPUT_FILE_FORMAT);
311     if (outputFileFormatOpt.isSome()) {
312       outputFileFormat = outputFileFormatOpt.get();
313       switch (outputFileFormat) {
314         case "srt":
315         case "vtt":
316           break;
317         default:
318           logger.debug("Azure output file format not valid, using default format {}.",
319               DEFAULT_OUTPUT_FILE_FORMAT);
320           outputFileFormat = DEFAULT_OUTPUT_FILE_FORMAT;
321           break;
322       }
323     } else {
324       logger.debug("Azure output file format not set, using default format {}.", DEFAULT_OUTPUT_FILE_FORMAT);
325       outputFileFormat = DEFAULT_OUTPUT_FILE_FORMAT;
326     }
327     logger.info("Transcription output format is set to '{}'.", outputFileFormat);
328 
329     //// create Azure storage client
330     try {
331       azureAuthorization = new MicrosoftAzureAuthorization(azureStorageAccountName, azureAccountAccessKey);
332       azureStorageClient = new MicrosoftAzureStorageClient(azureAuthorization);
333     } catch (MicrosoftAzureStorageClientException e) {
334       logger.error("Unable to create Microsoft Azure storage client. "
335           + "Deactivating Microsoft Azure Transcription service.", e);
336       deactivate();
337       return;
338     }
339 
340     // create Azure Speech Services client
341     azureSpeechServicesClient = new MicrosoftAzureSpeechServicesClient(
342         azureSpeechServicesEndpoint, azureCognitiveServicesSubscriptionKey);
343 
344     if (scheduledExecutorService != null) {
345       scheduledExecutorService.shutdown();
346       try {
347         scheduledExecutorService.awaitTermination(60, TimeUnit.SECONDS);
348       } catch (InterruptedException e) {
349         // pending task took to long
350         // pending task will be restarted on next run
351       }
352     }
353     scheduledExecutorService = Executors.newScheduledThreadPool(2);
354     scheduledExecutorService.scheduleWithFixedDelay(new WorkflowDispatcher(), 120, 120, TimeUnit.SECONDS);
355     logger.info("Activated.");
356   }
357 
358   @Deactivate
359   public void deactivate() {
360     enabled = false;
361     if (scheduledExecutorService != null) {
362       scheduledExecutorService.shutdown();
363       try {
364         scheduledExecutorService.awaitTermination(60, TimeUnit.SECONDS);
365       } catch (InterruptedException e) {
366         // pending task took to long
367         // pending task will be restarted on next run
368       }
369     }
370     azureAuthorization = null;
371     azureStorageClient = null;
372     azureSpeechServicesClient = null;
373     logger.info("Deactivated.");
374   }
375 
376   @Override
377   protected String process(Job job) throws Exception {
378     Operation op = null;
379     String operation = job.getOperation();
380     List<String> arguments = job.getArguments();
381     op = Operation.valueOf(operation);
382     switch (op) {
383       case StartTranscription:
384         long jobId = job.getId();
385         String mpId = arguments.get(0);
386         Track track = (Track) MediaPackageElementParser.getFromXml(arguments.get(1));
387         String languageCode = arguments.get(2);
388         if (StringUtils.isBlank(languageCode)) {
389           languageCode = getLanguage();
390         }
391         return createTranscriptionJob(jobId, mpId, track, languageCode);
392       default:
393         throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
394     }
395   }
396 
397   @Override
398   public Job startTranscription(String mpId, Track track) throws TranscriptionServiceException {
399     return startTranscription(mpId, track, getLanguage());
400   }
401 
402   @Override
403   public Job startTranscription(String mpId, Track track, String... args) throws TranscriptionServiceException {
404     try {
405       List<String> jobArgs = new ArrayList<>(2 + args.length);
406       jobArgs.add(mpId);
407       jobArgs.add(MediaPackageElementParser.getAsXml(track));
408       jobArgs.addAll(Arrays.asList(args));
409       return serviceRegistry.createJob(JOB_TYPE, Operation.StartTranscription.toString(), jobArgs);
410     } catch (ServiceRegistryException e) {
411       throw new TranscriptionServiceException(String.format(
412           "Unable to create transcription job for media package '%s'.", mpId), e);
413     } catch (MediaPackageException e) {
414       throw new TranscriptionServiceException(String.format(
415           "Unable to to parse track from media package '%s'.", mpId), e);
416     }
417   }
418 
419   @Override
420   public MediaPackageElement getGeneratedTranscription(String mpId, String jobId, MediaPackageElement.Type type)
421           throws TranscriptionServiceException {
422     if (type != MediaPackageElement.Type.Track && type != MediaPackageElement.Type.Attachment) {
423       throw new IllegalArgumentException("Target type must be a Track or Attachment.");
424     }
425     MicrosoftAzureSpeechTranscription transcription;
426     try {
427       transcription = azureSpeechServicesClient.getTranscriptionById(jobId);
428     } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureSpeechClientException
429              | MicrosoftAzureNotFoundException e) {
430       throw new TranscriptionServiceException(String.format(
431           "Unable to get transcription '%s' for media package '%s'.", jobId, mpId), e);
432     }
433     MicrosoftAzureSpeechTranscriptionJson transcriptionJson;
434     URI transcriptionFileUri;
435     try {
436       transcriptionJson = getTranscriptionJson(mpId, transcription);
437       transcriptionFileUri = MicrosoftAzureSpeechServicesClient.writeTranscriptionFile(transcriptionJson,
438           workspace, outputFileFormat, azureSpeechRecognitionMinConfidence, splitTextLineLength);
439     } catch (IOException | MicrosoftAzureNotFoundException e) {
440       throw new TranscriptionServiceException(String.format(
441           "Unable to download transcription file for media package '%s'.", mpId), e);
442     }
443     MediaPackageElement transcriptionElement =  MediaPackageElementBuilderFactory.newInstance().newElementBuilder()
444         .elementFromURI(transcriptionFileUri, type, new MediaPackageElementFlavor("captions", outputFileFormat));
445     if (type.equals(MediaPackageElement.Type.Track)) {
446       // apply predefined tags as documented in https://docs.opencast.org/develop/admin/#configuration/subtitles/
447       transcriptionElement.setTags(new String[] { "generator-type:auto", "generator:azure", "type:subtitle"});
448     }
449     return transcriptionElement;
450   }
451 
452   @Override
453   public void transcriptionDone(String mpId, Object results) throws TranscriptionServiceException {
454     MicrosoftAzureSpeechTranscription transcription = (MicrosoftAzureSpeechTranscription) results;
455     logger.info("Transcription job {} for media package {} done.", transcription.getID(), mpId);
456     // delete audio source files in Azure storage
457     try {
458       deleteTranscriptionSourceFiles(mpId, transcription.getID());
459     } catch (TranscriptionServiceException e) {
460       logger.warn("Unable to delete transcription source files for media package {} after transcription job done.",
461           mpId, e);
462     }
463     try {
464       database.updateJobControl(transcription.getID(), TranscriptionJobControl.Status.TranscriptionComplete.name());
465     } catch (TranscriptionDatabaseException e) {
466       throw new TranscriptionServiceException(String.format(
467           "Transcription job for media package '%s' succeeded but storing job status in the database failed."
468           , mpId), e);
469     }
470   }
471 
472   @Override
473   public void transcriptionError(String mpId, Object results) throws TranscriptionServiceException {
474     MicrosoftAzureSpeechTranscription transcription = (MicrosoftAzureSpeechTranscription) results;
475     String message = "";
476     if (transcription != null && transcription.properties != null && transcription.properties.containsKey("error")) {
477       Map<String, Object> errorInfo = (Map<String, Object>) transcription.properties.get("error");
478       message = String.format(" Microsoft error code %s: %s", errorInfo.getOrDefault("code", "UNKNOWN"),
479           errorInfo.getOrDefault("message", "No info"));
480     }
481     logger.info("Transcription job {} for media package {} failed.{}", transcription.getID(), mpId, message);
482     // delete audio source files in Azure storage
483     try {
484       deleteTranscriptionSourceFiles(mpId, transcription.getID());
485     } catch (TranscriptionServiceException e) {
486       logger.warn("Unable to delete transcription source files for media package {} after transcription kob failure.",
487           mpId, e);
488     }
489     try {
490       database.updateJobControl(transcription.getID(), TranscriptionJobControl.Status.Error.name());
491     } catch (TranscriptionDatabaseException e) {
492       throw new TranscriptionServiceException(String.format(
493           "Transcription job for media package '%s' failed and storing job status in the database failed too."
494           , mpId), e);
495     }
496   }
497 
498   @Override
499   public String getLanguage() {
500     return language;
501   }
502 
503   @Override
504   public Map<String, Object> getReturnValues(String mpId, String jobId) throws TranscriptionServiceException {
505     throw new NotImplementedException();
506   }
507 
508   public String createTranscriptionJob(long jobId, String mpId, Track track, String language)
509           throws TranscriptionServiceException {
510     // load media file into workspace
511     File trackFile;
512     try {
513       trackFile = workspace.get(track.getURI());
514     } catch (NotFoundException e) {
515       throw new TranscriptionServiceException(String.format("Track %s not found.", track.getURI()), e);
516     } catch (IOException e) {
517       throw new TranscriptionServiceException(String.format(
518           "Unable to get track %s for transcription.", track.getURI()), e);
519     }
520     // upload media file to azure storage
521     //// assure azure storage container exists
522     try {
523       azureStorageClient.createContainer(azureContainerName);
524     } catch (IOException | MicrosoftAzureStorageClientException | MicrosoftAzureNotAllowedException e) {
525       throw new TranscriptionServiceException(String.format(
526           "Unable to query or create a storage container '%s' on Microsoft Azure.", azureContainerName), e);
527     }
528     //// upload file to azure storage container
529     String azureBlobUrl;
530     try {
531       String filename = String.format("%d-%s.%s", jobId, mpId, FilenameUtils.getExtension(trackFile.getName()));
532       azureBlobUrl = azureStorageClient.uploadFile(trackFile, azureContainerName, azureBlobPath, filename);
533     } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureStorageClientException e) {
534       throw new TranscriptionServiceException(String.format(
535           "Unable to upload track %s from media package '%s' to Microsoft Azure storage container '%s'.",
536           track.getURI(), mpId, azureContainerName), e);
537     }
538     // start azure transcription job
539     List<String> contentUrls = Arrays.asList(azureBlobUrl);
540     String azureDestContainerUrl = String.format("%s?%s", azureStorageClient.getContainerUrl(azureContainerName),
541         azureAuthorization.generateServiceSasToken("cw", null, null, azureContainerName, "c"));
542     MicrosoftAzureSpeechTranscription transcription;
543     try {
544       transcription = azureSpeechServicesClient.createTranscription(contentUrls,
545           azureDestContainerUrl, String.format("Transcription job %d", jobId), language, autodetectLanguages,
546           null, null);
547       logger.info("Started transcription of {} from media package '{}' on Microsoft Azure Speech Services at {}",
548           track.getURI(), mpId, transcription.self);
549     } catch (MicrosoftAzureNotAllowedException | IOException | MicrosoftAzureSpeechClientException e) {
550       throw new TranscriptionServiceException(String.format(
551           "Unable to create transcription of track %s from media package '%s' "
552               + "in Microsoft Azure storage container '%s'.",
553           track.getURI(), mpId, azureContainerName), e);
554     }
555     // store transcription job ID and status
556     try {
557       database.storeJobControl(mpId, track.getIdentifier(), transcription.getID(),
558           TranscriptionJobControl.Status.InProgress.name(),
559           track.getDuration() == null ? 0 : track.getDuration(), new Date(), PROVIDER);
560     } catch (TranscriptionDatabaseException e) {
561       throw new TranscriptionServiceException(String.format(
562           "Unable to store transcription job of track %s from media package '%s' in the database.",
563           track.getURI(), mpId), e);
564     }
565     // return transcription job ID
566     return transcription.getID();
567   }
568 
569   MicrosoftAzureSpeechTranscriptionJson getTranscriptionJson(String mpId,
570       MicrosoftAzureSpeechTranscription transcription)
571           throws TranscriptionServiceException, MicrosoftAzureNotFoundException {
572     if (!transcription.isSucceeded()) {
573       if (transcription.isRunning()) {
574         throw new TranscriptionServiceException(String.format("Unable to get generated transcription. "
575             + "Transcription job '%s' for media package '%s' is currently running.", transcription.getID(), mpId));
576       } else if (transcription.isFailed()) {
577         throw new TranscriptionServiceException(String.format("Unable to get generated transcription. "
578             + "Transcription job '%s' for media package '%s' is failed.", transcription.getID(), mpId));
579       }
580     }
581     // query transcription files
582     MicrosoftAzureSpeechTranscriptionFiles transcriptionFiles;
583     try {
584       transcriptionFiles = azureSpeechServicesClient.getTranscriptionFilesById(transcription.getID());
585     } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureSpeechClientException e) {
586       throw new TranscriptionServiceException(String.format(
587           "Unable to get transcription files '%s' for media package '%s'.", transcription.getID(), mpId), e);
588     }
589     // download transcription file to workspace
590     MicrosoftAzureSpeechTranscriptionFile transcriptionFile = null;
591     for (MicrosoftAzureSpeechTranscriptionFile tf : transcriptionFiles.values) {
592       if (tf.isTranscriptionFile()) {
593         transcriptionFile = tf;
594         break;
595       }
596     }
597     if (transcriptionFile == null) {
598       // get more files with transcriptionFiles.nextLink
599       // TODO
600       throw new NotImplementedException("At least one transcription file should be provided.");
601     }
602 
603     try {
604       return MicrosoftAzureSpeechServicesClient
605           .getTranscriptionJson(transcriptionFile);
606     } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureSpeechClientException
607              | MicrosoftAzureNotFoundException e) {
608       throw new TranscriptionServiceException(String.format(
609           "Unable to download transcription file '%s' for media package '%s'.", transcriptionFile.self, mpId), e);
610     }
611   }
612 
613   String startWorkflow(String mpId, MicrosoftAzureSpeechTranscription transcription)
614           throws TranscriptionDatabaseException, NotFoundException, WorkflowDatabaseException,
615           TranscriptionServiceException, MicrosoftAzureNotFoundException {
616     MicrosoftAzureSpeechTranscriptionJson transcriptionJson = getTranscriptionJson(mpId, transcription);
617     String transcriptionLocale = transcriptionJson.getRecognizedLocale();
618 
619     DefaultOrganization defaultOrg = new DefaultOrganization();
620     securityService.setOrganization(defaultOrg);
621     securityService.setUser(SecurityUtil.createSystemUser(systemAccount, defaultOrg));
622 
623     // Find the episode
624     final AQueryBuilder q = assetManager.createQuery();
625     final AResult r = q.select(q.snapshot()).where(q.mediaPackageId(mpId).and(q.version().isLatest())).run();
626     if (r.getSize() == 0) {
627       // Media package not archived yet? Skip until next time.
628       logger.warn("Media package {} has not been archived yet. Skipped.", mpId);
629       return null;
630     }
631 
632     String org = Enrichments.enrich(r).getSnapshots().get(0).getOrganizationId();
633     Organization organization = organizationDirectoryService.getOrganization(org);
634     if (organization == null) {
635       logger.warn("Media package {} has an unknown organization {}. Skipped.", mpId, org);
636       return null;
637     }
638     securityService.setOrganization(organization);
639 
640     // Build workflow
641     Map<String, String> params = new HashMap<>();
642     params.put("transcriptionJobId", transcription.getID());
643     String locale = "";
644     String language = "";
645     if (StringUtils.isNotBlank(transcriptionLocale)) {
646       locale = transcriptionLocale;
647       language = Locale.forLanguageTag(transcriptionLocale).getLanguage();
648     }
649     params.put("transcriptionLocale", locale);
650     params.put("transcriptionLocaleSet", Boolean.toString(!StringUtils.isEmpty(locale)));
651     params.put("transcriptionLocaleSubtypeSuffix", !StringUtils.isEmpty(locale) ? "+" + locale : "");
652     params.put("transcriptionLocaleTag", !StringUtils.isEmpty(locale) ? "lang:" + locale : "");
653     params.put("transcriptionLanguage", language);
654     params.put("transcriptionLanguageSet", Boolean.toString(!StringUtils.isEmpty(language)));
655     params.put("transcriptionLanguageSubtypeSuffix", !StringUtils.isEmpty(language) ? "+" + language : "");
656     params.put("transcriptionLanguageTag", !StringUtils.isEmpty(language) ? "lang:" + language : "");
657     WorkflowDefinition wfDef = workflowService.getWorkflowDefinitionById(workflowDefinitionId);
658 
659     // Apply workflow
660     // wfUtil is only used by unit tests
661     Workflows workflows = wfUtil != null ? wfUtil : new Workflows(assetManager, workflowService);
662     Set<String> mpIds = new HashSet<>();
663     mpIds.add(mpId);
664     List<WorkflowInstance> wfList = workflows
665         .applyWorkflowToLatestVersion(mpIds, ConfiguredWorkflow.workflow(wfDef, params));
666     return wfList.size() > 0 ? Long.toString(wfList.get(0).getId()) : null;
667   }
668 
669   public void deleteTranscription(String mpId, String transcriptionId)
670           throws TranscriptionServiceException, TranscriptionDatabaseException {
671     TranscriptionJobControl transcriptionJobControl = database.findByJob(transcriptionId);
672     TranscriptionJobControl.Status transcriptionJobControlStatus = TranscriptionJobControl.Status.valueOf(
673         transcriptionJobControl.getStatus());
674     if (transcriptionJobControlStatus != TranscriptionJobControl.Status.Closed
675         && transcriptionJobControlStatus != TranscriptionJobControl.Status.Canceled
676         && transcriptionJobControlStatus != TranscriptionJobControl.Status.Error) {
677       throw new TranscriptionServiceException(String.format("Abort deleting transcription %s with invalid status '%s'.",
678           transcriptionId, transcriptionJobControl.getStatus()));
679     }
680     deleteTranscriptionSourceFiles(mpId, transcriptionId);
681     try {
682       azureSpeechServicesClient.deleteTranscription(transcriptionId);
683     } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureSpeechClientException e) {
684       throw new TranscriptionServiceException(String.format(
685           "Unable to delete transcription '%s' for media package '%s'.", transcriptionId, mpId), e);
686     }
687     database.deleteJobControl(transcriptionJobControl.getTranscriptionJobId());
688   }
689 
690   public void deleteTranscriptionSourceFiles(String mpId, String transcriptionId)
691           throws TranscriptionServiceException {
692     MicrosoftAzureSpeechTranscriptionFiles transcriptionFiles;
693     try {
694       transcriptionFiles = azureSpeechServicesClient.getTranscriptionFilesById(transcriptionId);
695     } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureSpeechClientException e) {
696       throw new TranscriptionServiceException(String.format(
697           "Unable to get for transcription '%s' from media package '%s'.", transcriptionId, mpId), e);
698     } catch (MicrosoftAzureNotFoundException e) {
699       // catch deleting non-existing file
700       logger.debug("Failed to get non existing transcription files from media package {} for deleting.", mpId, e);
701       return;
702     }
703     for (MicrosoftAzureSpeechTranscriptionFile transcriptionFile : transcriptionFiles.values) {
704       if (!transcriptionFile.isTranscriptionFile()) {
705         continue;
706       }
707       MicrosoftAzureSpeechTranscriptionJson transcriptionJson;
708       try {
709         transcriptionJson = MicrosoftAzureSpeechServicesClient
710             .getTranscriptionJson(transcriptionFile);
711       } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureSpeechClientException e) {
712         throw new TranscriptionServiceException(String.format(
713             "Unable to download transcription file '%s' for media package '%s'.", transcriptionFile.self, mpId), e);
714       } catch (MicrosoftAzureNotFoundException e) {
715         // catch deleting non-existing file
716         logger.debug("Failed to get non existing transcription file {} from media package {} for deleting.",
717             transcriptionFile.self, mpId, e);
718         continue;
719       }
720       if (StringUtils.isNotBlank(transcriptionJson.source)) {
721         try {
722           azureStorageClient.deleteFile(new URL(transcriptionJson.source));
723         } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureStorageClientException e) {
724           throw new TranscriptionServiceException(String.format(
725               "Unable to delete audio source file for media package %s.", mpId, e));
726         }
727       }
728     }
729   }
730 
731   class WorkflowDispatcher implements Runnable {
732 
733     @Override
734     public void run() {
735       if (!enabled) {
736         logger.debug("Service disabled, cancel processing.");
737         return;
738       }
739       logger.debug("Run jobs handling loop for transcription provider {}.", PROVIDER);
740       long providerId;
741       try {
742         TranscriptionProviderControl providerInfo = database.findIdByProvider(PROVIDER);
743         if (providerInfo != null) {
744           providerId = providerInfo.getId();
745         } else {
746           logger.debug("No jobs yet for provider {}.", PROVIDER);
747           return;
748         }
749         // handle jobs in progress
750         for (TranscriptionJobControl jobControl : database.findByStatus(
751             TranscriptionJobControl.Status.InProgress.name())) {
752           if (providerId != jobControl.getProviderId()) {
753             continue;
754           }
755           String mpId = jobControl.getMediaPackageId();
756           String transcriptionId = jobControl.getTranscriptionJobId();
757           try {
758             MicrosoftAzureSpeechTranscription transcription = azureSpeechServicesClient.getTranscriptionById(
759                 transcriptionId);
760             // check and update job status
761             if (!transcription.isRunning()) {
762               if (transcription.isFailed()) {
763                 transcriptionError(mpId, transcription);
764               } else if (transcription.isSucceeded()) {
765                 transcriptionDone(mpId, transcription);
766               }
767             }
768           } catch (MicrosoftAzureNotAllowedException | IOException | MicrosoftAzureSpeechClientException e) {
769             logger.error("Unable to get or update transcription {} or transcription file from media package {}.",
770                 transcriptionId, mpId, e);
771           } catch (MicrosoftAzureNotFoundException e) {
772             logger.warn("Transcription {} from media package {} not found.", transcriptionId, mpId);
773             database.updateJobControl(transcriptionId, TranscriptionJobControl.Status.Error.name());
774           } catch (TranscriptionServiceException e) {
775             logger.error(e.getMessage(), e);
776           }
777         }
778         // handle completed jobs
779         for (TranscriptionJobControl jobControl : database.findByStatus(
780             TranscriptionJobControl.Status.TranscriptionComplete.name())) {
781           if (providerId != jobControl.getProviderId()) {
782             continue;
783           }
784           String mpId = jobControl.getMediaPackageId();
785           String transcriptionId = jobControl.getTranscriptionJobId();
786           try {
787             MicrosoftAzureSpeechTranscription transcription = azureSpeechServicesClient.getTranscriptionById(
788                 transcriptionId);
789             // start workflow
790             String workflowId = startWorkflow(mpId, transcription);
791             // update db
792             if (workflowId != null) {
793               database.updateJobControl(transcriptionId, TranscriptionJobControl.Status.Closed.name());
794               logger.info("Attach transcription workflow {} scheduled for mp {}, microsoft azure transcription job {}",
795                   workflowId, mpId, transcriptionId);
796             }
797           } catch (MicrosoftAzureNotAllowedException | IOException
798                    | MicrosoftAzureSpeechClientException e) {
799             logger.warn("Unable to get transcription {} or transcription file from media package {}.",
800                 transcriptionId, mpId, e);
801           } catch (MicrosoftAzureNotFoundException e) {
802             logger.warn("Transcription {} from media package {} not found.", transcriptionId, mpId);
803             database.updateJobControl(transcriptionId, TranscriptionJobControl.Status.Error.name());
804           } catch (TranscriptionServiceException e) {
805             logger.warn(e.getMessage(), e);
806           } catch (NotFoundException e) {
807             logger.warn("Unable to load organization.", e);
808           } catch (IllegalStateException e) {
809             logger.debug(e.getMessage());
810           }
811         }
812         // cleanup all old jobs
813         for (TranscriptionJobControl jobControl : database.findByStatus(
814             TranscriptionJobControl.Status.Closed.name(), TranscriptionJobControl.Status.Error.name())) {
815           if (providerId != jobControl.getProviderId()) {
816             continue;
817           }
818           String mpId = jobControl.getMediaPackageId();
819           String transcriptionId = jobControl.getTranscriptionJobId();
820           if (Instant.now().minus(7, ChronoUnit.DAYS).isAfter(jobControl.getDateCreated().toInstant())) {
821             try {
822               deleteTranscription(jobControl.getMediaPackageId(), jobControl.getTranscriptionJobId());
823             } catch (TranscriptionServiceException e) {
824               logger.error("Unable to delete transcription {} or transcription files from media package {}.",
825                   transcriptionId, mpId, e);
826             }
827           }
828         }
829       } catch (TranscriptionDatabaseException e) {
830         logger.warn("Could not read or update transcription job control database", e);
831       } catch (WorkflowDatabaseException e) {
832         logger.warn("Unable to get workflow definition.", e);
833       } catch (Throwable e) {
834         // catch all
835         logger.error("Something went wrong in transcription job processing loop. Exception unhandled!!!", e);
836       }
837     }
838   }
839 
840   // Only used by unit tests!
841   void setWfUtil(Workflows wfUtil) {
842     this.wfUtil = wfUtil;
843   }
844 
845   @Override
846   protected ServiceRegistry getServiceRegistry() {
847     return serviceRegistry;
848   }
849 
850   @Override
851   protected SecurityService getSecurityService() {
852     return securityService;
853   }
854 
855   @Override
856   protected UserDirectoryService getUserDirectoryService() {
857     return userDirectoryService;
858   }
859 
860   @Override
861   protected OrganizationDirectoryService getOrganizationDirectoryService() {
862     return organizationDirectoryService;
863   }
864 
865   @Reference
866   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
867     this.serviceRegistry = serviceRegistry;
868   }
869 
870   @Reference
871   public void setSecurityService(SecurityService securityService) {
872     this.securityService = securityService;
873   }
874 
875   @Reference
876   public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
877     this.userDirectoryService = userDirectoryService;
878   }
879 
880   @Reference
881   public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
882     this.organizationDirectoryService = organizationDirectoryService;
883   }
884 
885   @Reference
886   public void setWorkspace(Workspace ws) {
887     this.workspace = ws;
888   }
889 
890 //  @Reference
891 //  public void setWorkingFileRepository(WorkingFileRepository wfr) {
892 //    this.wfr = wfr;
893 //  }
894 
895   @Reference
896   public void setDatabase(TranscriptionDatabase service) {
897     this.database = service;
898   }
899 
900   @Reference
901   public void setAssetManager(AssetManager service) {
902     this.assetManager = service;
903   }
904 
905   @Reference
906   public void setWorkflowService(WorkflowService service) {
907     this.workflowService = service;
908   }
909 }