1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.transcription.googlespeech;
22
23 import org.opencastproject.assetmanager.api.AssetManager;
24 import org.opencastproject.assetmanager.api.Snapshot;
25 import org.opencastproject.assetmanager.util.Workflows;
26 import org.opencastproject.job.api.AbstractJobProducer;
27 import org.opencastproject.job.api.Job;
28 import org.opencastproject.kernel.mail.SmtpService;
29 import org.opencastproject.mediapackage.MediaPackageElement;
30 import org.opencastproject.mediapackage.MediaPackageElementBuilder;
31 import org.opencastproject.mediapackage.MediaPackageElementBuilderFactory;
32 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
33 import org.opencastproject.mediapackage.MediaPackageElementParser;
34 import org.opencastproject.mediapackage.MediaPackageException;
35 import org.opencastproject.mediapackage.Track;
36 import org.opencastproject.security.api.DefaultOrganization;
37 import org.opencastproject.security.api.Organization;
38 import org.opencastproject.security.api.OrganizationDirectoryService;
39 import org.opencastproject.security.api.SecurityService;
40 import org.opencastproject.security.api.UserDirectoryService;
41 import org.opencastproject.security.util.SecurityUtil;
42 import org.opencastproject.serviceregistry.api.ServiceRegistry;
43 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
44 import org.opencastproject.systems.OpencastConstants;
45 import org.opencastproject.transcription.api.TranscriptionService;
46 import org.opencastproject.transcription.api.TranscriptionServiceException;
47 import org.opencastproject.transcription.persistence.TranscriptionDatabase;
48 import org.opencastproject.transcription.persistence.TranscriptionDatabaseException;
49 import org.opencastproject.transcription.persistence.TranscriptionJobControl;
50 import org.opencastproject.transcription.persistence.TranscriptionProviderControl;
51 import org.opencastproject.util.NotFoundException;
52 import org.opencastproject.util.OsgiUtil;
53 import org.opencastproject.util.UrlSupport;
54 import org.opencastproject.workflow.api.ConfiguredWorkflow;
55 import org.opencastproject.workflow.api.WorkflowDatabaseException;
56 import org.opencastproject.workflow.api.WorkflowDefinition;
57 import org.opencastproject.workflow.api.WorkflowInstance;
58 import org.opencastproject.workflow.api.WorkflowService;
59 import org.opencastproject.workingfilerepository.api.WorkingFileRepository;
60 import org.opencastproject.workspace.api.Workspace;
61
62 import org.apache.commons.io.FilenameUtils;
63 import org.apache.commons.lang3.StringUtils;
64 import org.apache.http.HttpEntity;
65 import org.apache.http.HttpHeaders;
66 import org.apache.http.HttpStatus;
67 import org.apache.http.client.config.RequestConfig;
68 import org.apache.http.client.methods.CloseableHttpResponse;
69 import org.apache.http.client.methods.HttpGet;
70 import org.apache.http.client.methods.HttpPost;
71 import org.apache.http.entity.StringEntity;
72 import org.apache.http.impl.client.CloseableHttpClient;
73 import org.apache.http.impl.client.HttpClients;
74 import org.apache.http.util.EntityUtils;
75 import org.json.simple.JSONArray;
76 import org.json.simple.JSONObject;
77 import org.json.simple.parser.JSONParser;
78 import org.osgi.service.component.ComponentContext;
79 import org.osgi.service.component.annotations.Activate;
80 import org.osgi.service.component.annotations.Component;
81 import org.osgi.service.component.annotations.Reference;
82 import org.slf4j.Logger;
83 import org.slf4j.LoggerFactory;
84
85 import java.io.ByteArrayInputStream;
86 import java.io.File;
87 import java.io.IOException;
88 import java.net.URI;
89 import java.util.Arrays;
90 import java.util.HashMap;
91 import java.util.HashSet;
92 import java.util.List;
93 import java.util.Map;
94 import java.util.Optional;
95 import java.util.Set;
96 import java.util.concurrent.Executors;
97 import java.util.concurrent.ScheduledExecutorService;
98 import java.util.concurrent.TimeUnit;
99
100 @Component(
101 immediate = true,
102 service = { TranscriptionService.class,GoogleSpeechTranscriptionService.class },
103 property = {
104 "service.description=Google Speech Transcription Service",
105 "provider=google.speech"
106 }
107 )
108 public class GoogleSpeechTranscriptionService extends AbstractJobProducer implements TranscriptionService {
109
110
111
112
113 private static final Logger logger = LoggerFactory.getLogger(GoogleSpeechTranscriptionService.class);
114
115 private static final String JOB_TYPE = "org.opencastproject.transcription.googlespeech";
116
117 static final String TRANSCRIPT_COLLECTION = "transcripts";
118 static final String TRANSCRIPTION_ERROR = "Transcription ERROR";
119 static final String TRANSCRIPTION_JOB_ID_KEY = "transcriptionJobId";
120 static final String ACCESS_TOKEN_NAME = "access_token";
121 static final String ACCESS_TOKEN_EXPIRY_NAME = "expires_in";
122 private static final int CONNECTION_TIMEOUT = 60000;
123 private static final int SOCKET_TIMEOUT = 60000;
124 private static final int ACCESS_TOKEN_MINIMUM_TIME = 60000;
125
126 public static final String DEFAULT_WF_DEF = "google-speech-attach-transcripts";
127 private static final long DEFAULT_COMPLETION_BUFFER = 300;
128 private static final long DEFAULT_DISPATCH_INTERVAL = 60;
129 private static final long DEFAULT_MAX_PROCESSING_TIME = 5 * 60 * 60;
130
131 private static final int DEFAULT_CLEANUP_RESULTS_DAYS = 7;
132 private static final boolean DEFAULT_PROFANITY_FILTER = false;
133 private static final String DEFAULT_LANGUAGE = "en-US";
134 private static final boolean DEFAULT_ENABLE_PUNCTUATION = false;
135 private static final String DEFAULT_MODEL = "default";
136 private static final String GOOGLE_SPEECH_URL = "https://speech.googleapis.com/v1";
137 private static final String GOOGLE_AUTH2_URL = "https://www.googleapis.com/oauth2/v4/token";
138 private static final String REQUEST_METHOD = "speech:longrunningrecognize";
139 private static final String RESULT_PATH = "operations";
140 private static final String INVALID_TOKEN = "-1";
141 private static final String PROVIDER = "Google Speech";
142 private static final String DEFAULT_ENCODING = "flac";
143
144
145 private String clusterName = "";
146
147
148
149
150 private ServiceRegistry serviceRegistry;
151 private SecurityService securityService;
152 private UserDirectoryService userDirectoryService;
153 private OrganizationDirectoryService organizationDirectoryService;
154 private Workspace workspace;
155 private TranscriptionDatabase database;
156 private AssetManager assetManager;
157 private WorkflowService workflowService;
158 private WorkingFileRepository wfr;
159 private SmtpService smtpService;
160
161
162 private Workflows wfUtil;
163
164 private enum Operation {
165 StartTranscription
166 }
167
168
169
170
171 public static final String ENABLED_CONFIG = "enabled";
172 public static final String GOOGLE_SPEECH_LANGUAGE = "google.speech.language";
173 public static final String PROFANITY_FILTER = "google.speech.profanity.filter";
174 public static final String ENABLE_PUNCTUATION = "google.speech.transcription.punctuation";
175 public static final String TRANSCRIPTION_MODEL = "google.speech.transcription.model";
176 public static final String WORKFLOW_CONFIG = "workflow";
177 public static final String DISPATCH_WORKFLOW_INTERVAL_CONFIG = "workflow.dispatch.interval";
178 public static final String COMPLETION_CHECK_BUFFER_CONFIG = "completion.check.buffer";
179 public static final String MAX_PROCESSING_TIME_CONFIG = "max.processing.time";
180 public static final String NOTIFICATION_EMAIL_CONFIG = "notification.email";
181 public static final String CLEANUP_RESULTS_DAYS_CONFIG = "cleanup.results.days";
182 public static final String GOOGLE_CLOUD_CLIENT_ID = "google.cloud.client.id";
183 public static final String GOOGLE_CLOUD_CLIENT_SECRET = "google.cloud.client.secret";
184 public static final String GOOGLE_CLOUD_REFRESH_TOKEN = "google.cloud.refresh.token";
185 public static final String GOOGLE_CLOUD_BUCKET = "google.cloud.storage.bucket";
186 public static final String GOOGLE_CLOUD_TOKEN_ENDPOINT_URL = "google.cloud.token.endpoint.url";
187 public static final String ENCODING_EXTENSION = "encoding.extension";
188
189
190
191
192 private boolean enabled = false;
193 private boolean profanityFilter = DEFAULT_PROFANITY_FILTER;
194 private boolean enablePunctuation = DEFAULT_ENABLE_PUNCTUATION;
195 private String model = DEFAULT_MODEL;
196 private String defaultLanguage = DEFAULT_LANGUAGE;
197 private String defaultEncoding = DEFAULT_ENCODING;
198 private String workflowDefinitionId = DEFAULT_WF_DEF;
199 private long workflowDispatchInterval = DEFAULT_DISPATCH_INTERVAL;
200 private long completionCheckBuffer = DEFAULT_COMPLETION_BUFFER;
201 private long maxProcessingSeconds = DEFAULT_MAX_PROCESSING_TIME;
202 private String toEmailAddress;
203 private int cleanupResultDays = DEFAULT_CLEANUP_RESULTS_DAYS;
204 private String clientId;
205 private String clientSecret;
206 private String clientToken;
207 private String accessToken = INVALID_TOKEN;
208 private String tokenEndpoint = GOOGLE_AUTH2_URL;
209 private String storageBucket;
210 private long tokenExpiryTime = 0;
211 private String systemAccount;
212 private ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);
213
214 public GoogleSpeechTranscriptionService() {
215 super(JOB_TYPE);
216 }
217
218 @Activate
219 public void activate(ComponentContext cc) {
220
221 enabled = OsgiUtil.getOptCfgAsBoolean(cc.getProperties(), ENABLED_CONFIG).get();
222 if (!enabled) {
223 logger.info("Service disabled. If you want to enable it, please update the service configuration.");
224 return;
225 }
226
227 clientId = OsgiUtil.getComponentContextProperty(cc, GOOGLE_CLOUD_CLIENT_ID);
228 clientSecret = OsgiUtil.getComponentContextProperty(cc, GOOGLE_CLOUD_CLIENT_SECRET);
229 clientToken = OsgiUtil.getComponentContextProperty(cc, GOOGLE_CLOUD_REFRESH_TOKEN);
230 storageBucket = OsgiUtil.getComponentContextProperty(cc, GOOGLE_CLOUD_BUCKET);
231
232
233 Optional<String> tokenOpt = OsgiUtil.getOptCfg(cc.getProperties(), GOOGLE_CLOUD_TOKEN_ENDPOINT_URL);
234 if (tokenOpt.isPresent()) {
235 tokenEndpoint = tokenOpt.get();
236 logger.info("Access token endpoint is set to {}", tokenEndpoint);
237 } else {
238 logger.info("Default access token endpoint will be used");
239 }
240
241
242 Optional<String> profanityOpt = OsgiUtil.getOptCfg(cc.getProperties(), PROFANITY_FILTER);
243 if (profanityOpt.isPresent()) {
244 profanityFilter = Boolean.parseBoolean(profanityOpt.get());
245 logger.info("Profanity filter is set to {}", profanityFilter);
246 } else {
247 logger.info("Default profanity filter will be used");
248 }
249
250 Optional<String> languageOpt = OsgiUtil.getOptCfg(cc.getProperties(), GOOGLE_SPEECH_LANGUAGE);
251 if (languageOpt.isPresent()) {
252 defaultLanguage = languageOpt.get();
253 logger.info("Language used is {}", defaultLanguage);
254 } else {
255 logger.info("Default language will be used");
256 }
257
258 Optional<String> punctuationOpt = OsgiUtil.getOptCfg(cc.getProperties(), ENABLE_PUNCTUATION);
259 if (punctuationOpt.isPresent()) {
260 enablePunctuation = Boolean.parseBoolean(punctuationOpt.get());
261 logger.info("Enable punctuation is set to {}", enablePunctuation);
262 } else {
263 logger.info("Default punctuation setting will be used");
264 }
265
266 Optional<String> transModel = OsgiUtil.getOptCfg(cc.getProperties(), TRANSCRIPTION_MODEL);
267 if (transModel.isPresent()) {
268 model = transModel.get();
269 logger.info("Transcription model used is {}", model);
270 } else {
271 logger.info("Default Transcription model will be used");
272 }
273
274 Optional<String> encodingOpt = OsgiUtil.getOptCfg(cc.getProperties(), ENCODING_EXTENSION);
275 if (encodingOpt.isPresent()) {
276 defaultEncoding = encodingOpt.get();
277 logger.info("Encoding used is {}", defaultEncoding);
278 } else {
279 logger.info("Default encoding will be used");
280 }
281
282
283 Optional<String> wfOpt = OsgiUtil.getOptCfg(cc.getProperties(), WORKFLOW_CONFIG);
284 if (wfOpt.isPresent()) {
285 workflowDefinitionId = wfOpt.get();
286 }
287 logger.info("Workflow definition is {}", workflowDefinitionId);
288
289 Optional<String> intervalOpt = OsgiUtil.getOptCfg(cc.getProperties(), DISPATCH_WORKFLOW_INTERVAL_CONFIG);
290 if (intervalOpt.isPresent()) {
291 try {
292 workflowDispatchInterval = Long.parseLong(intervalOpt.get());
293 } catch (NumberFormatException e) {
294
295 logger.warn("Invalid configuration for Workflow dispatch interval. Default used instead: {}",
296 workflowDispatchInterval);
297 }
298 }
299 logger.info("Workflow dispatch interval is {} seconds", workflowDispatchInterval);
300
301 Optional<String> bufferOpt = OsgiUtil.getOptCfg(cc.getProperties(), COMPLETION_CHECK_BUFFER_CONFIG);
302 if (bufferOpt.isPresent()) {
303 try {
304 completionCheckBuffer = Long.parseLong(bufferOpt.get());
305 } catch (NumberFormatException e) {
306
307 logger.warn("Invalid configuration for {} : {}. Default used instead: {}",
308 new Object[]{COMPLETION_CHECK_BUFFER_CONFIG, bufferOpt.get(), completionCheckBuffer});
309 }
310 }
311 logger.info("Completion check buffer is {} seconds", completionCheckBuffer);
312
313 Optional<String> maxProcessingOpt = OsgiUtil.getOptCfg(cc.getProperties(), MAX_PROCESSING_TIME_CONFIG);
314 if (maxProcessingOpt.isPresent()) {
315 try {
316 maxProcessingSeconds = Long.parseLong(maxProcessingOpt.get());
317 } catch (NumberFormatException e) {
318
319 logger.warn("Invalid configuration for maximum processing time. Default used instead: {}",
320 maxProcessingSeconds);
321 }
322 }
323 logger.info("Maximum time a job is checked after it should have ended is {} seconds", maxProcessingSeconds);
324
325 Optional<String> cleaupOpt = OsgiUtil.getOptCfg(cc.getProperties(), CLEANUP_RESULTS_DAYS_CONFIG);
326 if (cleaupOpt.isPresent()) {
327 try {
328 cleanupResultDays = Integer.parseInt(cleaupOpt.get());
329 } catch (NumberFormatException e) {
330
331 logger.warn("Invalid configuration for clean up days. Default used instead: {}", cleanupResultDays);
332 }
333 }
334 logger.info("Cleanup result files after {} days", cleanupResultDays);
335
336 systemAccount = OsgiUtil.getContextProperty(cc, OpencastConstants.DIGEST_USER_PROPERTY);
337
338
339 scheduledExecutor.scheduleWithFixedDelay(new WorkflowDispatcher(), 120, workflowDispatchInterval,
340 TimeUnit.SECONDS);
341
342
343 scheduledExecutor.scheduleWithFixedDelay(new ResultsFileCleanup(), 1, 1, TimeUnit.DAYS);
344
345
346 Optional<String> optTo = OsgiUtil.getOptCfg(cc.getProperties(), NOTIFICATION_EMAIL_CONFIG);
347 if (optTo.isPresent()) {
348 toEmailAddress = optTo.get();
349 } else {
350
351 optTo = OsgiUtil.getOptContextProperty(cc, OpencastConstants.ADMIN_EMAIL_PROPERTY);
352 if (optTo.isPresent()) {
353 toEmailAddress = optTo.get();
354 }
355 }
356 if (toEmailAddress != null) {
357 logger.info("Notification email set to {}", toEmailAddress);
358 } else {
359 logger.warn("Email notification disabled");
360 }
361
362 Optional<String> optCluster = OsgiUtil.getOptContextProperty(cc, OpencastConstants.ENVIRONMENT_NAME_PROPERTY);
363 if (optCluster.isPresent()) {
364 clusterName = optCluster.get();
365 }
366 logger.info("Environment name is {}", clusterName);
367
368 logger.info("Activated!");
369 }
370
371 @Override
372 public Job startTranscription(String mpId, Track track, String... args) throws TranscriptionServiceException {
373 if (!enabled) {
374 throw new TranscriptionServiceException(
375 "This service is disabled. If you want to enable it, please update the service configuration.");
376 }
377
378 if (args.length == 0) {
379 throw new IllegalArgumentException("Additional language argument is required.");
380 }
381
382 try {
383 return serviceRegistry.createJob(JOB_TYPE, Operation.StartTranscription.name(),
384 Arrays.asList(mpId, MediaPackageElementParser.getAsXml(track), args[0]));
385 } catch (ServiceRegistryException e) {
386 throw new TranscriptionServiceException("Unable to create a job", e);
387 } catch (MediaPackageException e) {
388 throw new TranscriptionServiceException("Invalid track " + track.toString(), e);
389 }
390 }
391
392 @Override
393 public Job startTranscription(String mpId, Track track) throws TranscriptionServiceException {
394 throw new UnsupportedOperationException("Not supported.");
395 }
396
397 @Override
398 public void transcriptionDone(String mpId, Object obj) throws TranscriptionServiceException {
399 JSONObject jsonObj = null;
400 String jobId = null;
401 String token = INVALID_TOKEN;
402 try {
403 token = getRefreshAccessToken();
404 } catch (IOException ex) {
405 logger.error("Unable to create access token, error: {}", ex.toString());
406 }
407 if (token.equals(INVALID_TOKEN)) {
408 throw new TranscriptionServiceException("Invalid access token");
409 }
410 try {
411 jsonObj = (JSONObject) obj;
412 jobId = (String) jsonObj.get("name");
413 logger.info("Transcription done for mpId {}, jobId {}", mpId, jobId);
414 JSONArray resultsArray = getTranscriptionResult(jsonObj);
415
416
417
418
419 database.updateJobControl(jobId, TranscriptionJobControl.Status.TranscriptionComplete.name());
420
421
422 deleteStorageFile(mpId, token);
423
424
425 if (resultsArray != null) {
426 saveResults(jobId, jsonObj);
427 }
428 } catch (IOException e) {
429 if (jsonObj == null) {
430 logger.warn("Could not save transcription results file for mpId {}, jobId {}: null",
431 mpId, jobId);
432 } else {
433 logger.warn("Could not save transcription results file for mpId {}, jobId {}: {}",
434 mpId, jobId, jsonObj.toJSONString());
435 }
436 throw new TranscriptionServiceException("Could not save transcription results file", e);
437 } catch (TranscriptionDatabaseException e) {
438 logger.warn("Transcription results file were saved but state in db not updated for mpId {}, jobId {}", mpId,
439 jobId);
440 throw new TranscriptionServiceException("Could not update transcription job control db", e);
441 }
442 }
443
444 @Override
445 public String getLanguage() {
446 return defaultLanguage;
447 }
448
449 @Override
450 public Map<String, Object> getReturnValues(String mpId, String jobId) throws TranscriptionServiceException {
451 throw new TranscriptionServiceException("Method not implemented");
452 }
453
454 @Override
455 public void transcriptionError(String mpId, Object obj) throws TranscriptionServiceException {
456 JSONObject jsonObj = null;
457 String jobId = null;
458 try {
459 jsonObj = (JSONObject) obj;
460 jobId = (String) jsonObj.get("name");
461
462 database.updateJobControl(jobId, TranscriptionJobControl.Status.Error.name());
463 TranscriptionJobControl jobControl = database.findByJob(jobId);
464 logger.warn("Error received for media package {}, job id {}",
465 jobControl.getMediaPackageId(), jobId);
466
467 sendEmail(TRANSCRIPTION_ERROR,
468 String.format("There was a transcription error for for media package %s, job id %s.",
469 jobControl.getMediaPackageId(), jobId));
470 } catch (TranscriptionDatabaseException e) {
471 logger.warn("Transcription error. State in db could not be updated to error for mpId {}, jobId {}", mpId, jobId);
472 throw new TranscriptionServiceException("Could not update transcription job control db", e);
473 }
474 }
475
476 @Override
477 protected String process(Job job) throws Exception {
478 Operation op = null;
479 String operation = job.getOperation();
480 List<String> arguments = job.getArguments();
481 String result = "";
482 op = Operation.valueOf(operation);
483 switch (op) {
484 case StartTranscription:
485 String mpId = arguments.get(0);
486 Track track = (Track) MediaPackageElementParser.getFromXml(arguments.get(1));
487 String languageCode = arguments.get(2);
488 createRecognitionsJob(mpId, track, languageCode);
489 break;
490 default:
491 throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
492 }
493 return result;
494 }
495
496
497
498
499
500 void createRecognitionsJob(String mpId, Track track, String languageCode)
501 throws TranscriptionServiceException, IOException {
502
503 if (StringUtils.isBlank(languageCode)) {
504 languageCode = defaultLanguage;
505 }
506 String audioUrl;
507 audioUrl = uploadAudioFileToGoogleStorage(mpId, track);
508 CloseableHttpClient httpClient = makeHttpClient();
509 CloseableHttpResponse response = null;
510 String token = getRefreshAccessToken();
511 if (token.equals(INVALID_TOKEN) || audioUrl == null) {
512 throw new TranscriptionServiceException("Could not create recognition job. Audio file or access token invalid");
513 }
514
515
516 JSONObject configValues = new JSONObject();
517 JSONObject audioValues = new JSONObject();
518 JSONObject container = new JSONObject();
519 configValues.put("languageCode", languageCode);
520 configValues.put("enableWordTimeOffsets", true);
521 configValues.put("profanityFilter", profanityFilter);
522 configValues.put("enableAutomaticPunctuation", enablePunctuation);
523 configValues.put("model", model);
524 audioValues.put("uri", audioUrl);
525 container.put("config", configValues);
526 container.put("audio", audioValues);
527
528 try {
529 HttpPost httpPost = new HttpPost(UrlSupport.concat(GOOGLE_SPEECH_URL, REQUEST_METHOD));
530 logger.debug("Url to invoke Google speech service: {}", httpPost.getURI().toString());
531 StringEntity params = new StringEntity(container.toJSONString());
532 httpPost.addHeader("Authorization", "Bearer " + token);
533 httpPost.addHeader(HttpHeaders.CONTENT_TYPE, "application/json; charset=utf-8");
534 httpPost.setEntity(params);
535 response = httpClient.execute(httpPost);
536 int code = response.getStatusLine().getStatusCode();
537 HttpEntity entity = response.getEntity();
538 String jsonString = EntityUtils.toString(response.getEntity());
539 JSONParser jsonParser = new JSONParser();
540 JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
541
542 switch (code) {
543 case HttpStatus.SC_OK:
544 logger.info("Recognitions job has been successfully created");
545
546
547
548
549
550
551
552
553 String jobId = (String) jsonObject.get("name");
554 logger.info(
555 "Transcription for mp {} has been submitted. Job id: {}", mpId,
556 jobId);
557
558 database.storeJobControl(mpId, track.getIdentifier(), jobId, TranscriptionJobControl.Status.InProgress.name(),
559 track.getDuration() == null ? 0 : track.getDuration().longValue(), null, PROVIDER);
560 EntityUtils.consume(entity);
561 return;
562 default:
563 JSONObject errorObj = (JSONObject) jsonObject.get("error");
564 logger.warn("Invalid argument returned, status: {} with message: {}", code, (String) errorObj.get("message"));
565 break;
566 }
567 throw new TranscriptionServiceException("Could not create recognition job. Status returned: " + code);
568 } catch (Exception e) {
569 logger.warn("Exception when calling the recognitions endpoint", e);
570 throw new TranscriptionServiceException("Exception when calling the recognitions endpoint", e);
571 } finally {
572 try {
573 httpClient.close();
574 if (response != null) {
575 response.close();
576 }
577 } catch (IOException e) {
578 }
579 }
580 }
581
582
583
584
585
586
587
588
589
590
591
592
593
594 boolean getAndSaveJobResults(String jobId) throws TranscriptionServiceException, IOException {
595 CloseableHttpClient httpClient = makeHttpClient();
596 CloseableHttpResponse response = null;
597 String mpId = "unknown";
598 JSONArray resultsArray = null;
599 String token = getRefreshAccessToken();
600 if (token.equals(INVALID_TOKEN)) {
601 return false;
602 }
603 try {
604 HttpGet httpGet = new HttpGet(UrlSupport.concat(GOOGLE_SPEECH_URL, RESULT_PATH, jobId));
605 logger.debug("Url to invoke Google speech service: {}", httpGet.getURI().toString());
606
607 httpGet.addHeader("Authorization", "Bearer " + token);
608 response = httpClient.execute(httpGet);
609 int code = response.getStatusLine().getStatusCode();
610
611 switch (code) {
612 case HttpStatus.SC_OK:
613 HttpEntity entity = response.getEntity();
614
615 String jsonString = EntityUtils.toString(entity);
616 JSONParser jsonParser = new JSONParser();
617 JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
618 Boolean jobDone = (Boolean) jsonObject.get("done");
619 TranscriptionJobControl jc = database.findByJob(jobId);
620 if (jc != null) {
621 mpId = jc.getMediaPackageId();
622 }
623 if (jobDone) {
624 resultsArray = getTranscriptionResult(jsonObject);
625 }
626 logger.info("Recognitions job {} has been found, completed status {}", jobId, jobDone.toString());
627 EntityUtils.consume(entity);
628
629 if (jobDone && resultsArray != null) {
630 transcriptionDone(mpId, jsonObject);
631 return true;
632 }
633 return false;
634 case HttpStatus.SC_NOT_FOUND:
635 logger.warn("Job not found: {}", jobId);
636 break;
637 case HttpStatus.SC_SERVICE_UNAVAILABLE:
638 logger.warn("Service unavailable returned, status: {}", code);
639 break;
640 default:
641 logger.warn("Error return status: {}.", code);
642 break;
643 }
644 throw new TranscriptionServiceException(
645 String.format("Could not check recognition job for media package %s, job id %s. Status returned: %d",
646 mpId, jobId, code), code);
647 } catch (TranscriptionServiceException e) {
648 throw e;
649 } catch (Exception e) {
650 if (hasTranscriptionRequestExpired(jobId)) {
651
652 cancelTranscription(jobId, "Google Transcription job canceled due to errors");
653 logger.info("Google Transcription job {} has been canceled. Email notification sent", jobId);
654 }
655 String msg = String.format("Exception when calling the recognitions endpoint for media package %s, job id %s",
656 mpId, jobId);
657 logger.warn(msg, e);
658 throw new TranscriptionServiceException(String.format(
659 "Exception when calling the recognitions endpoint for media package %s, job id %s", mpId, jobId), e);
660 } finally {
661 try {
662 httpClient.close();
663 if (response != null) {
664 response.close();
665 }
666 } catch (IOException e) {
667 }
668 }
669 }
670
671
672
673
674
675
676
677
678
679
680 public String getTranscriptionResults(String jobId)
681 throws TranscriptionServiceException, IOException {
682 CloseableHttpClient httpClient = makeHttpClient();
683 CloseableHttpResponse response = null;
684 String token = getRefreshAccessToken();
685 if (token.equals(INVALID_TOKEN)) {
686 logger.warn("Invalid access token");
687 return "No results found";
688 }
689 try {
690 HttpGet httpGet = new HttpGet(UrlSupport.concat(GOOGLE_SPEECH_URL, RESULT_PATH, jobId));
691 logger.debug("Url to invoke Google speech service: {}", httpGet.getURI().toString());
692
693 httpGet.addHeader("Authorization", "Bearer " + token);
694 response = httpClient.execute(httpGet);
695 int code = response.getStatusLine().getStatusCode();
696
697 switch (code) {
698 case HttpStatus.SC_OK:
699 HttpEntity entity = response.getEntity();
700 logger.info("Retrieved details for transcription with job id: '{}'", jobId);
701 return EntityUtils.toString(entity);
702 default:
703 logger.warn("Error retrieving details for transcription with job id: '{}', return status: {}.", jobId, code);
704 break;
705 }
706 } catch (Exception e) {
707 logger.warn("Exception when calling the transcription service for job id: {}", jobId, e);
708 throw new TranscriptionServiceException(String.format(
709 "Exception when calling the transcription service for job id: %s", jobId), e);
710 } finally {
711 try {
712 httpClient.close();
713 if (response != null) {
714 response.close();
715 }
716 } catch (IOException e) {
717 }
718 }
719 return "No results found";
720 }
721
722 private void saveResults(String jobId, JSONObject jsonObj) throws IOException {
723 JSONArray resultsArray = getTranscriptionResult(jsonObj);
724 if (resultsArray != null) {
725
726 workspace.putInCollection(TRANSCRIPT_COLLECTION, jobId + ".json",
727 new ByteArrayInputStream(jsonObj.toJSONString().getBytes()));
728 }
729 }
730
731 @Override
732 public MediaPackageElement getGeneratedTranscription(String mpId, String jobId, MediaPackageElement.Type type)
733 throws TranscriptionServiceException {
734 try {
735
736 if (jobId == null || "null".equals(jobId)) {
737 jobId = null;
738 for (TranscriptionJobControl jc : database.findByMediaPackage(mpId)) {
739 if (TranscriptionJobControl.Status.Closed.name().equals(jc.getStatus())
740 || TranscriptionJobControl.Status.TranscriptionComplete.name().equals(jc.getStatus())) {
741 jobId = jc.getTranscriptionJobId();
742 }
743 }
744 }
745
746 if (jobId == null) {
747 throw new TranscriptionServiceException(
748 "No completed or closed transcription job found in database for media package " + mpId);
749 }
750
751
752 URI uri = workspace.getCollectionURI(TRANSCRIPT_COLLECTION, jobId + ".json");
753 try {
754 workspace.get(uri);
755 } catch (Exception e) {
756 try {
757
758 getAndSaveJobResults(jobId);
759 } catch (IOException ex) {
760 logger.error("Unable to retrieve transcription job, error: {}", ex.toString());
761 }
762 }
763 MediaPackageElementBuilder builder = MediaPackageElementBuilderFactory.newInstance().newElementBuilder();
764 return builder.elementFromURI(uri, type,
765 new MediaPackageElementFlavor("captions", "google-speech-json"));
766 } catch (TranscriptionDatabaseException e) {
767 throw new TranscriptionServiceException("Job id not informed and could not find transcription", e);
768 }
769 }
770
771
772
773
774
775
776
777
778 public String getTranscriptionStatus(String mpId) throws TranscriptionServiceException {
779 try {
780 for (TranscriptionJobControl jc : database.findByMediaPackage(mpId)) {
781 return jc.getStatus();
782 }
783 } catch (TranscriptionDatabaseException e) {
784 throw new TranscriptionServiceException("Mediapackage id transcription status unknown", e);
785 }
786 return "Unknown";
787 }
788
789 protected CloseableHttpClient makeHttpClient() throws IOException {
790 RequestConfig reqConfig = RequestConfig.custom()
791 .setConnectTimeout(CONNECTION_TIMEOUT)
792 .setSocketTimeout(SOCKET_TIMEOUT)
793 .setConnectionRequestTimeout(CONNECTION_TIMEOUT)
794 .build();
795 return HttpClients.custom().setDefaultRequestConfig(reqConfig).build();
796 }
797
798 protected String refreshAccessToken(String clientId, String clientSecret, String refreshToken)
799 throws TranscriptionServiceException, IOException {
800 CloseableHttpClient httpClient = makeHttpClient();
801 CloseableHttpResponse response = null;
802
803 try {
804 HttpPost httpPost = new HttpPost(tokenEndpoint + String.format(
805 "?client_id=%s&client_secret=%s&refresh_token=%s&grant_type=refresh_token",
806 clientId, clientSecret, refreshToken));
807 httpPost.addHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
808 response = httpClient.execute(httpPost);
809 int code = response.getStatusLine().getStatusCode();
810 String jsonString = EntityUtils.toString(response.getEntity());
811 JSONParser jsonParser = new JSONParser();
812 JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
813 switch (code) {
814 case HttpStatus.SC_OK:
815 accessToken = (String) jsonObject.get(ACCESS_TOKEN_NAME);
816 long duration = (long) jsonObject.get(ACCESS_TOKEN_EXPIRY_NAME);
817 tokenExpiryTime = (System.currentTimeMillis() + (duration * 1000));
818 if (!INVALID_TOKEN.equals(accessToken)) {
819 logger.info("Google Cloud Service access token created");
820 return accessToken;
821 }
822 throw new TranscriptionServiceException(
823 String.format("Created token is invalid. Status returned: %d", code), code);
824 case HttpStatus.SC_BAD_REQUEST:
825 case HttpStatus.SC_UNAUTHORIZED:
826 String error = (String) jsonObject.get("error");
827 String errorDetails = (String) jsonObject.get("error_description");
828 logger.warn("Invalid argument returned, status: {}", code);
829 logger.warn("Unable to refresh Google Cloud Service token, error: {}, error details: {}",
830 error, errorDetails);
831 break;
832 default:
833 logger.warn("Invalid argument returned, status: {}", code);
834 }
835 throw new TranscriptionServiceException(
836 String.format("Could not create Google access token. Status returned: %d", code), code);
837 } catch (TranscriptionServiceException e) {
838 throw e;
839 } catch (Exception e) {
840 logger.warn("Unable to generate access token for Google Cloud Services");
841 return INVALID_TOKEN;
842 } finally {
843 try {
844 httpClient.close();
845 if (response != null) {
846 response.close();
847 }
848 } catch (IOException e) {
849 }
850 }
851 }
852
853 protected String getRefreshAccessToken() throws TranscriptionServiceException, IOException {
854
855 if ((!INVALID_TOKEN.equals(accessToken))
856 && (System.currentTimeMillis() < (tokenExpiryTime - ACCESS_TOKEN_MINIMUM_TIME))) {
857 return accessToken;
858 }
859 return refreshAccessToken(clientId, clientSecret, clientToken);
860 }
861
862 protected String uploadAudioFileToGoogleStorage(String mpId, Track track)
863 throws TranscriptionServiceException, IOException {
864 File audioFile;
865 String audioUrl = null;
866 String fileExtension;
867 int audioResponse;
868 CloseableHttpClient httpClientStorage = makeHttpClient();
869 GoogleSpeechTranscriptionServiceStorage storage = new GoogleSpeechTranscriptionServiceStorage();
870 try {
871 audioFile = workspace.get(track.getURI());
872 fileExtension = FilenameUtils.getExtension(audioFile.getName());
873 long fileSize = audioFile.length();
874 String contentType = track.getMimeType().toString();
875 String token = getRefreshAccessToken();
876
877 audioResponse = storage.startUpload(httpClientStorage, storageBucket, mpId, fileExtension,
878 audioFile, String.valueOf(fileSize), contentType, token);
879 if (audioResponse == HttpStatus.SC_OK) {
880 audioUrl = String.format("gs://%s/%s.%s", storageBucket, mpId, fileExtension);
881 return audioUrl;
882 }
883 logger.error("Error when uploading audio to Google Storage, error code: {}", audioResponse);
884 return audioUrl;
885 } catch (Exception e) {
886 throw new TranscriptionServiceException("Error reading audio track", e);
887 }
888 }
889
890 private JSONArray getTranscriptionResult(JSONObject jsonObj) {
891 JSONObject responseObj = (JSONObject) jsonObj.get("response");
892 JSONArray resultsArray = (JSONArray) responseObj.get("results");
893 return resultsArray;
894 }
895
896 protected void deleteStorageFile(String mpId, String token) throws IOException {
897 CloseableHttpClient httpClientDel = makeHttpClient();
898 GoogleSpeechTranscriptionServiceStorage storage = new GoogleSpeechTranscriptionServiceStorage();
899 storage.deleteGoogleStorageFile(httpClientDel, storageBucket, mpId + "." + defaultEncoding, token);
900 }
901
902 private void sendEmail(String subject, String body) {
903 if (toEmailAddress == null) {
904 logger.info("Skipping sending email notification. Message is {}.", body);
905 return;
906 }
907 try {
908 logger.debug("Sending e-mail notification to {}", toEmailAddress);
909 smtpService.send(toEmailAddress, String.format("%s (%s)", subject, clusterName), body);
910 logger.info("Sent e-mail notification to {}", toEmailAddress);
911 } catch (Exception e) {
912 logger.error("Could not send email: {}\n{}", subject, body, e);
913 }
914 }
915
916 private void cancelTranscription(String jobId, String message) {
917 try {
918 database.updateJobControl(jobId, TranscriptionJobControl.Status.Canceled.name());
919 String mpId = database.findByJob(jobId).getMediaPackageId();
920 try {
921
922 String token = getRefreshAccessToken();
923 deleteStorageFile(mpId, token);
924 } catch (Exception ex) {
925 logger.warn(String.format("could not delete file %s.%s from Google cloud storage", mpId, defaultEncoding), ex);
926 } finally {
927
928 sendEmail("Transcription ERROR", String.format("%s(media package %s, job id %s).", message, mpId, jobId));
929 }
930 } catch (Exception e) {
931 logger.error(String.format("ERROR while deleting transcription job: %s", jobId), e);
932 }
933 }
934
935 private boolean hasTranscriptionRequestExpired(String jobId) {
936 try {
937
938 if (database.findByJob(jobId).getDateCreated().getTime() + database.findByJob(jobId).getTrackDuration()
939 + (completionCheckBuffer + maxProcessingSeconds) * 1000 < System.currentTimeMillis()) {
940 return true;
941 }
942 } catch (Exception e) {
943 logger.error(String.format("ERROR while calculating transcription request expiration for job: %s", jobId), e);
944
945 return true;
946 }
947 return false;
948 }
949
950 private long getRemainingTranscriptionExpireTimeInMin(String jobId) {
951 try {
952 long expiredTime = (database.findByJob(jobId).getDateCreated().getTime()
953 + database.findByJob(jobId).getTrackDuration()
954 + (completionCheckBuffer + maxProcessingSeconds) * 1000)
955 - (System.currentTimeMillis());
956
957 if (expiredTime < 0) {
958 expiredTime = 0;
959 }
960 return TimeUnit.MILLISECONDS.toMinutes(expiredTime);
961 } catch (Exception e) {
962 logger.error("Unable to calculate remaining transcription expired time for transcription job {}", jobId);
963 }
964 return 0;
965 }
966
967 @Reference
968 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
969 this.serviceRegistry = serviceRegistry;
970 }
971
972 @Reference
973 public void setSecurityService(SecurityService securityService) {
974 this.securityService = securityService;
975 }
976
977 @Reference
978 public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
979 this.userDirectoryService = userDirectoryService;
980 }
981
982 @Reference
983 public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
984 this.organizationDirectoryService = organizationDirectoryService;
985 }
986
987 @Reference
988 public void setSmtpService(SmtpService service) {
989 this.smtpService = service;
990 }
991
992 @Reference
993 public void setWorkspace(Workspace ws) {
994 this.workspace = ws;
995 }
996
997 @Reference
998 public void setWorkingFileRepository(WorkingFileRepository wfr) {
999 this.wfr = wfr;
1000 }
1001
1002 @Reference
1003 public void setDatabase(TranscriptionDatabase service) {
1004 this.database = service;
1005 }
1006
1007 @Reference
1008 public void setAssetManager(AssetManager service) {
1009 this.assetManager = service;
1010 }
1011
1012 @Reference
1013 public void setWorkflowService(WorkflowService service) {
1014 this.workflowService = service;
1015 }
1016
1017 @Override
1018 protected ServiceRegistry getServiceRegistry() {
1019 return serviceRegistry;
1020 }
1021
1022 @Override
1023 protected SecurityService getSecurityService() {
1024 return securityService;
1025 }
1026
1027 @Override
1028 protected UserDirectoryService getUserDirectoryService() {
1029 return userDirectoryService;
1030 }
1031
1032 @Override
1033 protected OrganizationDirectoryService getOrganizationDirectoryService() {
1034 return organizationDirectoryService;
1035 }
1036
1037
1038 void setWfUtil(Workflows wfUtil) {
1039 this.wfUtil = wfUtil;
1040 }
1041
1042 class WorkflowDispatcher implements Runnable {
1043
1044
1045
1046
1047
1048
1049 @Override
1050 public void run() {
1051 logger.debug("WorkflowDispatcher waking up...");
1052
1053 try {
1054
1055
1056 long providerId;
1057 TranscriptionProviderControl providerInfo = database.findIdByProvider(PROVIDER);
1058 if (providerInfo != null) {
1059 providerId = providerInfo.getId();
1060 } else {
1061 logger.debug("No jobs yet for provider {}", PROVIDER);
1062 return;
1063 }
1064
1065 List<TranscriptionJobControl> jobs = database.findByStatus(TranscriptionJobControl.Status.InProgress.name(),
1066 TranscriptionJobControl.Status.TranscriptionComplete.name());
1067 for (TranscriptionJobControl j : jobs) {
1068
1069
1070 if (j.getProviderId() != providerId) {
1071 continue;
1072 }
1073
1074 String mpId = j.getMediaPackageId();
1075 String jobId = j.getTranscriptionJobId();
1076
1077
1078 if (TranscriptionJobControl.Status.InProgress.name().equals(j.getStatus())) {
1079
1080
1081
1082 if (j.getDateCreated().getTime() + (j.getTrackDuration() / 3) + completionCheckBuffer * 1000 < System
1083 .currentTimeMillis()) {
1084 try {
1085 if (!getAndSaveJobResults(jobId)) {
1086
1087 if (hasTranscriptionRequestExpired(jobId)) {
1088
1089 database.updateJobControl(jobId, TranscriptionJobControl.Status.Canceled.name());
1090
1091 String token = getRefreshAccessToken();
1092 deleteStorageFile(mpId, token);
1093
1094 sendEmail(TRANSCRIPTION_ERROR, String.format(
1095 "Transcription job was in processing state for too long and was marked "
1096 + "as cancelled (media package %s, job id %s).",
1097 mpId, jobId));
1098 }
1099
1100 continue;
1101 }
1102 } catch (TranscriptionServiceException e) {
1103 if (e.getCode() == 404) {
1104
1105 database.updateJobControl(jobId, TranscriptionJobControl.Status.Canceled.name());
1106
1107 sendEmail(TRANSCRIPTION_ERROR,
1108 String.format("Transcription job was not found (media package %s, job id %s).", mpId, jobId));
1109 }
1110 continue;
1111 } catch (IOException ex) {
1112 logger.error("Transcription job not found, error: {}", ex.toString());
1113 }
1114 } else {
1115 continue;
1116 }
1117 }
1118
1119
1120 try {
1121
1122
1123 Map<String, String> params = new HashMap<String, String>();
1124 params.put(TRANSCRIPTION_JOB_ID_KEY, jobId);
1125 String wfId = startWorkflow(mpId, workflowDefinitionId, jobId, params);
1126 if (wfId == null) {
1127 logger.warn("Attach transcription workflow could NOT be scheduled for mp {}, google speech job {}",
1128 mpId, jobId);
1129 continue;
1130 }
1131
1132 database.updateJobControl(jobId, TranscriptionJobControl.Status.Closed.name());
1133 logger.info("Attach transcription workflow {} scheduled for mp {}, google speech job {}",
1134 wfId, mpId, jobId);
1135 } catch (Exception e) {
1136 logger.warn("Attach transcription workflow could NOT be scheduled for mp {}, google speech job {}, {}: {}",
1137 mpId, jobId, e.getClass().getName(), e.getMessage());
1138 }
1139 }
1140 } catch (TranscriptionDatabaseException e) {
1141 logger.warn("Could not read transcription job control database: {}", e.getMessage());
1142 }
1143 }
1144 }
1145
1146 private String startWorkflow(String mpId, String wfDefId, String jobId, Map<String, String> params) {
1147 DefaultOrganization defaultOrg = new DefaultOrganization();
1148 securityService.setOrganization(defaultOrg);
1149 securityService.setUser(SecurityUtil.createSystemUser(systemAccount, defaultOrg));
1150
1151
1152 Optional<Snapshot> snapshot = assetManager.getLatestSnapshot(mpId);
1153 if (snapshot.isEmpty()) {
1154 if (!hasTranscriptionRequestExpired(jobId)) {
1155
1156 logger.warn("Media package {} has not been archived yet or has been deleted. Will keep trying for {} "
1157 + "more minutes before cancelling transcription job {}.",
1158 mpId, getRemainingTranscriptionExpireTimeInMin(jobId), jobId);
1159 } else {
1160
1161 cancelTranscription(jobId, " Google Transcription job canceled, archived media package not found");
1162 logger.info("Google Transcription job {} has been canceled. Email notification sent", jobId);
1163 }
1164 return null;
1165 }
1166
1167 String org = snapshot.get().getOrganizationId();
1168 Organization organization = null;
1169 try {
1170 organization = organizationDirectoryService.getOrganization(org);
1171 if (organization == null) {
1172 logger.warn("Media package {} has an unknown organization {}.", mpId, org);
1173 return null;
1174 }
1175 } catch (NotFoundException e) {
1176 logger.warn("Organization {} not found for media package {}.", org, mpId);
1177 return null;
1178 }
1179 securityService.setOrganization(organization);
1180
1181 try {
1182 WorkflowDefinition wfDef = workflowService.getWorkflowDefinitionById(wfDefId);
1183 Workflows workflows;
1184 if (wfUtil != null) {
1185 workflows = wfUtil;
1186 } else {
1187 workflows = new Workflows(assetManager, workflowService);
1188 }
1189 Set<String> mpIds = new HashSet<String>();
1190 mpIds.add(mpId);
1191 List<WorkflowInstance> wfList = workflows
1192 .applyWorkflowToLatestVersion(mpIds, ConfiguredWorkflow.workflow(wfDef, params));
1193 return wfList.size() > 0 ? Long.toString(wfList.get(0).getId()) : null;
1194 } catch (NotFoundException | WorkflowDatabaseException e) {
1195 logger.warn("Could not get workflow definition: {}", wfDefId);
1196 }
1197
1198 return null;
1199 }
1200
1201 class ResultsFileCleanup implements Runnable {
1202
1203 @Override
1204 public void run() {
1205 logger.info("ResultsFileCleanup waking up...");
1206 try {
1207
1208 wfr.cleanupOldFilesFromCollection(TRANSCRIPT_COLLECTION, cleanupResultDays);
1209 } catch (IOException e) {
1210 logger.warn("Could not cleanup old transcript results files", e);
1211 }
1212 }
1213 }
1214
1215 }