1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.transcription.microsoft.azure;
23
24 import org.opencastproject.assetmanager.api.AssetManager;
25 import org.opencastproject.assetmanager.api.Snapshot;
26 import org.opencastproject.assetmanager.util.Workflows;
27 import org.opencastproject.job.api.AbstractJobProducer;
28 import org.opencastproject.job.api.Job;
29 import org.opencastproject.mediapackage.MediaPackageElement;
30 import org.opencastproject.mediapackage.MediaPackageElementBuilderFactory;
31 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
32 import org.opencastproject.mediapackage.MediaPackageElementParser;
33 import org.opencastproject.mediapackage.MediaPackageException;
34 import org.opencastproject.mediapackage.Track;
35 import org.opencastproject.security.api.DefaultOrganization;
36 import org.opencastproject.security.api.Organization;
37 import org.opencastproject.security.api.OrganizationDirectoryService;
38 import org.opencastproject.security.api.SecurityService;
39 import org.opencastproject.security.api.UserDirectoryService;
40 import org.opencastproject.security.util.SecurityUtil;
41 import org.opencastproject.serviceregistry.api.ServiceRegistry;
42 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
43 import org.opencastproject.systems.OpencastConstants;
44 import org.opencastproject.transcription.api.TranscriptionService;
45 import org.opencastproject.transcription.api.TranscriptionServiceException;
46 import org.opencastproject.transcription.microsoft.azure.model.MicrosoftAzureSpeechTranscription;
47 import org.opencastproject.transcription.microsoft.azure.model.MicrosoftAzureSpeechTranscriptionFile;
48 import org.opencastproject.transcription.microsoft.azure.model.MicrosoftAzureSpeechTranscriptionFiles;
49 import org.opencastproject.transcription.microsoft.azure.model.MicrosoftAzureSpeechTranscriptionJson;
50 import org.opencastproject.transcription.persistence.TranscriptionDatabase;
51 import org.opencastproject.transcription.persistence.TranscriptionDatabaseException;
52 import org.opencastproject.transcription.persistence.TranscriptionJobControl;
53 import org.opencastproject.transcription.persistence.TranscriptionProviderControl;
54 import org.opencastproject.util.NotFoundException;
55 import org.opencastproject.util.OsgiUtil;
56 import org.opencastproject.workflow.api.ConfiguredWorkflow;
57 import org.opencastproject.workflow.api.WorkflowDatabaseException;
58 import org.opencastproject.workflow.api.WorkflowDefinition;
59 import org.opencastproject.workflow.api.WorkflowInstance;
60 import org.opencastproject.workflow.api.WorkflowService;
61 import org.opencastproject.workspace.api.Workspace;
62
63 import org.apache.commons.io.FilenameUtils;
64 import org.apache.commons.lang3.NotImplementedException;
65 import org.apache.commons.lang3.StringUtils;
66 import org.osgi.service.component.ComponentContext;
67 import org.osgi.service.component.annotations.Activate;
68 import org.osgi.service.component.annotations.Component;
69 import org.osgi.service.component.annotations.Deactivate;
70 import org.osgi.service.component.annotations.Modified;
71 import org.osgi.service.component.annotations.Reference;
72 import org.slf4j.Logger;
73 import org.slf4j.LoggerFactory;
74
75 import java.io.File;
76 import java.io.IOException;
77 import java.net.URI;
78 import java.net.URL;
79 import java.time.Instant;
80 import java.time.temporal.ChronoUnit;
81 import java.util.ArrayList;
82 import java.util.Arrays;
83 import java.util.Date;
84 import java.util.HashMap;
85 import java.util.HashSet;
86 import java.util.List;
87 import java.util.Locale;
88 import java.util.Map;
89 import java.util.Optional;
90 import java.util.Set;
91 import java.util.concurrent.Executors;
92 import java.util.concurrent.ScheduledExecutorService;
93 import java.util.concurrent.TimeUnit;
94
95 @Component(immediate = true, service = {
96 TranscriptionService.class, MicrosoftAzureTranscriptionService.class }, property = {
97 "service.description=Microsoft Azure Transcription Service", "provider=microsoft.azure" })
98 public class MicrosoftAzureTranscriptionService extends AbstractJobProducer implements TranscriptionService {
99
100 private static final Logger logger = LoggerFactory.getLogger(MicrosoftAzureTranscriptionService.class);
101
102 private static final String JOB_TYPE = "org.opencastproject.transcription.microsoft.azure";
103 private static final String PROVIDER = "microsoft-azure-speech-services";
104 private static final String DEFAULT_WORKFLOW_DEFINITION_ID = "microsoft-azure-attach-transcription";
105 private static final String DEFAULT_LANGUAGE = "en-GB";
106
107 private static final String DEFAULT_AZURE_BLOB_PATH = "";
108 private static final String DEFAULT_AZURE_CONTAINER_NAME = "opencast-transcriptions";
109 private static final float DEFAULT_MIN_CONFIDENCE = 1.0f;
110 private static final int DEFAULT_SPLIT_TEXT_LINE_LENGTH = 100;
111 private static final String DEFAULT_OUTPUT_FILE_FORMAT = "vtt";
112 private static final String KEY_ENABLED = "enabled";
113 private static final String KEY_LANGUAGE = "language";
114 private static final String KEY_AUTO_DETECT_LANGUAGES = "auto.detect.languages";
115 private static final String KEY_WORKFLOW = "workflow";
116 private static final String KEY_AZURE_STORAGE_ACCOUNT_NAME = "azure_storage_account_name";
117 private static final String KEY_AZURE_ACCOUNT_ACCESS_KEY = "azure_account_access_key";
118 private static final String KEY_AZURE_BOLB_PATH = "azure_blob_path";
119 private static final String KEY_AZURE_CONTAINER_NAME = "azure_container_name";
120 private static final String KEY_AZURE_SPEECH_SERVICES_ENDPOINT = "azure_speech_services_endpoint";
121 private static final String KEY_COGNITIVE_SERVICES_SUBSCRIPTION_KEY = "azure_cognitive_services_subscription_key";
122 private static final String KEY_AZURE_SPEECH_RECOGNITION_MIN_CONFIDENCE = "azure_speech_recognition_min_confidence";
123 private static final String KEY_SPLIT_TEXT_LINE_LENGTH = "split.text.line.length";
124 private static final String KEY_OUTPUT_FILE_FORMAT = "output.file.format";
125
126
127 private AssetManager assetManager;
128 private OrganizationDirectoryService organizationDirectoryService;
129 private SecurityService securityService;
130 private ServiceRegistry serviceRegistry;
131 private TranscriptionDatabase database;
132 private UserDirectoryService userDirectoryService;
133 private WorkflowService workflowService;
134 private Workspace workspace;
135 private ScheduledExecutorService scheduledExecutorService;
136 private Workflows wfUtil;
137 private String systemAccount;
138 private boolean enabled;
139 private String language;
140 private List<String> autodetectLanguages;
141 private String workflowDefinitionId;
142 private String azureStorageAccountName;
143 private String azureAccountAccessKey;
144 private String azureBlobPath;
145 private String azureContainerName;
146 private String azureSpeechServicesEndpoint;
147 private String azureCognitiveServicesSubscriptionKey;
148 private MicrosoftAzureAuthorization azureAuthorization;
149 private MicrosoftAzureStorageClient azureStorageClient;
150 private MicrosoftAzureSpeechServicesClient azureSpeechServicesClient;
151 private Float azureSpeechRecognitionMinConfidence;
152 private Integer splitTextLineLength;
153 private String outputFileFormat;
154
155 private enum Operation {
156 StartTranscription
157 }
158
159
160
161
162 public MicrosoftAzureTranscriptionService() {
163 super(JOB_TYPE);
164 }
165
166 @Activate
167 public void activate(ComponentContext cc) {
168 super.activate(cc);
169 systemAccount = OsgiUtil.getContextProperty(cc, OpencastConstants.DIGEST_USER_PROPERTY);
170 logger.debug("Activating...");
171 modified(cc);
172 }
173
174 @Modified
175 public void modified(ComponentContext cc) {
176 logger.debug("Updating config...");
177 Optional<Boolean> enabledOpt = OsgiUtil.getOptCfgAsBoolean(cc.getProperties(), KEY_ENABLED);
178 if (enabledOpt.isPresent()) {
179 enabled = enabledOpt.get();
180 } else {
181 deactivate();
182 }
183
184 if (!enabled) {
185 logger.info("Microsoft Azure transcription service disabled."
186 + " If you want to enable it, please update the service configuration.");
187 return;
188 }
189
190 Optional<String> azureStorageAccountNameKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(),
191 KEY_AZURE_STORAGE_ACCOUNT_NAME);
192 if (azureStorageAccountNameKeyOpt.isPresent()) {
193 azureStorageAccountName = azureStorageAccountNameKeyOpt.get();
194 } else {
195 logger.warn("Azure storage account name key was not set. Disabling Microsoft Azure transcription service.");
196 deactivate();
197 return;
198 }
199
200 Optional<String> azureAccountAccessKeyKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_AZURE_ACCOUNT_ACCESS_KEY);
201 if (azureAccountAccessKeyKeyOpt.isPresent()) {
202 azureAccountAccessKey = azureAccountAccessKeyKeyOpt.get();
203 } else {
204 logger.warn("Azure storage account access key was not set. Disabling Microsoft Azure transcription service.");
205 deactivate();
206 return;
207 }
208
209 Optional<String> azureSpeechServicesKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(),
210 KEY_AZURE_SPEECH_SERVICES_ENDPOINT);
211 if (azureSpeechServicesKeyOpt.isPresent()) {
212 azureSpeechServicesEndpoint = azureSpeechServicesKeyOpt.get();
213 } else {
214 logger.warn("Azure speech services endpoint was not set. Disabling Microsoft Azure transcription service.");
215 deactivate();
216 return;
217 }
218
219 Optional<String> azureCognitiveServicesSubscriptionKeyKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(),
220 KEY_COGNITIVE_SERVICES_SUBSCRIPTION_KEY);
221 if (azureCognitiveServicesSubscriptionKeyKeyOpt.isPresent()) {
222 azureCognitiveServicesSubscriptionKey = azureCognitiveServicesSubscriptionKeyKeyOpt.get();
223 } else {
224 logger.warn("Azure cognitive services subscription key was not set. "
225 + "Disabling Microsoft Azure transcription service.");
226 deactivate();
227 return;
228 }
229
230
231 Optional<String> workflowKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_WORKFLOW);
232 if (workflowKeyOpt.isPresent()) {
233 workflowDefinitionId = workflowKeyOpt.get();
234 logger.info("Workflow is set to '{}'.", workflowDefinitionId);
235 } else {
236 workflowDefinitionId = DEFAULT_WORKFLOW_DEFINITION_ID;
237 logger.info("Default workflow '{}' will be used.", workflowDefinitionId);
238 }
239
240 Optional<String> azureBlobPathKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_AZURE_BOLB_PATH);
241 if (azureBlobPathKeyOpt.isPresent()) {
242 azureBlobPath = azureBlobPathKeyOpt.get();
243 } else {
244 logger.debug("Azure blob path was not set, using default path.");
245 azureBlobPath = DEFAULT_AZURE_BLOB_PATH;
246 }
247
248 Optional<String> languageOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_LANGUAGE);
249 if (languageOpt.isPresent()) {
250 language = languageOpt.get();
251 logger.info("Default language is set to '{}'.", language);
252 } else {
253 language = DEFAULT_LANGUAGE;
254 logger.info("Default language '{}' will be used.", language);
255 }
256
257 autodetectLanguages = new ArrayList<>();
258 Optional<String> autoDetectLanguagesOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_AUTO_DETECT_LANGUAGES);
259 if (languageOpt.isPresent()) {
260 for (String lang : StringUtils.split(autoDetectLanguagesOpt.get(), ",")) {
261 if (StringUtils.isNotBlank(lang)) {
262 autodetectLanguages.add(StringUtils.trimToEmpty(lang));
263 }
264 }
265 }
266
267 Optional<String> azureContainerNameKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_AZURE_CONTAINER_NAME);
268 if (azureContainerNameKeyOpt.isPresent()) {
269 azureContainerName = azureContainerNameKeyOpt.get();
270 } else {
271 logger.debug("Azure storage container name was not set, using default path.");
272 azureContainerName = DEFAULT_AZURE_CONTAINER_NAME;
273 }
274
275 Optional<String> azureSpeechRecognitionMinConfidenceKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(),
276 KEY_AZURE_SPEECH_RECOGNITION_MIN_CONFIDENCE);
277 if (azureSpeechRecognitionMinConfidenceKeyOpt.isPresent()) {
278 String azureSpeechRecognitionMinConfidenceStr = azureSpeechRecognitionMinConfidenceKeyOpt.get();
279 try {
280 azureSpeechRecognitionMinConfidence = Float.valueOf(azureSpeechRecognitionMinConfidenceStr);
281 } catch (NumberFormatException e) {
282 logger.error("Azure speech recognition min confidence value is not valid. "
283 + "Please set a value between 0.0 and 1.0. "
284 + "Setting to default value of {}.", DEFAULT_MIN_CONFIDENCE);
285 azureSpeechRecognitionMinConfidence = DEFAULT_MIN_CONFIDENCE;
286 }
287 } else {
288 logger.debug("Azure speech recognition min confidence value was not set. Setting to default value of {}.",
289 DEFAULT_MIN_CONFIDENCE);
290 azureSpeechRecognitionMinConfidence = DEFAULT_MIN_CONFIDENCE;
291 }
292
293 Optional<String> splitTextLineLengthOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_SPLIT_TEXT_LINE_LENGTH);
294 if (splitTextLineLengthOpt.isPresent()) {
295 try {
296 splitTextLineLength = Integer.parseInt(splitTextLineLengthOpt.get());
297 } catch (NumberFormatException e) {
298 splitTextLineLength = DEFAULT_SPLIT_TEXT_LINE_LENGTH;
299 logger.error("Invalid configuration value for '{}'. Set default value {}.", KEY_SPLIT_TEXT_LINE_LENGTH,
300 DEFAULT_SPLIT_TEXT_LINE_LENGTH);
301 }
302 } else {
303 logger.debug("Configuration value for '{}' was not set. Setting to default value of {}.",
304 KEY_SPLIT_TEXT_LINE_LENGTH, DEFAULT_MIN_CONFIDENCE);
305 splitTextLineLength = DEFAULT_SPLIT_TEXT_LINE_LENGTH;
306 }
307
308 Optional<String> outputFileFormatOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_OUTPUT_FILE_FORMAT);
309 if (outputFileFormatOpt.isPresent()) {
310 outputFileFormat = outputFileFormatOpt.get();
311 switch (outputFileFormat) {
312 case "srt":
313 case "vtt":
314 break;
315 default:
316 logger.debug("Azure output file format not valid, using default format {}.",
317 DEFAULT_OUTPUT_FILE_FORMAT);
318 outputFileFormat = DEFAULT_OUTPUT_FILE_FORMAT;
319 break;
320 }
321 } else {
322 logger.debug("Azure output file format not set, using default format {}.", DEFAULT_OUTPUT_FILE_FORMAT);
323 outputFileFormat = DEFAULT_OUTPUT_FILE_FORMAT;
324 }
325 logger.info("Transcription output format is set to '{}'.", outputFileFormat);
326
327
328 try {
329 azureAuthorization = new MicrosoftAzureAuthorization(azureStorageAccountName, azureAccountAccessKey);
330 azureStorageClient = new MicrosoftAzureStorageClient(azureAuthorization);
331 } catch (MicrosoftAzureStorageClientException e) {
332 logger.error("Unable to create Microsoft Azure storage client. "
333 + "Deactivating Microsoft Azure Transcription service.", e);
334 deactivate();
335 return;
336 }
337
338
339 azureSpeechServicesClient = new MicrosoftAzureSpeechServicesClient(
340 azureSpeechServicesEndpoint, azureCognitiveServicesSubscriptionKey);
341
342 if (scheduledExecutorService != null) {
343 scheduledExecutorService.shutdown();
344 try {
345 scheduledExecutorService.awaitTermination(60, TimeUnit.SECONDS);
346 } catch (InterruptedException e) {
347
348
349 }
350 }
351 scheduledExecutorService = Executors.newScheduledThreadPool(2);
352 scheduledExecutorService.scheduleWithFixedDelay(new WorkflowDispatcher(), 120, 120, TimeUnit.SECONDS);
353 logger.info("Activated.");
354 }
355
356 @Deactivate
357 public void deactivate() {
358 enabled = false;
359 if (scheduledExecutorService != null) {
360 scheduledExecutorService.shutdown();
361 try {
362 scheduledExecutorService.awaitTermination(60, TimeUnit.SECONDS);
363 } catch (InterruptedException e) {
364
365
366 }
367 }
368 azureAuthorization = null;
369 azureStorageClient = null;
370 azureSpeechServicesClient = null;
371 logger.info("Deactivated.");
372 }
373
374 @Override
375 protected String process(Job job) throws Exception {
376 Operation op = null;
377 String operation = job.getOperation();
378 List<String> arguments = job.getArguments();
379 op = Operation.valueOf(operation);
380 switch (op) {
381 case StartTranscription:
382 long jobId = job.getId();
383 String mpId = arguments.get(0);
384 Track track = (Track) MediaPackageElementParser.getFromXml(arguments.get(1));
385 String languageCode = arguments.get(2);
386 if (StringUtils.isBlank(languageCode)) {
387 languageCode = getLanguage();
388 }
389 return createTranscriptionJob(jobId, mpId, track, languageCode);
390 default:
391 throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
392 }
393 }
394
395 @Override
396 public Job startTranscription(String mpId, Track track) throws TranscriptionServiceException {
397 return startTranscription(mpId, track, getLanguage());
398 }
399
400 @Override
401 public Job startTranscription(String mpId, Track track, String... args) throws TranscriptionServiceException {
402 try {
403 List<String> jobArgs = new ArrayList<>(2 + args.length);
404 jobArgs.add(mpId);
405 jobArgs.add(MediaPackageElementParser.getAsXml(track));
406 jobArgs.addAll(Arrays.asList(args));
407 return serviceRegistry.createJob(JOB_TYPE, Operation.StartTranscription.toString(), jobArgs);
408 } catch (ServiceRegistryException e) {
409 throw new TranscriptionServiceException(String.format(
410 "Unable to create transcription job for media package '%s'.", mpId), e);
411 } catch (MediaPackageException e) {
412 throw new TranscriptionServiceException(String.format(
413 "Unable to to parse track from media package '%s'.", mpId), e);
414 }
415 }
416
417 @Override
418 public MediaPackageElement getGeneratedTranscription(String mpId, String jobId, MediaPackageElement.Type type)
419 throws TranscriptionServiceException {
420 if (type != MediaPackageElement.Type.Track && type != MediaPackageElement.Type.Attachment) {
421 throw new IllegalArgumentException("Target type must be a Track or Attachment.");
422 }
423 MicrosoftAzureSpeechTranscription transcription;
424 try {
425 transcription = azureSpeechServicesClient.getTranscriptionById(jobId);
426 } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureSpeechClientException
427 | MicrosoftAzureNotFoundException e) {
428 throw new TranscriptionServiceException(String.format(
429 "Unable to get transcription '%s' for media package '%s'.", jobId, mpId), e);
430 }
431 MicrosoftAzureSpeechTranscriptionJson transcriptionJson;
432 URI transcriptionFileUri;
433 try {
434 transcriptionJson = getTranscriptionJson(mpId, transcription);
435 transcriptionFileUri = MicrosoftAzureSpeechServicesClient.writeTranscriptionFile(transcriptionJson,
436 workspace, outputFileFormat, azureSpeechRecognitionMinConfidence, splitTextLineLength);
437 } catch (IOException | MicrosoftAzureNotFoundException e) {
438 throw new TranscriptionServiceException(String.format(
439 "Unable to download transcription file for media package '%s'.", mpId), e);
440 }
441 MediaPackageElement transcriptionElement = MediaPackageElementBuilderFactory.newInstance().newElementBuilder()
442 .elementFromURI(transcriptionFileUri, type, new MediaPackageElementFlavor("captions", outputFileFormat));
443 if (type.equals(MediaPackageElement.Type.Track)) {
444
445 transcriptionElement.setTags(new String[] { "generator-type:auto", "generator:azure", "type:subtitle"});
446 }
447 return transcriptionElement;
448 }
449
450 @Override
451 public void transcriptionDone(String mpId, Object results) throws TranscriptionServiceException {
452 MicrosoftAzureSpeechTranscription transcription = (MicrosoftAzureSpeechTranscription) results;
453 logger.info("Transcription job {} for media package {} done.", transcription.getID(), mpId);
454
455 try {
456 deleteTranscriptionSourceFiles(mpId, transcription.getID());
457 } catch (TranscriptionServiceException e) {
458 logger.warn("Unable to delete transcription source files for media package {} after transcription job done.",
459 mpId, e);
460 }
461 try {
462 database.updateJobControl(transcription.getID(), TranscriptionJobControl.Status.TranscriptionComplete.name());
463 } catch (TranscriptionDatabaseException e) {
464 throw new TranscriptionServiceException(String.format(
465 "Transcription job for media package '%s' succeeded but storing job status in the database failed."
466 , mpId), e);
467 }
468 }
469
470 @Override
471 public void transcriptionError(String mpId, Object results) throws TranscriptionServiceException {
472 MicrosoftAzureSpeechTranscription transcription = (MicrosoftAzureSpeechTranscription) results;
473 String message = "";
474 if (transcription != null && transcription.properties != null && transcription.properties.containsKey("error")) {
475 Map<String, Object> errorInfo = (Map<String, Object>) transcription.properties.get("error");
476 message = String.format(" Microsoft error code %s: %s", errorInfo.getOrDefault("code", "UNKNOWN"),
477 errorInfo.getOrDefault("message", "No info"));
478 }
479 logger.info("Transcription job {} for media package {} failed.{}", transcription.getID(), mpId, message);
480
481 try {
482 deleteTranscriptionSourceFiles(mpId, transcription.getID());
483 } catch (TranscriptionServiceException e) {
484 logger.warn("Unable to delete transcription source files for media package {} after transcription kob failure.",
485 mpId, e);
486 }
487 try {
488 database.updateJobControl(transcription.getID(), TranscriptionJobControl.Status.Error.name());
489 } catch (TranscriptionDatabaseException e) {
490 throw new TranscriptionServiceException(String.format(
491 "Transcription job for media package '%s' failed and storing job status in the database failed too."
492 , mpId), e);
493 }
494 }
495
496 @Override
497 public String getLanguage() {
498 return language;
499 }
500
501 @Override
502 public Map<String, Object> getReturnValues(String mpId, String jobId) throws TranscriptionServiceException {
503 throw new NotImplementedException();
504 }
505
506 public String createTranscriptionJob(long jobId, String mpId, Track track, String language)
507 throws TranscriptionServiceException {
508
509 File trackFile;
510 try {
511 trackFile = workspace.get(track.getURI());
512 } catch (NotFoundException e) {
513 throw new TranscriptionServiceException(String.format("Track %s not found.", track.getURI()), e);
514 } catch (IOException e) {
515 throw new TranscriptionServiceException(String.format(
516 "Unable to get track %s for transcription.", track.getURI()), e);
517 }
518
519
520 try {
521 azureStorageClient.createContainer(azureContainerName);
522 } catch (IOException | MicrosoftAzureStorageClientException | MicrosoftAzureNotAllowedException e) {
523 throw new TranscriptionServiceException(String.format(
524 "Unable to query or create a storage container '%s' on Microsoft Azure.", azureContainerName), e);
525 }
526
527 String azureBlobUrl;
528 try {
529 String filename = String.format("%d-%s.%s", jobId, mpId, FilenameUtils.getExtension(trackFile.getName()));
530 azureBlobUrl = azureStorageClient.uploadFile(trackFile, azureContainerName, azureBlobPath, filename);
531 } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureStorageClientException e) {
532 throw new TranscriptionServiceException(String.format(
533 "Unable to upload track %s from media package '%s' to Microsoft Azure storage container '%s'.",
534 track.getURI(), mpId, azureContainerName), e);
535 }
536
537 List<String> contentUrls = Arrays.asList(azureBlobUrl);
538 String azureDestContainerUrl = String.format("%s?%s", azureStorageClient.getContainerUrl(azureContainerName),
539 azureAuthorization.generateServiceSasToken("cw", null, null, azureContainerName, "c"));
540 MicrosoftAzureSpeechTranscription transcription;
541 try {
542 transcription = azureSpeechServicesClient.createTranscription(contentUrls,
543 azureDestContainerUrl, String.format("Transcription job %d", jobId), language, autodetectLanguages,
544 null, null);
545 logger.info("Started transcription of {} from media package '{}' on Microsoft Azure Speech Services at {}",
546 track.getURI(), mpId, transcription.self);
547 } catch (MicrosoftAzureNotAllowedException | IOException | MicrosoftAzureSpeechClientException e) {
548 throw new TranscriptionServiceException(String.format(
549 "Unable to create transcription of track %s from media package '%s' "
550 + "in Microsoft Azure storage container '%s'.",
551 track.getURI(), mpId, azureContainerName), e);
552 }
553
554 try {
555 database.storeJobControl(mpId, track.getIdentifier(), transcription.getID(),
556 TranscriptionJobControl.Status.InProgress.name(),
557 track.getDuration() == null ? 0 : track.getDuration(), new Date(), PROVIDER);
558 } catch (TranscriptionDatabaseException e) {
559 throw new TranscriptionServiceException(String.format(
560 "Unable to store transcription job of track %s from media package '%s' in the database.",
561 track.getURI(), mpId), e);
562 }
563
564 return transcription.getID();
565 }
566
567 MicrosoftAzureSpeechTranscriptionJson getTranscriptionJson(String mpId,
568 MicrosoftAzureSpeechTranscription transcription)
569 throws TranscriptionServiceException, MicrosoftAzureNotFoundException {
570 if (!transcription.isSucceeded()) {
571 if (transcription.isRunning()) {
572 throw new TranscriptionServiceException(String.format("Unable to get generated transcription. "
573 + "Transcription job '%s' for media package '%s' is currently running.", transcription.getID(), mpId));
574 } else if (transcription.isFailed()) {
575 throw new TranscriptionServiceException(String.format("Unable to get generated transcription. "
576 + "Transcription job '%s' for media package '%s' is failed.", transcription.getID(), mpId));
577 }
578 }
579
580 MicrosoftAzureSpeechTranscriptionFiles transcriptionFiles;
581 try {
582 transcriptionFiles = azureSpeechServicesClient.getTranscriptionFilesById(transcription.getID());
583 } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureSpeechClientException e) {
584 throw new TranscriptionServiceException(String.format(
585 "Unable to get transcription files '%s' for media package '%s'.", transcription.getID(), mpId), e);
586 }
587
588 MicrosoftAzureSpeechTranscriptionFile transcriptionFile = null;
589 for (MicrosoftAzureSpeechTranscriptionFile tf : transcriptionFiles.values) {
590 if (tf.isTranscriptionFile()) {
591 transcriptionFile = tf;
592 break;
593 }
594 }
595 if (transcriptionFile == null) {
596
597
598 throw new NotImplementedException("At least one transcription file should be provided.");
599 }
600
601 try {
602 return MicrosoftAzureSpeechServicesClient
603 .getTranscriptionJson(transcriptionFile);
604 } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureSpeechClientException
605 | MicrosoftAzureNotFoundException e) {
606 throw new TranscriptionServiceException(String.format(
607 "Unable to download transcription file '%s' for media package '%s'.", transcriptionFile.self, mpId), e);
608 }
609 }
610
611 String startWorkflow(String mpId, MicrosoftAzureSpeechTranscription transcription)
612 throws TranscriptionDatabaseException, NotFoundException, WorkflowDatabaseException,
613 TranscriptionServiceException, MicrosoftAzureNotFoundException {
614 MicrosoftAzureSpeechTranscriptionJson transcriptionJson = getTranscriptionJson(mpId, transcription);
615 String transcriptionLocale = transcriptionJson.getRecognizedLocale();
616
617 DefaultOrganization defaultOrg = new DefaultOrganization();
618 securityService.setOrganization(defaultOrg);
619 securityService.setUser(SecurityUtil.createSystemUser(systemAccount, defaultOrg));
620
621
622 Optional<Snapshot> snapshot = assetManager.getLatestSnapshot(mpId);
623 if (snapshot.isEmpty()) {
624
625 logger.warn("Media package {} has not been archived yet. Skipped.", mpId);
626 return null;
627 }
628
629 String org = snapshot.get().getOrganizationId();
630 Organization organization = organizationDirectoryService.getOrganization(org);
631 if (organization == null) {
632 logger.warn("Media package {} has an unknown organization {}. Skipped.", mpId, org);
633 return null;
634 }
635 securityService.setOrganization(organization);
636
637
638 Map<String, String> params = new HashMap<>();
639 params.put("transcriptionJobId", transcription.getID());
640 String locale = "";
641 String language = "";
642 if (StringUtils.isNotBlank(transcriptionLocale)) {
643 locale = transcriptionLocale;
644 language = Locale.forLanguageTag(transcriptionLocale).getLanguage();
645 }
646 params.put("transcriptionLocale", locale);
647 params.put("transcriptionLocaleSet", Boolean.toString(!StringUtils.isEmpty(locale)));
648 params.put("transcriptionLocaleSubtypeSuffix", !StringUtils.isEmpty(locale) ? "+" + locale : "");
649 params.put("transcriptionLocaleTag", !StringUtils.isEmpty(locale) ? "lang:" + locale : "");
650 params.put("transcriptionLanguage", language);
651 params.put("transcriptionLanguageSet", Boolean.toString(!StringUtils.isEmpty(language)));
652 params.put("transcriptionLanguageSubtypeSuffix", !StringUtils.isEmpty(language) ? "+" + language : "");
653 params.put("transcriptionLanguageTag", !StringUtils.isEmpty(language) ? "lang:" + language : "");
654 WorkflowDefinition wfDef = workflowService.getWorkflowDefinitionById(workflowDefinitionId);
655
656
657
658 Workflows workflows = wfUtil != null ? wfUtil : new Workflows(assetManager, workflowService);
659 Set<String> mpIds = new HashSet<>();
660 mpIds.add(mpId);
661 List<WorkflowInstance> wfList = workflows
662 .applyWorkflowToLatestVersion(mpIds, ConfiguredWorkflow.workflow(wfDef, params));
663 return wfList.size() > 0 ? Long.toString(wfList.get(0).getId()) : null;
664 }
665
666 public void deleteTranscription(String mpId, String transcriptionId)
667 throws TranscriptionServiceException, TranscriptionDatabaseException {
668 TranscriptionJobControl transcriptionJobControl = database.findByJob(transcriptionId);
669 TranscriptionJobControl.Status transcriptionJobControlStatus = TranscriptionJobControl.Status.valueOf(
670 transcriptionJobControl.getStatus());
671 if (transcriptionJobControlStatus != TranscriptionJobControl.Status.Closed
672 && transcriptionJobControlStatus != TranscriptionJobControl.Status.Canceled
673 && transcriptionJobControlStatus != TranscriptionJobControl.Status.Error) {
674 throw new TranscriptionServiceException(String.format("Abort deleting transcription %s with invalid status '%s'.",
675 transcriptionId, transcriptionJobControl.getStatus()));
676 }
677 deleteTranscriptionSourceFiles(mpId, transcriptionId);
678 try {
679 azureSpeechServicesClient.deleteTranscription(transcriptionId);
680 } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureSpeechClientException e) {
681 throw new TranscriptionServiceException(String.format(
682 "Unable to delete transcription '%s' for media package '%s'.", transcriptionId, mpId), e);
683 }
684 database.deleteJobControl(transcriptionJobControl.getTranscriptionJobId());
685 }
686
687 public void deleteTranscriptionSourceFiles(String mpId, String transcriptionId)
688 throws TranscriptionServiceException {
689 MicrosoftAzureSpeechTranscriptionFiles transcriptionFiles;
690 try {
691 transcriptionFiles = azureSpeechServicesClient.getTranscriptionFilesById(transcriptionId);
692 } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureSpeechClientException e) {
693 throw new TranscriptionServiceException(String.format(
694 "Unable to get for transcription '%s' from media package '%s'.", transcriptionId, mpId), e);
695 } catch (MicrosoftAzureNotFoundException e) {
696
697 logger.debug("Failed to get non existing transcription files from media package {} for deleting.", mpId, e);
698 return;
699 }
700 for (MicrosoftAzureSpeechTranscriptionFile transcriptionFile : transcriptionFiles.values) {
701 if (!transcriptionFile.isTranscriptionFile()) {
702 continue;
703 }
704 MicrosoftAzureSpeechTranscriptionJson transcriptionJson;
705 try {
706 transcriptionJson = MicrosoftAzureSpeechServicesClient
707 .getTranscriptionJson(transcriptionFile);
708 } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureSpeechClientException e) {
709 throw new TranscriptionServiceException(String.format(
710 "Unable to download transcription file '%s' for media package '%s'.", transcriptionFile.self, mpId), e);
711 } catch (MicrosoftAzureNotFoundException e) {
712
713 logger.debug("Failed to get non existing transcription file {} from media package {} for deleting.",
714 transcriptionFile.self, mpId, e);
715 continue;
716 }
717 if (StringUtils.isNotBlank(transcriptionJson.source)) {
718 try {
719 azureStorageClient.deleteFile(new URL(transcriptionJson.source));
720 } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureStorageClientException e) {
721 throw new TranscriptionServiceException(String.format(
722 "Unable to delete audio source file for media package %s.", mpId, e));
723 }
724 }
725 }
726 }
727
728 class WorkflowDispatcher implements Runnable {
729
730 @Override
731 public void run() {
732 if (!enabled) {
733 logger.debug("Service disabled, cancel processing.");
734 return;
735 }
736 logger.debug("Run jobs handling loop for transcription provider {}.", PROVIDER);
737 long providerId;
738 try {
739 TranscriptionProviderControl providerInfo = database.findIdByProvider(PROVIDER);
740 if (providerInfo != null) {
741 providerId = providerInfo.getId();
742 } else {
743 logger.debug("No jobs yet for provider {}.", PROVIDER);
744 return;
745 }
746
747 for (TranscriptionJobControl jobControl : database.findByStatus(
748 TranscriptionJobControl.Status.InProgress.name())) {
749 if (providerId != jobControl.getProviderId()) {
750 continue;
751 }
752 String mpId = jobControl.getMediaPackageId();
753 String transcriptionId = jobControl.getTranscriptionJobId();
754 try {
755 MicrosoftAzureSpeechTranscription transcription = azureSpeechServicesClient.getTranscriptionById(
756 transcriptionId);
757
758 if (!transcription.isRunning()) {
759 if (transcription.isFailed()) {
760 transcriptionError(mpId, transcription);
761 } else if (transcription.isSucceeded()) {
762 transcriptionDone(mpId, transcription);
763 }
764 }
765 } catch (MicrosoftAzureNotAllowedException | IOException | MicrosoftAzureSpeechClientException e) {
766 logger.error("Unable to get or update transcription {} or transcription file from media package {}.",
767 transcriptionId, mpId, e);
768 } catch (MicrosoftAzureNotFoundException e) {
769 logger.warn("Transcription {} from media package {} not found.", transcriptionId, mpId);
770 database.updateJobControl(transcriptionId, TranscriptionJobControl.Status.Error.name());
771 } catch (TranscriptionServiceException e) {
772 logger.error(e.getMessage(), e);
773 }
774 }
775
776 for (TranscriptionJobControl jobControl : database.findByStatus(
777 TranscriptionJobControl.Status.TranscriptionComplete.name())) {
778 if (providerId != jobControl.getProviderId()) {
779 continue;
780 }
781 String mpId = jobControl.getMediaPackageId();
782 String transcriptionId = jobControl.getTranscriptionJobId();
783 try {
784 MicrosoftAzureSpeechTranscription transcription = azureSpeechServicesClient.getTranscriptionById(
785 transcriptionId);
786
787 String workflowId = startWorkflow(mpId, transcription);
788
789 if (workflowId != null) {
790 database.updateJobControl(transcriptionId, TranscriptionJobControl.Status.Closed.name());
791 logger.info("Attach transcription workflow {} scheduled for mp {}, microsoft azure transcription job {}",
792 workflowId, mpId, transcriptionId);
793 }
794 } catch (MicrosoftAzureNotAllowedException | IOException
795 | MicrosoftAzureSpeechClientException e) {
796 logger.warn("Unable to get transcription {} or transcription file from media package {}.",
797 transcriptionId, mpId, e);
798 } catch (MicrosoftAzureNotFoundException e) {
799 logger.warn("Transcription {} from media package {} not found.", transcriptionId, mpId);
800 database.updateJobControl(transcriptionId, TranscriptionJobControl.Status.Error.name());
801 } catch (TranscriptionServiceException e) {
802 logger.warn(e.getMessage(), e);
803 } catch (NotFoundException e) {
804 logger.warn("Unable to load organization.", e);
805 } catch (IllegalStateException e) {
806 logger.debug(e.getMessage());
807 }
808 }
809
810 for (TranscriptionJobControl jobControl : database.findByStatus(
811 TranscriptionJobControl.Status.Closed.name(), TranscriptionJobControl.Status.Error.name())) {
812 if (providerId != jobControl.getProviderId()) {
813 continue;
814 }
815 String mpId = jobControl.getMediaPackageId();
816 String transcriptionId = jobControl.getTranscriptionJobId();
817 if (Instant.now().minus(7, ChronoUnit.DAYS).isAfter(jobControl.getDateCreated().toInstant())) {
818 try {
819 deleteTranscription(jobControl.getMediaPackageId(), jobControl.getTranscriptionJobId());
820 } catch (TranscriptionServiceException e) {
821 logger.error("Unable to delete transcription {} or transcription files from media package {}.",
822 transcriptionId, mpId, e);
823 }
824 }
825 }
826 } catch (TranscriptionDatabaseException e) {
827 logger.warn("Could not read or update transcription job control database", e);
828 } catch (WorkflowDatabaseException e) {
829 logger.warn("Unable to get workflow definition.", e);
830 } catch (Throwable e) {
831
832 logger.error("Something went wrong in transcription job processing loop. Exception unhandled!!!", e);
833 }
834 }
835 }
836
837
838 void setWfUtil(Workflows wfUtil) {
839 this.wfUtil = wfUtil;
840 }
841
842 @Override
843 protected ServiceRegistry getServiceRegistry() {
844 return serviceRegistry;
845 }
846
847 @Override
848 protected SecurityService getSecurityService() {
849 return securityService;
850 }
851
852 @Override
853 protected UserDirectoryService getUserDirectoryService() {
854 return userDirectoryService;
855 }
856
857 @Override
858 protected OrganizationDirectoryService getOrganizationDirectoryService() {
859 return organizationDirectoryService;
860 }
861
862 @Reference
863 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
864 this.serviceRegistry = serviceRegistry;
865 }
866
867 @Reference
868 public void setSecurityService(SecurityService securityService) {
869 this.securityService = securityService;
870 }
871
872 @Reference
873 public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
874 this.userDirectoryService = userDirectoryService;
875 }
876
877 @Reference
878 public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
879 this.organizationDirectoryService = organizationDirectoryService;
880 }
881
882 @Reference
883 public void setWorkspace(Workspace ws) {
884 this.workspace = ws;
885 }
886
887
888
889
890
891
892 @Reference
893 public void setDatabase(TranscriptionDatabase service) {
894 this.database = service;
895 }
896
897 @Reference
898 public void setAssetManager(AssetManager service) {
899 this.assetManager = service;
900 }
901
902 @Reference
903 public void setWorkflowService(WorkflowService service) {
904 this.workflowService = service;
905 }
906 }