View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.transcription.googlespeech;
22  
23  import org.opencastproject.assetmanager.api.AssetManager;
24  import org.opencastproject.assetmanager.api.fn.Enrichments;
25  import org.opencastproject.assetmanager.api.query.AQueryBuilder;
26  import org.opencastproject.assetmanager.api.query.AResult;
27  import org.opencastproject.assetmanager.util.Workflows;
28  import org.opencastproject.job.api.AbstractJobProducer;
29  import org.opencastproject.job.api.Job;
30  import org.opencastproject.kernel.mail.SmtpService;
31  import org.opencastproject.mediapackage.MediaPackageElement;
32  import org.opencastproject.mediapackage.MediaPackageElementBuilder;
33  import org.opencastproject.mediapackage.MediaPackageElementBuilderFactory;
34  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
35  import org.opencastproject.mediapackage.MediaPackageElementParser;
36  import org.opencastproject.mediapackage.MediaPackageException;
37  import org.opencastproject.mediapackage.Track;
38  import org.opencastproject.security.api.DefaultOrganization;
39  import org.opencastproject.security.api.Organization;
40  import org.opencastproject.security.api.OrganizationDirectoryService;
41  import org.opencastproject.security.api.SecurityService;
42  import org.opencastproject.security.api.UserDirectoryService;
43  import org.opencastproject.security.util.SecurityUtil;
44  import org.opencastproject.serviceregistry.api.ServiceRegistry;
45  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
46  import org.opencastproject.systems.OpencastConstants;
47  import org.opencastproject.transcription.api.TranscriptionService;
48  import org.opencastproject.transcription.api.TranscriptionServiceException;
49  import org.opencastproject.transcription.persistence.TranscriptionDatabase;
50  import org.opencastproject.transcription.persistence.TranscriptionDatabaseException;
51  import org.opencastproject.transcription.persistence.TranscriptionJobControl;
52  import org.opencastproject.transcription.persistence.TranscriptionProviderControl;
53  import org.opencastproject.util.NotFoundException;
54  import org.opencastproject.util.OsgiUtil;
55  import org.opencastproject.util.UrlSupport;
56  import org.opencastproject.util.data.Option;
57  import org.opencastproject.workflow.api.ConfiguredWorkflow;
58  import org.opencastproject.workflow.api.WorkflowDatabaseException;
59  import org.opencastproject.workflow.api.WorkflowDefinition;
60  import org.opencastproject.workflow.api.WorkflowInstance;
61  import org.opencastproject.workflow.api.WorkflowService;
62  import org.opencastproject.workingfilerepository.api.WorkingFileRepository;
63  import org.opencastproject.workspace.api.Workspace;
64  
65  import org.apache.commons.io.FilenameUtils;
66  import org.apache.commons.lang3.StringUtils;
67  import org.apache.http.HttpEntity;
68  import org.apache.http.HttpHeaders;
69  import org.apache.http.HttpStatus;
70  import org.apache.http.client.config.RequestConfig;
71  import org.apache.http.client.methods.CloseableHttpResponse;
72  import org.apache.http.client.methods.HttpGet;
73  import org.apache.http.client.methods.HttpPost;
74  import org.apache.http.entity.StringEntity;
75  import org.apache.http.impl.client.CloseableHttpClient;
76  import org.apache.http.impl.client.HttpClients;
77  import org.apache.http.util.EntityUtils;
78  import org.json.simple.JSONArray;
79  import org.json.simple.JSONObject;
80  import org.json.simple.parser.JSONParser;
81  import org.osgi.service.component.ComponentContext;
82  import org.osgi.service.component.annotations.Activate;
83  import org.osgi.service.component.annotations.Component;
84  import org.osgi.service.component.annotations.Reference;
85  import org.slf4j.Logger;
86  import org.slf4j.LoggerFactory;
87  
88  import java.io.ByteArrayInputStream;
89  import java.io.File;
90  import java.io.IOException;
91  import java.net.URI;
92  import java.util.Arrays;
93  import java.util.HashMap;
94  import java.util.HashSet;
95  import java.util.List;
96  import java.util.Map;
97  import java.util.Set;
98  import java.util.concurrent.Executors;
99  import java.util.concurrent.ScheduledExecutorService;
100 import java.util.concurrent.TimeUnit;
101 
102 @Component(
103     immediate = true,
104     service = { TranscriptionService.class,GoogleSpeechTranscriptionService.class },
105     property = {
106         "service.description=Google Speech Transcription Service",
107         "provider=google.speech"
108     }
109 )
110 public class GoogleSpeechTranscriptionService extends AbstractJobProducer implements TranscriptionService {
111 
112   /**
113    * The logger
114    */
115   private static final Logger logger = LoggerFactory.getLogger(GoogleSpeechTranscriptionService.class);
116 
117   private static final String JOB_TYPE = "org.opencastproject.transcription.googlespeech";
118 
119   static final String TRANSCRIPT_COLLECTION = "transcripts";
120   static final String TRANSCRIPTION_ERROR = "Transcription ERROR";
121   static final String TRANSCRIPTION_JOB_ID_KEY = "transcriptionJobId";
122   static final String ACCESS_TOKEN_NAME = "access_token";
123   static final String ACCESS_TOKEN_EXPIRY_NAME = "expires_in";
124   private static final int CONNECTION_TIMEOUT = 60000; // ms, 1 minute
125   private static final int SOCKET_TIMEOUT = 60000; // ms, 1 minute
126   private static final int ACCESS_TOKEN_MINIMUM_TIME = 60000; // ms , 1 minute
127   // Default workflow to attach transcription results to mediapackage
128   public static final String DEFAULT_WF_DEF = "google-speech-attach-transcripts";
129   private static final long DEFAULT_COMPLETION_BUFFER = 300; // in seconds, default is 5 minutes
130   private static final long DEFAULT_DISPATCH_INTERVAL = 60; // in seconds, default is 1 minute
131   private static final long DEFAULT_MAX_PROCESSING_TIME = 5 * 60 * 60; // in seconds, default is 5 hours
132   // Cleans up results files that are older than 7 days
133   private static final int DEFAULT_CLEANUP_RESULTS_DAYS = 7;
134   private static final boolean DEFAULT_PROFANITY_FILTER = false;
135   private static final String DEFAULT_LANGUAGE = "en-US";
136   private static final boolean DEFAULT_ENABLE_PUNCTUATION = false;
137   private static final String DEFAULT_MODEL = "default";
138   private static final String GOOGLE_SPEECH_URL = "https://speech.googleapis.com/v1";
139   private static final String GOOGLE_AUTH2_URL = "https://www.googleapis.com/oauth2/v4/token";
140   private static final String REQUEST_METHOD = "speech:longrunningrecognize";
141   private static final String RESULT_PATH = "operations";
142   private static final String INVALID_TOKEN = "-1";
143   private static final String PROVIDER = "Google Speech";
144   private static final String DEFAULT_ENCODING = "flac";
145 
146   // Cluster name
147   private String clusterName = "";
148 
149   /**
150    * Service dependencies
151    */
152   private ServiceRegistry serviceRegistry;
153   private SecurityService securityService;
154   private UserDirectoryService userDirectoryService;
155   private OrganizationDirectoryService organizationDirectoryService;
156   private Workspace workspace;
157   private TranscriptionDatabase database;
158   private AssetManager assetManager;
159   private WorkflowService workflowService;
160   private WorkingFileRepository wfr;
161   private SmtpService smtpService;
162 
163   // Only used by unit tests!
164   private Workflows wfUtil;
165 
166   private enum Operation {
167     StartTranscription
168   }
169 
170   /**
171    * Service configuration options
172    */
173   public static final String ENABLED_CONFIG = "enabled";
174   public static final String GOOGLE_SPEECH_LANGUAGE = "google.speech.language";
175   public static final String PROFANITY_FILTER = "google.speech.profanity.filter";
176   public static final String ENABLE_PUNCTUATION = "google.speech.transcription.punctuation";
177   public static final String TRANSCRIPTION_MODEL = "google.speech.transcription.model";
178   public static final String WORKFLOW_CONFIG = "workflow";
179   public static final String DISPATCH_WORKFLOW_INTERVAL_CONFIG = "workflow.dispatch.interval";
180   public static final String COMPLETION_CHECK_BUFFER_CONFIG = "completion.check.buffer";
181   public static final String MAX_PROCESSING_TIME_CONFIG = "max.processing.time";
182   public static final String NOTIFICATION_EMAIL_CONFIG = "notification.email";
183   public static final String CLEANUP_RESULTS_DAYS_CONFIG = "cleanup.results.days";
184   public static final String GOOGLE_CLOUD_CLIENT_ID = "google.cloud.client.id";
185   public static final String GOOGLE_CLOUD_CLIENT_SECRET = "google.cloud.client.secret";
186   public static final String GOOGLE_CLOUD_REFRESH_TOKEN = "google.cloud.refresh.token";
187   public static final String GOOGLE_CLOUD_BUCKET = "google.cloud.storage.bucket";
188   public static final String GOOGLE_CLOUD_TOKEN_ENDPOINT_URL = "google.cloud.token.endpoint.url";
189   public static final String ENCODING_EXTENSION = "encoding.extension";
190 
191   /**
192    * Service configuration values
193    */
194   private boolean enabled = false; // Disabled by default
195   private boolean profanityFilter = DEFAULT_PROFANITY_FILTER;
196   private boolean enablePunctuation = DEFAULT_ENABLE_PUNCTUATION;
197   private String model = DEFAULT_MODEL;
198   private String defaultLanguage = DEFAULT_LANGUAGE;
199   private String defaultEncoding = DEFAULT_ENCODING;
200   private String workflowDefinitionId = DEFAULT_WF_DEF;
201   private long workflowDispatchInterval = DEFAULT_DISPATCH_INTERVAL;
202   private long completionCheckBuffer = DEFAULT_COMPLETION_BUFFER;
203   private long maxProcessingSeconds = DEFAULT_MAX_PROCESSING_TIME;
204   private String toEmailAddress;
205   private int cleanupResultDays = DEFAULT_CLEANUP_RESULTS_DAYS;
206   private String clientId;
207   private String clientSecret;
208   private String clientToken;
209   private String accessToken = INVALID_TOKEN;
210   private String tokenEndpoint = GOOGLE_AUTH2_URL;
211   private String storageBucket;
212   private long tokenExpiryTime = 0;
213   private String systemAccount;
214   private ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);
215 
216   public GoogleSpeechTranscriptionService() {
217     super(JOB_TYPE);
218   }
219 
220   @Activate
221   public void activate(ComponentContext cc) {
222     // Has this service been enabled?
223     enabled = OsgiUtil.getOptCfgAsBoolean(cc.getProperties(), ENABLED_CONFIG).get();
224     if (!enabled) {
225       logger.info("Service disabled. If you want to enable it, please update the service configuration.");
226       return;
227     }
228     // Mandatory API access properties
229     clientId = OsgiUtil.getComponentContextProperty(cc, GOOGLE_CLOUD_CLIENT_ID);
230     clientSecret = OsgiUtil.getComponentContextProperty(cc, GOOGLE_CLOUD_CLIENT_SECRET);
231     clientToken = OsgiUtil.getComponentContextProperty(cc, GOOGLE_CLOUD_REFRESH_TOKEN);
232     storageBucket = OsgiUtil.getComponentContextProperty(cc, GOOGLE_CLOUD_BUCKET);
233 
234     // access token endpoint
235     Option<String> tokenOpt = OsgiUtil.getOptCfg(cc.getProperties(), GOOGLE_CLOUD_TOKEN_ENDPOINT_URL);
236     if (tokenOpt.isSome()) {
237       tokenEndpoint = tokenOpt.get();
238       logger.info("Access token endpoint is set to {}", tokenEndpoint);
239     } else {
240       logger.info("Default access token endpoint will be used");
241     }
242 
243     // profanity filter to use
244     Option<String> profanityOpt = OsgiUtil.getOptCfg(cc.getProperties(), PROFANITY_FILTER);
245     if (profanityOpt.isSome()) {
246       profanityFilter = Boolean.parseBoolean(profanityOpt.get());
247       logger.info("Profanity filter is set to {}", profanityFilter);
248     } else {
249       logger.info("Default profanity filter will be used");
250     }
251     // Language model to be used
252     Option<String> languageOpt = OsgiUtil.getOptCfg(cc.getProperties(), GOOGLE_SPEECH_LANGUAGE);
253     if (languageOpt.isSome()) {
254       defaultLanguage = languageOpt.get();
255       logger.info("Language used is {}", defaultLanguage);
256     } else {
257       logger.info("Default language will be used");
258     }
259     // Enable punctuation or not
260     Option<String> punctuationOpt = OsgiUtil.getOptCfg(cc.getProperties(), ENABLE_PUNCTUATION);
261     if (punctuationOpt.isSome()) {
262       enablePunctuation = Boolean.parseBoolean(punctuationOpt.get());
263       logger.info("Enable punctuation is set to {}", enablePunctuation);
264     } else {
265       logger.info("Default punctuation setting will be used");
266     }
267     // Transription model to be used
268     Option<String> transModel = OsgiUtil.getOptCfg(cc.getProperties(), TRANSCRIPTION_MODEL);
269     if (transModel.isSome()) {
270       model = transModel.get();
271       logger.info("Transcription model used is {}", model);
272     } else {
273       logger.info("Default Transcription model will be used");
274     }
275     // Encoding to be used
276     Option<String> encodingOpt = OsgiUtil.getOptCfg(cc.getProperties(), ENCODING_EXTENSION);
277     if (encodingOpt.isSome()) {
278       defaultEncoding = encodingOpt.get();
279       logger.info("Encoding used is {}", defaultEncoding);
280     } else {
281       logger.info("Default encoding will be used");
282     }
283 
284     // Workflow to execute when getting callback (optional, with default)
285     Option<String> wfOpt = OsgiUtil.getOptCfg(cc.getProperties(), WORKFLOW_CONFIG);
286     if (wfOpt.isSome()) {
287       workflowDefinitionId = wfOpt.get();
288     }
289     logger.info("Workflow definition is {}", workflowDefinitionId);
290     // Interval to check for completed transcription jobs and start workflows to attach transcripts
291     Option<String> intervalOpt = OsgiUtil.getOptCfg(cc.getProperties(), DISPATCH_WORKFLOW_INTERVAL_CONFIG);
292     if (intervalOpt.isSome()) {
293       try {
294         workflowDispatchInterval = Long.parseLong(intervalOpt.get());
295       } catch (NumberFormatException e) {
296         // Use default
297         logger.warn("Invalid configuration for Workflow dispatch interval. Default used instead: {}",
298             workflowDispatchInterval);
299       }
300     }
301     logger.info("Workflow dispatch interval is {} seconds", workflowDispatchInterval);
302     // How long to wait after a transcription is supposed to finish before starting checking
303     Option<String> bufferOpt = OsgiUtil.getOptCfg(cc.getProperties(), COMPLETION_CHECK_BUFFER_CONFIG);
304     if (bufferOpt.isSome()) {
305       try {
306         completionCheckBuffer = Long.parseLong(bufferOpt.get());
307       } catch (NumberFormatException e) {
308         // Use default
309         logger.warn("Invalid configuration for {} : {}. Default used instead: {}",
310                 new Object[]{COMPLETION_CHECK_BUFFER_CONFIG, bufferOpt.get(), completionCheckBuffer});
311       }
312     }
313     logger.info("Completion check buffer is {} seconds", completionCheckBuffer);
314     // How long to wait after a transcription is supposed to finish before marking the job as canceled in the db
315     Option<String> maxProcessingOpt = OsgiUtil.getOptCfg(cc.getProperties(), MAX_PROCESSING_TIME_CONFIG);
316     if (maxProcessingOpt.isSome()) {
317       try {
318         maxProcessingSeconds = Long.parseLong(maxProcessingOpt.get());
319       } catch (NumberFormatException e) {
320         // Use default
321         logger.warn("Invalid configuration for maximum processing time. Default used instead: {}",
322             maxProcessingSeconds);
323       }
324     }
325     logger.info("Maximum time a job is checked after it should have ended is {} seconds", maxProcessingSeconds);
326     // How long to keep result files in the working file repository
327     Option<String> cleaupOpt = OsgiUtil.getOptCfg(cc.getProperties(), CLEANUP_RESULTS_DAYS_CONFIG);
328     if (cleaupOpt.isSome()) {
329       try {
330         cleanupResultDays = Integer.parseInt(cleaupOpt.get());
331       } catch (NumberFormatException e) {
332         // Use default
333         logger.warn("Invalid configuration for clean up days. Default used instead: {}", cleanupResultDays);
334       }
335     }
336     logger.info("Cleanup result files after {} days", cleanupResultDays);
337 
338     systemAccount = OsgiUtil.getContextProperty(cc, OpencastConstants.DIGEST_USER_PROPERTY);
339 
340     // Schedule the workflow dispatching, starting in 2 minutes
341     scheduledExecutor.scheduleWithFixedDelay(new WorkflowDispatcher(), 120, workflowDispatchInterval,
342             TimeUnit.SECONDS);
343 
344     // Schedule the cleanup of old results jobs from the collection in the wfr once a day
345     scheduledExecutor.scheduleWithFixedDelay(new ResultsFileCleanup(), 1, 1, TimeUnit.DAYS);
346 
347     // Notification email passed in this service configuration?
348     Option<String> optTo = OsgiUtil.getOptCfg(cc.getProperties(), NOTIFICATION_EMAIL_CONFIG);
349     if (optTo.isSome()) {
350       toEmailAddress = optTo.get();
351     } else {
352       // Use admin email informed in custom.properties
353       optTo = OsgiUtil.getOptContextProperty(cc, OpencastConstants.ADMIN_EMAIL_PROPERTY);
354       if (optTo.isSome()) {
355         toEmailAddress = optTo.get();
356       }
357     }
358     if (toEmailAddress != null) {
359       logger.info("Notification email set to {}", toEmailAddress);
360     } else {
361       logger.warn("Email notification disabled");
362     }
363 
364     Option<String> optCluster = OsgiUtil.getOptContextProperty(cc, OpencastConstants.ENVIRONMENT_NAME_PROPERTY);
365     if (optCluster.isSome()) {
366       clusterName = optCluster.get();
367     }
368     logger.info("Environment name is {}", clusterName);
369 
370     logger.info("Activated!");
371   }
372 
373   @Override
374   public Job startTranscription(String mpId, Track track, String... args) throws TranscriptionServiceException {
375     if (!enabled) {
376       throw new TranscriptionServiceException(
377               "This service is disabled. If you want to enable it, please update the service configuration.");
378     }
379 
380     if (args.length == 0) {
381       throw new IllegalArgumentException("Additional language argument is required.");
382     }
383 
384     try {
385       return serviceRegistry.createJob(JOB_TYPE, Operation.StartTranscription.name(),
386               Arrays.asList(mpId, MediaPackageElementParser.getAsXml(track), args[0]));
387     } catch (ServiceRegistryException e) {
388       throw new TranscriptionServiceException("Unable to create a job", e);
389     } catch (MediaPackageException e) {
390       throw new TranscriptionServiceException("Invalid track " + track.toString(), e);
391     }
392   }
393 
394   @Override
395   public Job startTranscription(String mpId, Track track) throws TranscriptionServiceException {
396     throw new UnsupportedOperationException("Not supported.");
397   }
398 
399   @Override
400   public void transcriptionDone(String mpId, Object obj) throws TranscriptionServiceException {
401     JSONObject jsonObj = null;
402     String jobId = null;
403     String token = INVALID_TOKEN;
404     try {
405       token = getRefreshAccessToken();
406     } catch (IOException ex) {
407       logger.error("Unable to create access token, error: {}", ex.toString());
408     }
409     if (token.equals(INVALID_TOKEN)) {
410       throw new TranscriptionServiceException("Invalid access token");
411     }
412     try {
413       jsonObj = (JSONObject) obj;
414       jobId = (String) jsonObj.get("name");
415       logger.info("Transcription done for mpId {}, jobId {}", mpId, jobId);
416       JSONArray resultsArray = getTranscriptionResult(jsonObj);
417 
418       // Update state in database
419       // If there's an optimistic lock exception here, it's ok because the workflow dispatcher
420       // may be doing the same thing
421       database.updateJobControl(jobId, TranscriptionJobControl.Status.TranscriptionComplete.name());
422 
423       // Delete audio file from Google storage
424       deleteStorageFile(mpId, token);
425 
426       // Save results in file system if they exist
427       if (resultsArray != null) {
428         saveResults(jobId, jsonObj);
429       }
430     } catch (IOException e) {
431       if (jsonObj == null) {
432         logger.warn("Could not save transcription results file for mpId {}, jobId {}: null",
433                 mpId, jobId);
434       } else {
435         logger.warn("Could not save transcription results file for mpId {}, jobId {}: {}",
436                 mpId, jobId, jsonObj.toJSONString());
437       }
438       throw new TranscriptionServiceException("Could not save transcription results file", e);
439     } catch (TranscriptionDatabaseException e) {
440       logger.warn("Transcription results file were saved but state in db not updated for mpId {}, jobId {}", mpId,
441               jobId);
442       throw new TranscriptionServiceException("Could not update transcription job control db", e);
443     }
444   }
445 
446   @Override
447   public String getLanguage() {
448     return defaultLanguage;
449   }
450 
451   @Override
452   public Map<String, Object> getReturnValues(String mpId, String jobId) throws TranscriptionServiceException {
453     throw new TranscriptionServiceException("Method not implemented");
454   }
455 
456   @Override
457   public void transcriptionError(String mpId, Object obj) throws TranscriptionServiceException {
458     JSONObject jsonObj = null;
459     String jobId = null;
460     try {
461       jsonObj = (JSONObject) obj;
462       jobId = (String) jsonObj.get("name");
463       // Update state in database
464       database.updateJobControl(jobId, TranscriptionJobControl.Status.Error.name());
465       TranscriptionJobControl jobControl = database.findByJob(jobId);
466       logger.warn("Error received for media package {}, job id {}",
467               jobControl.getMediaPackageId(), jobId);
468       // Send notification email
469       sendEmail(TRANSCRIPTION_ERROR,
470               String.format("There was a transcription error for for media package %s, job id %s.",
471                       jobControl.getMediaPackageId(), jobId));
472     } catch (TranscriptionDatabaseException e) {
473       logger.warn("Transcription error. State in db could not be updated to error for mpId {}, jobId {}", mpId, jobId);
474       throw new TranscriptionServiceException("Could not update transcription job control db", e);
475     }
476   }
477 
478   @Override
479   protected String process(Job job) throws Exception {
480     Operation op = null;
481     String operation = job.getOperation();
482     List<String> arguments = job.getArguments();
483     String result = "";
484     op = Operation.valueOf(operation);
485     switch (op) {
486       case StartTranscription:
487         String mpId = arguments.get(0);
488         Track track = (Track) MediaPackageElementParser.getFromXml(arguments.get(1));
489         String languageCode = arguments.get(2);
490         createRecognitionsJob(mpId, track, languageCode);
491         break;
492       default:
493         throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
494     }
495     return result;
496   }
497 
498   /**
499    * Asynchronous Requests and Responses call to Google Speech API
500    * https://cloud.google.com/speech-to-text/docs/basics
501    */
502   void createRecognitionsJob(String mpId, Track track, String languageCode)
503           throws TranscriptionServiceException, IOException {
504     // Use default defaultlanguage if not set by workflow
505     if (StringUtils.isBlank(languageCode)) {
506       languageCode = defaultLanguage;
507     }
508     String audioUrl;
509     audioUrl = uploadAudioFileToGoogleStorage(mpId, track);
510     CloseableHttpClient httpClient = makeHttpClient();
511     CloseableHttpResponse response = null;
512     String token = getRefreshAccessToken();
513     if (token.equals(INVALID_TOKEN) || audioUrl == null) {
514       throw new TranscriptionServiceException("Could not create recognition job. Audio file or access token invalid");
515     }
516 
517     // Create json for configuration and audio file 
518     JSONObject configValues = new JSONObject();
519     JSONObject audioValues = new JSONObject();
520     JSONObject container = new JSONObject();
521     configValues.put("languageCode", languageCode);
522     configValues.put("enableWordTimeOffsets", true);
523     configValues.put("profanityFilter", profanityFilter);
524     configValues.put("enableAutomaticPunctuation", enablePunctuation);
525     configValues.put("model", model);
526     audioValues.put("uri", audioUrl);
527     container.put("config", configValues);
528     container.put("audio", audioValues);
529 
530     try {
531       HttpPost httpPost = new HttpPost(UrlSupport.concat(GOOGLE_SPEECH_URL, REQUEST_METHOD));
532       logger.debug("Url to invoke Google speech service: {}", httpPost.getURI().toString());
533       StringEntity params = new StringEntity(container.toJSONString());
534       httpPost.addHeader("Authorization", "Bearer " + token); // add the authorization header to the request;
535       httpPost.addHeader(HttpHeaders.CONTENT_TYPE, "application/json; charset=utf-8");
536       httpPost.setEntity(params);
537       response = httpClient.execute(httpPost);
538       int code = response.getStatusLine().getStatusCode();
539       HttpEntity entity = response.getEntity();
540       String jsonString = EntityUtils.toString(response.getEntity());
541       JSONParser jsonParser = new JSONParser();
542       JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
543 
544       switch (code) {
545         case HttpStatus.SC_OK: // 200
546           logger.info("Recognitions job has been successfully created");
547 
548           /**
549            * Response returned is a json object: { "name":
550            * "7612202767953098924", "metadata": { "@type":
551            * "type.googleapis.com/google.cloud.speech.v1.LongRunningRecognizeMetadata",
552            * "progressPercent": 90, "startTime": "2017-07-20T16:36:55.033650Z",
553            * "lastUpdateTime": "2017-07-20T16:37:17.158630Z" } }
554            */
555           String jobId = (String) jsonObject.get("name");
556           logger.info(
557                   "Transcription for mp {} has been submitted. Job id: {}", mpId,
558                   jobId);
559 
560           database.storeJobControl(mpId, track.getIdentifier(), jobId, TranscriptionJobControl.Status.InProgress.name(),
561                   track.getDuration() == null ? 0 : track.getDuration().longValue(), null, PROVIDER);
562           EntityUtils.consume(entity);
563           return;
564         default:
565           JSONObject errorObj = (JSONObject) jsonObject.get("error");
566           logger.warn("Invalid argument returned, status: {} with message: {}", code, (String) errorObj.get("message"));
567           break;
568       }
569       throw new TranscriptionServiceException("Could not create recognition job. Status returned: " + code);
570     } catch (Exception e) {
571       logger.warn("Exception when calling the recognitions endpoint", e);
572       throw new TranscriptionServiceException("Exception when calling the recognitions endpoint", e);
573     } finally {
574       try {
575         httpClient.close();
576         if (response != null) {
577           response.close();
578         }
579       } catch (IOException e) {
580       }
581     }
582   }
583 
584   /**
585    * Get transcription job result: GET /v1/operations/{name}
586    *
587    * "response": { "@type":
588    * "type.googleapis.com/google.cloud.speech.v1.LongRunningRecognizeResponse",
589    * "results": [ { "alternatives": [ { "transcript": "Four score and
590    * twenty...", "confidence": 0.97186122, "words": [ { "startTime": "1.300s",
591    * "endTime": "1.400s", "word": "Four" }, { "startTime": "1.400s", "endTime":
592    * "1.600s", "word": "score" }, { "startTime": "1.600s", "endTime": "1.600s",
593    * "word": "and" }, { "startTime": "1.600s", "endTime": "1.900s", "word":
594    * "twenty" }, ] } ] }
595    */
596   boolean getAndSaveJobResults(String jobId) throws TranscriptionServiceException, IOException {
597     CloseableHttpClient httpClient = makeHttpClient();
598     CloseableHttpResponse response = null;
599     String mpId = "unknown";
600     JSONArray resultsArray = null;
601     String token = getRefreshAccessToken();
602     if (token.equals(INVALID_TOKEN)) {
603       return false;
604     }
605     try {
606       HttpGet httpGet = new HttpGet(UrlSupport.concat(GOOGLE_SPEECH_URL, RESULT_PATH, jobId));
607       logger.debug("Url to invoke Google speech service: {}", httpGet.getURI().toString());
608       // add the authorization header to the request;
609       httpGet.addHeader("Authorization", "Bearer " + token);
610       response = httpClient.execute(httpGet);
611       int code = response.getStatusLine().getStatusCode();
612 
613       switch (code) {
614         case HttpStatus.SC_OK: // 200
615           HttpEntity entity = response.getEntity();
616           // Response returned is a json object described above
617           String jsonString = EntityUtils.toString(entity);
618           JSONParser jsonParser = new JSONParser();
619           JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
620           Boolean jobDone = (Boolean) jsonObject.get("done");
621           TranscriptionJobControl jc = database.findByJob(jobId);
622           if (jc != null) {
623             mpId = jc.getMediaPackageId();
624           }
625           if (jobDone) {
626             resultsArray = getTranscriptionResult(jsonObject);
627           }
628           logger.info("Recognitions job {} has been found, completed status {}", jobId, jobDone.toString());
629           EntityUtils.consume(entity);
630 
631           if (jobDone && resultsArray != null) {
632             transcriptionDone(mpId, jsonObject);
633             return true;
634           }
635           return false;
636         case HttpStatus.SC_NOT_FOUND: // 404
637           logger.warn("Job not found: {}", jobId);
638           break;
639         case HttpStatus.SC_SERVICE_UNAVAILABLE: // 503
640           logger.warn("Service unavailable returned, status: {}", code);
641           break;
642         default:
643           logger.warn("Error return status: {}.", code);
644           break;
645       }
646       throw new TranscriptionServiceException(
647               String.format("Could not check recognition job for media package %s, job id %s. Status returned: %d",
648                       mpId, jobId, code), code);
649     } catch (TranscriptionServiceException e) {
650       throw e;
651     } catch (Exception e) {
652       if (hasTranscriptionRequestExpired(jobId)) {
653         // Cancel the job and inform admin
654         cancelTranscription(jobId, "Google Transcription job canceled due to errors");
655         logger.info("Google Transcription job {} has been canceled. Email notification sent", jobId);
656       }
657       String msg = String.format("Exception when calling the recognitions endpoint for media package %s, job id %s",
658               mpId, jobId);
659       logger.warn(msg, e);
660       throw new TranscriptionServiceException(String.format(
661               "Exception when calling the recognitions endpoint for media package %s, job id %s", mpId, jobId), e);
662     } finally {
663       try {
664         httpClient.close();
665         if (response != null) {
666           response.close();
667         }
668       } catch (IOException e) {
669       }
670     }
671   }
672 
673   /**
674    * Get transcription result: GET /v1/operations/{name} Method mainly used by
675    * the REST endpoint
676    *
677    * @param jobId
678    * @return job details
679    * @throws org.opencastproject.transcription.api.TranscriptionServiceException
680    * @throws java.io.IOException
681    */
682   public String getTranscriptionResults(String jobId)
683           throws TranscriptionServiceException, IOException {
684     CloseableHttpClient httpClient = makeHttpClient();
685     CloseableHttpResponse response = null;
686     String token = getRefreshAccessToken();
687     if (token.equals(INVALID_TOKEN)) {
688       logger.warn("Invalid access token");
689       return "No results found";
690     }
691     try {
692       HttpGet httpGet = new HttpGet(UrlSupport.concat(GOOGLE_SPEECH_URL, RESULT_PATH, jobId));
693       logger.debug("Url to invoke Google speech service: {}", httpGet.getURI().toString());
694       // add the authorization header to the request;
695       httpGet.addHeader("Authorization", "Bearer " + token);
696       response = httpClient.execute(httpGet);
697       int code = response.getStatusLine().getStatusCode();
698 
699       switch (code) {
700         case HttpStatus.SC_OK: // 200
701           HttpEntity entity = response.getEntity();
702           logger.info("Retrieved details for transcription with job id: '{}'", jobId);
703           return EntityUtils.toString(entity);
704         default:
705           logger.warn("Error retrieving details for transcription with job id: '{}', return status: {}.", jobId, code);
706           break;
707       }
708     } catch (Exception e) {
709       logger.warn("Exception when calling the transcription service for job id: {}", jobId, e);
710       throw new TranscriptionServiceException(String.format(
711               "Exception when calling the transcription service for job id: %s", jobId), e);
712     } finally {
713       try {
714         httpClient.close();
715         if (response != null) {
716           response.close();
717         }
718       } catch (IOException e) {
719       }
720     }
721     return "No results found";
722   }
723 
724   private void saveResults(String jobId, JSONObject jsonObj) throws IOException {
725     JSONArray resultsArray = getTranscriptionResult(jsonObj);
726     if (resultsArray != null) {
727       // Save the results into a collection
728       workspace.putInCollection(TRANSCRIPT_COLLECTION, jobId + ".json",
729               new ByteArrayInputStream(jsonObj.toJSONString().getBytes()));
730     }
731   }
732 
733   @Override
734   public MediaPackageElement getGeneratedTranscription(String mpId, String jobId, MediaPackageElement.Type type)
735           throws TranscriptionServiceException {
736     try {
737       // If jobId is unknown, look for all jobs associated to that mpId
738       if (jobId == null || "null".equals(jobId)) {
739         jobId = null;
740         for (TranscriptionJobControl jc : database.findByMediaPackage(mpId)) {
741           if (TranscriptionJobControl.Status.Closed.name().equals(jc.getStatus())
742                   || TranscriptionJobControl.Status.TranscriptionComplete.name().equals(jc.getStatus())) {
743             jobId = jc.getTranscriptionJobId();
744           }
745         }
746       }
747 
748       if (jobId == null) {
749         throw new TranscriptionServiceException(
750                 "No completed or closed transcription job found in database for media package " + mpId);
751       }
752 
753       // Results already saved?
754       URI uri = workspace.getCollectionURI(TRANSCRIPT_COLLECTION, jobId + ".json");
755       try {
756         workspace.get(uri);
757       } catch (Exception e) {
758         try {
759           // Not saved yet so call the google speech service to get the results
760           getAndSaveJobResults(jobId);
761         } catch (IOException ex) {
762           logger.error("Unable to retrieve transcription job, error: {}", ex.toString());
763         }
764       }
765       MediaPackageElementBuilder builder = MediaPackageElementBuilderFactory.newInstance().newElementBuilder();
766       return builder.elementFromURI(uri, type,
767           new MediaPackageElementFlavor("captions", "google-speech-json"));
768     } catch (TranscriptionDatabaseException e) {
769       throw new TranscriptionServiceException("Job id not informed and could not find transcription", e);
770     }
771   }
772 
773   /**
774    * Get mediapackage transcription status
775    *
776    * @param mpId, mediapackage id
777    * @return transcription status
778    * @throws TranscriptionServiceException
779    */
780   public String getTranscriptionStatus(String mpId) throws TranscriptionServiceException {
781     try {
782       for (TranscriptionJobControl jc : database.findByMediaPackage(mpId)) {
783         return jc.getStatus();
784       }
785     } catch (TranscriptionDatabaseException e) {
786       throw new TranscriptionServiceException("Mediapackage id transcription status unknown", e);
787     }
788     return "Unknown";
789   }
790 
791   protected CloseableHttpClient makeHttpClient() throws IOException {
792     RequestConfig reqConfig = RequestConfig.custom()
793             .setConnectTimeout(CONNECTION_TIMEOUT)
794             .setSocketTimeout(SOCKET_TIMEOUT)
795             .setConnectionRequestTimeout(CONNECTION_TIMEOUT)
796             .build();
797     return HttpClients.custom().setDefaultRequestConfig(reqConfig).build();
798   }
799 
800   protected String refreshAccessToken(String clientId, String clientSecret, String refreshToken)
801           throws TranscriptionServiceException, IOException {
802     CloseableHttpClient httpClient = makeHttpClient();
803     CloseableHttpResponse response = null;
804 
805     try {
806       HttpPost httpPost = new HttpPost(tokenEndpoint + String.format(
807               "?client_id=%s&client_secret=%s&refresh_token=%s&grant_type=refresh_token",
808               clientId, clientSecret, refreshToken));
809       httpPost.addHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
810       response = httpClient.execute(httpPost);
811       int code = response.getStatusLine().getStatusCode();
812       String jsonString = EntityUtils.toString(response.getEntity());
813       JSONParser jsonParser = new JSONParser();
814       JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
815       switch (code) {
816         case HttpStatus.SC_OK: // 200
817           accessToken = (String) jsonObject.get(ACCESS_TOKEN_NAME);
818           long duration = (long) jsonObject.get(ACCESS_TOKEN_EXPIRY_NAME); // Duration in second
819           tokenExpiryTime = (System.currentTimeMillis() + (duration * 1000)); // time in millisecond
820           if (!INVALID_TOKEN.equals(accessToken)) {
821             logger.info("Google Cloud Service access token created");
822             return accessToken;
823           }
824           throw new TranscriptionServiceException(
825               String.format("Created token is invalid. Status returned: %d", code), code);
826         case HttpStatus.SC_BAD_REQUEST: // 400
827         case HttpStatus.SC_UNAUTHORIZED: // 401
828           String error = (String) jsonObject.get("error");
829           String errorDetails = (String) jsonObject.get("error_description");
830           logger.warn("Invalid argument returned, status: {}", code);
831           logger.warn("Unable to refresh Google Cloud Service token, error: {}, error details: {}",
832               error, errorDetails);
833           break;
834         default:
835           logger.warn("Invalid argument returned, status: {}", code);
836       }
837       throw new TranscriptionServiceException(
838               String.format("Could not create Google access token. Status returned: %d", code), code);
839     } catch (TranscriptionServiceException e) {
840       throw e;
841     } catch (Exception e) {
842       logger.warn("Unable to generate access token for Google Cloud Services");
843       return INVALID_TOKEN;
844     } finally {
845       try {
846         httpClient.close();
847         if (response != null) {
848           response.close();
849         }
850       } catch (IOException e) {
851       }
852     }
853   }
854 
855   protected String getRefreshAccessToken() throws TranscriptionServiceException, IOException {
856     // Check that token hasn't expired
857     if ((!INVALID_TOKEN.equals(accessToken))
858         && (System.currentTimeMillis() < (tokenExpiryTime - ACCESS_TOKEN_MINIMUM_TIME))) {
859       return accessToken;
860     }
861     return refreshAccessToken(clientId, clientSecret, clientToken);
862   }
863 
864   protected String uploadAudioFileToGoogleStorage(String mpId, Track track)
865           throws TranscriptionServiceException, IOException {
866     File audioFile;
867     String audioUrl = null;
868     String fileExtension;
869     int audioResponse;
870     CloseableHttpClient httpClientStorage = makeHttpClient();
871     GoogleSpeechTranscriptionServiceStorage storage = new GoogleSpeechTranscriptionServiceStorage();
872     try {
873       audioFile = workspace.get(track.getURI());
874       fileExtension = FilenameUtils.getExtension(audioFile.getName());
875       long fileSize = audioFile.length();
876       String contentType = track.getMimeType().toString();
877       String token = getRefreshAccessToken();
878       // Upload file to google cloud storage
879       audioResponse = storage.startUpload(httpClientStorage, storageBucket, mpId, fileExtension,
880               audioFile, String.valueOf(fileSize), contentType, token);
881       if (audioResponse == HttpStatus.SC_OK) {
882         audioUrl = String.format("gs://%s/%s.%s", storageBucket, mpId, fileExtension);
883         return audioUrl;
884       }
885       logger.error("Error when uploading audio to Google Storage, error code: {}", audioResponse);
886       return audioUrl;
887     } catch (Exception e) {
888       throw new TranscriptionServiceException("Error reading audio track", e);
889     }
890   }
891 
892   private JSONArray getTranscriptionResult(JSONObject jsonObj) {
893     JSONObject responseObj = (JSONObject) jsonObj.get("response");
894     JSONArray resultsArray = (JSONArray) responseObj.get("results");
895     return resultsArray;
896   }
897 
898   protected void deleteStorageFile(String mpId, String token) throws IOException {
899     CloseableHttpClient httpClientDel = makeHttpClient();
900     GoogleSpeechTranscriptionServiceStorage storage = new GoogleSpeechTranscriptionServiceStorage();
901     storage.deleteGoogleStorageFile(httpClientDel, storageBucket, mpId + "." + defaultEncoding, token);
902   }
903 
904   private void sendEmail(String subject, String body) {
905     if (toEmailAddress == null) {
906       logger.info("Skipping sending email notification. Message is {}.", body);
907       return;
908     }
909     try {
910       logger.debug("Sending e-mail notification to {}", toEmailAddress);
911       smtpService.send(toEmailAddress, String.format("%s (%s)", subject, clusterName), body);
912       logger.info("Sent e-mail notification to {}", toEmailAddress);
913     } catch (Exception e) {
914       logger.error("Could not send email: {}\n{}", subject, body, e);
915     }
916   }
917 
918   private void cancelTranscription(String jobId, String message) {
919     try {
920       database.updateJobControl(jobId, TranscriptionJobControl.Status.Canceled.name());
921       String mpId = database.findByJob(jobId).getMediaPackageId();
922       try {
923         // Delete file stored on Google storage
924         String token = getRefreshAccessToken();
925         deleteStorageFile(mpId, token);
926       } catch (Exception ex) {
927         logger.warn(String.format("could not delete file %s.%s from Google cloud storage", mpId, defaultEncoding), ex);
928       } finally {
929         // Send notification email
930         sendEmail("Transcription ERROR", String.format("%s(media package %s, job id %s).", message, mpId, jobId));
931       }
932     } catch (Exception e) {
933       logger.error(String.format("ERROR while deleting transcription job: %s", jobId), e);
934     }
935   }
936 
937   private boolean hasTranscriptionRequestExpired(String jobId) {
938     try {
939       // set a time limit based on video duration and maximum processing time
940       if (database.findByJob(jobId).getDateCreated().getTime() + database.findByJob(jobId).getTrackDuration()
941               + (completionCheckBuffer + maxProcessingSeconds) * 1000 < System.currentTimeMillis()) {
942         return true;
943       }
944     } catch (Exception e) {
945       logger.error(String.format("ERROR while calculating transcription request expiration for job: %s", jobId), e);
946       // to avoid perpetual non-expired state, transcription is set as expired
947       return true;
948     }
949     return false;
950   }
951 
952   private long getRemainingTranscriptionExpireTimeInMin(String jobId) {
953     try {
954       long expiredTime = (database.findByJob(jobId).getDateCreated().getTime()
955           + database.findByJob(jobId).getTrackDuration()
956           + (completionCheckBuffer + maxProcessingSeconds) * 1000)
957           - (System.currentTimeMillis());
958       // Transcription has expired
959       if (expiredTime < 0) {
960         expiredTime = 0;
961       }
962       return TimeUnit.MILLISECONDS.toMinutes(expiredTime);
963     } catch (Exception e) {
964       logger.error("Unable to calculate remaining transcription expired time for transcription job {}", jobId);
965     }
966     return 0;
967   }
968 
969   @Reference
970   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
971     this.serviceRegistry = serviceRegistry;
972   }
973 
974   @Reference
975   public void setSecurityService(SecurityService securityService) {
976     this.securityService = securityService;
977   }
978 
979   @Reference
980   public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
981     this.userDirectoryService = userDirectoryService;
982   }
983 
984   @Reference
985   public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
986     this.organizationDirectoryService = organizationDirectoryService;
987   }
988 
989   @Reference
990   public void setSmtpService(SmtpService service) {
991     this.smtpService = service;
992   }
993 
994   @Reference
995   public void setWorkspace(Workspace ws) {
996     this.workspace = ws;
997   }
998 
999   @Reference
1000   public void setWorkingFileRepository(WorkingFileRepository wfr) {
1001     this.wfr = wfr;
1002   }
1003 
1004   @Reference
1005   public void setDatabase(TranscriptionDatabase service) {
1006     this.database = service;
1007   }
1008 
1009   @Reference
1010   public void setAssetManager(AssetManager service) {
1011     this.assetManager = service;
1012   }
1013 
1014   @Reference
1015   public void setWorkflowService(WorkflowService service) {
1016     this.workflowService = service;
1017   }
1018 
1019   @Override
1020   protected ServiceRegistry getServiceRegistry() {
1021     return serviceRegistry;
1022   }
1023 
1024   @Override
1025   protected SecurityService getSecurityService() {
1026     return securityService;
1027   }
1028 
1029   @Override
1030   protected UserDirectoryService getUserDirectoryService() {
1031     return userDirectoryService;
1032   }
1033 
1034   @Override
1035   protected OrganizationDirectoryService getOrganizationDirectoryService() {
1036     return organizationDirectoryService;
1037   }
1038 
1039   // Only used by unit tests!
1040   void setWfUtil(Workflows wfUtil) {
1041     this.wfUtil = wfUtil;
1042   }
1043 
1044   class WorkflowDispatcher implements Runnable {
1045 
1046     /**
1047      * {@inheritDoc}
1048      *
1049      * @see java.lang.Thread#run()
1050      */
1051     @Override
1052     public void run() {
1053       logger.debug("WorkflowDispatcher waking up...");
1054 
1055       try {
1056         // Find jobs that are in progress and jobs that had transcription complete
1057 
1058         long providerId;
1059         TranscriptionProviderControl providerInfo = database.findIdByProvider(PROVIDER);
1060         if (providerInfo != null) {
1061           providerId = providerInfo.getId();
1062         } else {
1063           logger.debug("No jobs yet for provider {}", PROVIDER);
1064           return;
1065         }
1066 
1067         List<TranscriptionJobControl> jobs = database.findByStatus(TranscriptionJobControl.Status.InProgress.name(),
1068                 TranscriptionJobControl.Status.TranscriptionComplete.name());
1069         for (TranscriptionJobControl j : jobs) {
1070 
1071           // Don't process jobs for other services
1072           if (j.getProviderId() != providerId) {
1073             continue;
1074           }
1075 
1076           String mpId = j.getMediaPackageId();
1077           String jobId = j.getTranscriptionJobId();
1078 
1079           // If the job in progress, check if it should already have finished.
1080           if (TranscriptionJobControl.Status.InProgress.name().equals(j.getStatus())) {
1081             // If job should already have been completed, try to get the results. Consider a buffer factor so that we
1082             // don't try it too early. Results normally should be ready 1/3 of the time of the track duration.
1083             // The completionCheckBuffer can be used to delay results check.
1084             if (j.getDateCreated().getTime() + (j.getTrackDuration() / 3) + completionCheckBuffer * 1000 < System
1085                     .currentTimeMillis()) {
1086               try {
1087                 if (!getAndSaveJobResults(jobId)) {
1088                   // Job still running, not finished, so check if it should have finished more than N seconds ago
1089                   if (hasTranscriptionRequestExpired(jobId)) {
1090                     // Processing for too long, mark job as cancelled and don't check anymore
1091                     database.updateJobControl(jobId, TranscriptionJobControl.Status.Canceled.name());
1092                     // Delete file stored on Google storage
1093                     String token = getRefreshAccessToken();
1094                     deleteStorageFile(mpId, token);
1095                     // Send notification email
1096                     sendEmail(TRANSCRIPTION_ERROR, String.format(
1097                         "Transcription job was in processing state for too long and was marked "
1098                             + "as cancelled (media package %s, job id %s).",
1099                         mpId, jobId));
1100                   }
1101                   // else Job still running, not finished
1102                   continue;
1103                 }
1104               } catch (TranscriptionServiceException e) {
1105                 if (e.getCode() == 404) {
1106                   // Job not found there, update job state to canceled
1107                   database.updateJobControl(jobId, TranscriptionJobControl.Status.Canceled.name());
1108                   // Send notification email
1109                   sendEmail(TRANSCRIPTION_ERROR,
1110                           String.format("Transcription job was not found (media package %s, job id %s).", mpId, jobId));
1111                 }
1112                 continue; // Skip this one, exception was already logged
1113               } catch (IOException ex) {
1114                 logger.error("Transcription job not found, error: {}", ex.toString());
1115               }
1116             } else {
1117               continue; // Not time to check yet
1118             }
1119           }
1120 
1121           // Jobs that get here have state TranscriptionCompleted or had an IOException]
1122           try {
1123 
1124             // Apply workflow to attach transcripts
1125             Map<String, String> params = new HashMap<String, String>();
1126             params.put(TRANSCRIPTION_JOB_ID_KEY, jobId);
1127             String wfId = startWorkflow(mpId, workflowDefinitionId, jobId, params);
1128             if (wfId == null) {
1129               logger.warn("Attach transcription workflow could NOT be scheduled for mp {}, google speech job {}",
1130                   mpId, jobId);
1131               continue;
1132             }
1133             // Update state in the database
1134             database.updateJobControl(jobId, TranscriptionJobControl.Status.Closed.name());
1135             logger.info("Attach transcription workflow {} scheduled for mp {}, google speech job {}",
1136                     wfId, mpId, jobId);
1137           } catch (Exception e) {
1138             logger.warn("Attach transcription workflow could NOT be scheduled for mp {}, google speech job {}, {}: {}",
1139                     mpId, jobId, e.getClass().getName(), e.getMessage());
1140           }
1141         }
1142       } catch (TranscriptionDatabaseException e) {
1143         logger.warn("Could not read transcription job control database: {}", e.getMessage());
1144       }
1145     }
1146   }
1147 
1148   private String startWorkflow(String mpId, String wfDefId, String jobId, Map<String, String> params) {
1149     DefaultOrganization defaultOrg = new DefaultOrganization();
1150     securityService.setOrganization(defaultOrg);
1151     securityService.setUser(SecurityUtil.createSystemUser(systemAccount, defaultOrg));
1152 
1153     // Find the episode
1154     final AQueryBuilder q = assetManager.createQuery();
1155     final AResult r = q.select(q.snapshot()).where(q.mediaPackageId(mpId).and(q.version().isLatest())).run();
1156     if (r.getSize() == 0) {
1157       if (!hasTranscriptionRequestExpired(jobId)) {
1158         // Media package not archived but still within completion time? Skip until next time.
1159         logger.warn("Media package {} has not been archived yet or has been deleted. Will keep trying for {} "
1160             + "more minutes before cancelling transcription job {}.",
1161             mpId, getRemainingTranscriptionExpireTimeInMin(jobId), jobId);
1162       } else {
1163         // Close transcription job and email admin
1164         cancelTranscription(jobId, " Google Transcription job canceled, archived media package not found");
1165         logger.info("Google Transcription job {} has been canceled. Email notification sent", jobId);
1166       }
1167       return null;
1168     }
1169 
1170     String org = Enrichments.enrich(r).getSnapshots().stream().findFirst().get().getOrganizationId();
1171     Organization organization = null;
1172     try {
1173       organization = organizationDirectoryService.getOrganization(org);
1174       if (organization == null) {
1175         logger.warn("Media package {} has an unknown organization {}.", mpId, org);
1176         return null;
1177       }
1178     } catch (NotFoundException e) {
1179       logger.warn("Organization {} not found for media package {}.", org, mpId);
1180       return null;
1181     }
1182     securityService.setOrganization(organization);
1183 
1184     try {
1185       WorkflowDefinition wfDef = workflowService.getWorkflowDefinitionById(wfDefId);
1186       Workflows workflows;
1187       if (wfUtil != null) {
1188         workflows = wfUtil;
1189       } else {
1190         workflows = new Workflows(assetManager, workflowService);
1191       }
1192       Set<String> mpIds = new HashSet<String>();
1193       mpIds.add(mpId);
1194       List<WorkflowInstance> wfList = workflows
1195               .applyWorkflowToLatestVersion(mpIds, ConfiguredWorkflow.workflow(wfDef, params));
1196       return wfList.size() > 0 ? Long.toString(wfList.get(0).getId()) : null;
1197     } catch (NotFoundException | WorkflowDatabaseException e) {
1198       logger.warn("Could not get workflow definition: {}", wfDefId);
1199     }
1200 
1201     return null;
1202   }
1203 
1204   class ResultsFileCleanup implements Runnable {
1205 
1206     @Override
1207     public void run() {
1208       logger.info("ResultsFileCleanup waking up...");
1209       try {
1210         // Cleans up results files older than CLEANUP_RESULT_FILES_DAYS days
1211         wfr.cleanupOldFilesFromCollection(TRANSCRIPT_COLLECTION, cleanupResultDays);
1212       } catch (IOException e) {
1213         logger.warn("Could not cleanup old transcript results files", e);
1214       }
1215     }
1216   }
1217 
1218 }