1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.transcription.googlespeech;
22
23 import org.opencastproject.assetmanager.api.AssetManager;
24 import org.opencastproject.assetmanager.api.fn.Enrichments;
25 import org.opencastproject.assetmanager.api.query.AQueryBuilder;
26 import org.opencastproject.assetmanager.api.query.AResult;
27 import org.opencastproject.assetmanager.util.Workflows;
28 import org.opencastproject.job.api.AbstractJobProducer;
29 import org.opencastproject.job.api.Job;
30 import org.opencastproject.kernel.mail.SmtpService;
31 import org.opencastproject.mediapackage.MediaPackageElement;
32 import org.opencastproject.mediapackage.MediaPackageElementBuilder;
33 import org.opencastproject.mediapackage.MediaPackageElementBuilderFactory;
34 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
35 import org.opencastproject.mediapackage.MediaPackageElementParser;
36 import org.opencastproject.mediapackage.MediaPackageException;
37 import org.opencastproject.mediapackage.Track;
38 import org.opencastproject.security.api.DefaultOrganization;
39 import org.opencastproject.security.api.Organization;
40 import org.opencastproject.security.api.OrganizationDirectoryService;
41 import org.opencastproject.security.api.SecurityService;
42 import org.opencastproject.security.api.UserDirectoryService;
43 import org.opencastproject.security.util.SecurityUtil;
44 import org.opencastproject.serviceregistry.api.ServiceRegistry;
45 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
46 import org.opencastproject.systems.OpencastConstants;
47 import org.opencastproject.transcription.api.TranscriptionService;
48 import org.opencastproject.transcription.api.TranscriptionServiceException;
49 import org.opencastproject.transcription.persistence.TranscriptionDatabase;
50 import org.opencastproject.transcription.persistence.TranscriptionDatabaseException;
51 import org.opencastproject.transcription.persistence.TranscriptionJobControl;
52 import org.opencastproject.transcription.persistence.TranscriptionProviderControl;
53 import org.opencastproject.util.NotFoundException;
54 import org.opencastproject.util.OsgiUtil;
55 import org.opencastproject.util.UrlSupport;
56 import org.opencastproject.util.data.Option;
57 import org.opencastproject.workflow.api.ConfiguredWorkflow;
58 import org.opencastproject.workflow.api.WorkflowDatabaseException;
59 import org.opencastproject.workflow.api.WorkflowDefinition;
60 import org.opencastproject.workflow.api.WorkflowInstance;
61 import org.opencastproject.workflow.api.WorkflowService;
62 import org.opencastproject.workingfilerepository.api.WorkingFileRepository;
63 import org.opencastproject.workspace.api.Workspace;
64
65 import org.apache.commons.io.FilenameUtils;
66 import org.apache.commons.lang3.StringUtils;
67 import org.apache.http.HttpEntity;
68 import org.apache.http.HttpHeaders;
69 import org.apache.http.HttpStatus;
70 import org.apache.http.client.config.RequestConfig;
71 import org.apache.http.client.methods.CloseableHttpResponse;
72 import org.apache.http.client.methods.HttpGet;
73 import org.apache.http.client.methods.HttpPost;
74 import org.apache.http.entity.StringEntity;
75 import org.apache.http.impl.client.CloseableHttpClient;
76 import org.apache.http.impl.client.HttpClients;
77 import org.apache.http.util.EntityUtils;
78 import org.json.simple.JSONArray;
79 import org.json.simple.JSONObject;
80 import org.json.simple.parser.JSONParser;
81 import org.osgi.service.component.ComponentContext;
82 import org.osgi.service.component.annotations.Activate;
83 import org.osgi.service.component.annotations.Component;
84 import org.osgi.service.component.annotations.Reference;
85 import org.slf4j.Logger;
86 import org.slf4j.LoggerFactory;
87
88 import java.io.ByteArrayInputStream;
89 import java.io.File;
90 import java.io.IOException;
91 import java.net.URI;
92 import java.util.Arrays;
93 import java.util.HashMap;
94 import java.util.HashSet;
95 import java.util.List;
96 import java.util.Map;
97 import java.util.Set;
98 import java.util.concurrent.Executors;
99 import java.util.concurrent.ScheduledExecutorService;
100 import java.util.concurrent.TimeUnit;
101
102 @Component(
103 immediate = true,
104 service = { TranscriptionService.class,GoogleSpeechTranscriptionService.class },
105 property = {
106 "service.description=Google Speech Transcription Service",
107 "provider=google.speech"
108 }
109 )
110 public class GoogleSpeechTranscriptionService extends AbstractJobProducer implements TranscriptionService {
111
112
113
114
115 private static final Logger logger = LoggerFactory.getLogger(GoogleSpeechTranscriptionService.class);
116
117 private static final String JOB_TYPE = "org.opencastproject.transcription.googlespeech";
118
119 static final String TRANSCRIPT_COLLECTION = "transcripts";
120 static final String TRANSCRIPTION_ERROR = "Transcription ERROR";
121 static final String TRANSCRIPTION_JOB_ID_KEY = "transcriptionJobId";
122 static final String ACCESS_TOKEN_NAME = "access_token";
123 static final String ACCESS_TOKEN_EXPIRY_NAME = "expires_in";
124 private static final int CONNECTION_TIMEOUT = 60000;
125 private static final int SOCKET_TIMEOUT = 60000;
126 private static final int ACCESS_TOKEN_MINIMUM_TIME = 60000;
127
128 public static final String DEFAULT_WF_DEF = "google-speech-attach-transcripts";
129 private static final long DEFAULT_COMPLETION_BUFFER = 300;
130 private static final long DEFAULT_DISPATCH_INTERVAL = 60;
131 private static final long DEFAULT_MAX_PROCESSING_TIME = 5 * 60 * 60;
132
133 private static final int DEFAULT_CLEANUP_RESULTS_DAYS = 7;
134 private static final boolean DEFAULT_PROFANITY_FILTER = false;
135 private static final String DEFAULT_LANGUAGE = "en-US";
136 private static final boolean DEFAULT_ENABLE_PUNCTUATION = false;
137 private static final String DEFAULT_MODEL = "default";
138 private static final String GOOGLE_SPEECH_URL = "https://speech.googleapis.com/v1";
139 private static final String GOOGLE_AUTH2_URL = "https://www.googleapis.com/oauth2/v4/token";
140 private static final String REQUEST_METHOD = "speech:longrunningrecognize";
141 private static final String RESULT_PATH = "operations";
142 private static final String INVALID_TOKEN = "-1";
143 private static final String PROVIDER = "Google Speech";
144 private static final String DEFAULT_ENCODING = "flac";
145
146
147 private String clusterName = "";
148
149
150
151
152 private ServiceRegistry serviceRegistry;
153 private SecurityService securityService;
154 private UserDirectoryService userDirectoryService;
155 private OrganizationDirectoryService organizationDirectoryService;
156 private Workspace workspace;
157 private TranscriptionDatabase database;
158 private AssetManager assetManager;
159 private WorkflowService workflowService;
160 private WorkingFileRepository wfr;
161 private SmtpService smtpService;
162
163
164 private Workflows wfUtil;
165
166 private enum Operation {
167 StartTranscription
168 }
169
170
171
172
173 public static final String ENABLED_CONFIG = "enabled";
174 public static final String GOOGLE_SPEECH_LANGUAGE = "google.speech.language";
175 public static final String PROFANITY_FILTER = "google.speech.profanity.filter";
176 public static final String ENABLE_PUNCTUATION = "google.speech.transcription.punctuation";
177 public static final String TRANSCRIPTION_MODEL = "google.speech.transcription.model";
178 public static final String WORKFLOW_CONFIG = "workflow";
179 public static final String DISPATCH_WORKFLOW_INTERVAL_CONFIG = "workflow.dispatch.interval";
180 public static final String COMPLETION_CHECK_BUFFER_CONFIG = "completion.check.buffer";
181 public static final String MAX_PROCESSING_TIME_CONFIG = "max.processing.time";
182 public static final String NOTIFICATION_EMAIL_CONFIG = "notification.email";
183 public static final String CLEANUP_RESULTS_DAYS_CONFIG = "cleanup.results.days";
184 public static final String GOOGLE_CLOUD_CLIENT_ID = "google.cloud.client.id";
185 public static final String GOOGLE_CLOUD_CLIENT_SECRET = "google.cloud.client.secret";
186 public static final String GOOGLE_CLOUD_REFRESH_TOKEN = "google.cloud.refresh.token";
187 public static final String GOOGLE_CLOUD_BUCKET = "google.cloud.storage.bucket";
188 public static final String GOOGLE_CLOUD_TOKEN_ENDPOINT_URL = "google.cloud.token.endpoint.url";
189 public static final String ENCODING_EXTENSION = "encoding.extension";
190
191
192
193
194 private boolean enabled = false;
195 private boolean profanityFilter = DEFAULT_PROFANITY_FILTER;
196 private boolean enablePunctuation = DEFAULT_ENABLE_PUNCTUATION;
197 private String model = DEFAULT_MODEL;
198 private String defaultLanguage = DEFAULT_LANGUAGE;
199 private String defaultEncoding = DEFAULT_ENCODING;
200 private String workflowDefinitionId = DEFAULT_WF_DEF;
201 private long workflowDispatchInterval = DEFAULT_DISPATCH_INTERVAL;
202 private long completionCheckBuffer = DEFAULT_COMPLETION_BUFFER;
203 private long maxProcessingSeconds = DEFAULT_MAX_PROCESSING_TIME;
204 private String toEmailAddress;
205 private int cleanupResultDays = DEFAULT_CLEANUP_RESULTS_DAYS;
206 private String clientId;
207 private String clientSecret;
208 private String clientToken;
209 private String accessToken = INVALID_TOKEN;
210 private String tokenEndpoint = GOOGLE_AUTH2_URL;
211 private String storageBucket;
212 private long tokenExpiryTime = 0;
213 private String systemAccount;
214 private ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);
215
216 public GoogleSpeechTranscriptionService() {
217 super(JOB_TYPE);
218 }
219
220 @Activate
221 public void activate(ComponentContext cc) {
222
223 enabled = OsgiUtil.getOptCfgAsBoolean(cc.getProperties(), ENABLED_CONFIG).get();
224 if (!enabled) {
225 logger.info("Service disabled. If you want to enable it, please update the service configuration.");
226 return;
227 }
228
229 clientId = OsgiUtil.getComponentContextProperty(cc, GOOGLE_CLOUD_CLIENT_ID);
230 clientSecret = OsgiUtil.getComponentContextProperty(cc, GOOGLE_CLOUD_CLIENT_SECRET);
231 clientToken = OsgiUtil.getComponentContextProperty(cc, GOOGLE_CLOUD_REFRESH_TOKEN);
232 storageBucket = OsgiUtil.getComponentContextProperty(cc, GOOGLE_CLOUD_BUCKET);
233
234
235 Option<String> tokenOpt = OsgiUtil.getOptCfg(cc.getProperties(), GOOGLE_CLOUD_TOKEN_ENDPOINT_URL);
236 if (tokenOpt.isSome()) {
237 tokenEndpoint = tokenOpt.get();
238 logger.info("Access token endpoint is set to {}", tokenEndpoint);
239 } else {
240 logger.info("Default access token endpoint will be used");
241 }
242
243
244 Option<String> profanityOpt = OsgiUtil.getOptCfg(cc.getProperties(), PROFANITY_FILTER);
245 if (profanityOpt.isSome()) {
246 profanityFilter = Boolean.parseBoolean(profanityOpt.get());
247 logger.info("Profanity filter is set to {}", profanityFilter);
248 } else {
249 logger.info("Default profanity filter will be used");
250 }
251
252 Option<String> languageOpt = OsgiUtil.getOptCfg(cc.getProperties(), GOOGLE_SPEECH_LANGUAGE);
253 if (languageOpt.isSome()) {
254 defaultLanguage = languageOpt.get();
255 logger.info("Language used is {}", defaultLanguage);
256 } else {
257 logger.info("Default language will be used");
258 }
259
260 Option<String> punctuationOpt = OsgiUtil.getOptCfg(cc.getProperties(), ENABLE_PUNCTUATION);
261 if (punctuationOpt.isSome()) {
262 enablePunctuation = Boolean.parseBoolean(punctuationOpt.get());
263 logger.info("Enable punctuation is set to {}", enablePunctuation);
264 } else {
265 logger.info("Default punctuation setting will be used");
266 }
267
268 Option<String> transModel = OsgiUtil.getOptCfg(cc.getProperties(), TRANSCRIPTION_MODEL);
269 if (transModel.isSome()) {
270 model = transModel.get();
271 logger.info("Transcription model used is {}", model);
272 } else {
273 logger.info("Default Transcription model will be used");
274 }
275
276 Option<String> encodingOpt = OsgiUtil.getOptCfg(cc.getProperties(), ENCODING_EXTENSION);
277 if (encodingOpt.isSome()) {
278 defaultEncoding = encodingOpt.get();
279 logger.info("Encoding used is {}", defaultEncoding);
280 } else {
281 logger.info("Default encoding will be used");
282 }
283
284
285 Option<String> wfOpt = OsgiUtil.getOptCfg(cc.getProperties(), WORKFLOW_CONFIG);
286 if (wfOpt.isSome()) {
287 workflowDefinitionId = wfOpt.get();
288 }
289 logger.info("Workflow definition is {}", workflowDefinitionId);
290
291 Option<String> intervalOpt = OsgiUtil.getOptCfg(cc.getProperties(), DISPATCH_WORKFLOW_INTERVAL_CONFIG);
292 if (intervalOpt.isSome()) {
293 try {
294 workflowDispatchInterval = Long.parseLong(intervalOpt.get());
295 } catch (NumberFormatException e) {
296
297 logger.warn("Invalid configuration for Workflow dispatch interval. Default used instead: {}",
298 workflowDispatchInterval);
299 }
300 }
301 logger.info("Workflow dispatch interval is {} seconds", workflowDispatchInterval);
302
303 Option<String> bufferOpt = OsgiUtil.getOptCfg(cc.getProperties(), COMPLETION_CHECK_BUFFER_CONFIG);
304 if (bufferOpt.isSome()) {
305 try {
306 completionCheckBuffer = Long.parseLong(bufferOpt.get());
307 } catch (NumberFormatException e) {
308
309 logger.warn("Invalid configuration for {} : {}. Default used instead: {}",
310 new Object[]{COMPLETION_CHECK_BUFFER_CONFIG, bufferOpt.get(), completionCheckBuffer});
311 }
312 }
313 logger.info("Completion check buffer is {} seconds", completionCheckBuffer);
314
315 Option<String> maxProcessingOpt = OsgiUtil.getOptCfg(cc.getProperties(), MAX_PROCESSING_TIME_CONFIG);
316 if (maxProcessingOpt.isSome()) {
317 try {
318 maxProcessingSeconds = Long.parseLong(maxProcessingOpt.get());
319 } catch (NumberFormatException e) {
320
321 logger.warn("Invalid configuration for maximum processing time. Default used instead: {}",
322 maxProcessingSeconds);
323 }
324 }
325 logger.info("Maximum time a job is checked after it should have ended is {} seconds", maxProcessingSeconds);
326
327 Option<String> cleaupOpt = OsgiUtil.getOptCfg(cc.getProperties(), CLEANUP_RESULTS_DAYS_CONFIG);
328 if (cleaupOpt.isSome()) {
329 try {
330 cleanupResultDays = Integer.parseInt(cleaupOpt.get());
331 } catch (NumberFormatException e) {
332
333 logger.warn("Invalid configuration for clean up days. Default used instead: {}", cleanupResultDays);
334 }
335 }
336 logger.info("Cleanup result files after {} days", cleanupResultDays);
337
338 systemAccount = OsgiUtil.getContextProperty(cc, OpencastConstants.DIGEST_USER_PROPERTY);
339
340
341 scheduledExecutor.scheduleWithFixedDelay(new WorkflowDispatcher(), 120, workflowDispatchInterval,
342 TimeUnit.SECONDS);
343
344
345 scheduledExecutor.scheduleWithFixedDelay(new ResultsFileCleanup(), 1, 1, TimeUnit.DAYS);
346
347
348 Option<String> optTo = OsgiUtil.getOptCfg(cc.getProperties(), NOTIFICATION_EMAIL_CONFIG);
349 if (optTo.isSome()) {
350 toEmailAddress = optTo.get();
351 } else {
352
353 optTo = OsgiUtil.getOptContextProperty(cc, OpencastConstants.ADMIN_EMAIL_PROPERTY);
354 if (optTo.isSome()) {
355 toEmailAddress = optTo.get();
356 }
357 }
358 if (toEmailAddress != null) {
359 logger.info("Notification email set to {}", toEmailAddress);
360 } else {
361 logger.warn("Email notification disabled");
362 }
363
364 Option<String> optCluster = OsgiUtil.getOptContextProperty(cc, OpencastConstants.ENVIRONMENT_NAME_PROPERTY);
365 if (optCluster.isSome()) {
366 clusterName = optCluster.get();
367 }
368 logger.info("Environment name is {}", clusterName);
369
370 logger.info("Activated!");
371 }
372
373 @Override
374 public Job startTranscription(String mpId, Track track, String... args) throws TranscriptionServiceException {
375 if (!enabled) {
376 throw new TranscriptionServiceException(
377 "This service is disabled. If you want to enable it, please update the service configuration.");
378 }
379
380 if (args.length == 0) {
381 throw new IllegalArgumentException("Additional language argument is required.");
382 }
383
384 try {
385 return serviceRegistry.createJob(JOB_TYPE, Operation.StartTranscription.name(),
386 Arrays.asList(mpId, MediaPackageElementParser.getAsXml(track), args[0]));
387 } catch (ServiceRegistryException e) {
388 throw new TranscriptionServiceException("Unable to create a job", e);
389 } catch (MediaPackageException e) {
390 throw new TranscriptionServiceException("Invalid track " + track.toString(), e);
391 }
392 }
393
394 @Override
395 public Job startTranscription(String mpId, Track track) throws TranscriptionServiceException {
396 throw new UnsupportedOperationException("Not supported.");
397 }
398
399 @Override
400 public void transcriptionDone(String mpId, Object obj) throws TranscriptionServiceException {
401 JSONObject jsonObj = null;
402 String jobId = null;
403 String token = INVALID_TOKEN;
404 try {
405 token = getRefreshAccessToken();
406 } catch (IOException ex) {
407 logger.error("Unable to create access token, error: {}", ex.toString());
408 }
409 if (token.equals(INVALID_TOKEN)) {
410 throw new TranscriptionServiceException("Invalid access token");
411 }
412 try {
413 jsonObj = (JSONObject) obj;
414 jobId = (String) jsonObj.get("name");
415 logger.info("Transcription done for mpId {}, jobId {}", mpId, jobId);
416 JSONArray resultsArray = getTranscriptionResult(jsonObj);
417
418
419
420
421 database.updateJobControl(jobId, TranscriptionJobControl.Status.TranscriptionComplete.name());
422
423
424 deleteStorageFile(mpId, token);
425
426
427 if (resultsArray != null) {
428 saveResults(jobId, jsonObj);
429 }
430 } catch (IOException e) {
431 if (jsonObj == null) {
432 logger.warn("Could not save transcription results file for mpId {}, jobId {}: null",
433 mpId, jobId);
434 } else {
435 logger.warn("Could not save transcription results file for mpId {}, jobId {}: {}",
436 mpId, jobId, jsonObj.toJSONString());
437 }
438 throw new TranscriptionServiceException("Could not save transcription results file", e);
439 } catch (TranscriptionDatabaseException e) {
440 logger.warn("Transcription results file were saved but state in db not updated for mpId {}, jobId {}", mpId,
441 jobId);
442 throw new TranscriptionServiceException("Could not update transcription job control db", e);
443 }
444 }
445
446 @Override
447 public String getLanguage() {
448 return defaultLanguage;
449 }
450
451 @Override
452 public Map<String, Object> getReturnValues(String mpId, String jobId) throws TranscriptionServiceException {
453 throw new TranscriptionServiceException("Method not implemented");
454 }
455
456 @Override
457 public void transcriptionError(String mpId, Object obj) throws TranscriptionServiceException {
458 JSONObject jsonObj = null;
459 String jobId = null;
460 try {
461 jsonObj = (JSONObject) obj;
462 jobId = (String) jsonObj.get("name");
463
464 database.updateJobControl(jobId, TranscriptionJobControl.Status.Error.name());
465 TranscriptionJobControl jobControl = database.findByJob(jobId);
466 logger.warn("Error received for media package {}, job id {}",
467 jobControl.getMediaPackageId(), jobId);
468
469 sendEmail(TRANSCRIPTION_ERROR,
470 String.format("There was a transcription error for for media package %s, job id %s.",
471 jobControl.getMediaPackageId(), jobId));
472 } catch (TranscriptionDatabaseException e) {
473 logger.warn("Transcription error. State in db could not be updated to error for mpId {}, jobId {}", mpId, jobId);
474 throw new TranscriptionServiceException("Could not update transcription job control db", e);
475 }
476 }
477
478 @Override
479 protected String process(Job job) throws Exception {
480 Operation op = null;
481 String operation = job.getOperation();
482 List<String> arguments = job.getArguments();
483 String result = "";
484 op = Operation.valueOf(operation);
485 switch (op) {
486 case StartTranscription:
487 String mpId = arguments.get(0);
488 Track track = (Track) MediaPackageElementParser.getFromXml(arguments.get(1));
489 String languageCode = arguments.get(2);
490 createRecognitionsJob(mpId, track, languageCode);
491 break;
492 default:
493 throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
494 }
495 return result;
496 }
497
498
499
500
501
502 void createRecognitionsJob(String mpId, Track track, String languageCode)
503 throws TranscriptionServiceException, IOException {
504
505 if (StringUtils.isBlank(languageCode)) {
506 languageCode = defaultLanguage;
507 }
508 String audioUrl;
509 audioUrl = uploadAudioFileToGoogleStorage(mpId, track);
510 CloseableHttpClient httpClient = makeHttpClient();
511 CloseableHttpResponse response = null;
512 String token = getRefreshAccessToken();
513 if (token.equals(INVALID_TOKEN) || audioUrl == null) {
514 throw new TranscriptionServiceException("Could not create recognition job. Audio file or access token invalid");
515 }
516
517
518 JSONObject configValues = new JSONObject();
519 JSONObject audioValues = new JSONObject();
520 JSONObject container = new JSONObject();
521 configValues.put("languageCode", languageCode);
522 configValues.put("enableWordTimeOffsets", true);
523 configValues.put("profanityFilter", profanityFilter);
524 configValues.put("enableAutomaticPunctuation", enablePunctuation);
525 configValues.put("model", model);
526 audioValues.put("uri", audioUrl);
527 container.put("config", configValues);
528 container.put("audio", audioValues);
529
530 try {
531 HttpPost httpPost = new HttpPost(UrlSupport.concat(GOOGLE_SPEECH_URL, REQUEST_METHOD));
532 logger.debug("Url to invoke Google speech service: {}", httpPost.getURI().toString());
533 StringEntity params = new StringEntity(container.toJSONString());
534 httpPost.addHeader("Authorization", "Bearer " + token);
535 httpPost.addHeader(HttpHeaders.CONTENT_TYPE, "application/json; charset=utf-8");
536 httpPost.setEntity(params);
537 response = httpClient.execute(httpPost);
538 int code = response.getStatusLine().getStatusCode();
539 HttpEntity entity = response.getEntity();
540 String jsonString = EntityUtils.toString(response.getEntity());
541 JSONParser jsonParser = new JSONParser();
542 JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
543
544 switch (code) {
545 case HttpStatus.SC_OK:
546 logger.info("Recognitions job has been successfully created");
547
548
549
550
551
552
553
554
555 String jobId = (String) jsonObject.get("name");
556 logger.info(
557 "Transcription for mp {} has been submitted. Job id: {}", mpId,
558 jobId);
559
560 database.storeJobControl(mpId, track.getIdentifier(), jobId, TranscriptionJobControl.Status.InProgress.name(),
561 track.getDuration() == null ? 0 : track.getDuration().longValue(), null, PROVIDER);
562 EntityUtils.consume(entity);
563 return;
564 default:
565 JSONObject errorObj = (JSONObject) jsonObject.get("error");
566 logger.warn("Invalid argument returned, status: {} with message: {}", code, (String) errorObj.get("message"));
567 break;
568 }
569 throw new TranscriptionServiceException("Could not create recognition job. Status returned: " + code);
570 } catch (Exception e) {
571 logger.warn("Exception when calling the recognitions endpoint", e);
572 throw new TranscriptionServiceException("Exception when calling the recognitions endpoint", e);
573 } finally {
574 try {
575 httpClient.close();
576 if (response != null) {
577 response.close();
578 }
579 } catch (IOException e) {
580 }
581 }
582 }
583
584
585
586
587
588
589
590
591
592
593
594
595
596 boolean getAndSaveJobResults(String jobId) throws TranscriptionServiceException, IOException {
597 CloseableHttpClient httpClient = makeHttpClient();
598 CloseableHttpResponse response = null;
599 String mpId = "unknown";
600 JSONArray resultsArray = null;
601 String token = getRefreshAccessToken();
602 if (token.equals(INVALID_TOKEN)) {
603 return false;
604 }
605 try {
606 HttpGet httpGet = new HttpGet(UrlSupport.concat(GOOGLE_SPEECH_URL, RESULT_PATH, jobId));
607 logger.debug("Url to invoke Google speech service: {}", httpGet.getURI().toString());
608
609 httpGet.addHeader("Authorization", "Bearer " + token);
610 response = httpClient.execute(httpGet);
611 int code = response.getStatusLine().getStatusCode();
612
613 switch (code) {
614 case HttpStatus.SC_OK:
615 HttpEntity entity = response.getEntity();
616
617 String jsonString = EntityUtils.toString(entity);
618 JSONParser jsonParser = new JSONParser();
619 JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
620 Boolean jobDone = (Boolean) jsonObject.get("done");
621 TranscriptionJobControl jc = database.findByJob(jobId);
622 if (jc != null) {
623 mpId = jc.getMediaPackageId();
624 }
625 if (jobDone) {
626 resultsArray = getTranscriptionResult(jsonObject);
627 }
628 logger.info("Recognitions job {} has been found, completed status {}", jobId, jobDone.toString());
629 EntityUtils.consume(entity);
630
631 if (jobDone && resultsArray != null) {
632 transcriptionDone(mpId, jsonObject);
633 return true;
634 }
635 return false;
636 case HttpStatus.SC_NOT_FOUND:
637 logger.warn("Job not found: {}", jobId);
638 break;
639 case HttpStatus.SC_SERVICE_UNAVAILABLE:
640 logger.warn("Service unavailable returned, status: {}", code);
641 break;
642 default:
643 logger.warn("Error return status: {}.", code);
644 break;
645 }
646 throw new TranscriptionServiceException(
647 String.format("Could not check recognition job for media package %s, job id %s. Status returned: %d",
648 mpId, jobId, code), code);
649 } catch (TranscriptionServiceException e) {
650 throw e;
651 } catch (Exception e) {
652 if (hasTranscriptionRequestExpired(jobId)) {
653
654 cancelTranscription(jobId, "Google Transcription job canceled due to errors");
655 logger.info("Google Transcription job {} has been canceled. Email notification sent", jobId);
656 }
657 String msg = String.format("Exception when calling the recognitions endpoint for media package %s, job id %s",
658 mpId, jobId);
659 logger.warn(msg, e);
660 throw new TranscriptionServiceException(String.format(
661 "Exception when calling the recognitions endpoint for media package %s, job id %s", mpId, jobId), e);
662 } finally {
663 try {
664 httpClient.close();
665 if (response != null) {
666 response.close();
667 }
668 } catch (IOException e) {
669 }
670 }
671 }
672
673
674
675
676
677
678
679
680
681
682 public String getTranscriptionResults(String jobId)
683 throws TranscriptionServiceException, IOException {
684 CloseableHttpClient httpClient = makeHttpClient();
685 CloseableHttpResponse response = null;
686 String token = getRefreshAccessToken();
687 if (token.equals(INVALID_TOKEN)) {
688 logger.warn("Invalid access token");
689 return "No results found";
690 }
691 try {
692 HttpGet httpGet = new HttpGet(UrlSupport.concat(GOOGLE_SPEECH_URL, RESULT_PATH, jobId));
693 logger.debug("Url to invoke Google speech service: {}", httpGet.getURI().toString());
694
695 httpGet.addHeader("Authorization", "Bearer " + token);
696 response = httpClient.execute(httpGet);
697 int code = response.getStatusLine().getStatusCode();
698
699 switch (code) {
700 case HttpStatus.SC_OK:
701 HttpEntity entity = response.getEntity();
702 logger.info("Retrieved details for transcription with job id: '{}'", jobId);
703 return EntityUtils.toString(entity);
704 default:
705 logger.warn("Error retrieving details for transcription with job id: '{}', return status: {}.", jobId, code);
706 break;
707 }
708 } catch (Exception e) {
709 logger.warn("Exception when calling the transcription service for job id: {}", jobId, e);
710 throw new TranscriptionServiceException(String.format(
711 "Exception when calling the transcription service for job id: %s", jobId), e);
712 } finally {
713 try {
714 httpClient.close();
715 if (response != null) {
716 response.close();
717 }
718 } catch (IOException e) {
719 }
720 }
721 return "No results found";
722 }
723
724 private void saveResults(String jobId, JSONObject jsonObj) throws IOException {
725 JSONArray resultsArray = getTranscriptionResult(jsonObj);
726 if (resultsArray != null) {
727
728 workspace.putInCollection(TRANSCRIPT_COLLECTION, jobId + ".json",
729 new ByteArrayInputStream(jsonObj.toJSONString().getBytes()));
730 }
731 }
732
733 @Override
734 public MediaPackageElement getGeneratedTranscription(String mpId, String jobId, MediaPackageElement.Type type)
735 throws TranscriptionServiceException {
736 try {
737
738 if (jobId == null || "null".equals(jobId)) {
739 jobId = null;
740 for (TranscriptionJobControl jc : database.findByMediaPackage(mpId)) {
741 if (TranscriptionJobControl.Status.Closed.name().equals(jc.getStatus())
742 || TranscriptionJobControl.Status.TranscriptionComplete.name().equals(jc.getStatus())) {
743 jobId = jc.getTranscriptionJobId();
744 }
745 }
746 }
747
748 if (jobId == null) {
749 throw new TranscriptionServiceException(
750 "No completed or closed transcription job found in database for media package " + mpId);
751 }
752
753
754 URI uri = workspace.getCollectionURI(TRANSCRIPT_COLLECTION, jobId + ".json");
755 try {
756 workspace.get(uri);
757 } catch (Exception e) {
758 try {
759
760 getAndSaveJobResults(jobId);
761 } catch (IOException ex) {
762 logger.error("Unable to retrieve transcription job, error: {}", ex.toString());
763 }
764 }
765 MediaPackageElementBuilder builder = MediaPackageElementBuilderFactory.newInstance().newElementBuilder();
766 return builder.elementFromURI(uri, type,
767 new MediaPackageElementFlavor("captions", "google-speech-json"));
768 } catch (TranscriptionDatabaseException e) {
769 throw new TranscriptionServiceException("Job id not informed and could not find transcription", e);
770 }
771 }
772
773
774
775
776
777
778
779
780 public String getTranscriptionStatus(String mpId) throws TranscriptionServiceException {
781 try {
782 for (TranscriptionJobControl jc : database.findByMediaPackage(mpId)) {
783 return jc.getStatus();
784 }
785 } catch (TranscriptionDatabaseException e) {
786 throw new TranscriptionServiceException("Mediapackage id transcription status unknown", e);
787 }
788 return "Unknown";
789 }
790
791 protected CloseableHttpClient makeHttpClient() throws IOException {
792 RequestConfig reqConfig = RequestConfig.custom()
793 .setConnectTimeout(CONNECTION_TIMEOUT)
794 .setSocketTimeout(SOCKET_TIMEOUT)
795 .setConnectionRequestTimeout(CONNECTION_TIMEOUT)
796 .build();
797 return HttpClients.custom().setDefaultRequestConfig(reqConfig).build();
798 }
799
800 protected String refreshAccessToken(String clientId, String clientSecret, String refreshToken)
801 throws TranscriptionServiceException, IOException {
802 CloseableHttpClient httpClient = makeHttpClient();
803 CloseableHttpResponse response = null;
804
805 try {
806 HttpPost httpPost = new HttpPost(tokenEndpoint + String.format(
807 "?client_id=%s&client_secret=%s&refresh_token=%s&grant_type=refresh_token",
808 clientId, clientSecret, refreshToken));
809 httpPost.addHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
810 response = httpClient.execute(httpPost);
811 int code = response.getStatusLine().getStatusCode();
812 String jsonString = EntityUtils.toString(response.getEntity());
813 JSONParser jsonParser = new JSONParser();
814 JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
815 switch (code) {
816 case HttpStatus.SC_OK:
817 accessToken = (String) jsonObject.get(ACCESS_TOKEN_NAME);
818 long duration = (long) jsonObject.get(ACCESS_TOKEN_EXPIRY_NAME);
819 tokenExpiryTime = (System.currentTimeMillis() + (duration * 1000));
820 if (!INVALID_TOKEN.equals(accessToken)) {
821 logger.info("Google Cloud Service access token created");
822 return accessToken;
823 }
824 throw new TranscriptionServiceException(
825 String.format("Created token is invalid. Status returned: %d", code), code);
826 case HttpStatus.SC_BAD_REQUEST:
827 case HttpStatus.SC_UNAUTHORIZED:
828 String error = (String) jsonObject.get("error");
829 String errorDetails = (String) jsonObject.get("error_description");
830 logger.warn("Invalid argument returned, status: {}", code);
831 logger.warn("Unable to refresh Google Cloud Service token, error: {}, error details: {}",
832 error, errorDetails);
833 break;
834 default:
835 logger.warn("Invalid argument returned, status: {}", code);
836 }
837 throw new TranscriptionServiceException(
838 String.format("Could not create Google access token. Status returned: %d", code), code);
839 } catch (TranscriptionServiceException e) {
840 throw e;
841 } catch (Exception e) {
842 logger.warn("Unable to generate access token for Google Cloud Services");
843 return INVALID_TOKEN;
844 } finally {
845 try {
846 httpClient.close();
847 if (response != null) {
848 response.close();
849 }
850 } catch (IOException e) {
851 }
852 }
853 }
854
855 protected String getRefreshAccessToken() throws TranscriptionServiceException, IOException {
856
857 if ((!INVALID_TOKEN.equals(accessToken))
858 && (System.currentTimeMillis() < (tokenExpiryTime - ACCESS_TOKEN_MINIMUM_TIME))) {
859 return accessToken;
860 }
861 return refreshAccessToken(clientId, clientSecret, clientToken);
862 }
863
864 protected String uploadAudioFileToGoogleStorage(String mpId, Track track)
865 throws TranscriptionServiceException, IOException {
866 File audioFile;
867 String audioUrl = null;
868 String fileExtension;
869 int audioResponse;
870 CloseableHttpClient httpClientStorage = makeHttpClient();
871 GoogleSpeechTranscriptionServiceStorage storage = new GoogleSpeechTranscriptionServiceStorage();
872 try {
873 audioFile = workspace.get(track.getURI());
874 fileExtension = FilenameUtils.getExtension(audioFile.getName());
875 long fileSize = audioFile.length();
876 String contentType = track.getMimeType().toString();
877 String token = getRefreshAccessToken();
878
879 audioResponse = storage.startUpload(httpClientStorage, storageBucket, mpId, fileExtension,
880 audioFile, String.valueOf(fileSize), contentType, token);
881 if (audioResponse == HttpStatus.SC_OK) {
882 audioUrl = String.format("gs://%s/%s.%s", storageBucket, mpId, fileExtension);
883 return audioUrl;
884 }
885 logger.error("Error when uploading audio to Google Storage, error code: {}", audioResponse);
886 return audioUrl;
887 } catch (Exception e) {
888 throw new TranscriptionServiceException("Error reading audio track", e);
889 }
890 }
891
892 private JSONArray getTranscriptionResult(JSONObject jsonObj) {
893 JSONObject responseObj = (JSONObject) jsonObj.get("response");
894 JSONArray resultsArray = (JSONArray) responseObj.get("results");
895 return resultsArray;
896 }
897
898 protected void deleteStorageFile(String mpId, String token) throws IOException {
899 CloseableHttpClient httpClientDel = makeHttpClient();
900 GoogleSpeechTranscriptionServiceStorage storage = new GoogleSpeechTranscriptionServiceStorage();
901 storage.deleteGoogleStorageFile(httpClientDel, storageBucket, mpId + "." + defaultEncoding, token);
902 }
903
904 private void sendEmail(String subject, String body) {
905 if (toEmailAddress == null) {
906 logger.info("Skipping sending email notification. Message is {}.", body);
907 return;
908 }
909 try {
910 logger.debug("Sending e-mail notification to {}", toEmailAddress);
911 smtpService.send(toEmailAddress, String.format("%s (%s)", subject, clusterName), body);
912 logger.info("Sent e-mail notification to {}", toEmailAddress);
913 } catch (Exception e) {
914 logger.error("Could not send email: {}\n{}", subject, body, e);
915 }
916 }
917
918 private void cancelTranscription(String jobId, String message) {
919 try {
920 database.updateJobControl(jobId, TranscriptionJobControl.Status.Canceled.name());
921 String mpId = database.findByJob(jobId).getMediaPackageId();
922 try {
923
924 String token = getRefreshAccessToken();
925 deleteStorageFile(mpId, token);
926 } catch (Exception ex) {
927 logger.warn(String.format("could not delete file %s.%s from Google cloud storage", mpId, defaultEncoding), ex);
928 } finally {
929
930 sendEmail("Transcription ERROR", String.format("%s(media package %s, job id %s).", message, mpId, jobId));
931 }
932 } catch (Exception e) {
933 logger.error(String.format("ERROR while deleting transcription job: %s", jobId), e);
934 }
935 }
936
937 private boolean hasTranscriptionRequestExpired(String jobId) {
938 try {
939
940 if (database.findByJob(jobId).getDateCreated().getTime() + database.findByJob(jobId).getTrackDuration()
941 + (completionCheckBuffer + maxProcessingSeconds) * 1000 < System.currentTimeMillis()) {
942 return true;
943 }
944 } catch (Exception e) {
945 logger.error(String.format("ERROR while calculating transcription request expiration for job: %s", jobId), e);
946
947 return true;
948 }
949 return false;
950 }
951
952 private long getRemainingTranscriptionExpireTimeInMin(String jobId) {
953 try {
954 long expiredTime = (database.findByJob(jobId).getDateCreated().getTime()
955 + database.findByJob(jobId).getTrackDuration()
956 + (completionCheckBuffer + maxProcessingSeconds) * 1000)
957 - (System.currentTimeMillis());
958
959 if (expiredTime < 0) {
960 expiredTime = 0;
961 }
962 return TimeUnit.MILLISECONDS.toMinutes(expiredTime);
963 } catch (Exception e) {
964 logger.error("Unable to calculate remaining transcription expired time for transcription job {}", jobId);
965 }
966 return 0;
967 }
968
969 @Reference
970 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
971 this.serviceRegistry = serviceRegistry;
972 }
973
974 @Reference
975 public void setSecurityService(SecurityService securityService) {
976 this.securityService = securityService;
977 }
978
979 @Reference
980 public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
981 this.userDirectoryService = userDirectoryService;
982 }
983
984 @Reference
985 public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
986 this.organizationDirectoryService = organizationDirectoryService;
987 }
988
989 @Reference
990 public void setSmtpService(SmtpService service) {
991 this.smtpService = service;
992 }
993
994 @Reference
995 public void setWorkspace(Workspace ws) {
996 this.workspace = ws;
997 }
998
999 @Reference
1000 public void setWorkingFileRepository(WorkingFileRepository wfr) {
1001 this.wfr = wfr;
1002 }
1003
1004 @Reference
1005 public void setDatabase(TranscriptionDatabase service) {
1006 this.database = service;
1007 }
1008
1009 @Reference
1010 public void setAssetManager(AssetManager service) {
1011 this.assetManager = service;
1012 }
1013
1014 @Reference
1015 public void setWorkflowService(WorkflowService service) {
1016 this.workflowService = service;
1017 }
1018
1019 @Override
1020 protected ServiceRegistry getServiceRegistry() {
1021 return serviceRegistry;
1022 }
1023
1024 @Override
1025 protected SecurityService getSecurityService() {
1026 return securityService;
1027 }
1028
1029 @Override
1030 protected UserDirectoryService getUserDirectoryService() {
1031 return userDirectoryService;
1032 }
1033
1034 @Override
1035 protected OrganizationDirectoryService getOrganizationDirectoryService() {
1036 return organizationDirectoryService;
1037 }
1038
1039
1040 void setWfUtil(Workflows wfUtil) {
1041 this.wfUtil = wfUtil;
1042 }
1043
1044 class WorkflowDispatcher implements Runnable {
1045
1046
1047
1048
1049
1050
1051 @Override
1052 public void run() {
1053 logger.debug("WorkflowDispatcher waking up...");
1054
1055 try {
1056
1057
1058 long providerId;
1059 TranscriptionProviderControl providerInfo = database.findIdByProvider(PROVIDER);
1060 if (providerInfo != null) {
1061 providerId = providerInfo.getId();
1062 } else {
1063 logger.debug("No jobs yet for provider {}", PROVIDER);
1064 return;
1065 }
1066
1067 List<TranscriptionJobControl> jobs = database.findByStatus(TranscriptionJobControl.Status.InProgress.name(),
1068 TranscriptionJobControl.Status.TranscriptionComplete.name());
1069 for (TranscriptionJobControl j : jobs) {
1070
1071
1072 if (j.getProviderId() != providerId) {
1073 continue;
1074 }
1075
1076 String mpId = j.getMediaPackageId();
1077 String jobId = j.getTranscriptionJobId();
1078
1079
1080 if (TranscriptionJobControl.Status.InProgress.name().equals(j.getStatus())) {
1081
1082
1083
1084 if (j.getDateCreated().getTime() + (j.getTrackDuration() / 3) + completionCheckBuffer * 1000 < System
1085 .currentTimeMillis()) {
1086 try {
1087 if (!getAndSaveJobResults(jobId)) {
1088
1089 if (hasTranscriptionRequestExpired(jobId)) {
1090
1091 database.updateJobControl(jobId, TranscriptionJobControl.Status.Canceled.name());
1092
1093 String token = getRefreshAccessToken();
1094 deleteStorageFile(mpId, token);
1095
1096 sendEmail(TRANSCRIPTION_ERROR, String.format(
1097 "Transcription job was in processing state for too long and was marked "
1098 + "as cancelled (media package %s, job id %s).",
1099 mpId, jobId));
1100 }
1101
1102 continue;
1103 }
1104 } catch (TranscriptionServiceException e) {
1105 if (e.getCode() == 404) {
1106
1107 database.updateJobControl(jobId, TranscriptionJobControl.Status.Canceled.name());
1108
1109 sendEmail(TRANSCRIPTION_ERROR,
1110 String.format("Transcription job was not found (media package %s, job id %s).", mpId, jobId));
1111 }
1112 continue;
1113 } catch (IOException ex) {
1114 logger.error("Transcription job not found, error: {}", ex.toString());
1115 }
1116 } else {
1117 continue;
1118 }
1119 }
1120
1121
1122 try {
1123
1124
1125 Map<String, String> params = new HashMap<String, String>();
1126 params.put(TRANSCRIPTION_JOB_ID_KEY, jobId);
1127 String wfId = startWorkflow(mpId, workflowDefinitionId, jobId, params);
1128 if (wfId == null) {
1129 logger.warn("Attach transcription workflow could NOT be scheduled for mp {}, google speech job {}",
1130 mpId, jobId);
1131 continue;
1132 }
1133
1134 database.updateJobControl(jobId, TranscriptionJobControl.Status.Closed.name());
1135 logger.info("Attach transcription workflow {} scheduled for mp {}, google speech job {}",
1136 wfId, mpId, jobId);
1137 } catch (Exception e) {
1138 logger.warn("Attach transcription workflow could NOT be scheduled for mp {}, google speech job {}, {}: {}",
1139 mpId, jobId, e.getClass().getName(), e.getMessage());
1140 }
1141 }
1142 } catch (TranscriptionDatabaseException e) {
1143 logger.warn("Could not read transcription job control database: {}", e.getMessage());
1144 }
1145 }
1146 }
1147
1148 private String startWorkflow(String mpId, String wfDefId, String jobId, Map<String, String> params) {
1149 DefaultOrganization defaultOrg = new DefaultOrganization();
1150 securityService.setOrganization(defaultOrg);
1151 securityService.setUser(SecurityUtil.createSystemUser(systemAccount, defaultOrg));
1152
1153
1154 final AQueryBuilder q = assetManager.createQuery();
1155 final AResult r = q.select(q.snapshot()).where(q.mediaPackageId(mpId).and(q.version().isLatest())).run();
1156 if (r.getSize() == 0) {
1157 if (!hasTranscriptionRequestExpired(jobId)) {
1158
1159 logger.warn("Media package {} has not been archived yet or has been deleted. Will keep trying for {} "
1160 + "more minutes before cancelling transcription job {}.",
1161 mpId, getRemainingTranscriptionExpireTimeInMin(jobId), jobId);
1162 } else {
1163
1164 cancelTranscription(jobId, " Google Transcription job canceled, archived media package not found");
1165 logger.info("Google Transcription job {} has been canceled. Email notification sent", jobId);
1166 }
1167 return null;
1168 }
1169
1170 String org = Enrichments.enrich(r).getSnapshots().stream().findFirst().get().getOrganizationId();
1171 Organization organization = null;
1172 try {
1173 organization = organizationDirectoryService.getOrganization(org);
1174 if (organization == null) {
1175 logger.warn("Media package {} has an unknown organization {}.", mpId, org);
1176 return null;
1177 }
1178 } catch (NotFoundException e) {
1179 logger.warn("Organization {} not found for media package {}.", org, mpId);
1180 return null;
1181 }
1182 securityService.setOrganization(organization);
1183
1184 try {
1185 WorkflowDefinition wfDef = workflowService.getWorkflowDefinitionById(wfDefId);
1186 Workflows workflows;
1187 if (wfUtil != null) {
1188 workflows = wfUtil;
1189 } else {
1190 workflows = new Workflows(assetManager, workflowService);
1191 }
1192 Set<String> mpIds = new HashSet<String>();
1193 mpIds.add(mpId);
1194 List<WorkflowInstance> wfList = workflows
1195 .applyWorkflowToLatestVersion(mpIds, ConfiguredWorkflow.workflow(wfDef, params));
1196 return wfList.size() > 0 ? Long.toString(wfList.get(0).getId()) : null;
1197 } catch (NotFoundException | WorkflowDatabaseException e) {
1198 logger.warn("Could not get workflow definition: {}", wfDefId);
1199 }
1200
1201 return null;
1202 }
1203
1204 class ResultsFileCleanup implements Runnable {
1205
1206 @Override
1207 public void run() {
1208 logger.info("ResultsFileCleanup waking up...");
1209 try {
1210
1211 wfr.cleanupOldFilesFromCollection(TRANSCRIPT_COLLECTION, cleanupResultDays);
1212 } catch (IOException e) {
1213 logger.warn("Could not cleanup old transcript results files", e);
1214 }
1215 }
1216 }
1217
1218 }