View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.transcription.ibmwatson;
22  
23  import static org.opencastproject.systems.OpencastConstants.ADMIN_EMAIL_PROPERTY;
24  
25  import org.opencastproject.assetmanager.api.AssetManager;
26  import org.opencastproject.assetmanager.api.fn.Enrichments;
27  import org.opencastproject.assetmanager.api.query.AQueryBuilder;
28  import org.opencastproject.assetmanager.api.query.AResult;
29  import org.opencastproject.assetmanager.util.Workflows;
30  import org.opencastproject.job.api.AbstractJobProducer;
31  import org.opencastproject.job.api.Job;
32  import org.opencastproject.kernel.mail.SmtpService;
33  import org.opencastproject.mediapackage.MediaPackageElement;
34  import org.opencastproject.mediapackage.MediaPackageElementBuilder;
35  import org.opencastproject.mediapackage.MediaPackageElementBuilderFactory;
36  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
37  import org.opencastproject.mediapackage.MediaPackageElementParser;
38  import org.opencastproject.mediapackage.MediaPackageException;
39  import org.opencastproject.mediapackage.Track;
40  import org.opencastproject.security.api.DefaultOrganization;
41  import org.opencastproject.security.api.Organization;
42  import org.opencastproject.security.api.OrganizationDirectoryService;
43  import org.opencastproject.security.api.SecurityService;
44  import org.opencastproject.security.api.UserDirectoryService;
45  import org.opencastproject.security.util.SecurityUtil;
46  import org.opencastproject.serviceregistry.api.ServiceRegistry;
47  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
48  import org.opencastproject.systems.OpencastConstants;
49  import org.opencastproject.transcription.api.TranscriptionService;
50  import org.opencastproject.transcription.api.TranscriptionServiceException;
51  import org.opencastproject.transcription.persistence.TranscriptionDatabase;
52  import org.opencastproject.transcription.persistence.TranscriptionDatabaseException;
53  import org.opencastproject.transcription.persistence.TranscriptionJobControl;
54  import org.opencastproject.transcription.persistence.TranscriptionProviderControl;
55  import org.opencastproject.util.LoadUtil;
56  import org.opencastproject.util.NotFoundException;
57  import org.opencastproject.util.OsgiUtil;
58  import org.opencastproject.util.UrlSupport;
59  import org.opencastproject.util.data.Option;
60  import org.opencastproject.workflow.api.ConfiguredWorkflow;
61  import org.opencastproject.workflow.api.WorkflowDatabaseException;
62  import org.opencastproject.workflow.api.WorkflowDefinition;
63  import org.opencastproject.workflow.api.WorkflowInstance;
64  import org.opencastproject.workflow.api.WorkflowService;
65  import org.opencastproject.workingfilerepository.api.WorkingFileRepository;
66  import org.opencastproject.workspace.api.Workspace;
67  
68  import org.apache.commons.lang3.StringUtils;
69  import org.apache.http.HttpEntity;
70  import org.apache.http.HttpHeaders;
71  import org.apache.http.HttpHost;
72  import org.apache.http.HttpStatus;
73  import org.apache.http.auth.AuthScope;
74  import org.apache.http.auth.UsernamePasswordCredentials;
75  import org.apache.http.client.AuthCache;
76  import org.apache.http.client.CredentialsProvider;
77  import org.apache.http.client.config.RequestConfig;
78  import org.apache.http.client.methods.CloseableHttpResponse;
79  import org.apache.http.client.methods.HttpGet;
80  import org.apache.http.client.methods.HttpPost;
81  import org.apache.http.client.protocol.HttpClientContext;
82  import org.apache.http.entity.FileEntity;
83  import org.apache.http.impl.auth.BasicScheme;
84  import org.apache.http.impl.client.BasicAuthCache;
85  import org.apache.http.impl.client.BasicCredentialsProvider;
86  import org.apache.http.impl.client.CloseableHttpClient;
87  import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
88  import org.apache.http.impl.client.HttpClients;
89  import org.apache.http.util.EntityUtils;
90  import org.json.simple.JSONArray;
91  import org.json.simple.JSONObject;
92  import org.json.simple.parser.JSONParser;
93  import org.osgi.service.component.ComponentContext;
94  import org.osgi.service.component.annotations.Activate;
95  import org.osgi.service.component.annotations.Component;
96  import org.osgi.service.component.annotations.Reference;
97  import org.slf4j.Logger;
98  import org.slf4j.LoggerFactory;
99  
100 import java.io.ByteArrayInputStream;
101 import java.io.File;
102 import java.io.IOException;
103 import java.net.URI;
104 import java.net.URISyntaxException;
105 import java.util.Arrays;
106 import java.util.HashMap;
107 import java.util.HashSet;
108 import java.util.List;
109 import java.util.Map;
110 import java.util.Set;
111 import java.util.concurrent.Executors;
112 import java.util.concurrent.ScheduledExecutorService;
113 import java.util.concurrent.TimeUnit;
114 
115 @Component(
116     immediate = true,
117     service = { TranscriptionService.class,IBMWatsonTranscriptionService.class },
118     property = {
119         "service.description=IBM Watson Transcription Service",
120         "provider=ibm.watson"
121     }
122 )
123 public class IBMWatsonTranscriptionService extends AbstractJobProducer implements TranscriptionService {
124 
125   /** The logger */
126   private static final Logger logger = LoggerFactory.getLogger(IBMWatsonTranscriptionService.class);
127 
128   private static final String PROVIDER = "IBM Watson";
129 
130   private static final String JOB_TYPE = "org.opencastproject.transcription.ibmwatson";
131 
132   static final String TRANSCRIPT_COLLECTION = "transcripts";
133   static final String APIKEY = "apikey";
134   private static final int CONNECTION_TIMEOUT = 60000; // ms, 1 minute
135   private static final int SOCKET_TIMEOUT = 60000; // ms, 1 minute
136   // Default wf to attach transcription results to mp
137   public static final String DEFAULT_WF_DEF = "attach-watson-transcripts";
138   private static final long DEFAULT_COMPLETION_BUFFER = 600; // in seconds, default is 10 minutes
139   private static final long DEFAULT_DISPATCH_INTERVAL = 60; // in seconds, default is 1 minute
140   private static final long DEFAULT_MAX_PROCESSING_TIME = 2 * 60 * 60; // in seconds, default is 2 hours
141   private static final String DEFAULT_LANGUAGE = "en";
142   // Cleans up results files that are older than 7 days (which is how long the IBM watson
143   // speech-to-text-service keeps the jobs by default)
144   private static final int DEFAULT_CLEANUP_RESULTS_DAYS = 7;
145 
146   // Global configuration (custom.properties)
147   public static final String ADMIN_URL_PROPERTY = "org.opencastproject.admin.ui.url";
148   private static final String DIGEST_USER_PROPERTY = "org.opencastproject.security.digest.user";
149 
150   // Cluster name
151   private static final String CLUSTER_NAME_PROPERTY = "org.opencastproject.environment.name";
152   private String clusterName = "";
153 
154   /** The load introduced on the system by creating a transcription job */
155   public static final float DEFAULT_START_TRANSCRIPTION_JOB_LOAD = 0.1f;
156 
157   /** The key to look for in the service configuration file to override the default h=job load */
158   public static final String START_TRANSCRIPTION_JOB_LOAD_KEY = "job.load.start.transcription";
159 
160   /** The load introduced on the system by creating a caption job */
161   private float jobLoad = DEFAULT_START_TRANSCRIPTION_JOB_LOAD;
162 
163   // The events we are interested in receiving notifications
164   public interface JobEvent {
165     String COMPLETED_WITH_RESULTS = "recognitions.completed_with_results";
166     String FAILED = "recognitions.failed";
167   }
168 
169   public interface RecognitionJobStatus {
170     String COMPLETED = "completed";
171     String FAILED = "failed";
172     String PROCESSING = "processing";
173     String WAITING = "waiting";
174   }
175 
176   /** Service dependencies */
177   private ServiceRegistry serviceRegistry;
178   private SecurityService securityService;
179   private UserDirectoryService userDirectoryService;
180   private OrganizationDirectoryService organizationDirectoryService;
181   private Workspace workspace;
182   private TranscriptionDatabase database;
183   private AssetManager assetManager;
184   private WorkflowService workflowService;
185   private WorkingFileRepository wfr;
186   private SmtpService smtpService;
187 
188   // Only used by unit tests!
189   private Workflows wfUtil;
190 
191   private enum Operation {
192     StartTranscription
193   }
194 
195   private static final String IBM_WATSON_SERVICE_URL = "https://stream.watsonplatform.net/speech-to-text/api";
196   private static final String API_VERSION = "v1";
197   private static final String REGISTER_CALLBACK = "register_callback";
198   private static final String RECOGNITIONS = "recognitions";
199   private static final String CALLBACK_PATH = "/transcripts/watson/results";
200 
201   /** Service configuration options */
202   public static final String ENABLED_CONFIG = "enabled";
203   public static final String IBM_WATSON_SERVICE_URL_CONFIG = "ibm.watson.service.url";
204   public static final String IBM_WATSON_USER_CONFIG = "ibm.watson.user";
205   public static final String IBM_WATSON_PSW_CONFIG = "ibm.watson.password";
206   public static final String IBM_WATSON_API_KEY_CONFIG = "ibm.watson.api.key";
207   public static final String IBM_WATSON_MODEL_CONFIG = "ibm.watson.model";
208   public static final String WORKFLOW_CONFIG = "workflow";
209   public static final String DISPATCH_WORKFLOW_INTERVAL_CONFIG = "workflow.dispatch.interval";
210   public static final String COMPLETION_CHECK_BUFFER_CONFIG = "completion.check.buffer";
211   public static final String MAX_PROCESSING_TIME_CONFIG = "max.processing.time";
212   public static final String NOTIFICATION_EMAIL_CONFIG = "notification.email";
213   public static final String CLEANUP_RESULTS_DAYS_CONFIG = "cleanup.results.days";
214   public static final String MAX_ATTEMPTS_CONFIG = "max.attempts";
215   public static final String RETRY_WORKLFOW_CONFIG = "retry.workflow";
216 
217   /** Service configuration values */
218   private boolean enabled = false; // Disabled by default
219   private String watsonServiceUrl = UrlSupport.concat(IBM_WATSON_SERVICE_URL, API_VERSION);
220   private String model;
221   private String workflowDefinitionId = DEFAULT_WF_DEF;
222   private long workflowDispatchInterval = DEFAULT_DISPATCH_INTERVAL;
223   private long completionCheckBuffer = DEFAULT_COMPLETION_BUFFER;
224   private long maxProcessingSeconds = DEFAULT_MAX_PROCESSING_TIME;
225   private String toEmailAddress;
226   private int cleanupResultDays = DEFAULT_CLEANUP_RESULTS_DAYS;
227   private String language = DEFAULT_LANGUAGE;
228   private int maxAttempts = 1;
229   private String retryWfDefId = null;
230 
231   private String systemAccount;
232   private String serverUrl;
233   private String callbackUrl;
234   private boolean callbackAlreadyRegistered = false;
235   private ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);
236   // For preemptive basic authentication
237   private AuthCache authCache;
238   private CredentialsProvider credentialsProvider;
239 
240   public IBMWatsonTranscriptionService() {
241     super(JOB_TYPE);
242   }
243 
244   @Override
245   @Activate
246   public void activate(ComponentContext cc) {
247     if (cc != null) {
248       // Has this service been enabled?
249       enabled = OsgiUtil.getOptCfgAsBoolean(cc.getProperties(), ENABLED_CONFIG).get();
250 
251       if (enabled) {
252         // Service url (optional)
253         Option<String> urlOpt = OsgiUtil.getOptCfg(cc.getProperties(), IBM_WATSON_SERVICE_URL_CONFIG);
254         if (urlOpt.isSome()) {
255           watsonServiceUrl = UrlSupport.concat(urlOpt.get(), API_VERSION);
256         }
257 
258         // Api key is checked first. If not entered, user and password are mandatory (to
259         // support older instances of the STT service)
260         String user; // user name or 'apikey'
261         String psw; // user password or api key
262         Option<String> keyOpt = OsgiUtil.getOptCfg(cc.getProperties(), IBM_WATSON_API_KEY_CONFIG);
263         if (keyOpt.isSome()) {
264           user = APIKEY;
265           psw = keyOpt.get();
266           logger.info("Using transcription service at {} with api key", watsonServiceUrl);
267         } else {
268           // User name (mandatory if api key is empty)
269           user = OsgiUtil.getComponentContextProperty(cc, IBM_WATSON_USER_CONFIG);
270           // Password (mandatory if api key is empty)
271           psw = OsgiUtil.getComponentContextProperty(cc, IBM_WATSON_PSW_CONFIG);
272           logger.info("Using transcription service at {} with username {}", watsonServiceUrl, user);
273         }
274         // We will use preemptive basic auth
275         try {
276           URI uri = new URI(watsonServiceUrl);
277           HttpHost targetHost = new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme());
278           credentialsProvider = new BasicCredentialsProvider();
279           credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, psw));
280           authCache = new BasicAuthCache();
281           authCache.put(targetHost, new BasicScheme());
282         } catch (URISyntaxException e) {
283           throw new RuntimeException("Watson STT service url is not valid: " + watsonServiceUrl, e);
284         }
285 
286         // Language model to be used (optional)
287         Option<String> modelOpt = OsgiUtil.getOptCfg(cc.getProperties(), IBM_WATSON_MODEL_CONFIG);
288         if (modelOpt.isSome()) {
289           model = modelOpt.get();
290           language = StringUtils.substringBefore(model, "-");
291           logger.info("Model is {}", model);
292         } else {
293           logger.info("Default model will be used");
294         }
295 
296         // Workflow to execute when getting callback (optional, with default)
297         Option<String> wfOpt = OsgiUtil.getOptCfg(cc.getProperties(), WORKFLOW_CONFIG);
298         if (wfOpt.isSome()) {
299           workflowDefinitionId = wfOpt.get();
300         }
301         logger.info("Workflow definition is {}", workflowDefinitionId);
302         // Interval to check for completed transcription jobs and start workflows to attach transcripts
303         Option<String> intervalOpt = OsgiUtil.getOptCfg(cc.getProperties(), DISPATCH_WORKFLOW_INTERVAL_CONFIG);
304         if (intervalOpt.isSome()) {
305           try {
306             workflowDispatchInterval = Long.parseLong(intervalOpt.get());
307           } catch (NumberFormatException e) {
308             // Use default
309           }
310         }
311         logger.info("Workflow dispatch interval is {} seconds", workflowDispatchInterval);
312         // How long to wait after a transcription is supposed to finish before starting checking
313         Option<String> bufferOpt = OsgiUtil.getOptCfg(cc.getProperties(), COMPLETION_CHECK_BUFFER_CONFIG);
314         if (bufferOpt.isSome()) {
315           try {
316             completionCheckBuffer = Long.parseLong(bufferOpt.get());
317           } catch (NumberFormatException e) {
318             // Use default
319             logger.warn("Invalid configuration for {} : {}. Default used instead: {}",
320                     COMPLETION_CHECK_BUFFER_CONFIG, bufferOpt.get(), completionCheckBuffer);
321           }
322         }
323         logger.info("Completion check buffer is {} seconds", completionCheckBuffer);
324         // How long to wait after a transcription is supposed to finish before marking the job as canceled in the db
325         Option<String> maxProcessingOpt = OsgiUtil.getOptCfg(cc.getProperties(), MAX_PROCESSING_TIME_CONFIG);
326         if (maxProcessingOpt.isSome()) {
327           try {
328             maxProcessingSeconds = Long.parseLong(maxProcessingOpt.get());
329           } catch (NumberFormatException e) {
330             // Use default
331           }
332         }
333         logger.info("Maximum time a job is checked after it should have ended is {} seconds", maxProcessingSeconds);
334         // How long to keep result files in the working file repository
335         Option<String> cleaupOpt = OsgiUtil.getOptCfg(cc.getProperties(), CLEANUP_RESULTS_DAYS_CONFIG);
336         if (cleaupOpt.isSome()) {
337           try {
338             cleanupResultDays = Integer.parseInt(cleaupOpt.get());
339           } catch (NumberFormatException e) {
340             // Use default
341           }
342         }
343         logger.info("Cleanup result files after {} days", cleanupResultDays);
344 
345         // Maximum number of retries if error (optional)
346         Option<String> maxAttemptsOpt = OsgiUtil.getOptCfg(cc.getProperties(), MAX_ATTEMPTS_CONFIG);
347         if (maxAttemptsOpt.isSome()) {
348           try {
349             maxAttempts = Integer.parseInt(maxAttemptsOpt.get());
350             retryWfDefId = OsgiUtil.getComponentContextProperty(cc, RETRY_WORKLFOW_CONFIG);
351           } catch (NumberFormatException e) {
352             // Use default
353             logger.warn("Invalid configuration for {} : {}. Default used instead: no retries", MAX_ATTEMPTS_CONFIG,
354                     maxAttemptsOpt.get());
355           }
356         } else {
357           logger.info("No retries in case of errors");
358         }
359 
360         serverUrl = OsgiUtil.getContextProperty(cc, OpencastConstants.SERVER_URL_PROPERTY);
361         systemAccount = OsgiUtil.getContextProperty(cc, DIGEST_USER_PROPERTY);
362 
363         jobLoad = LoadUtil.getConfiguredLoadValue(cc.getProperties(), START_TRANSCRIPTION_JOB_LOAD_KEY,
364                 DEFAULT_START_TRANSCRIPTION_JOB_LOAD, serviceRegistry);
365 
366         // Schedule the workflow dispatching, starting in 2 minutes
367         scheduledExecutor.scheduleWithFixedDelay(new WorkflowDispatcher(), 120, workflowDispatchInterval,
368                 TimeUnit.SECONDS);
369 
370         // Schedule the cleanup of old results jobs from the collection in the wfr once a day
371         scheduledExecutor.scheduleWithFixedDelay(new ResultsFileCleanup(), 1, 1, TimeUnit.DAYS);
372 
373         // Notification email passed in this service configuration?
374         Option<String> optTo = OsgiUtil.getOptCfg(cc.getProperties(), NOTIFICATION_EMAIL_CONFIG);
375         if (optTo.isSome()) {
376           toEmailAddress = optTo.get();
377         } else {
378           // Use admin email informed in custom.properties
379           optTo = OsgiUtil.getOptContextProperty(cc, ADMIN_EMAIL_PROPERTY);
380           if (optTo.isSome()) {
381             toEmailAddress = optTo.get();
382           }
383         }
384         if (toEmailAddress != null) {
385           logger.info("Notification email set to {}", toEmailAddress);
386         } else {
387           logger.warn("Email notification disabled");
388         }
389 
390         Option<String> optCluster = OsgiUtil.getOptContextProperty(cc, CLUSTER_NAME_PROPERTY);
391         if (optCluster.isSome()) {
392           clusterName = optCluster.get();
393         }
394         logger.info("Environment name is {}", clusterName);
395 
396         logger.info("Activated!");
397         // Cannot call registerCallback here because of the REST service dependency on this service
398       } else {
399         logger.info("Service disabled. If you want to enable it, please update the service configuration.");
400       }
401     } else {
402       throw new IllegalArgumentException("Missing component context");
403     }
404   }
405 
406   @Override
407   public Job startTranscription(String mpId, Track track) throws TranscriptionServiceException {
408     if (!enabled) {
409       throw new TranscriptionServiceException(
410               "This service is disabled. If you want to enable it, please update the service configuration.");
411     }
412 
413     try {
414       return serviceRegistry.createJob(JOB_TYPE, Operation.StartTranscription.name(),
415               Arrays.asList(mpId, MediaPackageElementParser.getAsXml(track)), jobLoad);
416     } catch (ServiceRegistryException e) {
417       throw new TranscriptionServiceException("Unable to create a job", e);
418     } catch (MediaPackageException e) {
419       throw new TranscriptionServiceException("Invalid track " + track.toString(), e);
420     }
421   }
422 
423   @Override
424   public Job startTranscription(String mpId, Track track, String... args) {
425     throw new UnsupportedOperationException("Not supported.");
426   }
427 
428   @Override
429   public void transcriptionDone(String mpId, Object obj) throws TranscriptionServiceException {
430     JSONObject jsonObj = null;
431     String jobId = null;
432     try {
433       jsonObj = (JSONObject) obj;
434       jobId = (String) jsonObj.get("id");
435 
436       // Check for errors inside the results object. Sometimes we get a status completed, but
437       // the transcription failed e.g.
438       // curl --header "Content-Type: application/json" --request POST --data
439       // '{"id":"ebeeb546-2e1a-11e9-941d-f349af2d6273",
440       // "results":[{"error":"failed when posting audio to the STT service"}],
441       // "event":"recognitions.completed_with_results",
442       // "user_token":"66c6c9b0-b6a2-4c9a-92c8-55f953ab3d38",
443       // "created":"2019-02-11T05:04:29.283Z"}' http://ADMIN/transcripts/watson/results
444       if (jsonObj.get("results") instanceof JSONArray) {
445         JSONArray resultsArray = (JSONArray) jsonObj.get("results");
446         if (resultsArray != null && resultsArray.size() > 0) {
447           String error = (String) ((JSONObject) resultsArray.get(0)).get("error");
448           if (!StringUtils.isEmpty(error)) {
449             retryOrError(jobId, mpId,
450                 String.format("Transcription completed with error for mpId %s, jobId %s: %s", mpId, jobId, error));
451             return;
452           }
453         }
454       }
455 
456       logger.info("Transcription done for mpId {}, jobId {}", mpId, jobId);
457 
458       // Update state in database
459       // If there's an optimistic lock exception here, it's ok because the workflow dispatcher
460       // may be doing the same thing
461       database.updateJobControl(jobId, TranscriptionJobControl.Status.TranscriptionComplete.name());
462 
463       // Save results in file system if there
464       if (jsonObj.get("results") != null) {
465         saveResults(jobId, jsonObj);
466       }
467     } catch (IOException e) {
468       logger.warn("Could not save transcription results file for mpId {}, jobId {}: {}",
469               mpId, jobId, jsonObj == null ? "null" : jsonObj.toJSONString());
470       throw new TranscriptionServiceException("Could not save transcription results file", e);
471     } catch (TranscriptionDatabaseException e) {
472       logger.warn("Error when updating state in database for mpId {}, jobId {}", mpId, jobId);
473       throw new TranscriptionServiceException("Could not update transcription job control db", e);
474     }
475   }
476 
477   @Override
478   public void transcriptionError(String mpId, Object obj) throws TranscriptionServiceException {
479     JSONObject jsonObj = (JSONObject) obj;
480     String jobId = (String) jsonObj.get("id");
481     try {
482       retryOrError(jobId, mpId, String.format("Transcription error for media package %s, job id %s", mpId, jobId));
483     } catch (TranscriptionDatabaseException e) {
484       throw new TranscriptionServiceException("Error when updating job state.", e);
485     }
486   }
487 
488   @Override
489   public String getLanguage() {
490     return language;
491   }
492 
493   @Override
494   public Map<String, Object> getReturnValues(String mpId, String jobId) throws TranscriptionServiceException {
495     throw new TranscriptionServiceException("Method not implemented");
496   }
497 
498   @Override
499   protected String process(Job job) throws Exception {
500     Operation op = null;
501     String operation = job.getOperation();
502     List<String> arguments = job.getArguments();
503     String result = "";
504 
505     op = Operation.valueOf(operation);
506 
507     switch (op) {
508       case StartTranscription:
509         String mpId = arguments.get(0);
510         Track track = (Track) MediaPackageElementParser.getFromXml(arguments.get(1));
511         createRecognitionsJob(mpId, track);
512         break;
513       default:
514         throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
515     }
516 
517     return result;
518   }
519 
520   // Example URL is too long but needs to stay like this
521   // CHECKSTYLE:OFF
522   /**
523    * Register the callback url with the Speech-to-text service. From:
524    * https://cloud.ibm.com/apidocs/speech-to-text#register-a-callback
525    *
526    * curl -X POST -u "apikey:{apikey}"
527    * "https://stream.watsonplatform.net/speech-to-text/api/v1/register_callback?callback_url=http://{user_callback_path}/job_results&user_secret=ThisIsMySecret"
528    * Response looks like: { "status": "created", "url": "http://{user_callback_path}/results" }
529    */
530   // CHECKSTYLE:ON
531   void registerCallback() throws TranscriptionServiceException {
532     if (callbackAlreadyRegistered) {
533       return;
534     }
535 
536     Organization org = securityService.getOrganization();
537     String adminUrl = StringUtils.trimToNull(org.getProperties().get(ADMIN_URL_PROPERTY));
538     if (adminUrl != null) {
539       callbackUrl = adminUrl + CALLBACK_PATH;
540     } else {
541       callbackUrl = serverUrl + CALLBACK_PATH;
542     }
543     logger.info("Callback url is {}", callbackUrl);
544 
545     CloseableHttpClient httpClient = makeHttpClient();
546     HttpPost httpPost = new HttpPost(
547             UrlSupport.concat(watsonServiceUrl, REGISTER_CALLBACK) + String.format("?callback_url=%s", callbackUrl));
548     // Add AuthCache to the execution context for preemptive auth
549     HttpClientContext context = HttpClientContext.create();
550     context.setCredentialsProvider(credentialsProvider);
551     context.setAuthCache(authCache);
552     CloseableHttpResponse response = null;
553 
554     try {
555       response = httpClient.execute(httpPost, context);
556       int code = response.getStatusLine().getStatusCode();
557 
558       switch (code) {
559         case HttpStatus.SC_OK: // 200
560           logger.info("Callback url: {} had already already been registered", callbackUrl);
561           callbackAlreadyRegistered = true;
562           EntityUtils.consume(response.getEntity());
563           break;
564         case HttpStatus.SC_CREATED: // 201
565           logger.info("Callback url: {} has been successfully registered", callbackUrl);
566           callbackAlreadyRegistered = true;
567           EntityUtils.consume(response.getEntity());
568           break;
569         case HttpStatus.SC_BAD_REQUEST: // 400
570           logger.warn("Callback url {} could not be verified, status: {}", callbackUrl, code);
571           break;
572         case HttpStatus.SC_SERVICE_UNAVAILABLE: // 503
573           logger.warn("Service unavailable when registering callback url {} status: {}", callbackUrl, code);
574           break;
575         default:
576           logger.warn("Unknown status when registering callback url {}, status: {}", callbackUrl, code);
577           break;
578       }
579     } catch (Exception e) {
580       logger.warn("Exception when calling the the register callback endpoint", e);
581     } finally {
582       try {
583         httpClient.close();
584         if (response != null) {
585           response.close();
586         }
587       } catch (IOException e) {
588       }
589     }
590   }
591 
592   // Example URL is too long but needs to stay like this
593   // CHECKSTYLE:OFF
594   /**
595    * From: https://cloud.ibm.com/apidocs/speech-to-text#create-a-job:
596    *
597    * curl -X POST -u "apikey:{apikey}" --header "Content-Type: audio/flac" --data-binary @audio-file.flac
598    * "https://stream.watsonplatform.net/speech-to-text/api/v1/recognitions?callback_url=http://{user_callback_path}/job_results&user_token=job25&timestamps=true"
599    *
600    * Response: { "id": "4bd734c0-e575-21f3-de03-f932aa0468a0", "status": "waiting", "url":
601    * "http://stream.watsonplatform.net/speech-to-text/api/v1/recognitions/4bd734c0-e575-21f3-de03-f932aa0468a0" }
602    */
603   // CHECKSTYLE:ON
604   void createRecognitionsJob(String mpId, Track track) throws TranscriptionServiceException {
605     if (!callbackAlreadyRegistered) {
606       registerCallback();
607     }
608 
609     // Get audio track file
610     File audioFile = null;
611     try {
612       audioFile = workspace.get(track.getURI());
613     } catch (Exception e) {
614       throw new TranscriptionServiceException("Error reading audio track", e);
615     }
616 
617     CloseableHttpClient httpClient = makeHttpClient();
618     String additionalParms = "";
619     if (callbackAlreadyRegistered) {
620       additionalParms = String.format("&user_token=%s&callback_url=%s&events=%s,%s", mpId, callbackUrl,
621               JobEvent.COMPLETED_WITH_RESULTS, JobEvent.FAILED);
622     }
623     if (!StringUtils.isEmpty(model)) {
624       additionalParms += String.format("&model=%s", model);
625     }
626     // Add AuthCache to the execution context for preemptive auth
627     HttpClientContext context = HttpClientContext.create();
628     context.setCredentialsProvider(credentialsProvider);
629     context.setAuthCache(authCache);
630     CloseableHttpResponse response = null;
631     try {
632       HttpPost httpPost = new HttpPost(UrlSupport.concat(watsonServiceUrl, RECOGNITIONS)
633               + String.format(
634                       "?inactivity_timeout=-1&timestamps=true&smart_formatting=true%s", additionalParms));
635       logger.debug("Url to invoke ibm watson service: {}", httpPost.getURI().toString());
636       httpPost.setHeader(HttpHeaders.CONTENT_TYPE, track.getMimeType().toString());
637       FileEntity fileEntity = new FileEntity(audioFile);
638       fileEntity.setChunked(true);
639       httpPost.setEntity(fileEntity);
640       response = httpClient.execute(httpPost, context);
641       int code = response.getStatusLine().getStatusCode();
642 
643       switch (code) {
644         case HttpStatus.SC_CREATED: // 201
645           logger.info("Recognitions job has been successfully created");
646 
647           HttpEntity entity = response.getEntity();
648           // Response returned is a json object:
649           // {
650           // "id": "4bd734c0-e575-21f3-de03-f932aa0468a0",
651           // "status": "waiting",
652           // "url":
653           // "http://stream.watsonplatform.net/speech-to-text/api/v1/recognitions/4bd734c0-e575-21f3-de03-f932aa0468a0"
654           // }
655           String jsonString = EntityUtils.toString(response.getEntity());
656           JSONParser jsonParser = new JSONParser();
657           JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
658           String jobId = (String) jsonObject.get("id");
659           String jobStatus = (String) jsonObject.get("status");
660           String jobUrl = (String) jsonObject.get("url");
661           logger.info("Transcription for mp {} has been submitted. Job id: {}, job status: {}, job url: {}", mpId,
662                   jobId, jobStatus, jobUrl);
663 
664           database.storeJobControl(mpId, track.getIdentifier(), jobId, TranscriptionJobControl.Status.InProgress.name(),
665                   track.getDuration() == null ? 0 : track.getDuration().longValue(), null, PROVIDER);
666           EntityUtils.consume(entity);
667           return;
668         case HttpStatus.SC_BAD_REQUEST: // 400
669           logger.info("Invalid argument returned, status: {}", code);
670           break;
671         case HttpStatus.SC_SERVICE_UNAVAILABLE: // 503
672           logger.info("Service unavailable returned, status: {}", code);
673           break;
674         default:
675           logger.info("Unknown return status: {}.", code);
676           break;
677       }
678       throw new TranscriptionServiceException("Could not create recognition job. Status returned: " + code);
679     } catch (Exception e) {
680       logger.warn("Exception when calling the recognitions endpoint", e);
681       throw new TranscriptionServiceException("Exception when calling the recognitions endpoint", e);
682     } finally {
683       try {
684         httpClient.close();
685         if (response != null) {
686           response.close();
687         }
688       } catch (IOException e) {
689       }
690     }
691   }
692 
693   /**
694    * From: https://cloud.ibm.com/apidocs/speech-to-text#check-a-job:
695    *
696    * curl -X GET -u "apikey:{apikey}" "https://stream.watsonplatform.net/speech-to-text/api/v1/recognitions/{id}"
697    *
698    * Response: { "results": [ { "result_index": 0, "results": [ { "final": true, "alternatives": [ { "transcript":
699    * "several tornadoes touch down as a line of severe thunderstorms swept through Colorado on Sunday ", "timestamps": [
700    * [ "several", 1, 1.52 ], [ "tornadoes", 1.52, 2.15 ], . . . [ "Sunday", 5.74, 6.33 ] ], "confidence": 0.885 } ] } ]
701    * } ], "created": "2016-08-17T19:11:04.298Z", "updated": "2016-08-17T19:11:16.003Z", "status": "completed" }
702    */
703   String getAndSaveJobResults(String jobId) throws TranscriptionServiceException {
704     CloseableHttpClient httpClient = makeHttpClient();
705     // Add AuthCache to the execution context for preemptive auth
706     HttpClientContext context = HttpClientContext.create();
707     context.setCredentialsProvider(credentialsProvider);
708     context.setAuthCache(authCache);
709     CloseableHttpResponse response = null;
710     String mpId = "unknown";
711     try {
712       HttpGet httpGet = new HttpGet(UrlSupport.concat(watsonServiceUrl, RECOGNITIONS, jobId));
713       response = httpClient.execute(httpGet, context);
714       int code = response.getStatusLine().getStatusCode();
715 
716       switch (code) {
717         case HttpStatus.SC_OK: // 200
718           HttpEntity entity = response.getEntity();
719 
720           // Response returned is a json object described above
721           String jsonString = EntityUtils.toString(entity);
722           JSONParser jsonParser = new JSONParser();
723           JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
724           String jobStatus = (String) jsonObject.get("status");
725           mpId = (String) jsonObject.get("user_token");
726           // user_token doesn't come back if this is not in the context of a callback so get the mpId from the db
727           if (mpId == null) {
728             TranscriptionJobControl jc = database.findByJob(jobId);
729             if (jc != null) {
730               mpId = jc.getMediaPackageId();
731             }
732           }
733           logger.info("Recognitions job {} has been found, status {}", jobId, jobStatus);
734           EntityUtils.consume(entity);
735 
736           if (jobStatus.indexOf(RecognitionJobStatus.COMPLETED) > -1 && jsonObject.get("results") != null) {
737             transcriptionDone(mpId, jsonObject);
738           }
739           return jobStatus;
740         case HttpStatus.SC_NOT_FOUND: // 404
741           logger.info("Job not found: {}", jobId);
742           break;
743         case HttpStatus.SC_SERVICE_UNAVAILABLE: // 503
744           logger.info("Service unavailable returned, status: {}", code);
745           break;
746         default:
747           logger.info("Unknown return status: {}.", code);
748           break;
749       }
750       throw new TranscriptionServiceException(
751               String.format("Could not check recognition job for media package %s, job id %s. Status returned: %d",
752                       mpId, jobId, code),
753               code);
754     } catch (TranscriptionServiceException e) {
755       throw e;
756     } catch (Exception e) {
757       logger.warn("Exception when calling the recognitions endpoint for media package {}, job id {}",
758               mpId, jobId, e);
759       throw new TranscriptionServiceException(String.format(
760               "Exception when calling the recognitions endpoint for media package %s, job id %s", mpId, jobId), e);
761     } finally {
762       try {
763         httpClient.close();
764         if (response != null) {
765           response.close();
766         }
767       } catch (IOException e) {
768       }
769     }
770   }
771 
772   private void saveResults(String jobId, JSONObject jsonObj) throws IOException {
773     if (jsonObj.get("results") != null) {
774       // Save the results into a collection
775       workspace.putInCollection(TRANSCRIPT_COLLECTION, jobId + ".json",
776               new ByteArrayInputStream(jsonObj.toJSONString().getBytes()));
777     }
778   }
779 
780   @Override
781   public MediaPackageElement getGeneratedTranscription(String mpId, String jobId, MediaPackageElement.Type type)
782           throws TranscriptionServiceException {
783     try {
784       // If jobId is unknown, look for all jobs associated to that mpId
785       if (jobId == null || "null".equals(jobId)) {
786         jobId = null;
787         for (TranscriptionJobControl jc : database.findByMediaPackage(mpId)) {
788           if (TranscriptionJobControl.Status.Closed.name().equals(jc.getStatus())
789                   || TranscriptionJobControl.Status.TranscriptionComplete.name().equals(jc.getStatus())) {
790             jobId = jc.getTranscriptionJobId();
791           }
792         }
793       }
794 
795       if (jobId == null) {
796         throw new TranscriptionServiceException(
797                 "No completed or closed transcription job found in database for media package " + mpId);
798       }
799 
800       // Results already saved?
801       URI uri = workspace.getCollectionURI(TRANSCRIPT_COLLECTION, jobId + ".json");
802       try {
803         workspace.get(uri);
804       } catch (Exception e) {
805         // Not saved yet so call the ibm watson service to get the results
806         getAndSaveJobResults(jobId);
807       }
808       MediaPackageElementBuilder builder = MediaPackageElementBuilderFactory.newInstance().newElementBuilder();
809       return builder.elementFromURI(uri, type, new MediaPackageElementFlavor("captions", "ibm-watson-json"));
810     } catch (TranscriptionDatabaseException e) {
811       throw new TranscriptionServiceException("Job id not informed and could not find transcription", e);
812     }
813   }
814 
815   protected CloseableHttpClient makeHttpClient() {
816     RequestConfig reqConfig = RequestConfig.custom().setConnectTimeout(CONNECTION_TIMEOUT)
817             .setSocketTimeout(SOCKET_TIMEOUT).setConnectionRequestTimeout(CONNECTION_TIMEOUT).build();
818     return HttpClients.custom().setDefaultRequestConfig(reqConfig)
819             .setRetryHandler(new DefaultHttpRequestRetryHandler(3, true)).build();
820   }
821 
822   protected void retryOrError(String jobId, String mpId, String errorMsg) throws TranscriptionDatabaseException {
823     logger.warn(errorMsg);
824 
825     // TranscriptionJobControl.Status status
826     TranscriptionJobControl jc = database.findByJob(jobId);
827     String trackId = jc.getTrackId();
828     // Current job is still in progress state
829     int attempts = database
830             .findByMediaPackageTrackAndStatus(mpId, trackId, TranscriptionJobControl.Status.Error.name(),
831                     TranscriptionJobControl.Status.InProgress.name(), TranscriptionJobControl.Status.Canceled.name())
832             .size();
833     if (attempts < maxAttempts) {
834       // Update state in database to retry
835       database.updateJobControl(jobId, TranscriptionJobControl.Status.Retry.name());
836       logger.info("Will retry transcription for media package {}, track {}", mpId, trackId);
837     } else {
838       // Update state in database to error
839       database.updateJobControl(jobId, TranscriptionJobControl.Status.Error.name());
840       // Send error notification email
841       logger.error("{} transcription attempts exceeded maximum of {} for media package {}, track {}.", attempts,
842               maxAttempts, mpId, trackId);
843       sendEmail("Transcription ERROR", String.format(errorMsg, mpId, jobId));
844     }
845   }
846 
847   private void sendEmail(String subject, String body) {
848     if (toEmailAddress == null) {
849       logger.info("Skipping sending email notification. Message is {}.", body);
850       return;
851     }
852     try {
853       logger.debug("Sending e-mail notification to {}", toEmailAddress);
854       smtpService.send(toEmailAddress, String.format("%s (%s)", subject, clusterName), body);
855       logger.info("Sent e-mail notification to {}", toEmailAddress);
856     } catch (Exception e) {
857       logger.error("Could not send email: {}\n{}", subject, body, e);
858     }
859   }
860 
861   public boolean isCallbackAlreadyRegistered() {
862     return callbackAlreadyRegistered;
863   }
864 
865   @Reference
866   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
867     this.serviceRegistry = serviceRegistry;
868   }
869 
870   @Reference
871   public void setSecurityService(SecurityService securityService) {
872     this.securityService = securityService;
873   }
874 
875   @Reference
876   public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
877     this.userDirectoryService = userDirectoryService;
878   }
879 
880   @Reference
881   public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
882     this.organizationDirectoryService = organizationDirectoryService;
883   }
884 
885   @Reference
886   public void setSmtpService(SmtpService service) {
887     this.smtpService = service;
888   }
889 
890   @Reference
891   public void setWorkspace(Workspace ws) {
892     this.workspace = ws;
893   }
894 
895   @Reference
896   public void setWorkingFileRepository(WorkingFileRepository wfr) {
897     this.wfr = wfr;
898   }
899 
900   @Reference
901   public void setDatabase(TranscriptionDatabase service) {
902     this.database = service;
903   }
904 
905   @Reference
906   public void setAssetManager(AssetManager service) {
907     this.assetManager = service;
908   }
909 
910   @Reference
911   public void setWorkflowService(WorkflowService service) {
912     this.workflowService = service;
913   }
914 
915   @Override
916   protected ServiceRegistry getServiceRegistry() {
917     return serviceRegistry;
918   }
919 
920   @Override
921   protected SecurityService getSecurityService() {
922     return securityService;
923   }
924 
925   @Override
926   protected UserDirectoryService getUserDirectoryService() {
927     return userDirectoryService;
928   }
929 
930   @Override
931   protected OrganizationDirectoryService getOrganizationDirectoryService() {
932     return organizationDirectoryService;
933   }
934 
935   // Only used by unit tests!
936   void setWfUtil(Workflows wfUtil) {
937     this.wfUtil = wfUtil;
938   }
939 
940   class WorkflowDispatcher implements Runnable {
941 
942     /**
943      * {@inheritDoc}
944      *
945      * @see java.lang.Thread#run()
946      */
947     @Override
948     public void run() {
949       logger.debug("WorkflowDispatcher waking up...");
950 
951       try {
952         // Find jobs that are in progress and jobs that had transcription complete i.e. got the callback
953 
954         long providerId;
955         TranscriptionProviderControl providerInfo = database.findIdByProvider(PROVIDER);
956         if (providerInfo != null) {
957           providerId = providerInfo.getId();
958         } else {
959           logger.warn("No provider entry for {}", PROVIDER);
960           return;
961         }
962 
963         List<TranscriptionJobControl> jobs = database.findByStatus(TranscriptionJobControl.Status.InProgress.name(),
964                 TranscriptionJobControl.Status.TranscriptionComplete.name());
965 
966         for (TranscriptionJobControl j : jobs) {
967 
968           // Don't process jobs for other services
969           if (j.getProviderId() != providerId) {
970             continue;
971           }
972 
973           String mpId = j.getMediaPackageId();
974           String jobId = j.getTranscriptionJobId();
975 
976           // If the job in progress, check if it should already have finished and we didn't get the callback for some
977           // reason. This can happen if the admin server was offline when the callback came.
978           if (TranscriptionJobControl.Status.InProgress.name().equals(j.getStatus())) {
979             // If job should already have been completed, try to get the results. Consider a buffer factor so that we
980             // don't try it too early.
981             if (j.getDateCreated().getTime() + j.getTrackDuration() + completionCheckBuffer * 1000 < System
982                     .currentTimeMillis()) {
983               try {
984                 String jobStatus = getAndSaveJobResults(jobId);
985                 if (RecognitionJobStatus.FAILED.equals(jobStatus)) {
986                   retryOrError(jobId, mpId,
987                           String.format("Transcription job failed for mpId %s, jobId %s", mpId, jobId));
988                   continue;
989                 } else if (RecognitionJobStatus.PROCESSING.equals(jobStatus)
990                         || RecognitionJobStatus.WAITING.equals(jobStatus)) {
991                   // Job still waiting/running so check if it should have finished more than N seconds ago
992                   if (j.getDateCreated().getTime() + j.getTrackDuration()
993                           + (completionCheckBuffer + maxProcessingSeconds) * 1000 < System.currentTimeMillis()) {
994                     // Processing for too long, mark job as error or retry and don't check anymore
995                     retryOrError(jobId, mpId, String.format(
996                             "Transcription job was in waiting or processing state for too long "
997                                 + "(media package %s, job id %s)", mpId, jobId));
998                   }
999                   // else job still running, not finished
1000                   continue;
1001                 }
1002               } catch (TranscriptionServiceException e) {
1003                 if (e.getCode() == 404) {
1004                   // Job not found there, update job state to canceled
1005                   database.updateJobControl(jobId, TranscriptionJobControl.Status.Canceled.name());
1006                   // Send notification email
1007                   sendEmail("Transcription ERROR",
1008                           String.format("Transcription job was not found (media package %s, job id %s).", mpId, jobId));
1009                 }
1010                 continue; // Skip this one, exception was already logged
1011               }
1012             } else {
1013               continue; // Not time to check yet
1014             }
1015           }
1016 
1017           // Jobs that get here have state TranscriptionCompleted.
1018           try {
1019             // Apply workflow to attach transcripts
1020             Map<String, String> params = new HashMap<String, String>();
1021             params.put("transcriptionJobId", jobId);
1022             String wfId = startWorkflow(mpId, workflowDefinitionId, params);
1023             if (wfId == null) {
1024               logger.warn("Attach transcription workflow could NOT be scheduled for mp {}, watson job {}", mpId, jobId);
1025               continue;
1026             }
1027             // Update state in the database
1028             database.updateJobControl(jobId, TranscriptionJobControl.Status.Closed.name());
1029             logger.info("Attach transcription workflow {} scheduled for mp {}, watson job {}",
1030                     wfId, mpId, jobId);
1031           } catch (Exception e) {
1032             logger.warn(
1033                 "Attach transcription workflow could NOT be scheduled for media package {}, watson job {}, {}: {}",
1034                 mpId, jobId, e.getClass().getName(), e.getMessage()
1035             );
1036           }
1037         }
1038 
1039         if (maxAttempts > 1) {
1040           // Find jobs that need to be re-submitted
1041           jobs = database.findByStatus(TranscriptionJobControl.Status.Retry.name());
1042           HashMap<String, String> params = new HashMap<String, String>();
1043           for (TranscriptionJobControl j : jobs) {
1044             String mpId = j.getMediaPackageId();
1045             String wfId = startWorkflow(mpId, retryWfDefId, params);
1046             String jobId = j.getTranscriptionJobId();
1047             if (wfId == null) {
1048               logger.warn(
1049                   "Retry transcription workflow could NOT be scheduled for mp {}, watson job {}. "
1050                       + "Will try again next time.",
1051                   mpId, jobId);
1052               // Will try again next time
1053               continue;
1054             }
1055             logger.info("Retry transcription workflow {} scheduled for mp {}.", wfId, mpId);
1056             // Retry was submitted, update previously failed job state to error
1057             database.updateJobControl(jobId, TranscriptionJobControl.Status.Error.name());
1058           }
1059         }
1060       } catch (TranscriptionDatabaseException e) {
1061         logger.warn("Could not read/update transcription job control database.", e);
1062       }
1063     }
1064   }
1065 
1066   private String startWorkflow(String mpId, String wfDefId, Map<String, String> params) {
1067     DefaultOrganization defaultOrg = new DefaultOrganization();
1068     securityService.setOrganization(defaultOrg);
1069     securityService.setUser(SecurityUtil.createSystemUser(systemAccount, defaultOrg));
1070 
1071     // Find the episode
1072     final AQueryBuilder q = assetManager.createQuery();
1073     final AResult r = q.select(q.snapshot()).where(q.mediaPackageId(mpId).and(q.version().isLatest())).run();
1074     if (r.getSize() == 0) {
1075       // Media package not archived yet.
1076       logger.warn("Media package {} has not been archived yet.", mpId);
1077       return null;
1078     }
1079 
1080     String org = Enrichments.enrich(r).getSnapshots().stream().findFirst().get().getOrganizationId();
1081     Organization organization = null;
1082     try {
1083       organization = organizationDirectoryService.getOrganization(org);
1084       if (organization == null) {
1085         logger.warn("Media package {} has an unknown organization {}.", mpId, org);
1086         return null;
1087       }
1088     } catch (NotFoundException e) {
1089       logger.warn("Organization {} not found for media package {}.", org, mpId);
1090       return null;
1091     }
1092     securityService.setOrganization(organization);
1093 
1094     try {
1095       WorkflowDefinition wfDef = workflowService.getWorkflowDefinitionById(wfDefId);
1096       Workflows workflows = wfUtil != null ? wfUtil : new Workflows(assetManager, workflowService);
1097       Set<String> mpIds = new HashSet<String>();
1098       mpIds.add(mpId);
1099       List<WorkflowInstance> wfList = workflows
1100               .applyWorkflowToLatestVersion(mpIds, ConfiguredWorkflow.workflow(wfDef, params)).toList();
1101       return wfList.size() > 0 ? Long.toString(wfList.get(0).getId()) : null;
1102     } catch (NotFoundException | WorkflowDatabaseException e) {
1103       logger.warn("Could not get workflow definition: {}", wfDefId);
1104     }
1105 
1106     return null;
1107   }
1108 
1109   /**
1110    * Allow transcription service to be disabled via config Utility to verify service is active
1111    *
1112    * @return true if service is enabled, false if service should be skipped
1113    */
1114   public boolean isEnabled() {
1115     return enabled;
1116   }
1117 
1118   class ResultsFileCleanup implements Runnable {
1119     @Override
1120     public void run() {
1121       logger.info("ResultsFileCleanup waking up...");
1122       try {
1123         // Cleans up results files older than CLEANUP_RESULT_FILES_DAYS days
1124         wfr.cleanupOldFilesFromCollection(TRANSCRIPT_COLLECTION, cleanupResultDays);
1125       } catch (IOException e) {
1126         logger.warn("Could not cleanup old transcript results files", e);
1127       }
1128     }
1129   }
1130 
1131 }