1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.transcription.ibmwatson;
22
23 import static org.opencastproject.systems.OpencastConstants.ADMIN_EMAIL_PROPERTY;
24
25 import org.opencastproject.assetmanager.api.AssetManager;
26 import org.opencastproject.assetmanager.api.Snapshot;
27 import org.opencastproject.assetmanager.util.Workflows;
28 import org.opencastproject.job.api.AbstractJobProducer;
29 import org.opencastproject.job.api.Job;
30 import org.opencastproject.kernel.mail.SmtpService;
31 import org.opencastproject.mediapackage.MediaPackageElement;
32 import org.opencastproject.mediapackage.MediaPackageElementBuilder;
33 import org.opencastproject.mediapackage.MediaPackageElementBuilderFactory;
34 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
35 import org.opencastproject.mediapackage.MediaPackageElementParser;
36 import org.opencastproject.mediapackage.MediaPackageException;
37 import org.opencastproject.mediapackage.Track;
38 import org.opencastproject.security.api.DefaultOrganization;
39 import org.opencastproject.security.api.Organization;
40 import org.opencastproject.security.api.OrganizationDirectoryService;
41 import org.opencastproject.security.api.SecurityService;
42 import org.opencastproject.security.api.UserDirectoryService;
43 import org.opencastproject.security.util.SecurityUtil;
44 import org.opencastproject.serviceregistry.api.ServiceRegistry;
45 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
46 import org.opencastproject.systems.OpencastConstants;
47 import org.opencastproject.transcription.api.TranscriptionService;
48 import org.opencastproject.transcription.api.TranscriptionServiceException;
49 import org.opencastproject.transcription.persistence.TranscriptionDatabase;
50 import org.opencastproject.transcription.persistence.TranscriptionDatabaseException;
51 import org.opencastproject.transcription.persistence.TranscriptionJobControl;
52 import org.opencastproject.transcription.persistence.TranscriptionProviderControl;
53 import org.opencastproject.util.LoadUtil;
54 import org.opencastproject.util.NotFoundException;
55 import org.opencastproject.util.OsgiUtil;
56 import org.opencastproject.util.UrlSupport;
57 import org.opencastproject.workflow.api.ConfiguredWorkflow;
58 import org.opencastproject.workflow.api.WorkflowDatabaseException;
59 import org.opencastproject.workflow.api.WorkflowDefinition;
60 import org.opencastproject.workflow.api.WorkflowInstance;
61 import org.opencastproject.workflow.api.WorkflowService;
62 import org.opencastproject.workingfilerepository.api.WorkingFileRepository;
63 import org.opencastproject.workspace.api.Workspace;
64
65 import org.apache.commons.lang3.StringUtils;
66 import org.apache.http.HttpEntity;
67 import org.apache.http.HttpHeaders;
68 import org.apache.http.HttpHost;
69 import org.apache.http.HttpStatus;
70 import org.apache.http.auth.AuthScope;
71 import org.apache.http.auth.UsernamePasswordCredentials;
72 import org.apache.http.client.AuthCache;
73 import org.apache.http.client.CredentialsProvider;
74 import org.apache.http.client.config.RequestConfig;
75 import org.apache.http.client.methods.CloseableHttpResponse;
76 import org.apache.http.client.methods.HttpGet;
77 import org.apache.http.client.methods.HttpPost;
78 import org.apache.http.client.protocol.HttpClientContext;
79 import org.apache.http.entity.FileEntity;
80 import org.apache.http.impl.auth.BasicScheme;
81 import org.apache.http.impl.client.BasicAuthCache;
82 import org.apache.http.impl.client.BasicCredentialsProvider;
83 import org.apache.http.impl.client.CloseableHttpClient;
84 import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
85 import org.apache.http.impl.client.HttpClients;
86 import org.apache.http.util.EntityUtils;
87 import org.json.simple.JSONArray;
88 import org.json.simple.JSONObject;
89 import org.json.simple.parser.JSONParser;
90 import org.osgi.service.component.ComponentContext;
91 import org.osgi.service.component.annotations.Activate;
92 import org.osgi.service.component.annotations.Component;
93 import org.osgi.service.component.annotations.Reference;
94 import org.slf4j.Logger;
95 import org.slf4j.LoggerFactory;
96
97 import java.io.ByteArrayInputStream;
98 import java.io.File;
99 import java.io.IOException;
100 import java.net.URI;
101 import java.net.URISyntaxException;
102 import java.util.Arrays;
103 import java.util.HashMap;
104 import java.util.HashSet;
105 import java.util.List;
106 import java.util.Map;
107 import java.util.Optional;
108 import java.util.Set;
109 import java.util.concurrent.Executors;
110 import java.util.concurrent.ScheduledExecutorService;
111 import java.util.concurrent.TimeUnit;
112
113 @Component(
114 immediate = true,
115 service = { TranscriptionService.class,IBMWatsonTranscriptionService.class },
116 property = {
117 "service.description=IBM Watson Transcription Service",
118 "provider=ibm.watson"
119 }
120 )
121 public class IBMWatsonTranscriptionService extends AbstractJobProducer implements TranscriptionService {
122
123
124 private static final Logger logger = LoggerFactory.getLogger(IBMWatsonTranscriptionService.class);
125
126 private static final String PROVIDER = "IBM Watson";
127
128 private static final String JOB_TYPE = "org.opencastproject.transcription.ibmwatson";
129
130 static final String TRANSCRIPT_COLLECTION = "transcripts";
131 static final String APIKEY = "apikey";
132 private static final int CONNECTION_TIMEOUT = 60000;
133 private static final int SOCKET_TIMEOUT = 60000;
134
135 public static final String DEFAULT_WF_DEF = "attach-watson-transcripts";
136 private static final long DEFAULT_COMPLETION_BUFFER = 600;
137 private static final long DEFAULT_DISPATCH_INTERVAL = 60;
138 private static final long DEFAULT_MAX_PROCESSING_TIME = 2 * 60 * 60;
139 private static final String DEFAULT_LANGUAGE = "en";
140
141
142 private static final int DEFAULT_CLEANUP_RESULTS_DAYS = 7;
143
144
145 public static final String ADMIN_URL_PROPERTY = "org.opencastproject.admin.ui.url";
146 private static final String DIGEST_USER_PROPERTY = "org.opencastproject.security.digest.user";
147
148
149 private static final String CLUSTER_NAME_PROPERTY = "org.opencastproject.environment.name";
150 private String clusterName = "";
151
152
153 public static final float DEFAULT_START_TRANSCRIPTION_JOB_LOAD = 0.1f;
154
155
156 public static final String START_TRANSCRIPTION_JOB_LOAD_KEY = "job.load.start.transcription";
157
158
159 private float jobLoad = DEFAULT_START_TRANSCRIPTION_JOB_LOAD;
160
161
162 public interface JobEvent {
163 String COMPLETED_WITH_RESULTS = "recognitions.completed_with_results";
164 String FAILED = "recognitions.failed";
165 }
166
167 public interface RecognitionJobStatus {
168 String COMPLETED = "completed";
169 String FAILED = "failed";
170 String PROCESSING = "processing";
171 String WAITING = "waiting";
172 }
173
174
175 private ServiceRegistry serviceRegistry;
176 private SecurityService securityService;
177 private UserDirectoryService userDirectoryService;
178 private OrganizationDirectoryService organizationDirectoryService;
179 private Workspace workspace;
180 private TranscriptionDatabase database;
181 private AssetManager assetManager;
182 private WorkflowService workflowService;
183 private WorkingFileRepository wfr;
184 private SmtpService smtpService;
185
186
187 private Workflows wfUtil;
188
189 private enum Operation {
190 StartTranscription
191 }
192
193 private static final String IBM_WATSON_SERVICE_URL = "https://stream.watsonplatform.net/speech-to-text/api";
194 private static final String API_VERSION = "v1";
195 private static final String REGISTER_CALLBACK = "register_callback";
196 private static final String RECOGNITIONS = "recognitions";
197 private static final String CALLBACK_PATH = "/transcripts/watson/results";
198
199
200 public static final String ENABLED_CONFIG = "enabled";
201 public static final String IBM_WATSON_SERVICE_URL_CONFIG = "ibm.watson.service.url";
202 public static final String IBM_WATSON_USER_CONFIG = "ibm.watson.user";
203 public static final String IBM_WATSON_PSW_CONFIG = "ibm.watson.password";
204 public static final String IBM_WATSON_API_KEY_CONFIG = "ibm.watson.api.key";
205 public static final String IBM_WATSON_MODEL_CONFIG = "ibm.watson.model";
206 public static final String WORKFLOW_CONFIG = "workflow";
207 public static final String DISPATCH_WORKFLOW_INTERVAL_CONFIG = "workflow.dispatch.interval";
208 public static final String COMPLETION_CHECK_BUFFER_CONFIG = "completion.check.buffer";
209 public static final String MAX_PROCESSING_TIME_CONFIG = "max.processing.time";
210 public static final String NOTIFICATION_EMAIL_CONFIG = "notification.email";
211 public static final String CLEANUP_RESULTS_DAYS_CONFIG = "cleanup.results.days";
212 public static final String MAX_ATTEMPTS_CONFIG = "max.attempts";
213 public static final String RETRY_WORKLFOW_CONFIG = "retry.workflow";
214
215
216 private boolean enabled = false;
217 private String watsonServiceUrl = UrlSupport.concat(IBM_WATSON_SERVICE_URL, API_VERSION);
218 private String model;
219 private String workflowDefinitionId = DEFAULT_WF_DEF;
220 private long workflowDispatchInterval = DEFAULT_DISPATCH_INTERVAL;
221 private long completionCheckBuffer = DEFAULT_COMPLETION_BUFFER;
222 private long maxProcessingSeconds = DEFAULT_MAX_PROCESSING_TIME;
223 private String toEmailAddress;
224 private int cleanupResultDays = DEFAULT_CLEANUP_RESULTS_DAYS;
225 private String language = DEFAULT_LANGUAGE;
226 private int maxAttempts = 1;
227 private String retryWfDefId = null;
228
229 private String systemAccount;
230 private String serverUrl;
231 private String callbackUrl;
232 private boolean callbackAlreadyRegistered = false;
233 private ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);
234
235 private AuthCache authCache;
236 private CredentialsProvider credentialsProvider;
237
238 public IBMWatsonTranscriptionService() {
239 super(JOB_TYPE);
240 }
241
242 @Override
243 @Activate
244 public void activate(ComponentContext cc) {
245 if (cc != null) {
246
247 enabled = OsgiUtil.getOptCfgAsBoolean(cc.getProperties(), ENABLED_CONFIG).get();
248
249 if (enabled) {
250
251 Optional<String> urlOpt = OsgiUtil.getOptCfg(cc.getProperties(), IBM_WATSON_SERVICE_URL_CONFIG);
252 if (urlOpt.isPresent()) {
253 watsonServiceUrl = UrlSupport.concat(urlOpt.get(), API_VERSION);
254 }
255
256
257
258 String user;
259 String psw;
260 Optional<String> keyOpt = OsgiUtil.getOptCfg(cc.getProperties(), IBM_WATSON_API_KEY_CONFIG);
261 if (keyOpt.isPresent()) {
262 user = APIKEY;
263 psw = keyOpt.get();
264 logger.info("Using transcription service at {} with api key", watsonServiceUrl);
265 } else {
266
267 user = OsgiUtil.getComponentContextProperty(cc, IBM_WATSON_USER_CONFIG);
268
269 psw = OsgiUtil.getComponentContextProperty(cc, IBM_WATSON_PSW_CONFIG);
270 logger.info("Using transcription service at {} with username {}", watsonServiceUrl, user);
271 }
272
273 try {
274 URI uri = new URI(watsonServiceUrl);
275 HttpHost targetHost = new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme());
276 credentialsProvider = new BasicCredentialsProvider();
277 credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, psw));
278 authCache = new BasicAuthCache();
279 authCache.put(targetHost, new BasicScheme());
280 } catch (URISyntaxException e) {
281 throw new RuntimeException("Watson STT service url is not valid: " + watsonServiceUrl, e);
282 }
283
284
285 Optional<String> modelOpt = OsgiUtil.getOptCfg(cc.getProperties(), IBM_WATSON_MODEL_CONFIG);
286 if (modelOpt.isPresent()) {
287 model = modelOpt.get();
288 language = StringUtils.substringBefore(model, "-");
289 logger.info("Model is {}", model);
290 } else {
291 logger.info("Default model will be used");
292 }
293
294
295 Optional<String> wfOpt = OsgiUtil.getOptCfg(cc.getProperties(), WORKFLOW_CONFIG);
296 if (wfOpt.isPresent()) {
297 workflowDefinitionId = wfOpt.get();
298 }
299 logger.info("Workflow definition is {}", workflowDefinitionId);
300
301 Optional<String> intervalOpt = OsgiUtil.getOptCfg(cc.getProperties(), DISPATCH_WORKFLOW_INTERVAL_CONFIG);
302 if (intervalOpt.isPresent()) {
303 try {
304 workflowDispatchInterval = Long.parseLong(intervalOpt.get());
305 } catch (NumberFormatException e) {
306
307 }
308 }
309 logger.info("Workflow dispatch interval is {} seconds", workflowDispatchInterval);
310
311 Optional<String> bufferOpt = OsgiUtil.getOptCfg(cc.getProperties(), COMPLETION_CHECK_BUFFER_CONFIG);
312 if (bufferOpt.isPresent()) {
313 try {
314 completionCheckBuffer = Long.parseLong(bufferOpt.get());
315 } catch (NumberFormatException e) {
316
317 logger.warn("Invalid configuration for {} : {}. Default used instead: {}",
318 COMPLETION_CHECK_BUFFER_CONFIG, bufferOpt.get(), completionCheckBuffer);
319 }
320 }
321 logger.info("Completion check buffer is {} seconds", completionCheckBuffer);
322
323 Optional<String> maxProcessingOpt = OsgiUtil.getOptCfg(cc.getProperties(), MAX_PROCESSING_TIME_CONFIG);
324 if (maxProcessingOpt.isPresent()) {
325 try {
326 maxProcessingSeconds = Long.parseLong(maxProcessingOpt.get());
327 } catch (NumberFormatException e) {
328
329 }
330 }
331 logger.info("Maximum time a job is checked after it should have ended is {} seconds", maxProcessingSeconds);
332
333 Optional<String> cleaupOpt = OsgiUtil.getOptCfg(cc.getProperties(), CLEANUP_RESULTS_DAYS_CONFIG);
334 if (cleaupOpt.isPresent()) {
335 try {
336 cleanupResultDays = Integer.parseInt(cleaupOpt.get());
337 } catch (NumberFormatException e) {
338
339 }
340 }
341 logger.info("Cleanup result files after {} days", cleanupResultDays);
342
343
344 Optional<String> maxAttemptsOpt = OsgiUtil.getOptCfg(cc.getProperties(), MAX_ATTEMPTS_CONFIG);
345 if (maxAttemptsOpt.isPresent()) {
346 try {
347 maxAttempts = Integer.parseInt(maxAttemptsOpt.get());
348 retryWfDefId = OsgiUtil.getComponentContextProperty(cc, RETRY_WORKLFOW_CONFIG);
349 } catch (NumberFormatException e) {
350
351 logger.warn("Invalid configuration for {} : {}. Default used instead: no retries", MAX_ATTEMPTS_CONFIG,
352 maxAttemptsOpt.get());
353 }
354 } else {
355 logger.info("No retries in case of errors");
356 }
357
358 serverUrl = OsgiUtil.getContextProperty(cc, OpencastConstants.SERVER_URL_PROPERTY);
359 systemAccount = OsgiUtil.getContextProperty(cc, DIGEST_USER_PROPERTY);
360
361 jobLoad = LoadUtil.getConfiguredLoadValue(cc.getProperties(), START_TRANSCRIPTION_JOB_LOAD_KEY,
362 DEFAULT_START_TRANSCRIPTION_JOB_LOAD, serviceRegistry);
363
364
365 scheduledExecutor.scheduleWithFixedDelay(new WorkflowDispatcher(), 120, workflowDispatchInterval,
366 TimeUnit.SECONDS);
367
368
369 scheduledExecutor.scheduleWithFixedDelay(new ResultsFileCleanup(), 1, 1, TimeUnit.DAYS);
370
371
372 Optional<String> optTo = OsgiUtil.getOptCfg(cc.getProperties(), NOTIFICATION_EMAIL_CONFIG);
373 if (optTo.isPresent()) {
374 toEmailAddress = optTo.get();
375 } else {
376
377 optTo = OsgiUtil.getOptContextProperty(cc, ADMIN_EMAIL_PROPERTY);
378 if (optTo.isPresent()) {
379 toEmailAddress = optTo.get();
380 }
381 }
382 if (toEmailAddress != null) {
383 logger.info("Notification email set to {}", toEmailAddress);
384 } else {
385 logger.warn("Email notification disabled");
386 }
387
388 Optional<String> optCluster = OsgiUtil.getOptContextProperty(cc, CLUSTER_NAME_PROPERTY);
389 if (optCluster.isPresent()) {
390 clusterName = optCluster.get();
391 }
392 logger.info("Environment name is {}", clusterName);
393
394 logger.info("Activated!");
395
396 } else {
397 logger.info("Service disabled. If you want to enable it, please update the service configuration.");
398 }
399 } else {
400 throw new IllegalArgumentException("Missing component context");
401 }
402 }
403
404 @Override
405 public Job startTranscription(String mpId, Track track) throws TranscriptionServiceException {
406 if (!enabled) {
407 throw new TranscriptionServiceException(
408 "This service is disabled. If you want to enable it, please update the service configuration.");
409 }
410
411 try {
412 return serviceRegistry.createJob(JOB_TYPE, Operation.StartTranscription.name(),
413 Arrays.asList(mpId, MediaPackageElementParser.getAsXml(track)), jobLoad);
414 } catch (ServiceRegistryException e) {
415 throw new TranscriptionServiceException("Unable to create a job", e);
416 } catch (MediaPackageException e) {
417 throw new TranscriptionServiceException("Invalid track " + track.toString(), e);
418 }
419 }
420
421 @Override
422 public Job startTranscription(String mpId, Track track, String... args) {
423 throw new UnsupportedOperationException("Not supported.");
424 }
425
426 @Override
427 public void transcriptionDone(String mpId, Object obj) throws TranscriptionServiceException {
428 JSONObject jsonObj = null;
429 String jobId = null;
430 try {
431 jsonObj = (JSONObject) obj;
432 jobId = (String) jsonObj.get("id");
433
434
435
436
437
438
439
440
441
442 if (jsonObj.get("results") instanceof JSONArray) {
443 JSONArray resultsArray = (JSONArray) jsonObj.get("results");
444 if (resultsArray != null && resultsArray.size() > 0) {
445 String error = (String) ((JSONObject) resultsArray.get(0)).get("error");
446 if (!StringUtils.isEmpty(error)) {
447 retryOrError(jobId, mpId,
448 String.format("Transcription completed with error for mpId %s, jobId %s: %s", mpId, jobId, error));
449 return;
450 }
451 }
452 }
453
454 logger.info("Transcription done for mpId {}, jobId {}", mpId, jobId);
455
456
457
458
459 database.updateJobControl(jobId, TranscriptionJobControl.Status.TranscriptionComplete.name());
460
461
462 if (jsonObj.get("results") != null) {
463 saveResults(jobId, jsonObj);
464 }
465 } catch (IOException e) {
466 logger.warn("Could not save transcription results file for mpId {}, jobId {}: {}",
467 mpId, jobId, jsonObj == null ? "null" : jsonObj.toJSONString());
468 throw new TranscriptionServiceException("Could not save transcription results file", e);
469 } catch (TranscriptionDatabaseException e) {
470 logger.warn("Error when updating state in database for mpId {}, jobId {}", mpId, jobId);
471 throw new TranscriptionServiceException("Could not update transcription job control db", e);
472 }
473 }
474
475 @Override
476 public void transcriptionError(String mpId, Object obj) throws TranscriptionServiceException {
477 JSONObject jsonObj = (JSONObject) obj;
478 String jobId = (String) jsonObj.get("id");
479 try {
480 retryOrError(jobId, mpId, String.format("Transcription error for media package %s, job id %s", mpId, jobId));
481 } catch (TranscriptionDatabaseException e) {
482 throw new TranscriptionServiceException("Error when updating job state.", e);
483 }
484 }
485
486 @Override
487 public String getLanguage() {
488 return language;
489 }
490
491 @Override
492 public Map<String, Object> getReturnValues(String mpId, String jobId) throws TranscriptionServiceException {
493 throw new TranscriptionServiceException("Method not implemented");
494 }
495
496 @Override
497 protected String process(Job job) throws Exception {
498 Operation op = null;
499 String operation = job.getOperation();
500 List<String> arguments = job.getArguments();
501 String result = "";
502
503 op = Operation.valueOf(operation);
504
505 switch (op) {
506 case StartTranscription:
507 String mpId = arguments.get(0);
508 Track track = (Track) MediaPackageElementParser.getFromXml(arguments.get(1));
509 createRecognitionsJob(mpId, track);
510 break;
511 default:
512 throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
513 }
514
515 return result;
516 }
517
518
519
520
521
522
523
524
525
526
527
528
529 void registerCallback() throws TranscriptionServiceException {
530 if (callbackAlreadyRegistered) {
531 return;
532 }
533
534 Organization org = securityService.getOrganization();
535 String adminUrl = StringUtils.trimToNull(org.getProperties().get(ADMIN_URL_PROPERTY));
536 if (adminUrl != null) {
537 callbackUrl = adminUrl + CALLBACK_PATH;
538 } else {
539 callbackUrl = serverUrl + CALLBACK_PATH;
540 }
541 logger.info("Callback url is {}", callbackUrl);
542
543 CloseableHttpClient httpClient = makeHttpClient();
544 HttpPost httpPost = new HttpPost(
545 UrlSupport.concat(watsonServiceUrl, REGISTER_CALLBACK) + String.format("?callback_url=%s", callbackUrl));
546
547 HttpClientContext context = HttpClientContext.create();
548 context.setCredentialsProvider(credentialsProvider);
549 context.setAuthCache(authCache);
550 CloseableHttpResponse response = null;
551
552 try {
553 response = httpClient.execute(httpPost, context);
554 int code = response.getStatusLine().getStatusCode();
555
556 switch (code) {
557 case HttpStatus.SC_OK:
558 logger.info("Callback url: {} had already already been registered", callbackUrl);
559 callbackAlreadyRegistered = true;
560 EntityUtils.consume(response.getEntity());
561 break;
562 case HttpStatus.SC_CREATED:
563 logger.info("Callback url: {} has been successfully registered", callbackUrl);
564 callbackAlreadyRegistered = true;
565 EntityUtils.consume(response.getEntity());
566 break;
567 case HttpStatus.SC_BAD_REQUEST:
568 logger.warn("Callback url {} could not be verified, status: {}", callbackUrl, code);
569 break;
570 case HttpStatus.SC_SERVICE_UNAVAILABLE:
571 logger.warn("Service unavailable when registering callback url {} status: {}", callbackUrl, code);
572 break;
573 default:
574 logger.warn("Unknown status when registering callback url {}, status: {}", callbackUrl, code);
575 break;
576 }
577 } catch (Exception e) {
578 logger.warn("Exception when calling the the register callback endpoint", e);
579 } finally {
580 try {
581 httpClient.close();
582 if (response != null) {
583 response.close();
584 }
585 } catch (IOException e) {
586 }
587 }
588 }
589
590
591
592
593
594
595
596
597
598
599
600
601
602 void createRecognitionsJob(String mpId, Track track) throws TranscriptionServiceException {
603 if (!callbackAlreadyRegistered) {
604 registerCallback();
605 }
606
607
608 File audioFile = null;
609 try {
610 audioFile = workspace.get(track.getURI());
611 } catch (Exception e) {
612 throw new TranscriptionServiceException("Error reading audio track", e);
613 }
614
615 CloseableHttpClient httpClient = makeHttpClient();
616 String additionalParms = "";
617 if (callbackAlreadyRegistered) {
618 additionalParms = String.format("&user_token=%s&callback_url=%s&events=%s,%s", mpId, callbackUrl,
619 JobEvent.COMPLETED_WITH_RESULTS, JobEvent.FAILED);
620 }
621 if (!StringUtils.isEmpty(model)) {
622 additionalParms += String.format("&model=%s", model);
623 }
624
625 HttpClientContext context = HttpClientContext.create();
626 context.setCredentialsProvider(credentialsProvider);
627 context.setAuthCache(authCache);
628 CloseableHttpResponse response = null;
629 try {
630 HttpPost httpPost = new HttpPost(UrlSupport.concat(watsonServiceUrl, RECOGNITIONS)
631 + String.format(
632 "?inactivity_timeout=-1×tamps=true&smart_formatting=true%s", additionalParms));
633 logger.debug("Url to invoke ibm watson service: {}", httpPost.getURI().toString());
634 httpPost.setHeader(HttpHeaders.CONTENT_TYPE, track.getMimeType().toString());
635 FileEntity fileEntity = new FileEntity(audioFile);
636 fileEntity.setChunked(true);
637 httpPost.setEntity(fileEntity);
638 response = httpClient.execute(httpPost, context);
639 int code = response.getStatusLine().getStatusCode();
640
641 switch (code) {
642 case HttpStatus.SC_CREATED:
643 logger.info("Recognitions job has been successfully created");
644
645 HttpEntity entity = response.getEntity();
646
647
648
649
650
651
652
653 String jsonString = EntityUtils.toString(response.getEntity());
654 JSONParser jsonParser = new JSONParser();
655 JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
656 String jobId = (String) jsonObject.get("id");
657 String jobStatus = (String) jsonObject.get("status");
658 String jobUrl = (String) jsonObject.get("url");
659 logger.info("Transcription for mp {} has been submitted. Job id: {}, job status: {}, job url: {}", mpId,
660 jobId, jobStatus, jobUrl);
661
662 database.storeJobControl(mpId, track.getIdentifier(), jobId, TranscriptionJobControl.Status.InProgress.name(),
663 track.getDuration() == null ? 0 : track.getDuration().longValue(), null, PROVIDER);
664 EntityUtils.consume(entity);
665 return;
666 case HttpStatus.SC_BAD_REQUEST:
667 logger.info("Invalid argument returned, status: {}", code);
668 break;
669 case HttpStatus.SC_SERVICE_UNAVAILABLE:
670 logger.info("Service unavailable returned, status: {}", code);
671 break;
672 default:
673 logger.info("Unknown return status: {}.", code);
674 break;
675 }
676 throw new TranscriptionServiceException("Could not create recognition job. Status returned: " + code);
677 } catch (Exception e) {
678 logger.warn("Exception when calling the recognitions endpoint", e);
679 throw new TranscriptionServiceException("Exception when calling the recognitions endpoint", e);
680 } finally {
681 try {
682 httpClient.close();
683 if (response != null) {
684 response.close();
685 }
686 } catch (IOException e) {
687 }
688 }
689 }
690
691
692
693
694
695
696
697
698
699
700
701 String getAndSaveJobResults(String jobId) throws TranscriptionServiceException {
702 CloseableHttpClient httpClient = makeHttpClient();
703
704 HttpClientContext context = HttpClientContext.create();
705 context.setCredentialsProvider(credentialsProvider);
706 context.setAuthCache(authCache);
707 CloseableHttpResponse response = null;
708 String mpId = "unknown";
709 try {
710 HttpGet httpGet = new HttpGet(UrlSupport.concat(watsonServiceUrl, RECOGNITIONS, jobId));
711 response = httpClient.execute(httpGet, context);
712 int code = response.getStatusLine().getStatusCode();
713
714 switch (code) {
715 case HttpStatus.SC_OK:
716 HttpEntity entity = response.getEntity();
717
718
719 String jsonString = EntityUtils.toString(entity);
720 JSONParser jsonParser = new JSONParser();
721 JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
722 String jobStatus = (String) jsonObject.get("status");
723 mpId = (String) jsonObject.get("user_token");
724
725 if (mpId == null) {
726 TranscriptionJobControl jc = database.findByJob(jobId);
727 if (jc != null) {
728 mpId = jc.getMediaPackageId();
729 }
730 }
731 logger.info("Recognitions job {} has been found, status {}", jobId, jobStatus);
732 EntityUtils.consume(entity);
733
734 if (jobStatus.indexOf(RecognitionJobStatus.COMPLETED) > -1 && jsonObject.get("results") != null) {
735 transcriptionDone(mpId, jsonObject);
736 }
737 return jobStatus;
738 case HttpStatus.SC_NOT_FOUND:
739 logger.info("Job not found: {}", jobId);
740 break;
741 case HttpStatus.SC_SERVICE_UNAVAILABLE:
742 logger.info("Service unavailable returned, status: {}", code);
743 break;
744 default:
745 logger.info("Unknown return status: {}.", code);
746 break;
747 }
748 throw new TranscriptionServiceException(
749 String.format("Could not check recognition job for media package %s, job id %s. Status returned: %d",
750 mpId, jobId, code),
751 code);
752 } catch (TranscriptionServiceException e) {
753 throw e;
754 } catch (Exception e) {
755 logger.warn("Exception when calling the recognitions endpoint for media package {}, job id {}",
756 mpId, jobId, e);
757 throw new TranscriptionServiceException(String.format(
758 "Exception when calling the recognitions endpoint for media package %s, job id %s", mpId, jobId), e);
759 } finally {
760 try {
761 httpClient.close();
762 if (response != null) {
763 response.close();
764 }
765 } catch (IOException e) {
766 }
767 }
768 }
769
770 private void saveResults(String jobId, JSONObject jsonObj) throws IOException {
771 if (jsonObj.get("results") != null) {
772
773 workspace.putInCollection(TRANSCRIPT_COLLECTION, jobId + ".json",
774 new ByteArrayInputStream(jsonObj.toJSONString().getBytes()));
775 }
776 }
777
778 @Override
779 public MediaPackageElement getGeneratedTranscription(String mpId, String jobId, MediaPackageElement.Type type)
780 throws TranscriptionServiceException {
781 try {
782
783 if (jobId == null || "null".equals(jobId)) {
784 jobId = null;
785 for (TranscriptionJobControl jc : database.findByMediaPackage(mpId)) {
786 if (TranscriptionJobControl.Status.Closed.name().equals(jc.getStatus())
787 || TranscriptionJobControl.Status.TranscriptionComplete.name().equals(jc.getStatus())) {
788 jobId = jc.getTranscriptionJobId();
789 }
790 }
791 }
792
793 if (jobId == null) {
794 throw new TranscriptionServiceException(
795 "No completed or closed transcription job found in database for media package " + mpId);
796 }
797
798
799 URI uri = workspace.getCollectionURI(TRANSCRIPT_COLLECTION, jobId + ".json");
800 try {
801 workspace.get(uri);
802 } catch (Exception e) {
803
804 getAndSaveJobResults(jobId);
805 }
806 MediaPackageElementBuilder builder = MediaPackageElementBuilderFactory.newInstance().newElementBuilder();
807 return builder.elementFromURI(uri, type, new MediaPackageElementFlavor("captions", "ibm-watson-json"));
808 } catch (TranscriptionDatabaseException e) {
809 throw new TranscriptionServiceException("Job id not informed and could not find transcription", e);
810 }
811 }
812
813 protected CloseableHttpClient makeHttpClient() {
814 RequestConfig reqConfig = RequestConfig.custom().setConnectTimeout(CONNECTION_TIMEOUT)
815 .setSocketTimeout(SOCKET_TIMEOUT).setConnectionRequestTimeout(CONNECTION_TIMEOUT).build();
816 return HttpClients.custom().setDefaultRequestConfig(reqConfig)
817 .setRetryHandler(new DefaultHttpRequestRetryHandler(3, true)).build();
818 }
819
820 protected void retryOrError(String jobId, String mpId, String errorMsg) throws TranscriptionDatabaseException {
821 logger.warn(errorMsg);
822
823
824 TranscriptionJobControl jc = database.findByJob(jobId);
825 String trackId = jc.getTrackId();
826
827 int attempts = database
828 .findByMediaPackageTrackAndStatus(mpId, trackId, TranscriptionJobControl.Status.Error.name(),
829 TranscriptionJobControl.Status.InProgress.name(), TranscriptionJobControl.Status.Canceled.name())
830 .size();
831 if (attempts < maxAttempts) {
832
833 database.updateJobControl(jobId, TranscriptionJobControl.Status.Retry.name());
834 logger.info("Will retry transcription for media package {}, track {}", mpId, trackId);
835 } else {
836
837 database.updateJobControl(jobId, TranscriptionJobControl.Status.Error.name());
838
839 logger.error("{} transcription attempts exceeded maximum of {} for media package {}, track {}.", attempts,
840 maxAttempts, mpId, trackId);
841 sendEmail("Transcription ERROR", String.format(errorMsg, mpId, jobId));
842 }
843 }
844
845 private void sendEmail(String subject, String body) {
846 if (toEmailAddress == null) {
847 logger.info("Skipping sending email notification. Message is {}.", body);
848 return;
849 }
850 try {
851 logger.debug("Sending e-mail notification to {}", toEmailAddress);
852 smtpService.send(toEmailAddress, String.format("%s (%s)", subject, clusterName), body);
853 logger.info("Sent e-mail notification to {}", toEmailAddress);
854 } catch (Exception e) {
855 logger.error("Could not send email: {}\n{}", subject, body, e);
856 }
857 }
858
859 public boolean isCallbackAlreadyRegistered() {
860 return callbackAlreadyRegistered;
861 }
862
863 @Reference
864 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
865 this.serviceRegistry = serviceRegistry;
866 }
867
868 @Reference
869 public void setSecurityService(SecurityService securityService) {
870 this.securityService = securityService;
871 }
872
873 @Reference
874 public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
875 this.userDirectoryService = userDirectoryService;
876 }
877
878 @Reference
879 public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
880 this.organizationDirectoryService = organizationDirectoryService;
881 }
882
883 @Reference
884 public void setSmtpService(SmtpService service) {
885 this.smtpService = service;
886 }
887
888 @Reference
889 public void setWorkspace(Workspace ws) {
890 this.workspace = ws;
891 }
892
893 @Reference
894 public void setWorkingFileRepository(WorkingFileRepository wfr) {
895 this.wfr = wfr;
896 }
897
898 @Reference
899 public void setDatabase(TranscriptionDatabase service) {
900 this.database = service;
901 }
902
903 @Reference
904 public void setAssetManager(AssetManager service) {
905 this.assetManager = service;
906 }
907
908 @Reference
909 public void setWorkflowService(WorkflowService service) {
910 this.workflowService = service;
911 }
912
913 @Override
914 protected ServiceRegistry getServiceRegistry() {
915 return serviceRegistry;
916 }
917
918 @Override
919 protected SecurityService getSecurityService() {
920 return securityService;
921 }
922
923 @Override
924 protected UserDirectoryService getUserDirectoryService() {
925 return userDirectoryService;
926 }
927
928 @Override
929 protected OrganizationDirectoryService getOrganizationDirectoryService() {
930 return organizationDirectoryService;
931 }
932
933
934 void setWfUtil(Workflows wfUtil) {
935 this.wfUtil = wfUtil;
936 }
937
938 class WorkflowDispatcher implements Runnable {
939
940
941
942
943
944
945 @Override
946 public void run() {
947 logger.debug("WorkflowDispatcher waking up...");
948
949 try {
950
951
952 long providerId;
953 TranscriptionProviderControl providerInfo = database.findIdByProvider(PROVIDER);
954 if (providerInfo != null) {
955 providerId = providerInfo.getId();
956 } else {
957 logger.warn("No provider entry for {}", PROVIDER);
958 return;
959 }
960
961 List<TranscriptionJobControl> jobs = database.findByStatus(TranscriptionJobControl.Status.InProgress.name(),
962 TranscriptionJobControl.Status.TranscriptionComplete.name());
963
964 for (TranscriptionJobControl j : jobs) {
965
966
967 if (j.getProviderId() != providerId) {
968 continue;
969 }
970
971 String mpId = j.getMediaPackageId();
972 String jobId = j.getTranscriptionJobId();
973
974
975
976 if (TranscriptionJobControl.Status.InProgress.name().equals(j.getStatus())) {
977
978
979 if (j.getDateCreated().getTime() + j.getTrackDuration() + completionCheckBuffer * 1000 < System
980 .currentTimeMillis()) {
981 try {
982 String jobStatus = getAndSaveJobResults(jobId);
983 if (RecognitionJobStatus.FAILED.equals(jobStatus)) {
984 retryOrError(jobId, mpId,
985 String.format("Transcription job failed for mpId %s, jobId %s", mpId, jobId));
986 continue;
987 } else if (RecognitionJobStatus.PROCESSING.equals(jobStatus)
988 || RecognitionJobStatus.WAITING.equals(jobStatus)) {
989
990 if (j.getDateCreated().getTime() + j.getTrackDuration()
991 + (completionCheckBuffer + maxProcessingSeconds) * 1000 < System.currentTimeMillis()) {
992
993 retryOrError(jobId, mpId, String.format(
994 "Transcription job was in waiting or processing state for too long "
995 + "(media package %s, job id %s)", mpId, jobId));
996 }
997
998 continue;
999 }
1000 } catch (TranscriptionServiceException e) {
1001 if (e.getCode() == 404) {
1002
1003 database.updateJobControl(jobId, TranscriptionJobControl.Status.Canceled.name());
1004
1005 sendEmail("Transcription ERROR",
1006 String.format("Transcription job was not found (media package %s, job id %s).", mpId, jobId));
1007 }
1008 continue;
1009 }
1010 } else {
1011 continue;
1012 }
1013 }
1014
1015
1016 try {
1017
1018 Map<String, String> params = new HashMap<String, String>();
1019 params.put("transcriptionJobId", jobId);
1020 String wfId = startWorkflow(mpId, workflowDefinitionId, params);
1021 if (wfId == null) {
1022 logger.warn("Attach transcription workflow could NOT be scheduled for mp {}, watson job {}", mpId, jobId);
1023 continue;
1024 }
1025
1026 database.updateJobControl(jobId, TranscriptionJobControl.Status.Closed.name());
1027 logger.info("Attach transcription workflow {} scheduled for mp {}, watson job {}",
1028 wfId, mpId, jobId);
1029 } catch (Exception e) {
1030 logger.warn(
1031 "Attach transcription workflow could NOT be scheduled for media package {}, watson job {}, {}: {}",
1032 mpId, jobId, e.getClass().getName(), e.getMessage()
1033 );
1034 }
1035 }
1036
1037 if (maxAttempts > 1) {
1038
1039 jobs = database.findByStatus(TranscriptionJobControl.Status.Retry.name());
1040 HashMap<String, String> params = new HashMap<String, String>();
1041 for (TranscriptionJobControl j : jobs) {
1042 String mpId = j.getMediaPackageId();
1043 String wfId = startWorkflow(mpId, retryWfDefId, params);
1044 String jobId = j.getTranscriptionJobId();
1045 if (wfId == null) {
1046 logger.warn(
1047 "Retry transcription workflow could NOT be scheduled for mp {}, watson job {}. "
1048 + "Will try again next time.",
1049 mpId, jobId);
1050
1051 continue;
1052 }
1053 logger.info("Retry transcription workflow {} scheduled for mp {}.", wfId, mpId);
1054
1055 database.updateJobControl(jobId, TranscriptionJobControl.Status.Error.name());
1056 }
1057 }
1058 } catch (TranscriptionDatabaseException e) {
1059 logger.warn("Could not read/update transcription job control database.", e);
1060 }
1061 }
1062 }
1063
1064 private String startWorkflow(String mpId, String wfDefId, Map<String, String> params) {
1065 DefaultOrganization defaultOrg = new DefaultOrganization();
1066 securityService.setOrganization(defaultOrg);
1067 securityService.setUser(SecurityUtil.createSystemUser(systemAccount, defaultOrg));
1068
1069
1070 Optional<Snapshot> snapshot = assetManager.getLatestSnapshot(mpId);
1071 if (snapshot.isEmpty()) {
1072
1073 logger.warn("Media package {} has not been archived yet.", mpId);
1074 return null;
1075 }
1076
1077 String org = snapshot.get().getOrganizationId();
1078 Organization organization = null;
1079 try {
1080 organization = organizationDirectoryService.getOrganization(org);
1081 if (organization == null) {
1082 logger.warn("Media package {} has an unknown organization {}.", mpId, org);
1083 return null;
1084 }
1085 } catch (NotFoundException e) {
1086 logger.warn("Organization {} not found for media package {}.", org, mpId);
1087 return null;
1088 }
1089 securityService.setOrganization(organization);
1090
1091 try {
1092 WorkflowDefinition wfDef = workflowService.getWorkflowDefinitionById(wfDefId);
1093 Workflows workflows = wfUtil != null ? wfUtil : new Workflows(assetManager, workflowService);
1094 Set<String> mpIds = new HashSet<String>();
1095 mpIds.add(mpId);
1096 List<WorkflowInstance> wfList = workflows
1097 .applyWorkflowToLatestVersion(mpIds, ConfiguredWorkflow.workflow(wfDef, params));
1098 return wfList.size() > 0 ? Long.toString(wfList.get(0).getId()) : null;
1099 } catch (NotFoundException | WorkflowDatabaseException e) {
1100 logger.warn("Could not get workflow definition: {}", wfDefId);
1101 }
1102
1103 return null;
1104 }
1105
1106
1107
1108
1109
1110
1111 public boolean isEnabled() {
1112 return enabled;
1113 }
1114
1115 class ResultsFileCleanup implements Runnable {
1116 @Override
1117 public void run() {
1118 logger.info("ResultsFileCleanup waking up...");
1119 try {
1120
1121 wfr.cleanupOldFilesFromCollection(TRANSCRIPT_COLLECTION, cleanupResultDays);
1122 } catch (IOException e) {
1123 logger.warn("Could not cleanup old transcript results files", e);
1124 }
1125 }
1126 }
1127
1128 }