View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.transcription.ibmwatson;
22  
23  import static org.opencastproject.systems.OpencastConstants.ADMIN_EMAIL_PROPERTY;
24  
25  import org.opencastproject.assetmanager.api.AssetManager;
26  import org.opencastproject.assetmanager.api.Snapshot;
27  import org.opencastproject.assetmanager.util.Workflows;
28  import org.opencastproject.job.api.AbstractJobProducer;
29  import org.opencastproject.job.api.Job;
30  import org.opencastproject.kernel.mail.SmtpService;
31  import org.opencastproject.mediapackage.MediaPackageElement;
32  import org.opencastproject.mediapackage.MediaPackageElementBuilder;
33  import org.opencastproject.mediapackage.MediaPackageElementBuilderFactory;
34  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
35  import org.opencastproject.mediapackage.MediaPackageElementParser;
36  import org.opencastproject.mediapackage.MediaPackageException;
37  import org.opencastproject.mediapackage.Track;
38  import org.opencastproject.security.api.DefaultOrganization;
39  import org.opencastproject.security.api.Organization;
40  import org.opencastproject.security.api.OrganizationDirectoryService;
41  import org.opencastproject.security.api.SecurityService;
42  import org.opencastproject.security.api.UserDirectoryService;
43  import org.opencastproject.security.util.SecurityUtil;
44  import org.opencastproject.serviceregistry.api.ServiceRegistry;
45  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
46  import org.opencastproject.systems.OpencastConstants;
47  import org.opencastproject.transcription.api.TranscriptionService;
48  import org.opencastproject.transcription.api.TranscriptionServiceException;
49  import org.opencastproject.transcription.persistence.TranscriptionDatabase;
50  import org.opencastproject.transcription.persistence.TranscriptionDatabaseException;
51  import org.opencastproject.transcription.persistence.TranscriptionJobControl;
52  import org.opencastproject.transcription.persistence.TranscriptionProviderControl;
53  import org.opencastproject.util.LoadUtil;
54  import org.opencastproject.util.NotFoundException;
55  import org.opencastproject.util.OsgiUtil;
56  import org.opencastproject.util.UrlSupport;
57  import org.opencastproject.workflow.api.ConfiguredWorkflow;
58  import org.opencastproject.workflow.api.WorkflowDatabaseException;
59  import org.opencastproject.workflow.api.WorkflowDefinition;
60  import org.opencastproject.workflow.api.WorkflowInstance;
61  import org.opencastproject.workflow.api.WorkflowService;
62  import org.opencastproject.workingfilerepository.api.WorkingFileRepository;
63  import org.opencastproject.workspace.api.Workspace;
64  
65  import org.apache.commons.lang3.StringUtils;
66  import org.apache.http.HttpEntity;
67  import org.apache.http.HttpHeaders;
68  import org.apache.http.HttpHost;
69  import org.apache.http.HttpStatus;
70  import org.apache.http.auth.AuthScope;
71  import org.apache.http.auth.UsernamePasswordCredentials;
72  import org.apache.http.client.AuthCache;
73  import org.apache.http.client.CredentialsProvider;
74  import org.apache.http.client.config.RequestConfig;
75  import org.apache.http.client.methods.CloseableHttpResponse;
76  import org.apache.http.client.methods.HttpGet;
77  import org.apache.http.client.methods.HttpPost;
78  import org.apache.http.client.protocol.HttpClientContext;
79  import org.apache.http.entity.FileEntity;
80  import org.apache.http.impl.auth.BasicScheme;
81  import org.apache.http.impl.client.BasicAuthCache;
82  import org.apache.http.impl.client.BasicCredentialsProvider;
83  import org.apache.http.impl.client.CloseableHttpClient;
84  import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
85  import org.apache.http.impl.client.HttpClients;
86  import org.apache.http.util.EntityUtils;
87  import org.json.simple.JSONArray;
88  import org.json.simple.JSONObject;
89  import org.json.simple.parser.JSONParser;
90  import org.osgi.service.component.ComponentContext;
91  import org.osgi.service.component.annotations.Activate;
92  import org.osgi.service.component.annotations.Component;
93  import org.osgi.service.component.annotations.Reference;
94  import org.slf4j.Logger;
95  import org.slf4j.LoggerFactory;
96  
97  import java.io.ByteArrayInputStream;
98  import java.io.File;
99  import java.io.IOException;
100 import java.net.URI;
101 import java.net.URISyntaxException;
102 import java.util.Arrays;
103 import java.util.HashMap;
104 import java.util.HashSet;
105 import java.util.List;
106 import java.util.Map;
107 import java.util.Optional;
108 import java.util.Set;
109 import java.util.concurrent.Executors;
110 import java.util.concurrent.ScheduledExecutorService;
111 import java.util.concurrent.TimeUnit;
112 
113 @Component(
114     immediate = true,
115     service = { TranscriptionService.class,IBMWatsonTranscriptionService.class },
116     property = {
117         "service.description=IBM Watson Transcription Service",
118         "provider=ibm.watson"
119     }
120 )
121 public class IBMWatsonTranscriptionService extends AbstractJobProducer implements TranscriptionService {
122 
123   /** The logger */
124   private static final Logger logger = LoggerFactory.getLogger(IBMWatsonTranscriptionService.class);
125 
126   private static final String PROVIDER = "IBM Watson";
127 
128   private static final String JOB_TYPE = "org.opencastproject.transcription.ibmwatson";
129 
130   static final String TRANSCRIPT_COLLECTION = "transcripts";
131   static final String APIKEY = "apikey";
132   private static final int CONNECTION_TIMEOUT = 60000; // ms, 1 minute
133   private static final int SOCKET_TIMEOUT = 60000; // ms, 1 minute
134   // Default wf to attach transcription results to mp
135   public static final String DEFAULT_WF_DEF = "attach-watson-transcripts";
136   private static final long DEFAULT_COMPLETION_BUFFER = 600; // in seconds, default is 10 minutes
137   private static final long DEFAULT_DISPATCH_INTERVAL = 60; // in seconds, default is 1 minute
138   private static final long DEFAULT_MAX_PROCESSING_TIME = 2 * 60 * 60; // in seconds, default is 2 hours
139   private static final String DEFAULT_LANGUAGE = "en";
140   // Cleans up results files that are older than 7 days (which is how long the IBM watson
141   // speech-to-text-service keeps the jobs by default)
142   private static final int DEFAULT_CLEANUP_RESULTS_DAYS = 7;
143 
144   // Global configuration (custom.properties)
145   public static final String ADMIN_URL_PROPERTY = "org.opencastproject.admin.ui.url";
146   private static final String DIGEST_USER_PROPERTY = "org.opencastproject.security.digest.user";
147 
148   // Cluster name
149   private static final String CLUSTER_NAME_PROPERTY = "org.opencastproject.environment.name";
150   private String clusterName = "";
151 
152   /** The load introduced on the system by creating a transcription job */
153   public static final float DEFAULT_START_TRANSCRIPTION_JOB_LOAD = 0.1f;
154 
155   /** The key to look for in the service configuration file to override the default h=job load */
156   public static final String START_TRANSCRIPTION_JOB_LOAD_KEY = "job.load.start.transcription";
157 
158   /** The load introduced on the system by creating a caption job */
159   private float jobLoad = DEFAULT_START_TRANSCRIPTION_JOB_LOAD;
160 
161   // The events we are interested in receiving notifications
162   public interface JobEvent {
163     String COMPLETED_WITH_RESULTS = "recognitions.completed_with_results";
164     String FAILED = "recognitions.failed";
165   }
166 
167   public interface RecognitionJobStatus {
168     String COMPLETED = "completed";
169     String FAILED = "failed";
170     String PROCESSING = "processing";
171     String WAITING = "waiting";
172   }
173 
174   /** Service dependencies */
175   private ServiceRegistry serviceRegistry;
176   private SecurityService securityService;
177   private UserDirectoryService userDirectoryService;
178   private OrganizationDirectoryService organizationDirectoryService;
179   private Workspace workspace;
180   private TranscriptionDatabase database;
181   private AssetManager assetManager;
182   private WorkflowService workflowService;
183   private WorkingFileRepository wfr;
184   private SmtpService smtpService;
185 
186   // Only used by unit tests!
187   private Workflows wfUtil;
188 
189   private enum Operation {
190     StartTranscription
191   }
192 
193   private static final String IBM_WATSON_SERVICE_URL = "https://stream.watsonplatform.net/speech-to-text/api";
194   private static final String API_VERSION = "v1";
195   private static final String REGISTER_CALLBACK = "register_callback";
196   private static final String RECOGNITIONS = "recognitions";
197   private static final String CALLBACK_PATH = "/transcripts/watson/results";
198 
199   /** Service configuration options */
200   public static final String ENABLED_CONFIG = "enabled";
201   public static final String IBM_WATSON_SERVICE_URL_CONFIG = "ibm.watson.service.url";
202   public static final String IBM_WATSON_USER_CONFIG = "ibm.watson.user";
203   public static final String IBM_WATSON_PSW_CONFIG = "ibm.watson.password";
204   public static final String IBM_WATSON_API_KEY_CONFIG = "ibm.watson.api.key";
205   public static final String IBM_WATSON_MODEL_CONFIG = "ibm.watson.model";
206   public static final String WORKFLOW_CONFIG = "workflow";
207   public static final String DISPATCH_WORKFLOW_INTERVAL_CONFIG = "workflow.dispatch.interval";
208   public static final String COMPLETION_CHECK_BUFFER_CONFIG = "completion.check.buffer";
209   public static final String MAX_PROCESSING_TIME_CONFIG = "max.processing.time";
210   public static final String NOTIFICATION_EMAIL_CONFIG = "notification.email";
211   public static final String CLEANUP_RESULTS_DAYS_CONFIG = "cleanup.results.days";
212   public static final String MAX_ATTEMPTS_CONFIG = "max.attempts";
213   public static final String RETRY_WORKLFOW_CONFIG = "retry.workflow";
214 
215   /** Service configuration values */
216   private boolean enabled = false; // Disabled by default
217   private String watsonServiceUrl = UrlSupport.concat(IBM_WATSON_SERVICE_URL, API_VERSION);
218   private String model;
219   private String workflowDefinitionId = DEFAULT_WF_DEF;
220   private long workflowDispatchInterval = DEFAULT_DISPATCH_INTERVAL;
221   private long completionCheckBuffer = DEFAULT_COMPLETION_BUFFER;
222   private long maxProcessingSeconds = DEFAULT_MAX_PROCESSING_TIME;
223   private String toEmailAddress;
224   private int cleanupResultDays = DEFAULT_CLEANUP_RESULTS_DAYS;
225   private String language = DEFAULT_LANGUAGE;
226   private int maxAttempts = 1;
227   private String retryWfDefId = null;
228 
229   private String systemAccount;
230   private String serverUrl;
231   private String callbackUrl;
232   private boolean callbackAlreadyRegistered = false;
233   private ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);
234   // For preemptive basic authentication
235   private AuthCache authCache;
236   private CredentialsProvider credentialsProvider;
237 
238   public IBMWatsonTranscriptionService() {
239     super(JOB_TYPE);
240   }
241 
242   @Override
243   @Activate
244   public void activate(ComponentContext cc) {
245     if (cc != null) {
246       // Has this service been enabled?
247       enabled = OsgiUtil.getOptCfgAsBoolean(cc.getProperties(), ENABLED_CONFIG).get();
248 
249       if (enabled) {
250         // Service url (optional)
251         Optional<String> urlOpt = OsgiUtil.getOptCfg(cc.getProperties(), IBM_WATSON_SERVICE_URL_CONFIG);
252         if (urlOpt.isPresent()) {
253           watsonServiceUrl = UrlSupport.concat(urlOpt.get(), API_VERSION);
254         }
255 
256         // Api key is checked first. If not entered, user and password are mandatory (to
257         // support older instances of the STT service)
258         String user; // user name or 'apikey'
259         String psw; // user password or api key
260         Optional<String> keyOpt = OsgiUtil.getOptCfg(cc.getProperties(), IBM_WATSON_API_KEY_CONFIG);
261         if (keyOpt.isPresent()) {
262           user = APIKEY;
263           psw = keyOpt.get();
264           logger.info("Using transcription service at {} with api key", watsonServiceUrl);
265         } else {
266           // User name (mandatory if api key is empty)
267           user = OsgiUtil.getComponentContextProperty(cc, IBM_WATSON_USER_CONFIG);
268           // Password (mandatory if api key is empty)
269           psw = OsgiUtil.getComponentContextProperty(cc, IBM_WATSON_PSW_CONFIG);
270           logger.info("Using transcription service at {} with username {}", watsonServiceUrl, user);
271         }
272         // We will use preemptive basic auth
273         try {
274           URI uri = new URI(watsonServiceUrl);
275           HttpHost targetHost = new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme());
276           credentialsProvider = new BasicCredentialsProvider();
277           credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, psw));
278           authCache = new BasicAuthCache();
279           authCache.put(targetHost, new BasicScheme());
280         } catch (URISyntaxException e) {
281           throw new RuntimeException("Watson STT service url is not valid: " + watsonServiceUrl, e);
282         }
283 
284         // Language model to be used (optional)
285         Optional<String> modelOpt = OsgiUtil.getOptCfg(cc.getProperties(), IBM_WATSON_MODEL_CONFIG);
286         if (modelOpt.isPresent()) {
287           model = modelOpt.get();
288           language = StringUtils.substringBefore(model, "-");
289           logger.info("Model is {}", model);
290         } else {
291           logger.info("Default model will be used");
292         }
293 
294         // Workflow to execute when getting callback (optional, with default)
295         Optional<String> wfOpt = OsgiUtil.getOptCfg(cc.getProperties(), WORKFLOW_CONFIG);
296         if (wfOpt.isPresent()) {
297           workflowDefinitionId = wfOpt.get();
298         }
299         logger.info("Workflow definition is {}", workflowDefinitionId);
300         // Interval to check for completed transcription jobs and start workflows to attach transcripts
301         Optional<String> intervalOpt = OsgiUtil.getOptCfg(cc.getProperties(), DISPATCH_WORKFLOW_INTERVAL_CONFIG);
302         if (intervalOpt.isPresent()) {
303           try {
304             workflowDispatchInterval = Long.parseLong(intervalOpt.get());
305           } catch (NumberFormatException e) {
306             // Use default
307           }
308         }
309         logger.info("Workflow dispatch interval is {} seconds", workflowDispatchInterval);
310         // How long to wait after a transcription is supposed to finish before starting checking
311         Optional<String> bufferOpt = OsgiUtil.getOptCfg(cc.getProperties(), COMPLETION_CHECK_BUFFER_CONFIG);
312         if (bufferOpt.isPresent()) {
313           try {
314             completionCheckBuffer = Long.parseLong(bufferOpt.get());
315           } catch (NumberFormatException e) {
316             // Use default
317             logger.warn("Invalid configuration for {} : {}. Default used instead: {}",
318                     COMPLETION_CHECK_BUFFER_CONFIG, bufferOpt.get(), completionCheckBuffer);
319           }
320         }
321         logger.info("Completion check buffer is {} seconds", completionCheckBuffer);
322         // How long to wait after a transcription is supposed to finish before marking the job as canceled in the db
323         Optional<String> maxProcessingOpt = OsgiUtil.getOptCfg(cc.getProperties(), MAX_PROCESSING_TIME_CONFIG);
324         if (maxProcessingOpt.isPresent()) {
325           try {
326             maxProcessingSeconds = Long.parseLong(maxProcessingOpt.get());
327           } catch (NumberFormatException e) {
328             // Use default
329           }
330         }
331         logger.info("Maximum time a job is checked after it should have ended is {} seconds", maxProcessingSeconds);
332         // How long to keep result files in the working file repository
333         Optional<String> cleaupOpt = OsgiUtil.getOptCfg(cc.getProperties(), CLEANUP_RESULTS_DAYS_CONFIG);
334         if (cleaupOpt.isPresent()) {
335           try {
336             cleanupResultDays = Integer.parseInt(cleaupOpt.get());
337           } catch (NumberFormatException e) {
338             // Use default
339           }
340         }
341         logger.info("Cleanup result files after {} days", cleanupResultDays);
342 
343         // Maximum number of retries if error (optional)
344         Optional<String> maxAttemptsOpt = OsgiUtil.getOptCfg(cc.getProperties(), MAX_ATTEMPTS_CONFIG);
345         if (maxAttemptsOpt.isPresent()) {
346           try {
347             maxAttempts = Integer.parseInt(maxAttemptsOpt.get());
348             retryWfDefId = OsgiUtil.getComponentContextProperty(cc, RETRY_WORKLFOW_CONFIG);
349           } catch (NumberFormatException e) {
350             // Use default
351             logger.warn("Invalid configuration for {} : {}. Default used instead: no retries", MAX_ATTEMPTS_CONFIG,
352                     maxAttemptsOpt.get());
353           }
354         } else {
355           logger.info("No retries in case of errors");
356         }
357 
358         serverUrl = OsgiUtil.getContextProperty(cc, OpencastConstants.SERVER_URL_PROPERTY);
359         systemAccount = OsgiUtil.getContextProperty(cc, DIGEST_USER_PROPERTY);
360 
361         jobLoad = LoadUtil.getConfiguredLoadValue(cc.getProperties(), START_TRANSCRIPTION_JOB_LOAD_KEY,
362                 DEFAULT_START_TRANSCRIPTION_JOB_LOAD, serviceRegistry);
363 
364         // Schedule the workflow dispatching, starting in 2 minutes
365         scheduledExecutor.scheduleWithFixedDelay(new WorkflowDispatcher(), 120, workflowDispatchInterval,
366                 TimeUnit.SECONDS);
367 
368         // Schedule the cleanup of old results jobs from the collection in the wfr once a day
369         scheduledExecutor.scheduleWithFixedDelay(new ResultsFileCleanup(), 1, 1, TimeUnit.DAYS);
370 
371         // Notification email passed in this service configuration?
372         Optional<String> optTo = OsgiUtil.getOptCfg(cc.getProperties(), NOTIFICATION_EMAIL_CONFIG);
373         if (optTo.isPresent()) {
374           toEmailAddress = optTo.get();
375         } else {
376           // Use admin email informed in custom.properties
377           optTo = OsgiUtil.getOptContextProperty(cc, ADMIN_EMAIL_PROPERTY);
378           if (optTo.isPresent()) {
379             toEmailAddress = optTo.get();
380           }
381         }
382         if (toEmailAddress != null) {
383           logger.info("Notification email set to {}", toEmailAddress);
384         } else {
385           logger.warn("Email notification disabled");
386         }
387 
388         Optional<String> optCluster = OsgiUtil.getOptContextProperty(cc, CLUSTER_NAME_PROPERTY);
389         if (optCluster.isPresent()) {
390           clusterName = optCluster.get();
391         }
392         logger.info("Environment name is {}", clusterName);
393 
394         logger.info("Activated!");
395         // Cannot call registerCallback here because of the REST service dependency on this service
396       } else {
397         logger.info("Service disabled. If you want to enable it, please update the service configuration.");
398       }
399     } else {
400       throw new IllegalArgumentException("Missing component context");
401     }
402   }
403 
404   @Override
405   public Job startTranscription(String mpId, Track track) throws TranscriptionServiceException {
406     if (!enabled) {
407       throw new TranscriptionServiceException(
408               "This service is disabled. If you want to enable it, please update the service configuration.");
409     }
410 
411     try {
412       return serviceRegistry.createJob(JOB_TYPE, Operation.StartTranscription.name(),
413               Arrays.asList(mpId, MediaPackageElementParser.getAsXml(track)), jobLoad);
414     } catch (ServiceRegistryException e) {
415       throw new TranscriptionServiceException("Unable to create a job", e);
416     } catch (MediaPackageException e) {
417       throw new TranscriptionServiceException("Invalid track " + track.toString(), e);
418     }
419   }
420 
421   @Override
422   public Job startTranscription(String mpId, Track track, String... args) {
423     throw new UnsupportedOperationException("Not supported.");
424   }
425 
426   @Override
427   public void transcriptionDone(String mpId, Object obj) throws TranscriptionServiceException {
428     JSONObject jsonObj = null;
429     String jobId = null;
430     try {
431       jsonObj = (JSONObject) obj;
432       jobId = (String) jsonObj.get("id");
433 
434       // Check for errors inside the results object. Sometimes we get a status completed, but
435       // the transcription failed e.g.
436       // curl --header "Content-Type: application/json" --request POST --data
437       // '{"id":"ebeeb546-2e1a-11e9-941d-f349af2d6273",
438       // "results":[{"error":"failed when posting audio to the STT service"}],
439       // "event":"recognitions.completed_with_results",
440       // "user_token":"66c6c9b0-b6a2-4c9a-92c8-55f953ab3d38",
441       // "created":"2019-02-11T05:04:29.283Z"}' http://ADMIN/transcripts/watson/results
442       if (jsonObj.get("results") instanceof JSONArray) {
443         JSONArray resultsArray = (JSONArray) jsonObj.get("results");
444         if (resultsArray != null && resultsArray.size() > 0) {
445           String error = (String) ((JSONObject) resultsArray.get(0)).get("error");
446           if (!StringUtils.isEmpty(error)) {
447             retryOrError(jobId, mpId,
448                 String.format("Transcription completed with error for mpId %s, jobId %s: %s", mpId, jobId, error));
449             return;
450           }
451         }
452       }
453 
454       logger.info("Transcription done for mpId {}, jobId {}", mpId, jobId);
455 
456       // Update state in database
457       // If there's an optimistic lock exception here, it's ok because the workflow dispatcher
458       // may be doing the same thing
459       database.updateJobControl(jobId, TranscriptionJobControl.Status.TranscriptionComplete.name());
460 
461       // Save results in file system if there
462       if (jsonObj.get("results") != null) {
463         saveResults(jobId, jsonObj);
464       }
465     } catch (IOException e) {
466       logger.warn("Could not save transcription results file for mpId {}, jobId {}: {}",
467               mpId, jobId, jsonObj == null ? "null" : jsonObj.toJSONString());
468       throw new TranscriptionServiceException("Could not save transcription results file", e);
469     } catch (TranscriptionDatabaseException e) {
470       logger.warn("Error when updating state in database for mpId {}, jobId {}", mpId, jobId);
471       throw new TranscriptionServiceException("Could not update transcription job control db", e);
472     }
473   }
474 
475   @Override
476   public void transcriptionError(String mpId, Object obj) throws TranscriptionServiceException {
477     JSONObject jsonObj = (JSONObject) obj;
478     String jobId = (String) jsonObj.get("id");
479     try {
480       retryOrError(jobId, mpId, String.format("Transcription error for media package %s, job id %s", mpId, jobId));
481     } catch (TranscriptionDatabaseException e) {
482       throw new TranscriptionServiceException("Error when updating job state.", e);
483     }
484   }
485 
486   @Override
487   public String getLanguage() {
488     return language;
489   }
490 
491   @Override
492   public Map<String, Object> getReturnValues(String mpId, String jobId) throws TranscriptionServiceException {
493     throw new TranscriptionServiceException("Method not implemented");
494   }
495 
496   @Override
497   protected String process(Job job) throws Exception {
498     Operation op = null;
499     String operation = job.getOperation();
500     List<String> arguments = job.getArguments();
501     String result = "";
502 
503     op = Operation.valueOf(operation);
504 
505     switch (op) {
506       case StartTranscription:
507         String mpId = arguments.get(0);
508         Track track = (Track) MediaPackageElementParser.getFromXml(arguments.get(1));
509         createRecognitionsJob(mpId, track);
510         break;
511       default:
512         throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
513     }
514 
515     return result;
516   }
517 
518   // Example URL is too long but needs to stay like this
519   // CHECKSTYLE:OFF
520   /**
521    * Register the callback url with the Speech-to-text service. From:
522    * https://cloud.ibm.com/apidocs/speech-to-text#register-a-callback
523    *
524    * curl -X POST -u "apikey:{apikey}"
525    * "https://stream.watsonplatform.net/speech-to-text/api/v1/register_callback?callback_url=http://{user_callback_path}/job_results&user_secret=ThisIsMySecret"
526    * Response looks like: { "status": "created", "url": "http://{user_callback_path}/results" }
527    */
528   // CHECKSTYLE:ON
529   void registerCallback() throws TranscriptionServiceException {
530     if (callbackAlreadyRegistered) {
531       return;
532     }
533 
534     Organization org = securityService.getOrganization();
535     String adminUrl = StringUtils.trimToNull(org.getProperties().get(ADMIN_URL_PROPERTY));
536     if (adminUrl != null) {
537       callbackUrl = adminUrl + CALLBACK_PATH;
538     } else {
539       callbackUrl = serverUrl + CALLBACK_PATH;
540     }
541     logger.info("Callback url is {}", callbackUrl);
542 
543     CloseableHttpClient httpClient = makeHttpClient();
544     HttpPost httpPost = new HttpPost(
545             UrlSupport.concat(watsonServiceUrl, REGISTER_CALLBACK) + String.format("?callback_url=%s", callbackUrl));
546     // Add AuthCache to the execution context for preemptive auth
547     HttpClientContext context = HttpClientContext.create();
548     context.setCredentialsProvider(credentialsProvider);
549     context.setAuthCache(authCache);
550     CloseableHttpResponse response = null;
551 
552     try {
553       response = httpClient.execute(httpPost, context);
554       int code = response.getStatusLine().getStatusCode();
555 
556       switch (code) {
557         case HttpStatus.SC_OK: // 200
558           logger.info("Callback url: {} had already already been registered", callbackUrl);
559           callbackAlreadyRegistered = true;
560           EntityUtils.consume(response.getEntity());
561           break;
562         case HttpStatus.SC_CREATED: // 201
563           logger.info("Callback url: {} has been successfully registered", callbackUrl);
564           callbackAlreadyRegistered = true;
565           EntityUtils.consume(response.getEntity());
566           break;
567         case HttpStatus.SC_BAD_REQUEST: // 400
568           logger.warn("Callback url {} could not be verified, status: {}", callbackUrl, code);
569           break;
570         case HttpStatus.SC_SERVICE_UNAVAILABLE: // 503
571           logger.warn("Service unavailable when registering callback url {} status: {}", callbackUrl, code);
572           break;
573         default:
574           logger.warn("Unknown status when registering callback url {}, status: {}", callbackUrl, code);
575           break;
576       }
577     } catch (Exception e) {
578       logger.warn("Exception when calling the the register callback endpoint", e);
579     } finally {
580       try {
581         httpClient.close();
582         if (response != null) {
583           response.close();
584         }
585       } catch (IOException e) {
586       }
587     }
588   }
589 
590   // Example URL is too long but needs to stay like this
591   // CHECKSTYLE:OFF
592   /**
593    * From: https://cloud.ibm.com/apidocs/speech-to-text#create-a-job:
594    *
595    * curl -X POST -u "apikey:{apikey}" --header "Content-Type: audio/flac" --data-binary @audio-file.flac
596    * "https://stream.watsonplatform.net/speech-to-text/api/v1/recognitions?callback_url=http://{user_callback_path}/job_results&user_token=job25&timestamps=true"
597    *
598    * Response: { "id": "4bd734c0-e575-21f3-de03-f932aa0468a0", "status": "waiting", "url":
599    * "http://stream.watsonplatform.net/speech-to-text/api/v1/recognitions/4bd734c0-e575-21f3-de03-f932aa0468a0" }
600    */
601   // CHECKSTYLE:ON
602   void createRecognitionsJob(String mpId, Track track) throws TranscriptionServiceException {
603     if (!callbackAlreadyRegistered) {
604       registerCallback();
605     }
606 
607     // Get audio track file
608     File audioFile = null;
609     try {
610       audioFile = workspace.get(track.getURI());
611     } catch (Exception e) {
612       throw new TranscriptionServiceException("Error reading audio track", e);
613     }
614 
615     CloseableHttpClient httpClient = makeHttpClient();
616     String additionalParms = "";
617     if (callbackAlreadyRegistered) {
618       additionalParms = String.format("&user_token=%s&callback_url=%s&events=%s,%s", mpId, callbackUrl,
619               JobEvent.COMPLETED_WITH_RESULTS, JobEvent.FAILED);
620     }
621     if (!StringUtils.isEmpty(model)) {
622       additionalParms += String.format("&model=%s", model);
623     }
624     // Add AuthCache to the execution context for preemptive auth
625     HttpClientContext context = HttpClientContext.create();
626     context.setCredentialsProvider(credentialsProvider);
627     context.setAuthCache(authCache);
628     CloseableHttpResponse response = null;
629     try {
630       HttpPost httpPost = new HttpPost(UrlSupport.concat(watsonServiceUrl, RECOGNITIONS)
631               + String.format(
632                       "?inactivity_timeout=-1&timestamps=true&smart_formatting=true%s", additionalParms));
633       logger.debug("Url to invoke ibm watson service: {}", httpPost.getURI().toString());
634       httpPost.setHeader(HttpHeaders.CONTENT_TYPE, track.getMimeType().toString());
635       FileEntity fileEntity = new FileEntity(audioFile);
636       fileEntity.setChunked(true);
637       httpPost.setEntity(fileEntity);
638       response = httpClient.execute(httpPost, context);
639       int code = response.getStatusLine().getStatusCode();
640 
641       switch (code) {
642         case HttpStatus.SC_CREATED: // 201
643           logger.info("Recognitions job has been successfully created");
644 
645           HttpEntity entity = response.getEntity();
646           // Response returned is a json object:
647           // {
648           // "id": "4bd734c0-e575-21f3-de03-f932aa0468a0",
649           // "status": "waiting",
650           // "url":
651           // "http://stream.watsonplatform.net/speech-to-text/api/v1/recognitions/4bd734c0-e575-21f3-de03-f932aa0468a0"
652           // }
653           String jsonString = EntityUtils.toString(response.getEntity());
654           JSONParser jsonParser = new JSONParser();
655           JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
656           String jobId = (String) jsonObject.get("id");
657           String jobStatus = (String) jsonObject.get("status");
658           String jobUrl = (String) jsonObject.get("url");
659           logger.info("Transcription for mp {} has been submitted. Job id: {}, job status: {}, job url: {}", mpId,
660                   jobId, jobStatus, jobUrl);
661 
662           database.storeJobControl(mpId, track.getIdentifier(), jobId, TranscriptionJobControl.Status.InProgress.name(),
663                   track.getDuration() == null ? 0 : track.getDuration().longValue(), null, PROVIDER);
664           EntityUtils.consume(entity);
665           return;
666         case HttpStatus.SC_BAD_REQUEST: // 400
667           logger.info("Invalid argument returned, status: {}", code);
668           break;
669         case HttpStatus.SC_SERVICE_UNAVAILABLE: // 503
670           logger.info("Service unavailable returned, status: {}", code);
671           break;
672         default:
673           logger.info("Unknown return status: {}.", code);
674           break;
675       }
676       throw new TranscriptionServiceException("Could not create recognition job. Status returned: " + code);
677     } catch (Exception e) {
678       logger.warn("Exception when calling the recognitions endpoint", e);
679       throw new TranscriptionServiceException("Exception when calling the recognitions endpoint", e);
680     } finally {
681       try {
682         httpClient.close();
683         if (response != null) {
684           response.close();
685         }
686       } catch (IOException e) {
687       }
688     }
689   }
690 
691   /**
692    * From: https://cloud.ibm.com/apidocs/speech-to-text#check-a-job:
693    *
694    * curl -X GET -u "apikey:{apikey}" "https://stream.watsonplatform.net/speech-to-text/api/v1/recognitions/{id}"
695    *
696    * Response: { "results": [ { "result_index": 0, "results": [ { "final": true, "alternatives": [ { "transcript":
697    * "several tornadoes touch down as a line of severe thunderstorms swept through Colorado on Sunday ", "timestamps": [
698    * [ "several", 1, 1.52 ], [ "tornadoes", 1.52, 2.15 ], . . . [ "Sunday", 5.74, 6.33 ] ], "confidence": 0.885 } ] } ]
699    * } ], "created": "2016-08-17T19:11:04.298Z", "updated": "2016-08-17T19:11:16.003Z", "status": "completed" }
700    */
701   String getAndSaveJobResults(String jobId) throws TranscriptionServiceException {
702     CloseableHttpClient httpClient = makeHttpClient();
703     // Add AuthCache to the execution context for preemptive auth
704     HttpClientContext context = HttpClientContext.create();
705     context.setCredentialsProvider(credentialsProvider);
706     context.setAuthCache(authCache);
707     CloseableHttpResponse response = null;
708     String mpId = "unknown";
709     try {
710       HttpGet httpGet = new HttpGet(UrlSupport.concat(watsonServiceUrl, RECOGNITIONS, jobId));
711       response = httpClient.execute(httpGet, context);
712       int code = response.getStatusLine().getStatusCode();
713 
714       switch (code) {
715         case HttpStatus.SC_OK: // 200
716           HttpEntity entity = response.getEntity();
717 
718           // Response returned is a json object described above
719           String jsonString = EntityUtils.toString(entity);
720           JSONParser jsonParser = new JSONParser();
721           JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
722           String jobStatus = (String) jsonObject.get("status");
723           mpId = (String) jsonObject.get("user_token");
724           // user_token doesn't come back if this is not in the context of a callback so get the mpId from the db
725           if (mpId == null) {
726             TranscriptionJobControl jc = database.findByJob(jobId);
727             if (jc != null) {
728               mpId = jc.getMediaPackageId();
729             }
730           }
731           logger.info("Recognitions job {} has been found, status {}", jobId, jobStatus);
732           EntityUtils.consume(entity);
733 
734           if (jobStatus.indexOf(RecognitionJobStatus.COMPLETED) > -1 && jsonObject.get("results") != null) {
735             transcriptionDone(mpId, jsonObject);
736           }
737           return jobStatus;
738         case HttpStatus.SC_NOT_FOUND: // 404
739           logger.info("Job not found: {}", jobId);
740           break;
741         case HttpStatus.SC_SERVICE_UNAVAILABLE: // 503
742           logger.info("Service unavailable returned, status: {}", code);
743           break;
744         default:
745           logger.info("Unknown return status: {}.", code);
746           break;
747       }
748       throw new TranscriptionServiceException(
749               String.format("Could not check recognition job for media package %s, job id %s. Status returned: %d",
750                       mpId, jobId, code),
751               code);
752     } catch (TranscriptionServiceException e) {
753       throw e;
754     } catch (Exception e) {
755       logger.warn("Exception when calling the recognitions endpoint for media package {}, job id {}",
756               mpId, jobId, e);
757       throw new TranscriptionServiceException(String.format(
758               "Exception when calling the recognitions endpoint for media package %s, job id %s", mpId, jobId), e);
759     } finally {
760       try {
761         httpClient.close();
762         if (response != null) {
763           response.close();
764         }
765       } catch (IOException e) {
766       }
767     }
768   }
769 
770   private void saveResults(String jobId, JSONObject jsonObj) throws IOException {
771     if (jsonObj.get("results") != null) {
772       // Save the results into a collection
773       workspace.putInCollection(TRANSCRIPT_COLLECTION, jobId + ".json",
774               new ByteArrayInputStream(jsonObj.toJSONString().getBytes()));
775     }
776   }
777 
778   @Override
779   public MediaPackageElement getGeneratedTranscription(String mpId, String jobId, MediaPackageElement.Type type)
780           throws TranscriptionServiceException {
781     try {
782       // If jobId is unknown, look for all jobs associated to that mpId
783       if (jobId == null || "null".equals(jobId)) {
784         jobId = null;
785         for (TranscriptionJobControl jc : database.findByMediaPackage(mpId)) {
786           if (TranscriptionJobControl.Status.Closed.name().equals(jc.getStatus())
787                   || TranscriptionJobControl.Status.TranscriptionComplete.name().equals(jc.getStatus())) {
788             jobId = jc.getTranscriptionJobId();
789           }
790         }
791       }
792 
793       if (jobId == null) {
794         throw new TranscriptionServiceException(
795                 "No completed or closed transcription job found in database for media package " + mpId);
796       }
797 
798       // Results already saved?
799       URI uri = workspace.getCollectionURI(TRANSCRIPT_COLLECTION, jobId + ".json");
800       try {
801         workspace.get(uri);
802       } catch (Exception e) {
803         // Not saved yet so call the ibm watson service to get the results
804         getAndSaveJobResults(jobId);
805       }
806       MediaPackageElementBuilder builder = MediaPackageElementBuilderFactory.newInstance().newElementBuilder();
807       return builder.elementFromURI(uri, type, new MediaPackageElementFlavor("captions", "ibm-watson-json"));
808     } catch (TranscriptionDatabaseException e) {
809       throw new TranscriptionServiceException("Job id not informed and could not find transcription", e);
810     }
811   }
812 
813   protected CloseableHttpClient makeHttpClient() {
814     RequestConfig reqConfig = RequestConfig.custom().setConnectTimeout(CONNECTION_TIMEOUT)
815             .setSocketTimeout(SOCKET_TIMEOUT).setConnectionRequestTimeout(CONNECTION_TIMEOUT).build();
816     return HttpClients.custom().setDefaultRequestConfig(reqConfig)
817             .setRetryHandler(new DefaultHttpRequestRetryHandler(3, true)).build();
818   }
819 
820   protected void retryOrError(String jobId, String mpId, String errorMsg) throws TranscriptionDatabaseException {
821     logger.warn(errorMsg);
822 
823     // TranscriptionJobControl.Status status
824     TranscriptionJobControl jc = database.findByJob(jobId);
825     String trackId = jc.getTrackId();
826     // Current job is still in progress state
827     int attempts = database
828             .findByMediaPackageTrackAndStatus(mpId, trackId, TranscriptionJobControl.Status.Error.name(),
829                     TranscriptionJobControl.Status.InProgress.name(), TranscriptionJobControl.Status.Canceled.name())
830             .size();
831     if (attempts < maxAttempts) {
832       // Update state in database to retry
833       database.updateJobControl(jobId, TranscriptionJobControl.Status.Retry.name());
834       logger.info("Will retry transcription for media package {}, track {}", mpId, trackId);
835     } else {
836       // Update state in database to error
837       database.updateJobControl(jobId, TranscriptionJobControl.Status.Error.name());
838       // Send error notification email
839       logger.error("{} transcription attempts exceeded maximum of {} for media package {}, track {}.", attempts,
840               maxAttempts, mpId, trackId);
841       sendEmail("Transcription ERROR", String.format(errorMsg, mpId, jobId));
842     }
843   }
844 
845   private void sendEmail(String subject, String body) {
846     if (toEmailAddress == null) {
847       logger.info("Skipping sending email notification. Message is {}.", body);
848       return;
849     }
850     try {
851       logger.debug("Sending e-mail notification to {}", toEmailAddress);
852       smtpService.send(toEmailAddress, String.format("%s (%s)", subject, clusterName), body);
853       logger.info("Sent e-mail notification to {}", toEmailAddress);
854     } catch (Exception e) {
855       logger.error("Could not send email: {}\n{}", subject, body, e);
856     }
857   }
858 
859   public boolean isCallbackAlreadyRegistered() {
860     return callbackAlreadyRegistered;
861   }
862 
863   @Reference
864   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
865     this.serviceRegistry = serviceRegistry;
866   }
867 
868   @Reference
869   public void setSecurityService(SecurityService securityService) {
870     this.securityService = securityService;
871   }
872 
873   @Reference
874   public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
875     this.userDirectoryService = userDirectoryService;
876   }
877 
878   @Reference
879   public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
880     this.organizationDirectoryService = organizationDirectoryService;
881   }
882 
883   @Reference
884   public void setSmtpService(SmtpService service) {
885     this.smtpService = service;
886   }
887 
888   @Reference
889   public void setWorkspace(Workspace ws) {
890     this.workspace = ws;
891   }
892 
893   @Reference
894   public void setWorkingFileRepository(WorkingFileRepository wfr) {
895     this.wfr = wfr;
896   }
897 
898   @Reference
899   public void setDatabase(TranscriptionDatabase service) {
900     this.database = service;
901   }
902 
903   @Reference
904   public void setAssetManager(AssetManager service) {
905     this.assetManager = service;
906   }
907 
908   @Reference
909   public void setWorkflowService(WorkflowService service) {
910     this.workflowService = service;
911   }
912 
913   @Override
914   protected ServiceRegistry getServiceRegistry() {
915     return serviceRegistry;
916   }
917 
918   @Override
919   protected SecurityService getSecurityService() {
920     return securityService;
921   }
922 
923   @Override
924   protected UserDirectoryService getUserDirectoryService() {
925     return userDirectoryService;
926   }
927 
928   @Override
929   protected OrganizationDirectoryService getOrganizationDirectoryService() {
930     return organizationDirectoryService;
931   }
932 
933   // Only used by unit tests!
934   void setWfUtil(Workflows wfUtil) {
935     this.wfUtil = wfUtil;
936   }
937 
938   class WorkflowDispatcher implements Runnable {
939 
940     /**
941      * {@inheritDoc}
942      *
943      * @see java.lang.Thread#run()
944      */
945     @Override
946     public void run() {
947       logger.debug("WorkflowDispatcher waking up...");
948 
949       try {
950         // Find jobs that are in progress and jobs that had transcription complete i.e. got the callback
951 
952         long providerId;
953         TranscriptionProviderControl providerInfo = database.findIdByProvider(PROVIDER);
954         if (providerInfo != null) {
955           providerId = providerInfo.getId();
956         } else {
957           logger.warn("No provider entry for {}", PROVIDER);
958           return;
959         }
960 
961         List<TranscriptionJobControl> jobs = database.findByStatus(TranscriptionJobControl.Status.InProgress.name(),
962                 TranscriptionJobControl.Status.TranscriptionComplete.name());
963 
964         for (TranscriptionJobControl j : jobs) {
965 
966           // Don't process jobs for other services
967           if (j.getProviderId() != providerId) {
968             continue;
969           }
970 
971           String mpId = j.getMediaPackageId();
972           String jobId = j.getTranscriptionJobId();
973 
974           // If the job in progress, check if it should already have finished and we didn't get the callback for some
975           // reason. This can happen if the admin server was offline when the callback came.
976           if (TranscriptionJobControl.Status.InProgress.name().equals(j.getStatus())) {
977             // If job should already have been completed, try to get the results. Consider a buffer factor so that we
978             // don't try it too early.
979             if (j.getDateCreated().getTime() + j.getTrackDuration() + completionCheckBuffer * 1000 < System
980                     .currentTimeMillis()) {
981               try {
982                 String jobStatus = getAndSaveJobResults(jobId);
983                 if (RecognitionJobStatus.FAILED.equals(jobStatus)) {
984                   retryOrError(jobId, mpId,
985                           String.format("Transcription job failed for mpId %s, jobId %s", mpId, jobId));
986                   continue;
987                 } else if (RecognitionJobStatus.PROCESSING.equals(jobStatus)
988                         || RecognitionJobStatus.WAITING.equals(jobStatus)) {
989                   // Job still waiting/running so check if it should have finished more than N seconds ago
990                   if (j.getDateCreated().getTime() + j.getTrackDuration()
991                           + (completionCheckBuffer + maxProcessingSeconds) * 1000 < System.currentTimeMillis()) {
992                     // Processing for too long, mark job as error or retry and don't check anymore
993                     retryOrError(jobId, mpId, String.format(
994                             "Transcription job was in waiting or processing state for too long "
995                                 + "(media package %s, job id %s)", mpId, jobId));
996                   }
997                   // else job still running, not finished
998                   continue;
999                 }
1000               } catch (TranscriptionServiceException e) {
1001                 if (e.getCode() == 404) {
1002                   // Job not found there, update job state to canceled
1003                   database.updateJobControl(jobId, TranscriptionJobControl.Status.Canceled.name());
1004                   // Send notification email
1005                   sendEmail("Transcription ERROR",
1006                           String.format("Transcription job was not found (media package %s, job id %s).", mpId, jobId));
1007                 }
1008                 continue; // Skip this one, exception was already logged
1009               }
1010             } else {
1011               continue; // Not time to check yet
1012             }
1013           }
1014 
1015           // Jobs that get here have state TranscriptionCompleted.
1016           try {
1017             // Apply workflow to attach transcripts
1018             Map<String, String> params = new HashMap<String, String>();
1019             params.put("transcriptionJobId", jobId);
1020             String wfId = startWorkflow(mpId, workflowDefinitionId, params);
1021             if (wfId == null) {
1022               logger.warn("Attach transcription workflow could NOT be scheduled for mp {}, watson job {}", mpId, jobId);
1023               continue;
1024             }
1025             // Update state in the database
1026             database.updateJobControl(jobId, TranscriptionJobControl.Status.Closed.name());
1027             logger.info("Attach transcription workflow {} scheduled for mp {}, watson job {}",
1028                     wfId, mpId, jobId);
1029           } catch (Exception e) {
1030             logger.warn(
1031                 "Attach transcription workflow could NOT be scheduled for media package {}, watson job {}, {}: {}",
1032                 mpId, jobId, e.getClass().getName(), e.getMessage()
1033             );
1034           }
1035         }
1036 
1037         if (maxAttempts > 1) {
1038           // Find jobs that need to be re-submitted
1039           jobs = database.findByStatus(TranscriptionJobControl.Status.Retry.name());
1040           HashMap<String, String> params = new HashMap<String, String>();
1041           for (TranscriptionJobControl j : jobs) {
1042             String mpId = j.getMediaPackageId();
1043             String wfId = startWorkflow(mpId, retryWfDefId, params);
1044             String jobId = j.getTranscriptionJobId();
1045             if (wfId == null) {
1046               logger.warn(
1047                   "Retry transcription workflow could NOT be scheduled for mp {}, watson job {}. "
1048                       + "Will try again next time.",
1049                   mpId, jobId);
1050               // Will try again next time
1051               continue;
1052             }
1053             logger.info("Retry transcription workflow {} scheduled for mp {}.", wfId, mpId);
1054             // Retry was submitted, update previously failed job state to error
1055             database.updateJobControl(jobId, TranscriptionJobControl.Status.Error.name());
1056           }
1057         }
1058       } catch (TranscriptionDatabaseException e) {
1059         logger.warn("Could not read/update transcription job control database.", e);
1060       }
1061     }
1062   }
1063 
1064   private String startWorkflow(String mpId, String wfDefId, Map<String, String> params) {
1065     DefaultOrganization defaultOrg = new DefaultOrganization();
1066     securityService.setOrganization(defaultOrg);
1067     securityService.setUser(SecurityUtil.createSystemUser(systemAccount, defaultOrg));
1068 
1069     // Find the episode
1070     Optional<Snapshot> snapshot = assetManager.getLatestSnapshot(mpId);
1071     if (snapshot.isEmpty()) {
1072       // Media package not archived yet.
1073       logger.warn("Media package {} has not been archived yet.", mpId);
1074       return null;
1075     }
1076 
1077     String org = snapshot.get().getOrganizationId();
1078     Organization organization = null;
1079     try {
1080       organization = organizationDirectoryService.getOrganization(org);
1081       if (organization == null) {
1082         logger.warn("Media package {} has an unknown organization {}.", mpId, org);
1083         return null;
1084       }
1085     } catch (NotFoundException e) {
1086       logger.warn("Organization {} not found for media package {}.", org, mpId);
1087       return null;
1088     }
1089     securityService.setOrganization(organization);
1090 
1091     try {
1092       WorkflowDefinition wfDef = workflowService.getWorkflowDefinitionById(wfDefId);
1093       Workflows workflows = wfUtil != null ? wfUtil : new Workflows(assetManager, workflowService);
1094       Set<String> mpIds = new HashSet<String>();
1095       mpIds.add(mpId);
1096       List<WorkflowInstance> wfList = workflows
1097               .applyWorkflowToLatestVersion(mpIds, ConfiguredWorkflow.workflow(wfDef, params));
1098       return wfList.size() > 0 ? Long.toString(wfList.get(0).getId()) : null;
1099     } catch (NotFoundException | WorkflowDatabaseException e) {
1100       logger.warn("Could not get workflow definition: {}", wfDefId);
1101     }
1102 
1103     return null;
1104   }
1105 
1106   /**
1107    * Allow transcription service to be disabled via config Utility to verify service is active
1108    *
1109    * @return true if service is enabled, false if service should be skipped
1110    */
1111   public boolean isEnabled() {
1112     return enabled;
1113   }
1114 
1115   class ResultsFileCleanup implements Runnable {
1116     @Override
1117     public void run() {
1118       logger.info("ResultsFileCleanup waking up...");
1119       try {
1120         // Cleans up results files older than CLEANUP_RESULT_FILES_DAYS days
1121         wfr.cleanupOldFilesFromCollection(TRANSCRIPT_COLLECTION, cleanupResultDays);
1122       } catch (IOException e) {
1123         logger.warn("Could not cleanup old transcript results files", e);
1124       }
1125     }
1126   }
1127 
1128 }