1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.transcription.ibmwatson;
22
23 import static org.opencastproject.systems.OpencastConstants.ADMIN_EMAIL_PROPERTY;
24
25 import org.opencastproject.assetmanager.api.AssetManager;
26 import org.opencastproject.assetmanager.api.fn.Enrichments;
27 import org.opencastproject.assetmanager.api.query.AQueryBuilder;
28 import org.opencastproject.assetmanager.api.query.AResult;
29 import org.opencastproject.assetmanager.util.Workflows;
30 import org.opencastproject.job.api.AbstractJobProducer;
31 import org.opencastproject.job.api.Job;
32 import org.opencastproject.kernel.mail.SmtpService;
33 import org.opencastproject.mediapackage.MediaPackageElement;
34 import org.opencastproject.mediapackage.MediaPackageElementBuilder;
35 import org.opencastproject.mediapackage.MediaPackageElementBuilderFactory;
36 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
37 import org.opencastproject.mediapackage.MediaPackageElementParser;
38 import org.opencastproject.mediapackage.MediaPackageException;
39 import org.opencastproject.mediapackage.Track;
40 import org.opencastproject.security.api.DefaultOrganization;
41 import org.opencastproject.security.api.Organization;
42 import org.opencastproject.security.api.OrganizationDirectoryService;
43 import org.opencastproject.security.api.SecurityService;
44 import org.opencastproject.security.api.UserDirectoryService;
45 import org.opencastproject.security.util.SecurityUtil;
46 import org.opencastproject.serviceregistry.api.ServiceRegistry;
47 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
48 import org.opencastproject.systems.OpencastConstants;
49 import org.opencastproject.transcription.api.TranscriptionService;
50 import org.opencastproject.transcription.api.TranscriptionServiceException;
51 import org.opencastproject.transcription.persistence.TranscriptionDatabase;
52 import org.opencastproject.transcription.persistence.TranscriptionDatabaseException;
53 import org.opencastproject.transcription.persistence.TranscriptionJobControl;
54 import org.opencastproject.transcription.persistence.TranscriptionProviderControl;
55 import org.opencastproject.util.LoadUtil;
56 import org.opencastproject.util.NotFoundException;
57 import org.opencastproject.util.OsgiUtil;
58 import org.opencastproject.util.UrlSupport;
59 import org.opencastproject.util.data.Option;
60 import org.opencastproject.workflow.api.ConfiguredWorkflow;
61 import org.opencastproject.workflow.api.WorkflowDatabaseException;
62 import org.opencastproject.workflow.api.WorkflowDefinition;
63 import org.opencastproject.workflow.api.WorkflowInstance;
64 import org.opencastproject.workflow.api.WorkflowService;
65 import org.opencastproject.workingfilerepository.api.WorkingFileRepository;
66 import org.opencastproject.workspace.api.Workspace;
67
68 import org.apache.commons.lang3.StringUtils;
69 import org.apache.http.HttpEntity;
70 import org.apache.http.HttpHeaders;
71 import org.apache.http.HttpHost;
72 import org.apache.http.HttpStatus;
73 import org.apache.http.auth.AuthScope;
74 import org.apache.http.auth.UsernamePasswordCredentials;
75 import org.apache.http.client.AuthCache;
76 import org.apache.http.client.CredentialsProvider;
77 import org.apache.http.client.config.RequestConfig;
78 import org.apache.http.client.methods.CloseableHttpResponse;
79 import org.apache.http.client.methods.HttpGet;
80 import org.apache.http.client.methods.HttpPost;
81 import org.apache.http.client.protocol.HttpClientContext;
82 import org.apache.http.entity.FileEntity;
83 import org.apache.http.impl.auth.BasicScheme;
84 import org.apache.http.impl.client.BasicAuthCache;
85 import org.apache.http.impl.client.BasicCredentialsProvider;
86 import org.apache.http.impl.client.CloseableHttpClient;
87 import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
88 import org.apache.http.impl.client.HttpClients;
89 import org.apache.http.util.EntityUtils;
90 import org.json.simple.JSONArray;
91 import org.json.simple.JSONObject;
92 import org.json.simple.parser.JSONParser;
93 import org.osgi.service.component.ComponentContext;
94 import org.osgi.service.component.annotations.Activate;
95 import org.osgi.service.component.annotations.Component;
96 import org.osgi.service.component.annotations.Reference;
97 import org.slf4j.Logger;
98 import org.slf4j.LoggerFactory;
99
100 import java.io.ByteArrayInputStream;
101 import java.io.File;
102 import java.io.IOException;
103 import java.net.URI;
104 import java.net.URISyntaxException;
105 import java.util.Arrays;
106 import java.util.HashMap;
107 import java.util.HashSet;
108 import java.util.List;
109 import java.util.Map;
110 import java.util.Set;
111 import java.util.concurrent.Executors;
112 import java.util.concurrent.ScheduledExecutorService;
113 import java.util.concurrent.TimeUnit;
114
115 @Component(
116 immediate = true,
117 service = { TranscriptionService.class,IBMWatsonTranscriptionService.class },
118 property = {
119 "service.description=IBM Watson Transcription Service",
120 "provider=ibm.watson"
121 }
122 )
123 public class IBMWatsonTranscriptionService extends AbstractJobProducer implements TranscriptionService {
124
125
126 private static final Logger logger = LoggerFactory.getLogger(IBMWatsonTranscriptionService.class);
127
128 private static final String PROVIDER = "IBM Watson";
129
130 private static final String JOB_TYPE = "org.opencastproject.transcription.ibmwatson";
131
132 static final String TRANSCRIPT_COLLECTION = "transcripts";
133 static final String APIKEY = "apikey";
134 private static final int CONNECTION_TIMEOUT = 60000;
135 private static final int SOCKET_TIMEOUT = 60000;
136
137 public static final String DEFAULT_WF_DEF = "attach-watson-transcripts";
138 private static final long DEFAULT_COMPLETION_BUFFER = 600;
139 private static final long DEFAULT_DISPATCH_INTERVAL = 60;
140 private static final long DEFAULT_MAX_PROCESSING_TIME = 2 * 60 * 60;
141 private static final String DEFAULT_LANGUAGE = "en";
142
143
144 private static final int DEFAULT_CLEANUP_RESULTS_DAYS = 7;
145
146
147 public static final String ADMIN_URL_PROPERTY = "org.opencastproject.admin.ui.url";
148 private static final String DIGEST_USER_PROPERTY = "org.opencastproject.security.digest.user";
149
150
151 private static final String CLUSTER_NAME_PROPERTY = "org.opencastproject.environment.name";
152 private String clusterName = "";
153
154
155 public static final float DEFAULT_START_TRANSCRIPTION_JOB_LOAD = 0.1f;
156
157
158 public static final String START_TRANSCRIPTION_JOB_LOAD_KEY = "job.load.start.transcription";
159
160
161 private float jobLoad = DEFAULT_START_TRANSCRIPTION_JOB_LOAD;
162
163
164 public interface JobEvent {
165 String COMPLETED_WITH_RESULTS = "recognitions.completed_with_results";
166 String FAILED = "recognitions.failed";
167 }
168
169 public interface RecognitionJobStatus {
170 String COMPLETED = "completed";
171 String FAILED = "failed";
172 String PROCESSING = "processing";
173 String WAITING = "waiting";
174 }
175
176
177 private ServiceRegistry serviceRegistry;
178 private SecurityService securityService;
179 private UserDirectoryService userDirectoryService;
180 private OrganizationDirectoryService organizationDirectoryService;
181 private Workspace workspace;
182 private TranscriptionDatabase database;
183 private AssetManager assetManager;
184 private WorkflowService workflowService;
185 private WorkingFileRepository wfr;
186 private SmtpService smtpService;
187
188
189 private Workflows wfUtil;
190
191 private enum Operation {
192 StartTranscription
193 }
194
195 private static final String IBM_WATSON_SERVICE_URL = "https://stream.watsonplatform.net/speech-to-text/api";
196 private static final String API_VERSION = "v1";
197 private static final String REGISTER_CALLBACK = "register_callback";
198 private static final String RECOGNITIONS = "recognitions";
199 private static final String CALLBACK_PATH = "/transcripts/watson/results";
200
201
202 public static final String ENABLED_CONFIG = "enabled";
203 public static final String IBM_WATSON_SERVICE_URL_CONFIG = "ibm.watson.service.url";
204 public static final String IBM_WATSON_USER_CONFIG = "ibm.watson.user";
205 public static final String IBM_WATSON_PSW_CONFIG = "ibm.watson.password";
206 public static final String IBM_WATSON_API_KEY_CONFIG = "ibm.watson.api.key";
207 public static final String IBM_WATSON_MODEL_CONFIG = "ibm.watson.model";
208 public static final String WORKFLOW_CONFIG = "workflow";
209 public static final String DISPATCH_WORKFLOW_INTERVAL_CONFIG = "workflow.dispatch.interval";
210 public static final String COMPLETION_CHECK_BUFFER_CONFIG = "completion.check.buffer";
211 public static final String MAX_PROCESSING_TIME_CONFIG = "max.processing.time";
212 public static final String NOTIFICATION_EMAIL_CONFIG = "notification.email";
213 public static final String CLEANUP_RESULTS_DAYS_CONFIG = "cleanup.results.days";
214 public static final String MAX_ATTEMPTS_CONFIG = "max.attempts";
215 public static final String RETRY_WORKLFOW_CONFIG = "retry.workflow";
216
217
218 private boolean enabled = false;
219 private String watsonServiceUrl = UrlSupport.concat(IBM_WATSON_SERVICE_URL, API_VERSION);
220 private String model;
221 private String workflowDefinitionId = DEFAULT_WF_DEF;
222 private long workflowDispatchInterval = DEFAULT_DISPATCH_INTERVAL;
223 private long completionCheckBuffer = DEFAULT_COMPLETION_BUFFER;
224 private long maxProcessingSeconds = DEFAULT_MAX_PROCESSING_TIME;
225 private String toEmailAddress;
226 private int cleanupResultDays = DEFAULT_CLEANUP_RESULTS_DAYS;
227 private String language = DEFAULT_LANGUAGE;
228 private int maxAttempts = 1;
229 private String retryWfDefId = null;
230
231 private String systemAccount;
232 private String serverUrl;
233 private String callbackUrl;
234 private boolean callbackAlreadyRegistered = false;
235 private ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);
236
237 private AuthCache authCache;
238 private CredentialsProvider credentialsProvider;
239
240 public IBMWatsonTranscriptionService() {
241 super(JOB_TYPE);
242 }
243
244 @Override
245 @Activate
246 public void activate(ComponentContext cc) {
247 if (cc != null) {
248
249 enabled = OsgiUtil.getOptCfgAsBoolean(cc.getProperties(), ENABLED_CONFIG).get();
250
251 if (enabled) {
252
253 Option<String> urlOpt = OsgiUtil.getOptCfg(cc.getProperties(), IBM_WATSON_SERVICE_URL_CONFIG);
254 if (urlOpt.isSome()) {
255 watsonServiceUrl = UrlSupport.concat(urlOpt.get(), API_VERSION);
256 }
257
258
259
260 String user;
261 String psw;
262 Option<String> keyOpt = OsgiUtil.getOptCfg(cc.getProperties(), IBM_WATSON_API_KEY_CONFIG);
263 if (keyOpt.isSome()) {
264 user = APIKEY;
265 psw = keyOpt.get();
266 logger.info("Using transcription service at {} with api key", watsonServiceUrl);
267 } else {
268
269 user = OsgiUtil.getComponentContextProperty(cc, IBM_WATSON_USER_CONFIG);
270
271 psw = OsgiUtil.getComponentContextProperty(cc, IBM_WATSON_PSW_CONFIG);
272 logger.info("Using transcription service at {} with username {}", watsonServiceUrl, user);
273 }
274
275 try {
276 URI uri = new URI(watsonServiceUrl);
277 HttpHost targetHost = new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme());
278 credentialsProvider = new BasicCredentialsProvider();
279 credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, psw));
280 authCache = new BasicAuthCache();
281 authCache.put(targetHost, new BasicScheme());
282 } catch (URISyntaxException e) {
283 throw new RuntimeException("Watson STT service url is not valid: " + watsonServiceUrl, e);
284 }
285
286
287 Option<String> modelOpt = OsgiUtil.getOptCfg(cc.getProperties(), IBM_WATSON_MODEL_CONFIG);
288 if (modelOpt.isSome()) {
289 model = modelOpt.get();
290 language = StringUtils.substringBefore(model, "-");
291 logger.info("Model is {}", model);
292 } else {
293 logger.info("Default model will be used");
294 }
295
296
297 Option<String> wfOpt = OsgiUtil.getOptCfg(cc.getProperties(), WORKFLOW_CONFIG);
298 if (wfOpt.isSome()) {
299 workflowDefinitionId = wfOpt.get();
300 }
301 logger.info("Workflow definition is {}", workflowDefinitionId);
302
303 Option<String> intervalOpt = OsgiUtil.getOptCfg(cc.getProperties(), DISPATCH_WORKFLOW_INTERVAL_CONFIG);
304 if (intervalOpt.isSome()) {
305 try {
306 workflowDispatchInterval = Long.parseLong(intervalOpt.get());
307 } catch (NumberFormatException e) {
308
309 }
310 }
311 logger.info("Workflow dispatch interval is {} seconds", workflowDispatchInterval);
312
313 Option<String> bufferOpt = OsgiUtil.getOptCfg(cc.getProperties(), COMPLETION_CHECK_BUFFER_CONFIG);
314 if (bufferOpt.isSome()) {
315 try {
316 completionCheckBuffer = Long.parseLong(bufferOpt.get());
317 } catch (NumberFormatException e) {
318
319 logger.warn("Invalid configuration for {} : {}. Default used instead: {}",
320 COMPLETION_CHECK_BUFFER_CONFIG, bufferOpt.get(), completionCheckBuffer);
321 }
322 }
323 logger.info("Completion check buffer is {} seconds", completionCheckBuffer);
324
325 Option<String> maxProcessingOpt = OsgiUtil.getOptCfg(cc.getProperties(), MAX_PROCESSING_TIME_CONFIG);
326 if (maxProcessingOpt.isSome()) {
327 try {
328 maxProcessingSeconds = Long.parseLong(maxProcessingOpt.get());
329 } catch (NumberFormatException e) {
330
331 }
332 }
333 logger.info("Maximum time a job is checked after it should have ended is {} seconds", maxProcessingSeconds);
334
335 Option<String> cleaupOpt = OsgiUtil.getOptCfg(cc.getProperties(), CLEANUP_RESULTS_DAYS_CONFIG);
336 if (cleaupOpt.isSome()) {
337 try {
338 cleanupResultDays = Integer.parseInt(cleaupOpt.get());
339 } catch (NumberFormatException e) {
340
341 }
342 }
343 logger.info("Cleanup result files after {} days", cleanupResultDays);
344
345
346 Option<String> maxAttemptsOpt = OsgiUtil.getOptCfg(cc.getProperties(), MAX_ATTEMPTS_CONFIG);
347 if (maxAttemptsOpt.isSome()) {
348 try {
349 maxAttempts = Integer.parseInt(maxAttemptsOpt.get());
350 retryWfDefId = OsgiUtil.getComponentContextProperty(cc, RETRY_WORKLFOW_CONFIG);
351 } catch (NumberFormatException e) {
352
353 logger.warn("Invalid configuration for {} : {}. Default used instead: no retries", MAX_ATTEMPTS_CONFIG,
354 maxAttemptsOpt.get());
355 }
356 } else {
357 logger.info("No retries in case of errors");
358 }
359
360 serverUrl = OsgiUtil.getContextProperty(cc, OpencastConstants.SERVER_URL_PROPERTY);
361 systemAccount = OsgiUtil.getContextProperty(cc, DIGEST_USER_PROPERTY);
362
363 jobLoad = LoadUtil.getConfiguredLoadValue(cc.getProperties(), START_TRANSCRIPTION_JOB_LOAD_KEY,
364 DEFAULT_START_TRANSCRIPTION_JOB_LOAD, serviceRegistry);
365
366
367 scheduledExecutor.scheduleWithFixedDelay(new WorkflowDispatcher(), 120, workflowDispatchInterval,
368 TimeUnit.SECONDS);
369
370
371 scheduledExecutor.scheduleWithFixedDelay(new ResultsFileCleanup(), 1, 1, TimeUnit.DAYS);
372
373
374 Option<String> optTo = OsgiUtil.getOptCfg(cc.getProperties(), NOTIFICATION_EMAIL_CONFIG);
375 if (optTo.isSome()) {
376 toEmailAddress = optTo.get();
377 } else {
378
379 optTo = OsgiUtil.getOptContextProperty(cc, ADMIN_EMAIL_PROPERTY);
380 if (optTo.isSome()) {
381 toEmailAddress = optTo.get();
382 }
383 }
384 if (toEmailAddress != null) {
385 logger.info("Notification email set to {}", toEmailAddress);
386 } else {
387 logger.warn("Email notification disabled");
388 }
389
390 Option<String> optCluster = OsgiUtil.getOptContextProperty(cc, CLUSTER_NAME_PROPERTY);
391 if (optCluster.isSome()) {
392 clusterName = optCluster.get();
393 }
394 logger.info("Environment name is {}", clusterName);
395
396 logger.info("Activated!");
397
398 } else {
399 logger.info("Service disabled. If you want to enable it, please update the service configuration.");
400 }
401 } else {
402 throw new IllegalArgumentException("Missing component context");
403 }
404 }
405
406 @Override
407 public Job startTranscription(String mpId, Track track) throws TranscriptionServiceException {
408 if (!enabled) {
409 throw new TranscriptionServiceException(
410 "This service is disabled. If you want to enable it, please update the service configuration.");
411 }
412
413 try {
414 return serviceRegistry.createJob(JOB_TYPE, Operation.StartTranscription.name(),
415 Arrays.asList(mpId, MediaPackageElementParser.getAsXml(track)), jobLoad);
416 } catch (ServiceRegistryException e) {
417 throw new TranscriptionServiceException("Unable to create a job", e);
418 } catch (MediaPackageException e) {
419 throw new TranscriptionServiceException("Invalid track " + track.toString(), e);
420 }
421 }
422
423 @Override
424 public Job startTranscription(String mpId, Track track, String... args) {
425 throw new UnsupportedOperationException("Not supported.");
426 }
427
428 @Override
429 public void transcriptionDone(String mpId, Object obj) throws TranscriptionServiceException {
430 JSONObject jsonObj = null;
431 String jobId = null;
432 try {
433 jsonObj = (JSONObject) obj;
434 jobId = (String) jsonObj.get("id");
435
436
437
438
439
440
441
442
443
444 if (jsonObj.get("results") instanceof JSONArray) {
445 JSONArray resultsArray = (JSONArray) jsonObj.get("results");
446 if (resultsArray != null && resultsArray.size() > 0) {
447 String error = (String) ((JSONObject) resultsArray.get(0)).get("error");
448 if (!StringUtils.isEmpty(error)) {
449 retryOrError(jobId, mpId,
450 String.format("Transcription completed with error for mpId %s, jobId %s: %s", mpId, jobId, error));
451 return;
452 }
453 }
454 }
455
456 logger.info("Transcription done for mpId {}, jobId {}", mpId, jobId);
457
458
459
460
461 database.updateJobControl(jobId, TranscriptionJobControl.Status.TranscriptionComplete.name());
462
463
464 if (jsonObj.get("results") != null) {
465 saveResults(jobId, jsonObj);
466 }
467 } catch (IOException e) {
468 logger.warn("Could not save transcription results file for mpId {}, jobId {}: {}",
469 mpId, jobId, jsonObj == null ? "null" : jsonObj.toJSONString());
470 throw new TranscriptionServiceException("Could not save transcription results file", e);
471 } catch (TranscriptionDatabaseException e) {
472 logger.warn("Error when updating state in database for mpId {}, jobId {}", mpId, jobId);
473 throw new TranscriptionServiceException("Could not update transcription job control db", e);
474 }
475 }
476
477 @Override
478 public void transcriptionError(String mpId, Object obj) throws TranscriptionServiceException {
479 JSONObject jsonObj = (JSONObject) obj;
480 String jobId = (String) jsonObj.get("id");
481 try {
482 retryOrError(jobId, mpId, String.format("Transcription error for media package %s, job id %s", mpId, jobId));
483 } catch (TranscriptionDatabaseException e) {
484 throw new TranscriptionServiceException("Error when updating job state.", e);
485 }
486 }
487
488 @Override
489 public String getLanguage() {
490 return language;
491 }
492
493 @Override
494 public Map<String, Object> getReturnValues(String mpId, String jobId) throws TranscriptionServiceException {
495 throw new TranscriptionServiceException("Method not implemented");
496 }
497
498 @Override
499 protected String process(Job job) throws Exception {
500 Operation op = null;
501 String operation = job.getOperation();
502 List<String> arguments = job.getArguments();
503 String result = "";
504
505 op = Operation.valueOf(operation);
506
507 switch (op) {
508 case StartTranscription:
509 String mpId = arguments.get(0);
510 Track track = (Track) MediaPackageElementParser.getFromXml(arguments.get(1));
511 createRecognitionsJob(mpId, track);
512 break;
513 default:
514 throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
515 }
516
517 return result;
518 }
519
520
521
522
523
524
525
526
527
528
529
530
531 void registerCallback() throws TranscriptionServiceException {
532 if (callbackAlreadyRegistered) {
533 return;
534 }
535
536 Organization org = securityService.getOrganization();
537 String adminUrl = StringUtils.trimToNull(org.getProperties().get(ADMIN_URL_PROPERTY));
538 if (adminUrl != null) {
539 callbackUrl = adminUrl + CALLBACK_PATH;
540 } else {
541 callbackUrl = serverUrl + CALLBACK_PATH;
542 }
543 logger.info("Callback url is {}", callbackUrl);
544
545 CloseableHttpClient httpClient = makeHttpClient();
546 HttpPost httpPost = new HttpPost(
547 UrlSupport.concat(watsonServiceUrl, REGISTER_CALLBACK) + String.format("?callback_url=%s", callbackUrl));
548
549 HttpClientContext context = HttpClientContext.create();
550 context.setCredentialsProvider(credentialsProvider);
551 context.setAuthCache(authCache);
552 CloseableHttpResponse response = null;
553
554 try {
555 response = httpClient.execute(httpPost, context);
556 int code = response.getStatusLine().getStatusCode();
557
558 switch (code) {
559 case HttpStatus.SC_OK:
560 logger.info("Callback url: {} had already already been registered", callbackUrl);
561 callbackAlreadyRegistered = true;
562 EntityUtils.consume(response.getEntity());
563 break;
564 case HttpStatus.SC_CREATED:
565 logger.info("Callback url: {} has been successfully registered", callbackUrl);
566 callbackAlreadyRegistered = true;
567 EntityUtils.consume(response.getEntity());
568 break;
569 case HttpStatus.SC_BAD_REQUEST:
570 logger.warn("Callback url {} could not be verified, status: {}", callbackUrl, code);
571 break;
572 case HttpStatus.SC_SERVICE_UNAVAILABLE:
573 logger.warn("Service unavailable when registering callback url {} status: {}", callbackUrl, code);
574 break;
575 default:
576 logger.warn("Unknown status when registering callback url {}, status: {}", callbackUrl, code);
577 break;
578 }
579 } catch (Exception e) {
580 logger.warn("Exception when calling the the register callback endpoint", e);
581 } finally {
582 try {
583 httpClient.close();
584 if (response != null) {
585 response.close();
586 }
587 } catch (IOException e) {
588 }
589 }
590 }
591
592
593
594
595
596
597
598
599
600
601
602
603
604 void createRecognitionsJob(String mpId, Track track) throws TranscriptionServiceException {
605 if (!callbackAlreadyRegistered) {
606 registerCallback();
607 }
608
609
610 File audioFile = null;
611 try {
612 audioFile = workspace.get(track.getURI());
613 } catch (Exception e) {
614 throw new TranscriptionServiceException("Error reading audio track", e);
615 }
616
617 CloseableHttpClient httpClient = makeHttpClient();
618 String additionalParms = "";
619 if (callbackAlreadyRegistered) {
620 additionalParms = String.format("&user_token=%s&callback_url=%s&events=%s,%s", mpId, callbackUrl,
621 JobEvent.COMPLETED_WITH_RESULTS, JobEvent.FAILED);
622 }
623 if (!StringUtils.isEmpty(model)) {
624 additionalParms += String.format("&model=%s", model);
625 }
626
627 HttpClientContext context = HttpClientContext.create();
628 context.setCredentialsProvider(credentialsProvider);
629 context.setAuthCache(authCache);
630 CloseableHttpResponse response = null;
631 try {
632 HttpPost httpPost = new HttpPost(UrlSupport.concat(watsonServiceUrl, RECOGNITIONS)
633 + String.format(
634 "?inactivity_timeout=-1×tamps=true&smart_formatting=true%s", additionalParms));
635 logger.debug("Url to invoke ibm watson service: {}", httpPost.getURI().toString());
636 httpPost.setHeader(HttpHeaders.CONTENT_TYPE, track.getMimeType().toString());
637 FileEntity fileEntity = new FileEntity(audioFile);
638 fileEntity.setChunked(true);
639 httpPost.setEntity(fileEntity);
640 response = httpClient.execute(httpPost, context);
641 int code = response.getStatusLine().getStatusCode();
642
643 switch (code) {
644 case HttpStatus.SC_CREATED:
645 logger.info("Recognitions job has been successfully created");
646
647 HttpEntity entity = response.getEntity();
648
649
650
651
652
653
654
655 String jsonString = EntityUtils.toString(response.getEntity());
656 JSONParser jsonParser = new JSONParser();
657 JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
658 String jobId = (String) jsonObject.get("id");
659 String jobStatus = (String) jsonObject.get("status");
660 String jobUrl = (String) jsonObject.get("url");
661 logger.info("Transcription for mp {} has been submitted. Job id: {}, job status: {}, job url: {}", mpId,
662 jobId, jobStatus, jobUrl);
663
664 database.storeJobControl(mpId, track.getIdentifier(), jobId, TranscriptionJobControl.Status.InProgress.name(),
665 track.getDuration() == null ? 0 : track.getDuration().longValue(), null, PROVIDER);
666 EntityUtils.consume(entity);
667 return;
668 case HttpStatus.SC_BAD_REQUEST:
669 logger.info("Invalid argument returned, status: {}", code);
670 break;
671 case HttpStatus.SC_SERVICE_UNAVAILABLE:
672 logger.info("Service unavailable returned, status: {}", code);
673 break;
674 default:
675 logger.info("Unknown return status: {}.", code);
676 break;
677 }
678 throw new TranscriptionServiceException("Could not create recognition job. Status returned: " + code);
679 } catch (Exception e) {
680 logger.warn("Exception when calling the recognitions endpoint", e);
681 throw new TranscriptionServiceException("Exception when calling the recognitions endpoint", e);
682 } finally {
683 try {
684 httpClient.close();
685 if (response != null) {
686 response.close();
687 }
688 } catch (IOException e) {
689 }
690 }
691 }
692
693
694
695
696
697
698
699
700
701
702
703 String getAndSaveJobResults(String jobId) throws TranscriptionServiceException {
704 CloseableHttpClient httpClient = makeHttpClient();
705
706 HttpClientContext context = HttpClientContext.create();
707 context.setCredentialsProvider(credentialsProvider);
708 context.setAuthCache(authCache);
709 CloseableHttpResponse response = null;
710 String mpId = "unknown";
711 try {
712 HttpGet httpGet = new HttpGet(UrlSupport.concat(watsonServiceUrl, RECOGNITIONS, jobId));
713 response = httpClient.execute(httpGet, context);
714 int code = response.getStatusLine().getStatusCode();
715
716 switch (code) {
717 case HttpStatus.SC_OK:
718 HttpEntity entity = response.getEntity();
719
720
721 String jsonString = EntityUtils.toString(entity);
722 JSONParser jsonParser = new JSONParser();
723 JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
724 String jobStatus = (String) jsonObject.get("status");
725 mpId = (String) jsonObject.get("user_token");
726
727 if (mpId == null) {
728 TranscriptionJobControl jc = database.findByJob(jobId);
729 if (jc != null) {
730 mpId = jc.getMediaPackageId();
731 }
732 }
733 logger.info("Recognitions job {} has been found, status {}", jobId, jobStatus);
734 EntityUtils.consume(entity);
735
736 if (jobStatus.indexOf(RecognitionJobStatus.COMPLETED) > -1 && jsonObject.get("results") != null) {
737 transcriptionDone(mpId, jsonObject);
738 }
739 return jobStatus;
740 case HttpStatus.SC_NOT_FOUND:
741 logger.info("Job not found: {}", jobId);
742 break;
743 case HttpStatus.SC_SERVICE_UNAVAILABLE:
744 logger.info("Service unavailable returned, status: {}", code);
745 break;
746 default:
747 logger.info("Unknown return status: {}.", code);
748 break;
749 }
750 throw new TranscriptionServiceException(
751 String.format("Could not check recognition job for media package %s, job id %s. Status returned: %d",
752 mpId, jobId, code),
753 code);
754 } catch (TranscriptionServiceException e) {
755 throw e;
756 } catch (Exception e) {
757 logger.warn("Exception when calling the recognitions endpoint for media package {}, job id {}",
758 mpId, jobId, e);
759 throw new TranscriptionServiceException(String.format(
760 "Exception when calling the recognitions endpoint for media package %s, job id %s", mpId, jobId), e);
761 } finally {
762 try {
763 httpClient.close();
764 if (response != null) {
765 response.close();
766 }
767 } catch (IOException e) {
768 }
769 }
770 }
771
772 private void saveResults(String jobId, JSONObject jsonObj) throws IOException {
773 if (jsonObj.get("results") != null) {
774
775 workspace.putInCollection(TRANSCRIPT_COLLECTION, jobId + ".json",
776 new ByteArrayInputStream(jsonObj.toJSONString().getBytes()));
777 }
778 }
779
780 @Override
781 public MediaPackageElement getGeneratedTranscription(String mpId, String jobId, MediaPackageElement.Type type)
782 throws TranscriptionServiceException {
783 try {
784
785 if (jobId == null || "null".equals(jobId)) {
786 jobId = null;
787 for (TranscriptionJobControl jc : database.findByMediaPackage(mpId)) {
788 if (TranscriptionJobControl.Status.Closed.name().equals(jc.getStatus())
789 || TranscriptionJobControl.Status.TranscriptionComplete.name().equals(jc.getStatus())) {
790 jobId = jc.getTranscriptionJobId();
791 }
792 }
793 }
794
795 if (jobId == null) {
796 throw new TranscriptionServiceException(
797 "No completed or closed transcription job found in database for media package " + mpId);
798 }
799
800
801 URI uri = workspace.getCollectionURI(TRANSCRIPT_COLLECTION, jobId + ".json");
802 try {
803 workspace.get(uri);
804 } catch (Exception e) {
805
806 getAndSaveJobResults(jobId);
807 }
808 MediaPackageElementBuilder builder = MediaPackageElementBuilderFactory.newInstance().newElementBuilder();
809 return builder.elementFromURI(uri, type, new MediaPackageElementFlavor("captions", "ibm-watson-json"));
810 } catch (TranscriptionDatabaseException e) {
811 throw new TranscriptionServiceException("Job id not informed and could not find transcription", e);
812 }
813 }
814
815 protected CloseableHttpClient makeHttpClient() {
816 RequestConfig reqConfig = RequestConfig.custom().setConnectTimeout(CONNECTION_TIMEOUT)
817 .setSocketTimeout(SOCKET_TIMEOUT).setConnectionRequestTimeout(CONNECTION_TIMEOUT).build();
818 return HttpClients.custom().setDefaultRequestConfig(reqConfig)
819 .setRetryHandler(new DefaultHttpRequestRetryHandler(3, true)).build();
820 }
821
822 protected void retryOrError(String jobId, String mpId, String errorMsg) throws TranscriptionDatabaseException {
823 logger.warn(errorMsg);
824
825
826 TranscriptionJobControl jc = database.findByJob(jobId);
827 String trackId = jc.getTrackId();
828
829 int attempts = database
830 .findByMediaPackageTrackAndStatus(mpId, trackId, TranscriptionJobControl.Status.Error.name(),
831 TranscriptionJobControl.Status.InProgress.name(), TranscriptionJobControl.Status.Canceled.name())
832 .size();
833 if (attempts < maxAttempts) {
834
835 database.updateJobControl(jobId, TranscriptionJobControl.Status.Retry.name());
836 logger.info("Will retry transcription for media package {}, track {}", mpId, trackId);
837 } else {
838
839 database.updateJobControl(jobId, TranscriptionJobControl.Status.Error.name());
840
841 logger.error("{} transcription attempts exceeded maximum of {} for media package {}, track {}.", attempts,
842 maxAttempts, mpId, trackId);
843 sendEmail("Transcription ERROR", String.format(errorMsg, mpId, jobId));
844 }
845 }
846
847 private void sendEmail(String subject, String body) {
848 if (toEmailAddress == null) {
849 logger.info("Skipping sending email notification. Message is {}.", body);
850 return;
851 }
852 try {
853 logger.debug("Sending e-mail notification to {}", toEmailAddress);
854 smtpService.send(toEmailAddress, String.format("%s (%s)", subject, clusterName), body);
855 logger.info("Sent e-mail notification to {}", toEmailAddress);
856 } catch (Exception e) {
857 logger.error("Could not send email: {}\n{}", subject, body, e);
858 }
859 }
860
861 public boolean isCallbackAlreadyRegistered() {
862 return callbackAlreadyRegistered;
863 }
864
865 @Reference
866 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
867 this.serviceRegistry = serviceRegistry;
868 }
869
870 @Reference
871 public void setSecurityService(SecurityService securityService) {
872 this.securityService = securityService;
873 }
874
875 @Reference
876 public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
877 this.userDirectoryService = userDirectoryService;
878 }
879
880 @Reference
881 public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
882 this.organizationDirectoryService = organizationDirectoryService;
883 }
884
885 @Reference
886 public void setSmtpService(SmtpService service) {
887 this.smtpService = service;
888 }
889
890 @Reference
891 public void setWorkspace(Workspace ws) {
892 this.workspace = ws;
893 }
894
895 @Reference
896 public void setWorkingFileRepository(WorkingFileRepository wfr) {
897 this.wfr = wfr;
898 }
899
900 @Reference
901 public void setDatabase(TranscriptionDatabase service) {
902 this.database = service;
903 }
904
905 @Reference
906 public void setAssetManager(AssetManager service) {
907 this.assetManager = service;
908 }
909
910 @Reference
911 public void setWorkflowService(WorkflowService service) {
912 this.workflowService = service;
913 }
914
915 @Override
916 protected ServiceRegistry getServiceRegistry() {
917 return serviceRegistry;
918 }
919
920 @Override
921 protected SecurityService getSecurityService() {
922 return securityService;
923 }
924
925 @Override
926 protected UserDirectoryService getUserDirectoryService() {
927 return userDirectoryService;
928 }
929
930 @Override
931 protected OrganizationDirectoryService getOrganizationDirectoryService() {
932 return organizationDirectoryService;
933 }
934
935
936 void setWfUtil(Workflows wfUtil) {
937 this.wfUtil = wfUtil;
938 }
939
940 class WorkflowDispatcher implements Runnable {
941
942
943
944
945
946
947 @Override
948 public void run() {
949 logger.debug("WorkflowDispatcher waking up...");
950
951 try {
952
953
954 long providerId;
955 TranscriptionProviderControl providerInfo = database.findIdByProvider(PROVIDER);
956 if (providerInfo != null) {
957 providerId = providerInfo.getId();
958 } else {
959 logger.warn("No provider entry for {}", PROVIDER);
960 return;
961 }
962
963 List<TranscriptionJobControl> jobs = database.findByStatus(TranscriptionJobControl.Status.InProgress.name(),
964 TranscriptionJobControl.Status.TranscriptionComplete.name());
965
966 for (TranscriptionJobControl j : jobs) {
967
968
969 if (j.getProviderId() != providerId) {
970 continue;
971 }
972
973 String mpId = j.getMediaPackageId();
974 String jobId = j.getTranscriptionJobId();
975
976
977
978 if (TranscriptionJobControl.Status.InProgress.name().equals(j.getStatus())) {
979
980
981 if (j.getDateCreated().getTime() + j.getTrackDuration() + completionCheckBuffer * 1000 < System
982 .currentTimeMillis()) {
983 try {
984 String jobStatus = getAndSaveJobResults(jobId);
985 if (RecognitionJobStatus.FAILED.equals(jobStatus)) {
986 retryOrError(jobId, mpId,
987 String.format("Transcription job failed for mpId %s, jobId %s", mpId, jobId));
988 continue;
989 } else if (RecognitionJobStatus.PROCESSING.equals(jobStatus)
990 || RecognitionJobStatus.WAITING.equals(jobStatus)) {
991
992 if (j.getDateCreated().getTime() + j.getTrackDuration()
993 + (completionCheckBuffer + maxProcessingSeconds) * 1000 < System.currentTimeMillis()) {
994
995 retryOrError(jobId, mpId, String.format(
996 "Transcription job was in waiting or processing state for too long "
997 + "(media package %s, job id %s)", mpId, jobId));
998 }
999
1000 continue;
1001 }
1002 } catch (TranscriptionServiceException e) {
1003 if (e.getCode() == 404) {
1004
1005 database.updateJobControl(jobId, TranscriptionJobControl.Status.Canceled.name());
1006
1007 sendEmail("Transcription ERROR",
1008 String.format("Transcription job was not found (media package %s, job id %s).", mpId, jobId));
1009 }
1010 continue;
1011 }
1012 } else {
1013 continue;
1014 }
1015 }
1016
1017
1018 try {
1019
1020 Map<String, String> params = new HashMap<String, String>();
1021 params.put("transcriptionJobId", jobId);
1022 String wfId = startWorkflow(mpId, workflowDefinitionId, params);
1023 if (wfId == null) {
1024 logger.warn("Attach transcription workflow could NOT be scheduled for mp {}, watson job {}", mpId, jobId);
1025 continue;
1026 }
1027
1028 database.updateJobControl(jobId, TranscriptionJobControl.Status.Closed.name());
1029 logger.info("Attach transcription workflow {} scheduled for mp {}, watson job {}",
1030 wfId, mpId, jobId);
1031 } catch (Exception e) {
1032 logger.warn(
1033 "Attach transcription workflow could NOT be scheduled for media package {}, watson job {}, {}: {}",
1034 mpId, jobId, e.getClass().getName(), e.getMessage()
1035 );
1036 }
1037 }
1038
1039 if (maxAttempts > 1) {
1040
1041 jobs = database.findByStatus(TranscriptionJobControl.Status.Retry.name());
1042 HashMap<String, String> params = new HashMap<String, String>();
1043 for (TranscriptionJobControl j : jobs) {
1044 String mpId = j.getMediaPackageId();
1045 String wfId = startWorkflow(mpId, retryWfDefId, params);
1046 String jobId = j.getTranscriptionJobId();
1047 if (wfId == null) {
1048 logger.warn(
1049 "Retry transcription workflow could NOT be scheduled for mp {}, watson job {}. "
1050 + "Will try again next time.",
1051 mpId, jobId);
1052
1053 continue;
1054 }
1055 logger.info("Retry transcription workflow {} scheduled for mp {}.", wfId, mpId);
1056
1057 database.updateJobControl(jobId, TranscriptionJobControl.Status.Error.name());
1058 }
1059 }
1060 } catch (TranscriptionDatabaseException e) {
1061 logger.warn("Could not read/update transcription job control database.", e);
1062 }
1063 }
1064 }
1065
1066 private String startWorkflow(String mpId, String wfDefId, Map<String, String> params) {
1067 DefaultOrganization defaultOrg = new DefaultOrganization();
1068 securityService.setOrganization(defaultOrg);
1069 securityService.setUser(SecurityUtil.createSystemUser(systemAccount, defaultOrg));
1070
1071
1072 final AQueryBuilder q = assetManager.createQuery();
1073 final AResult r = q.select(q.snapshot()).where(q.mediaPackageId(mpId).and(q.version().isLatest())).run();
1074 if (r.getSize() == 0) {
1075
1076 logger.warn("Media package {} has not been archived yet.", mpId);
1077 return null;
1078 }
1079
1080 String org = Enrichments.enrich(r).getSnapshots().stream().findFirst().get().getOrganizationId();
1081 Organization organization = null;
1082 try {
1083 organization = organizationDirectoryService.getOrganization(org);
1084 if (organization == null) {
1085 logger.warn("Media package {} has an unknown organization {}.", mpId, org);
1086 return null;
1087 }
1088 } catch (NotFoundException e) {
1089 logger.warn("Organization {} not found for media package {}.", org, mpId);
1090 return null;
1091 }
1092 securityService.setOrganization(organization);
1093
1094 try {
1095 WorkflowDefinition wfDef = workflowService.getWorkflowDefinitionById(wfDefId);
1096 Workflows workflows = wfUtil != null ? wfUtil : new Workflows(assetManager, workflowService);
1097 Set<String> mpIds = new HashSet<String>();
1098 mpIds.add(mpId);
1099 List<WorkflowInstance> wfList = workflows
1100 .applyWorkflowToLatestVersion(mpIds, ConfiguredWorkflow.workflow(wfDef, params)).toList();
1101 return wfList.size() > 0 ? Long.toString(wfList.get(0).getId()) : null;
1102 } catch (NotFoundException | WorkflowDatabaseException e) {
1103 logger.warn("Could not get workflow definition: {}", wfDefId);
1104 }
1105
1106 return null;
1107 }
1108
1109
1110
1111
1112
1113
1114 public boolean isEnabled() {
1115 return enabled;
1116 }
1117
1118 class ResultsFileCleanup implements Runnable {
1119 @Override
1120 public void run() {
1121 logger.info("ResultsFileCleanup waking up...");
1122 try {
1123
1124 wfr.cleanupOldFilesFromCollection(TRANSCRIPT_COLLECTION, cleanupResultDays);
1125 } catch (IOException e) {
1126 logger.warn("Could not cleanup old transcript results files", e);
1127 }
1128 }
1129 }
1130
1131 }