1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.transcription.microsoft.azure;
23
24 import org.opencastproject.assetmanager.api.AssetManager;
25 import org.opencastproject.assetmanager.api.fn.Enrichments;
26 import org.opencastproject.assetmanager.api.query.AQueryBuilder;
27 import org.opencastproject.assetmanager.api.query.AResult;
28 import org.opencastproject.assetmanager.util.Workflows;
29 import org.opencastproject.job.api.AbstractJobProducer;
30 import org.opencastproject.job.api.Job;
31 import org.opencastproject.mediapackage.MediaPackageElement;
32 import org.opencastproject.mediapackage.MediaPackageElementBuilderFactory;
33 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
34 import org.opencastproject.mediapackage.MediaPackageElementParser;
35 import org.opencastproject.mediapackage.MediaPackageException;
36 import org.opencastproject.mediapackage.Track;
37 import org.opencastproject.security.api.DefaultOrganization;
38 import org.opencastproject.security.api.Organization;
39 import org.opencastproject.security.api.OrganizationDirectoryService;
40 import org.opencastproject.security.api.SecurityService;
41 import org.opencastproject.security.api.UserDirectoryService;
42 import org.opencastproject.security.util.SecurityUtil;
43 import org.opencastproject.serviceregistry.api.ServiceRegistry;
44 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
45 import org.opencastproject.systems.OpencastConstants;
46 import org.opencastproject.transcription.api.TranscriptionService;
47 import org.opencastproject.transcription.api.TranscriptionServiceException;
48 import org.opencastproject.transcription.microsoft.azure.model.MicrosoftAzureSpeechTranscription;
49 import org.opencastproject.transcription.microsoft.azure.model.MicrosoftAzureSpeechTranscriptionFile;
50 import org.opencastproject.transcription.microsoft.azure.model.MicrosoftAzureSpeechTranscriptionFiles;
51 import org.opencastproject.transcription.microsoft.azure.model.MicrosoftAzureSpeechTranscriptionJson;
52 import org.opencastproject.transcription.persistence.TranscriptionDatabase;
53 import org.opencastproject.transcription.persistence.TranscriptionDatabaseException;
54 import org.opencastproject.transcription.persistence.TranscriptionJobControl;
55 import org.opencastproject.transcription.persistence.TranscriptionProviderControl;
56 import org.opencastproject.util.NotFoundException;
57 import org.opencastproject.util.OsgiUtil;
58 import org.opencastproject.util.data.Option;
59 import org.opencastproject.workflow.api.ConfiguredWorkflow;
60 import org.opencastproject.workflow.api.WorkflowDatabaseException;
61 import org.opencastproject.workflow.api.WorkflowDefinition;
62 import org.opencastproject.workflow.api.WorkflowInstance;
63 import org.opencastproject.workflow.api.WorkflowService;
64 import org.opencastproject.workspace.api.Workspace;
65
66 import org.apache.commons.io.FilenameUtils;
67 import org.apache.commons.lang3.NotImplementedException;
68 import org.apache.commons.lang3.StringUtils;
69 import org.osgi.service.component.ComponentContext;
70 import org.osgi.service.component.annotations.Activate;
71 import org.osgi.service.component.annotations.Component;
72 import org.osgi.service.component.annotations.Deactivate;
73 import org.osgi.service.component.annotations.Modified;
74 import org.osgi.service.component.annotations.Reference;
75 import org.slf4j.Logger;
76 import org.slf4j.LoggerFactory;
77
78 import java.io.File;
79 import java.io.IOException;
80 import java.net.URI;
81 import java.net.URL;
82 import java.time.Instant;
83 import java.time.temporal.ChronoUnit;
84 import java.util.ArrayList;
85 import java.util.Arrays;
86 import java.util.Date;
87 import java.util.HashMap;
88 import java.util.HashSet;
89 import java.util.List;
90 import java.util.Locale;
91 import java.util.Map;
92 import java.util.Set;
93 import java.util.concurrent.Executors;
94 import java.util.concurrent.ScheduledExecutorService;
95 import java.util.concurrent.TimeUnit;
96
97 @Component(immediate = true, service = {
98 TranscriptionService.class, MicrosoftAzureTranscriptionService.class }, property = {
99 "service.description=Microsoft Azure Transcription Service", "provider=microsoft.azure" })
100 public class MicrosoftAzureTranscriptionService extends AbstractJobProducer implements TranscriptionService {
101
102 private static final Logger logger = LoggerFactory.getLogger(MicrosoftAzureTranscriptionService.class);
103
104 private static final String JOB_TYPE = "org.opencastproject.transcription.microsoft.azure";
105 private static final String PROVIDER = "microsoft-azure-speech-services";
106 private static final String DEFAULT_WORKFLOW_DEFINITION_ID = "microsoft-azure-attach-transcription";
107 private static final String DEFAULT_LANGUAGE = "en-GB";
108
109 private static final String DEFAULT_AZURE_BLOB_PATH = "";
110 private static final String DEFAULT_AZURE_CONTAINER_NAME = "opencast-transcriptions";
111 private static final float DEFAULT_MIN_CONFIDENCE = 1.0f;
112 private static final int DEFAULT_SPLIT_TEXT_LINE_LENGTH = 100;
113 private static final String DEFAULT_OUTPUT_FILE_FORMAT = "vtt";
114 private static final String KEY_ENABLED = "enabled";
115 private static final String KEY_LANGUAGE = "language";
116 private static final String KEY_AUTO_DETECT_LANGUAGES = "auto.detect.languages";
117 private static final String KEY_WORKFLOW = "workflow";
118 private static final String KEY_AZURE_STORAGE_ACCOUNT_NAME = "azure_storage_account_name";
119 private static final String KEY_AZURE_ACCOUNT_ACCESS_KEY = "azure_account_access_key";
120 private static final String KEY_AZURE_BOLB_PATH = "azure_blob_path";
121 private static final String KEY_AZURE_CONTAINER_NAME = "azure_container_name";
122 private static final String KEY_AZURE_SPEECH_SERVICES_ENDPOINT = "azure_speech_services_endpoint";
123 private static final String KEY_COGNITIVE_SERVICES_SUBSCRIPTION_KEY = "azure_cognitive_services_subscription_key";
124 private static final String KEY_AZURE_SPEECH_RECOGNITION_MIN_CONFIDENCE = "azure_speech_recognition_min_confidence";
125 private static final String KEY_SPLIT_TEXT_LINE_LENGTH = "split.text.line.length";
126 private static final String KEY_OUTPUT_FILE_FORMAT = "output.file.format";
127
128
129 private AssetManager assetManager;
130 private OrganizationDirectoryService organizationDirectoryService;
131 private SecurityService securityService;
132 private ServiceRegistry serviceRegistry;
133 private TranscriptionDatabase database;
134 private UserDirectoryService userDirectoryService;
135 private WorkflowService workflowService;
136 private Workspace workspace;
137 private ScheduledExecutorService scheduledExecutorService;
138 private Workflows wfUtil;
139 private String systemAccount;
140 private boolean enabled;
141 private String language;
142 private List<String> autodetectLanguages;
143 private String workflowDefinitionId;
144 private String azureStorageAccountName;
145 private String azureAccountAccessKey;
146 private String azureBlobPath;
147 private String azureContainerName;
148 private String azureSpeechServicesEndpoint;
149 private String azureCognitiveServicesSubscriptionKey;
150 private MicrosoftAzureAuthorization azureAuthorization;
151 private MicrosoftAzureStorageClient azureStorageClient;
152 private MicrosoftAzureSpeechServicesClient azureSpeechServicesClient;
153 private Float azureSpeechRecognitionMinConfidence;
154 private Integer splitTextLineLength;
155 private String outputFileFormat;
156
157 private enum Operation {
158 StartTranscription
159 }
160
161
162
163
164 public MicrosoftAzureTranscriptionService() {
165 super(JOB_TYPE);
166 }
167
168 @Activate
169 public void activate(ComponentContext cc) {
170 super.activate(cc);
171 systemAccount = OsgiUtil.getContextProperty(cc, OpencastConstants.DIGEST_USER_PROPERTY);
172 logger.debug("Activating...");
173 modified(cc);
174 }
175
176 @Modified
177 public void modified(ComponentContext cc) {
178 logger.debug("Updating config...");
179 Option<Boolean> enabledOpt = OsgiUtil.getOptCfgAsBoolean(cc.getProperties(), KEY_ENABLED);
180 if (enabledOpt.isSome()) {
181 enabled = enabledOpt.get();
182 } else {
183 deactivate();
184 }
185
186 if (!enabled) {
187 logger.info("Microsoft Azure transcription service disabled."
188 + " If you want to enable it, please update the service configuration.");
189 return;
190 }
191
192 Option<String> azureStorageAccountNameKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(),
193 KEY_AZURE_STORAGE_ACCOUNT_NAME);
194 if (azureStorageAccountNameKeyOpt.isSome()) {
195 azureStorageAccountName = azureStorageAccountNameKeyOpt.get();
196 } else {
197 logger.warn("Azure storage account name key was not set. Disabling Microsoft Azure transcription service.");
198 deactivate();
199 return;
200 }
201
202 Option<String> azureAccountAccessKeyKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_AZURE_ACCOUNT_ACCESS_KEY);
203 if (azureAccountAccessKeyKeyOpt.isSome()) {
204 azureAccountAccessKey = azureAccountAccessKeyKeyOpt.get();
205 } else {
206 logger.warn("Azure storage account access key was not set. Disabling Microsoft Azure transcription service.");
207 deactivate();
208 return;
209 }
210
211 Option<String> azureSpeechServicesKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(),
212 KEY_AZURE_SPEECH_SERVICES_ENDPOINT);
213 if (azureSpeechServicesKeyOpt.isSome()) {
214 azureSpeechServicesEndpoint = azureSpeechServicesKeyOpt.get();
215 } else {
216 logger.warn("Azure speech services endpoint was not set. Disabling Microsoft Azure transcription service.");
217 deactivate();
218 return;
219 }
220
221 Option<String> azureCognitiveServicesSubscriptionKeyKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(),
222 KEY_COGNITIVE_SERVICES_SUBSCRIPTION_KEY);
223 if (azureCognitiveServicesSubscriptionKeyKeyOpt.isSome()) {
224 azureCognitiveServicesSubscriptionKey = azureCognitiveServicesSubscriptionKeyKeyOpt.get();
225 } else {
226 logger.warn("Azure cognitive services subscription key was not set. "
227 + "Disabling Microsoft Azure transcription service.");
228 deactivate();
229 return;
230 }
231
232
233 Option<String> workflowKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_WORKFLOW);
234 if (workflowKeyOpt.isSome()) {
235 workflowDefinitionId = workflowKeyOpt.get();
236 logger.info("Workflow is set to '{}'.", workflowDefinitionId);
237 } else {
238 workflowDefinitionId = DEFAULT_WORKFLOW_DEFINITION_ID;
239 logger.info("Default workflow '{}' will be used.", workflowDefinitionId);
240 }
241
242 Option<String> azureBlobPathKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_AZURE_BOLB_PATH);
243 if (azureBlobPathKeyOpt.isSome()) {
244 azureBlobPath = azureBlobPathKeyOpt.get();
245 } else {
246 logger.debug("Azure blob path was not set, using default path.");
247 azureBlobPath = DEFAULT_AZURE_BLOB_PATH;
248 }
249
250 Option<String> languageOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_LANGUAGE);
251 if (languageOpt.isSome()) {
252 language = languageOpt.get();
253 logger.info("Default language is set to '{}'.", language);
254 } else {
255 language = DEFAULT_LANGUAGE;
256 logger.info("Default language '{}' will be used.", language);
257 }
258
259 autodetectLanguages = new ArrayList<>();
260 Option<String> autoDetectLanguagesOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_AUTO_DETECT_LANGUAGES);
261 if (languageOpt.isSome()) {
262 for (String lang : StringUtils.split(autoDetectLanguagesOpt.get(), ",")) {
263 if (StringUtils.isNotBlank(lang)) {
264 autodetectLanguages.add(StringUtils.trimToEmpty(lang));
265 }
266 }
267 }
268
269 Option<String> azureContainerNameKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_AZURE_CONTAINER_NAME);
270 if (azureContainerNameKeyOpt.isSome()) {
271 azureContainerName = azureContainerNameKeyOpt.get();
272 } else {
273 logger.debug("Azure storage container name was not set, using default path.");
274 azureContainerName = DEFAULT_AZURE_CONTAINER_NAME;
275 }
276
277 Option<String> azureSpeechRecognitionMinConfidenceKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(),
278 KEY_AZURE_SPEECH_RECOGNITION_MIN_CONFIDENCE);
279 if (azureSpeechRecognitionMinConfidenceKeyOpt.isSome()) {
280 String azureSpeechRecognitionMinConfidenceStr = azureSpeechRecognitionMinConfidenceKeyOpt.get();
281 try {
282 azureSpeechRecognitionMinConfidence = Float.valueOf(azureSpeechRecognitionMinConfidenceStr);
283 } catch (NumberFormatException e) {
284 logger.error("Azure speech recognition min confidence value is not valid. "
285 + "Please set a value between 0.0 and 1.0. "
286 + "Setting to default value of {}.", DEFAULT_MIN_CONFIDENCE);
287 azureSpeechRecognitionMinConfidence = DEFAULT_MIN_CONFIDENCE;
288 }
289 } else {
290 logger.debug("Azure speech recognition min confidence value was not set. Setting to default value of {}.",
291 DEFAULT_MIN_CONFIDENCE);
292 azureSpeechRecognitionMinConfidence = DEFAULT_MIN_CONFIDENCE;
293 }
294
295 Option<String> splitTextLineLengthOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_SPLIT_TEXT_LINE_LENGTH);
296 if (splitTextLineLengthOpt.isSome()) {
297 try {
298 splitTextLineLength = Integer.parseInt(splitTextLineLengthOpt.get());
299 } catch (NumberFormatException e) {
300 splitTextLineLength = DEFAULT_SPLIT_TEXT_LINE_LENGTH;
301 logger.error("Invalid configuration value for '{}'. Set default value {}.", KEY_SPLIT_TEXT_LINE_LENGTH,
302 DEFAULT_SPLIT_TEXT_LINE_LENGTH);
303 }
304 } else {
305 logger.debug("Configuration value for '{}' was not set. Setting to default value of {}.",
306 KEY_SPLIT_TEXT_LINE_LENGTH, DEFAULT_MIN_CONFIDENCE);
307 splitTextLineLength = DEFAULT_SPLIT_TEXT_LINE_LENGTH;
308 }
309
310 Option<String> outputFileFormatOpt = OsgiUtil.getOptCfg(cc.getProperties(), KEY_OUTPUT_FILE_FORMAT);
311 if (outputFileFormatOpt.isSome()) {
312 outputFileFormat = outputFileFormatOpt.get();
313 switch (outputFileFormat) {
314 case "srt":
315 case "vtt":
316 break;
317 default:
318 logger.debug("Azure output file format not valid, using default format {}.",
319 DEFAULT_OUTPUT_FILE_FORMAT);
320 outputFileFormat = DEFAULT_OUTPUT_FILE_FORMAT;
321 break;
322 }
323 } else {
324 logger.debug("Azure output file format not set, using default format {}.", DEFAULT_OUTPUT_FILE_FORMAT);
325 outputFileFormat = DEFAULT_OUTPUT_FILE_FORMAT;
326 }
327 logger.info("Transcription output format is set to '{}'.", outputFileFormat);
328
329
330 try {
331 azureAuthorization = new MicrosoftAzureAuthorization(azureStorageAccountName, azureAccountAccessKey);
332 azureStorageClient = new MicrosoftAzureStorageClient(azureAuthorization);
333 } catch (MicrosoftAzureStorageClientException e) {
334 logger.error("Unable to create Microsoft Azure storage client. "
335 + "Deactivating Microsoft Azure Transcription service.", e);
336 deactivate();
337 return;
338 }
339
340
341 azureSpeechServicesClient = new MicrosoftAzureSpeechServicesClient(
342 azureSpeechServicesEndpoint, azureCognitiveServicesSubscriptionKey);
343
344 if (scheduledExecutorService != null) {
345 scheduledExecutorService.shutdown();
346 try {
347 scheduledExecutorService.awaitTermination(60, TimeUnit.SECONDS);
348 } catch (InterruptedException e) {
349
350
351 }
352 }
353 scheduledExecutorService = Executors.newScheduledThreadPool(2);
354 scheduledExecutorService.scheduleWithFixedDelay(new WorkflowDispatcher(), 120, 120, TimeUnit.SECONDS);
355 logger.info("Activated.");
356 }
357
358 @Deactivate
359 public void deactivate() {
360 enabled = false;
361 if (scheduledExecutorService != null) {
362 scheduledExecutorService.shutdown();
363 try {
364 scheduledExecutorService.awaitTermination(60, TimeUnit.SECONDS);
365 } catch (InterruptedException e) {
366
367
368 }
369 }
370 azureAuthorization = null;
371 azureStorageClient = null;
372 azureSpeechServicesClient = null;
373 logger.info("Deactivated.");
374 }
375
376 @Override
377 protected String process(Job job) throws Exception {
378 Operation op = null;
379 String operation = job.getOperation();
380 List<String> arguments = job.getArguments();
381 op = Operation.valueOf(operation);
382 switch (op) {
383 case StartTranscription:
384 long jobId = job.getId();
385 String mpId = arguments.get(0);
386 Track track = (Track) MediaPackageElementParser.getFromXml(arguments.get(1));
387 String languageCode = arguments.get(2);
388 if (StringUtils.isBlank(languageCode)) {
389 languageCode = getLanguage();
390 }
391 return createTranscriptionJob(jobId, mpId, track, languageCode);
392 default:
393 throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
394 }
395 }
396
397 @Override
398 public Job startTranscription(String mpId, Track track) throws TranscriptionServiceException {
399 return startTranscription(mpId, track, getLanguage());
400 }
401
402 @Override
403 public Job startTranscription(String mpId, Track track, String... args) throws TranscriptionServiceException {
404 try {
405 List<String> jobArgs = new ArrayList<>(2 + args.length);
406 jobArgs.add(mpId);
407 jobArgs.add(MediaPackageElementParser.getAsXml(track));
408 jobArgs.addAll(Arrays.asList(args));
409 return serviceRegistry.createJob(JOB_TYPE, Operation.StartTranscription.toString(), jobArgs);
410 } catch (ServiceRegistryException e) {
411 throw new TranscriptionServiceException(String.format(
412 "Unable to create transcription job for media package '%s'.", mpId), e);
413 } catch (MediaPackageException e) {
414 throw new TranscriptionServiceException(String.format(
415 "Unable to to parse track from media package '%s'.", mpId), e);
416 }
417 }
418
419 @Override
420 public MediaPackageElement getGeneratedTranscription(String mpId, String jobId, MediaPackageElement.Type type)
421 throws TranscriptionServiceException {
422 if (type != MediaPackageElement.Type.Track && type != MediaPackageElement.Type.Attachment) {
423 throw new IllegalArgumentException("Target type must be a Track or Attachment.");
424 }
425 MicrosoftAzureSpeechTranscription transcription;
426 try {
427 transcription = azureSpeechServicesClient.getTranscriptionById(jobId);
428 } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureSpeechClientException
429 | MicrosoftAzureNotFoundException e) {
430 throw new TranscriptionServiceException(String.format(
431 "Unable to get transcription '%s' for media package '%s'.", jobId, mpId), e);
432 }
433 MicrosoftAzureSpeechTranscriptionJson transcriptionJson;
434 URI transcriptionFileUri;
435 try {
436 transcriptionJson = getTranscriptionJson(mpId, transcription);
437 transcriptionFileUri = MicrosoftAzureSpeechServicesClient.writeTranscriptionFile(transcriptionJson,
438 workspace, outputFileFormat, azureSpeechRecognitionMinConfidence, splitTextLineLength);
439 } catch (IOException | MicrosoftAzureNotFoundException e) {
440 throw new TranscriptionServiceException(String.format(
441 "Unable to download transcription file for media package '%s'.", mpId), e);
442 }
443 MediaPackageElement transcriptionElement = MediaPackageElementBuilderFactory.newInstance().newElementBuilder()
444 .elementFromURI(transcriptionFileUri, type, new MediaPackageElementFlavor("captions", outputFileFormat));
445 if (type.equals(MediaPackageElement.Type.Track)) {
446
447 transcriptionElement.setTags(new String[] { "generator-type:auto", "generator:azure", "type:subtitle"});
448 }
449 return transcriptionElement;
450 }
451
452 @Override
453 public void transcriptionDone(String mpId, Object results) throws TranscriptionServiceException {
454 MicrosoftAzureSpeechTranscription transcription = (MicrosoftAzureSpeechTranscription) results;
455 logger.info("Transcription job {} for media package {} done.", transcription.getID(), mpId);
456
457 try {
458 deleteTranscriptionSourceFiles(mpId, transcription.getID());
459 } catch (TranscriptionServiceException e) {
460 logger.warn("Unable to delete transcription source files for media package {} after transcription job done.",
461 mpId, e);
462 }
463 try {
464 database.updateJobControl(transcription.getID(), TranscriptionJobControl.Status.TranscriptionComplete.name());
465 } catch (TranscriptionDatabaseException e) {
466 throw new TranscriptionServiceException(String.format(
467 "Transcription job for media package '%s' succeeded but storing job status in the database failed."
468 , mpId), e);
469 }
470 }
471
472 @Override
473 public void transcriptionError(String mpId, Object results) throws TranscriptionServiceException {
474 MicrosoftAzureSpeechTranscription transcription = (MicrosoftAzureSpeechTranscription) results;
475 String message = "";
476 if (transcription != null && transcription.properties != null && transcription.properties.containsKey("error")) {
477 Map<String, Object> errorInfo = (Map<String, Object>) transcription.properties.get("error");
478 message = String.format(" Microsoft error code %s: %s", errorInfo.getOrDefault("code", "UNKNOWN"),
479 errorInfo.getOrDefault("message", "No info"));
480 }
481 logger.info("Transcription job {} for media package {} failed.{}", transcription.getID(), mpId, message);
482
483 try {
484 deleteTranscriptionSourceFiles(mpId, transcription.getID());
485 } catch (TranscriptionServiceException e) {
486 logger.warn("Unable to delete transcription source files for media package {} after transcription kob failure.",
487 mpId, e);
488 }
489 try {
490 database.updateJobControl(transcription.getID(), TranscriptionJobControl.Status.Error.name());
491 } catch (TranscriptionDatabaseException e) {
492 throw new TranscriptionServiceException(String.format(
493 "Transcription job for media package '%s' failed and storing job status in the database failed too."
494 , mpId), e);
495 }
496 }
497
498 @Override
499 public String getLanguage() {
500 return language;
501 }
502
503 @Override
504 public Map<String, Object> getReturnValues(String mpId, String jobId) throws TranscriptionServiceException {
505 throw new NotImplementedException();
506 }
507
508 public String createTranscriptionJob(long jobId, String mpId, Track track, String language)
509 throws TranscriptionServiceException {
510
511 File trackFile;
512 try {
513 trackFile = workspace.get(track.getURI());
514 } catch (NotFoundException e) {
515 throw new TranscriptionServiceException(String.format("Track %s not found.", track.getURI()), e);
516 } catch (IOException e) {
517 throw new TranscriptionServiceException(String.format(
518 "Unable to get track %s for transcription.", track.getURI()), e);
519 }
520
521
522 try {
523 azureStorageClient.createContainer(azureContainerName);
524 } catch (IOException | MicrosoftAzureStorageClientException | MicrosoftAzureNotAllowedException e) {
525 throw new TranscriptionServiceException(String.format(
526 "Unable to query or create a storage container '%s' on Microsoft Azure.", azureContainerName), e);
527 }
528
529 String azureBlobUrl;
530 try {
531 String filename = String.format("%d-%s.%s", jobId, mpId, FilenameUtils.getExtension(trackFile.getName()));
532 azureBlobUrl = azureStorageClient.uploadFile(trackFile, azureContainerName, azureBlobPath, filename);
533 } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureStorageClientException e) {
534 throw new TranscriptionServiceException(String.format(
535 "Unable to upload track %s from media package '%s' to Microsoft Azure storage container '%s'.",
536 track.getURI(), mpId, azureContainerName), e);
537 }
538
539 List<String> contentUrls = Arrays.asList(azureBlobUrl);
540 String azureDestContainerUrl = String.format("%s?%s", azureStorageClient.getContainerUrl(azureContainerName),
541 azureAuthorization.generateServiceSasToken("cw", null, null, azureContainerName, "c"));
542 MicrosoftAzureSpeechTranscription transcription;
543 try {
544 transcription = azureSpeechServicesClient.createTranscription(contentUrls,
545 azureDestContainerUrl, String.format("Transcription job %d", jobId), language, autodetectLanguages,
546 null, null);
547 logger.info("Started transcription of {} from media package '{}' on Microsoft Azure Speech Services at {}",
548 track.getURI(), mpId, transcription.self);
549 } catch (MicrosoftAzureNotAllowedException | IOException | MicrosoftAzureSpeechClientException e) {
550 throw new TranscriptionServiceException(String.format(
551 "Unable to create transcription of track %s from media package '%s' "
552 + "in Microsoft Azure storage container '%s'.",
553 track.getURI(), mpId, azureContainerName), e);
554 }
555
556 try {
557 database.storeJobControl(mpId, track.getIdentifier(), transcription.getID(),
558 TranscriptionJobControl.Status.InProgress.name(),
559 track.getDuration() == null ? 0 : track.getDuration(), new Date(), PROVIDER);
560 } catch (TranscriptionDatabaseException e) {
561 throw new TranscriptionServiceException(String.format(
562 "Unable to store transcription job of track %s from media package '%s' in the database.",
563 track.getURI(), mpId), e);
564 }
565
566 return transcription.getID();
567 }
568
569 MicrosoftAzureSpeechTranscriptionJson getTranscriptionJson(String mpId,
570 MicrosoftAzureSpeechTranscription transcription)
571 throws TranscriptionServiceException, MicrosoftAzureNotFoundException {
572 if (!transcription.isSucceeded()) {
573 if (transcription.isRunning()) {
574 throw new TranscriptionServiceException(String.format("Unable to get generated transcription. "
575 + "Transcription job '%s' for media package '%s' is currently running.", transcription.getID(), mpId));
576 } else if (transcription.isFailed()) {
577 throw new TranscriptionServiceException(String.format("Unable to get generated transcription. "
578 + "Transcription job '%s' for media package '%s' is failed.", transcription.getID(), mpId));
579 }
580 }
581
582 MicrosoftAzureSpeechTranscriptionFiles transcriptionFiles;
583 try {
584 transcriptionFiles = azureSpeechServicesClient.getTranscriptionFilesById(transcription.getID());
585 } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureSpeechClientException e) {
586 throw new TranscriptionServiceException(String.format(
587 "Unable to get transcription files '%s' for media package '%s'.", transcription.getID(), mpId), e);
588 }
589
590 MicrosoftAzureSpeechTranscriptionFile transcriptionFile = null;
591 for (MicrosoftAzureSpeechTranscriptionFile tf : transcriptionFiles.values) {
592 if (tf.isTranscriptionFile()) {
593 transcriptionFile = tf;
594 break;
595 }
596 }
597 if (transcriptionFile == null) {
598
599
600 throw new NotImplementedException("At least one transcription file should be provided.");
601 }
602
603 try {
604 return MicrosoftAzureSpeechServicesClient
605 .getTranscriptionJson(transcriptionFile);
606 } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureSpeechClientException
607 | MicrosoftAzureNotFoundException e) {
608 throw new TranscriptionServiceException(String.format(
609 "Unable to download transcription file '%s' for media package '%s'.", transcriptionFile.self, mpId), e);
610 }
611 }
612
613 String startWorkflow(String mpId, MicrosoftAzureSpeechTranscription transcription)
614 throws TranscriptionDatabaseException, NotFoundException, WorkflowDatabaseException,
615 TranscriptionServiceException, MicrosoftAzureNotFoundException {
616 MicrosoftAzureSpeechTranscriptionJson transcriptionJson = getTranscriptionJson(mpId, transcription);
617 String transcriptionLocale = transcriptionJson.getRecognizedLocale();
618
619 DefaultOrganization defaultOrg = new DefaultOrganization();
620 securityService.setOrganization(defaultOrg);
621 securityService.setUser(SecurityUtil.createSystemUser(systemAccount, defaultOrg));
622
623
624 final AQueryBuilder q = assetManager.createQuery();
625 final AResult r = q.select(q.snapshot()).where(q.mediaPackageId(mpId).and(q.version().isLatest())).run();
626 if (r.getSize() == 0) {
627
628 logger.warn("Media package {} has not been archived yet. Skipped.", mpId);
629 return null;
630 }
631
632 String org = Enrichments.enrich(r).getSnapshots().get(0).getOrganizationId();
633 Organization organization = organizationDirectoryService.getOrganization(org);
634 if (organization == null) {
635 logger.warn("Media package {} has an unknown organization {}. Skipped.", mpId, org);
636 return null;
637 }
638 securityService.setOrganization(organization);
639
640
641 Map<String, String> params = new HashMap<>();
642 params.put("transcriptionJobId", transcription.getID());
643 String locale = "";
644 String language = "";
645 if (StringUtils.isNotBlank(transcriptionLocale)) {
646 locale = transcriptionLocale;
647 language = Locale.forLanguageTag(transcriptionLocale).getLanguage();
648 }
649 params.put("transcriptionLocale", locale);
650 params.put("transcriptionLocaleSet", Boolean.toString(!StringUtils.isEmpty(locale)));
651 params.put("transcriptionLocaleSubtypeSuffix", !StringUtils.isEmpty(locale) ? "+" + locale : "");
652 params.put("transcriptionLocaleTag", !StringUtils.isEmpty(locale) ? "lang:" + locale : "");
653 params.put("transcriptionLanguage", language);
654 params.put("transcriptionLanguageSet", Boolean.toString(!StringUtils.isEmpty(language)));
655 params.put("transcriptionLanguageSubtypeSuffix", !StringUtils.isEmpty(language) ? "+" + language : "");
656 params.put("transcriptionLanguageTag", !StringUtils.isEmpty(language) ? "lang:" + language : "");
657 WorkflowDefinition wfDef = workflowService.getWorkflowDefinitionById(workflowDefinitionId);
658
659
660
661 Workflows workflows = wfUtil != null ? wfUtil : new Workflows(assetManager, workflowService);
662 Set<String> mpIds = new HashSet<>();
663 mpIds.add(mpId);
664 List<WorkflowInstance> wfList = workflows
665 .applyWorkflowToLatestVersion(mpIds, ConfiguredWorkflow.workflow(wfDef, params));
666 return wfList.size() > 0 ? Long.toString(wfList.get(0).getId()) : null;
667 }
668
669 public void deleteTranscription(String mpId, String transcriptionId)
670 throws TranscriptionServiceException, TranscriptionDatabaseException {
671 TranscriptionJobControl transcriptionJobControl = database.findByJob(transcriptionId);
672 TranscriptionJobControl.Status transcriptionJobControlStatus = TranscriptionJobControl.Status.valueOf(
673 transcriptionJobControl.getStatus());
674 if (transcriptionJobControlStatus != TranscriptionJobControl.Status.Closed
675 && transcriptionJobControlStatus != TranscriptionJobControl.Status.Canceled
676 && transcriptionJobControlStatus != TranscriptionJobControl.Status.Error) {
677 throw new TranscriptionServiceException(String.format("Abort deleting transcription %s with invalid status '%s'.",
678 transcriptionId, transcriptionJobControl.getStatus()));
679 }
680 deleteTranscriptionSourceFiles(mpId, transcriptionId);
681 try {
682 azureSpeechServicesClient.deleteTranscription(transcriptionId);
683 } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureSpeechClientException e) {
684 throw new TranscriptionServiceException(String.format(
685 "Unable to delete transcription '%s' for media package '%s'.", transcriptionId, mpId), e);
686 }
687 database.deleteJobControl(transcriptionJobControl.getTranscriptionJobId());
688 }
689
690 public void deleteTranscriptionSourceFiles(String mpId, String transcriptionId)
691 throws TranscriptionServiceException {
692 MicrosoftAzureSpeechTranscriptionFiles transcriptionFiles;
693 try {
694 transcriptionFiles = azureSpeechServicesClient.getTranscriptionFilesById(transcriptionId);
695 } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureSpeechClientException e) {
696 throw new TranscriptionServiceException(String.format(
697 "Unable to get for transcription '%s' from media package '%s'.", transcriptionId, mpId), e);
698 } catch (MicrosoftAzureNotFoundException e) {
699
700 logger.debug("Failed to get non existing transcription files from media package {} for deleting.", mpId, e);
701 return;
702 }
703 for (MicrosoftAzureSpeechTranscriptionFile transcriptionFile : transcriptionFiles.values) {
704 if (!transcriptionFile.isTranscriptionFile()) {
705 continue;
706 }
707 MicrosoftAzureSpeechTranscriptionJson transcriptionJson;
708 try {
709 transcriptionJson = MicrosoftAzureSpeechServicesClient
710 .getTranscriptionJson(transcriptionFile);
711 } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureSpeechClientException e) {
712 throw new TranscriptionServiceException(String.format(
713 "Unable to download transcription file '%s' for media package '%s'.", transcriptionFile.self, mpId), e);
714 } catch (MicrosoftAzureNotFoundException e) {
715
716 logger.debug("Failed to get non existing transcription file {} from media package {} for deleting.",
717 transcriptionFile.self, mpId, e);
718 continue;
719 }
720 if (StringUtils.isNotBlank(transcriptionJson.source)) {
721 try {
722 azureStorageClient.deleteFile(new URL(transcriptionJson.source));
723 } catch (IOException | MicrosoftAzureNotAllowedException | MicrosoftAzureStorageClientException e) {
724 throw new TranscriptionServiceException(String.format(
725 "Unable to delete audio source file for media package %s.", mpId, e));
726 }
727 }
728 }
729 }
730
731 class WorkflowDispatcher implements Runnable {
732
733 @Override
734 public void run() {
735 if (!enabled) {
736 logger.debug("Service disabled, cancel processing.");
737 return;
738 }
739 logger.debug("Run jobs handling loop for transcription provider {}.", PROVIDER);
740 long providerId;
741 try {
742 TranscriptionProviderControl providerInfo = database.findIdByProvider(PROVIDER);
743 if (providerInfo != null) {
744 providerId = providerInfo.getId();
745 } else {
746 logger.debug("No jobs yet for provider {}.", PROVIDER);
747 return;
748 }
749
750 for (TranscriptionJobControl jobControl : database.findByStatus(
751 TranscriptionJobControl.Status.InProgress.name())) {
752 if (providerId != jobControl.getProviderId()) {
753 continue;
754 }
755 String mpId = jobControl.getMediaPackageId();
756 String transcriptionId = jobControl.getTranscriptionJobId();
757 try {
758 MicrosoftAzureSpeechTranscription transcription = azureSpeechServicesClient.getTranscriptionById(
759 transcriptionId);
760
761 if (!transcription.isRunning()) {
762 if (transcription.isFailed()) {
763 transcriptionError(mpId, transcription);
764 } else if (transcription.isSucceeded()) {
765 transcriptionDone(mpId, transcription);
766 }
767 }
768 } catch (MicrosoftAzureNotAllowedException | IOException | MicrosoftAzureSpeechClientException e) {
769 logger.error("Unable to get or update transcription {} or transcription file from media package {}.",
770 transcriptionId, mpId, e);
771 } catch (MicrosoftAzureNotFoundException e) {
772 logger.warn("Transcription {} from media package {} not found.", transcriptionId, mpId);
773 database.updateJobControl(transcriptionId, TranscriptionJobControl.Status.Error.name());
774 } catch (TranscriptionServiceException e) {
775 logger.error(e.getMessage(), e);
776 }
777 }
778
779 for (TranscriptionJobControl jobControl : database.findByStatus(
780 TranscriptionJobControl.Status.TranscriptionComplete.name())) {
781 if (providerId != jobControl.getProviderId()) {
782 continue;
783 }
784 String mpId = jobControl.getMediaPackageId();
785 String transcriptionId = jobControl.getTranscriptionJobId();
786 try {
787 MicrosoftAzureSpeechTranscription transcription = azureSpeechServicesClient.getTranscriptionById(
788 transcriptionId);
789
790 String workflowId = startWorkflow(mpId, transcription);
791
792 if (workflowId != null) {
793 database.updateJobControl(transcriptionId, TranscriptionJobControl.Status.Closed.name());
794 logger.info("Attach transcription workflow {} scheduled for mp {}, microsoft azure transcription job {}",
795 workflowId, mpId, transcriptionId);
796 }
797 } catch (MicrosoftAzureNotAllowedException | IOException
798 | MicrosoftAzureSpeechClientException e) {
799 logger.warn("Unable to get transcription {} or transcription file from media package {}.",
800 transcriptionId, mpId, e);
801 } catch (MicrosoftAzureNotFoundException e) {
802 logger.warn("Transcription {} from media package {} not found.", transcriptionId, mpId);
803 database.updateJobControl(transcriptionId, TranscriptionJobControl.Status.Error.name());
804 } catch (TranscriptionServiceException e) {
805 logger.warn(e.getMessage(), e);
806 } catch (NotFoundException e) {
807 logger.warn("Unable to load organization.", e);
808 } catch (IllegalStateException e) {
809 logger.debug(e.getMessage());
810 }
811 }
812
813 for (TranscriptionJobControl jobControl : database.findByStatus(
814 TranscriptionJobControl.Status.Closed.name(), TranscriptionJobControl.Status.Error.name())) {
815 if (providerId != jobControl.getProviderId()) {
816 continue;
817 }
818 String mpId = jobControl.getMediaPackageId();
819 String transcriptionId = jobControl.getTranscriptionJobId();
820 if (Instant.now().minus(7, ChronoUnit.DAYS).isAfter(jobControl.getDateCreated().toInstant())) {
821 try {
822 deleteTranscription(jobControl.getMediaPackageId(), jobControl.getTranscriptionJobId());
823 } catch (TranscriptionServiceException e) {
824 logger.error("Unable to delete transcription {} or transcription files from media package {}.",
825 transcriptionId, mpId, e);
826 }
827 }
828 }
829 } catch (TranscriptionDatabaseException e) {
830 logger.warn("Could not read or update transcription job control database", e);
831 } catch (WorkflowDatabaseException e) {
832 logger.warn("Unable to get workflow definition.", e);
833 } catch (Throwable e) {
834
835 logger.error("Something went wrong in transcription job processing loop. Exception unhandled!!!", e);
836 }
837 }
838 }
839
840
841 void setWfUtil(Workflows wfUtil) {
842 this.wfUtil = wfUtil;
843 }
844
845 @Override
846 protected ServiceRegistry getServiceRegistry() {
847 return serviceRegistry;
848 }
849
850 @Override
851 protected SecurityService getSecurityService() {
852 return securityService;
853 }
854
855 @Override
856 protected UserDirectoryService getUserDirectoryService() {
857 return userDirectoryService;
858 }
859
860 @Override
861 protected OrganizationDirectoryService getOrganizationDirectoryService() {
862 return organizationDirectoryService;
863 }
864
865 @Reference
866 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
867 this.serviceRegistry = serviceRegistry;
868 }
869
870 @Reference
871 public void setSecurityService(SecurityService securityService) {
872 this.securityService = securityService;
873 }
874
875 @Reference
876 public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
877 this.userDirectoryService = userDirectoryService;
878 }
879
880 @Reference
881 public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
882 this.organizationDirectoryService = organizationDirectoryService;
883 }
884
885 @Reference
886 public void setWorkspace(Workspace ws) {
887 this.workspace = ws;
888 }
889
890
891
892
893
894
895 @Reference
896 public void setDatabase(TranscriptionDatabase service) {
897 this.database = service;
898 }
899
900 @Reference
901 public void setAssetManager(AssetManager service) {
902 this.assetManager = service;
903 }
904
905 @Reference
906 public void setWorkflowService(WorkflowService service) {
907 this.workflowService = service;
908 }
909 }