View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.transcription.googlespeech;
22  
23  import org.opencastproject.assetmanager.api.AssetManager;
24  import org.opencastproject.assetmanager.api.Snapshot;
25  import org.opencastproject.assetmanager.util.Workflows;
26  import org.opencastproject.job.api.AbstractJobProducer;
27  import org.opencastproject.job.api.Job;
28  import org.opencastproject.kernel.mail.SmtpService;
29  import org.opencastproject.mediapackage.MediaPackageElement;
30  import org.opencastproject.mediapackage.MediaPackageElementBuilder;
31  import org.opencastproject.mediapackage.MediaPackageElementBuilderFactory;
32  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
33  import org.opencastproject.mediapackage.MediaPackageElementParser;
34  import org.opencastproject.mediapackage.MediaPackageException;
35  import org.opencastproject.mediapackage.Track;
36  import org.opencastproject.security.api.DefaultOrganization;
37  import org.opencastproject.security.api.Organization;
38  import org.opencastproject.security.api.OrganizationDirectoryService;
39  import org.opencastproject.security.api.SecurityService;
40  import org.opencastproject.security.api.UserDirectoryService;
41  import org.opencastproject.security.util.SecurityUtil;
42  import org.opencastproject.serviceregistry.api.ServiceRegistry;
43  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
44  import org.opencastproject.systems.OpencastConstants;
45  import org.opencastproject.transcription.api.TranscriptionService;
46  import org.opencastproject.transcription.api.TranscriptionServiceException;
47  import org.opencastproject.transcription.persistence.TranscriptionDatabase;
48  import org.opencastproject.transcription.persistence.TranscriptionDatabaseException;
49  import org.opencastproject.transcription.persistence.TranscriptionJobControl;
50  import org.opencastproject.transcription.persistence.TranscriptionProviderControl;
51  import org.opencastproject.util.NotFoundException;
52  import org.opencastproject.util.OsgiUtil;
53  import org.opencastproject.util.UrlSupport;
54  import org.opencastproject.workflow.api.ConfiguredWorkflow;
55  import org.opencastproject.workflow.api.WorkflowDatabaseException;
56  import org.opencastproject.workflow.api.WorkflowDefinition;
57  import org.opencastproject.workflow.api.WorkflowInstance;
58  import org.opencastproject.workflow.api.WorkflowService;
59  import org.opencastproject.workingfilerepository.api.WorkingFileRepository;
60  import org.opencastproject.workspace.api.Workspace;
61  
62  import org.apache.commons.io.FilenameUtils;
63  import org.apache.commons.lang3.StringUtils;
64  import org.apache.http.HttpEntity;
65  import org.apache.http.HttpHeaders;
66  import org.apache.http.HttpStatus;
67  import org.apache.http.client.config.RequestConfig;
68  import org.apache.http.client.methods.CloseableHttpResponse;
69  import org.apache.http.client.methods.HttpGet;
70  import org.apache.http.client.methods.HttpPost;
71  import org.apache.http.entity.StringEntity;
72  import org.apache.http.impl.client.CloseableHttpClient;
73  import org.apache.http.impl.client.HttpClients;
74  import org.apache.http.util.EntityUtils;
75  import org.json.simple.JSONArray;
76  import org.json.simple.JSONObject;
77  import org.json.simple.parser.JSONParser;
78  import org.osgi.service.component.ComponentContext;
79  import org.osgi.service.component.annotations.Activate;
80  import org.osgi.service.component.annotations.Component;
81  import org.osgi.service.component.annotations.Reference;
82  import org.slf4j.Logger;
83  import org.slf4j.LoggerFactory;
84  
85  import java.io.ByteArrayInputStream;
86  import java.io.File;
87  import java.io.IOException;
88  import java.net.URI;
89  import java.util.Arrays;
90  import java.util.HashMap;
91  import java.util.HashSet;
92  import java.util.List;
93  import java.util.Map;
94  import java.util.Optional;
95  import java.util.Set;
96  import java.util.concurrent.Executors;
97  import java.util.concurrent.ScheduledExecutorService;
98  import java.util.concurrent.TimeUnit;
99  
100 @Component(
101     immediate = true,
102     service = { TranscriptionService.class,GoogleSpeechTranscriptionService.class },
103     property = {
104         "service.description=Google Speech Transcription Service",
105         "provider=google.speech"
106     }
107 )
108 public class GoogleSpeechTranscriptionService extends AbstractJobProducer implements TranscriptionService {
109 
110   /**
111    * The logger
112    */
113   private static final Logger logger = LoggerFactory.getLogger(GoogleSpeechTranscriptionService.class);
114 
115   private static final String JOB_TYPE = "org.opencastproject.transcription.googlespeech";
116 
117   static final String TRANSCRIPT_COLLECTION = "transcripts";
118   static final String TRANSCRIPTION_ERROR = "Transcription ERROR";
119   static final String TRANSCRIPTION_JOB_ID_KEY = "transcriptionJobId";
120   static final String ACCESS_TOKEN_NAME = "access_token";
121   static final String ACCESS_TOKEN_EXPIRY_NAME = "expires_in";
122   private static final int CONNECTION_TIMEOUT = 60000; // ms, 1 minute
123   private static final int SOCKET_TIMEOUT = 60000; // ms, 1 minute
124   private static final int ACCESS_TOKEN_MINIMUM_TIME = 60000; // ms , 1 minute
125   // Default workflow to attach transcription results to mediapackage
126   public static final String DEFAULT_WF_DEF = "google-speech-attach-transcripts";
127   private static final long DEFAULT_COMPLETION_BUFFER = 300; // in seconds, default is 5 minutes
128   private static final long DEFAULT_DISPATCH_INTERVAL = 60; // in seconds, default is 1 minute
129   private static final long DEFAULT_MAX_PROCESSING_TIME = 5 * 60 * 60; // in seconds, default is 5 hours
130   // Cleans up results files that are older than 7 days
131   private static final int DEFAULT_CLEANUP_RESULTS_DAYS = 7;
132   private static final boolean DEFAULT_PROFANITY_FILTER = false;
133   private static final String DEFAULT_LANGUAGE = "en-US";
134   private static final boolean DEFAULT_ENABLE_PUNCTUATION = false;
135   private static final String DEFAULT_MODEL = "default";
136   private static final String GOOGLE_SPEECH_URL = "https://speech.googleapis.com/v1";
137   private static final String GOOGLE_AUTH2_URL = "https://www.googleapis.com/oauth2/v4/token";
138   private static final String REQUEST_METHOD = "speech:longrunningrecognize";
139   private static final String RESULT_PATH = "operations";
140   private static final String INVALID_TOKEN = "-1";
141   private static final String PROVIDER = "Google Speech";
142   private static final String DEFAULT_ENCODING = "flac";
143 
144   // Cluster name
145   private String clusterName = "";
146 
147   /**
148    * Service dependencies
149    */
150   private ServiceRegistry serviceRegistry;
151   private SecurityService securityService;
152   private UserDirectoryService userDirectoryService;
153   private OrganizationDirectoryService organizationDirectoryService;
154   private Workspace workspace;
155   private TranscriptionDatabase database;
156   private AssetManager assetManager;
157   private WorkflowService workflowService;
158   private WorkingFileRepository wfr;
159   private SmtpService smtpService;
160 
161   // Only used by unit tests!
162   private Workflows wfUtil;
163 
164   private enum Operation {
165     StartTranscription
166   }
167 
168   /**
169    * Service configuration options
170    */
171   public static final String ENABLED_CONFIG = "enabled";
172   public static final String GOOGLE_SPEECH_LANGUAGE = "google.speech.language";
173   public static final String PROFANITY_FILTER = "google.speech.profanity.filter";
174   public static final String ENABLE_PUNCTUATION = "google.speech.transcription.punctuation";
175   public static final String TRANSCRIPTION_MODEL = "google.speech.transcription.model";
176   public static final String WORKFLOW_CONFIG = "workflow";
177   public static final String DISPATCH_WORKFLOW_INTERVAL_CONFIG = "workflow.dispatch.interval";
178   public static final String COMPLETION_CHECK_BUFFER_CONFIG = "completion.check.buffer";
179   public static final String MAX_PROCESSING_TIME_CONFIG = "max.processing.time";
180   public static final String NOTIFICATION_EMAIL_CONFIG = "notification.email";
181   public static final String CLEANUP_RESULTS_DAYS_CONFIG = "cleanup.results.days";
182   public static final String GOOGLE_CLOUD_CLIENT_ID = "google.cloud.client.id";
183   public static final String GOOGLE_CLOUD_CLIENT_SECRET = "google.cloud.client.secret";
184   public static final String GOOGLE_CLOUD_REFRESH_TOKEN = "google.cloud.refresh.token";
185   public static final String GOOGLE_CLOUD_BUCKET = "google.cloud.storage.bucket";
186   public static final String GOOGLE_CLOUD_TOKEN_ENDPOINT_URL = "google.cloud.token.endpoint.url";
187   public static final String ENCODING_EXTENSION = "encoding.extension";
188 
189   /**
190    * Service configuration values
191    */
192   private boolean enabled = false; // Disabled by default
193   private boolean profanityFilter = DEFAULT_PROFANITY_FILTER;
194   private boolean enablePunctuation = DEFAULT_ENABLE_PUNCTUATION;
195   private String model = DEFAULT_MODEL;
196   private String defaultLanguage = DEFAULT_LANGUAGE;
197   private String defaultEncoding = DEFAULT_ENCODING;
198   private String workflowDefinitionId = DEFAULT_WF_DEF;
199   private long workflowDispatchInterval = DEFAULT_DISPATCH_INTERVAL;
200   private long completionCheckBuffer = DEFAULT_COMPLETION_BUFFER;
201   private long maxProcessingSeconds = DEFAULT_MAX_PROCESSING_TIME;
202   private String toEmailAddress;
203   private int cleanupResultDays = DEFAULT_CLEANUP_RESULTS_DAYS;
204   private String clientId;
205   private String clientSecret;
206   private String clientToken;
207   private String accessToken = INVALID_TOKEN;
208   private String tokenEndpoint = GOOGLE_AUTH2_URL;
209   private String storageBucket;
210   private long tokenExpiryTime = 0;
211   private String systemAccount;
212   private ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);
213 
214   public GoogleSpeechTranscriptionService() {
215     super(JOB_TYPE);
216   }
217 
218   @Activate
219   public void activate(ComponentContext cc) {
220     // Has this service been enabled?
221     enabled = OsgiUtil.getOptCfgAsBoolean(cc.getProperties(), ENABLED_CONFIG).get();
222     if (!enabled) {
223       logger.info("Service disabled. If you want to enable it, please update the service configuration.");
224       return;
225     }
226     // Mandatory API access properties
227     clientId = OsgiUtil.getComponentContextProperty(cc, GOOGLE_CLOUD_CLIENT_ID);
228     clientSecret = OsgiUtil.getComponentContextProperty(cc, GOOGLE_CLOUD_CLIENT_SECRET);
229     clientToken = OsgiUtil.getComponentContextProperty(cc, GOOGLE_CLOUD_REFRESH_TOKEN);
230     storageBucket = OsgiUtil.getComponentContextProperty(cc, GOOGLE_CLOUD_BUCKET);
231 
232     // access token endpoint
233     Optional<String> tokenOpt = OsgiUtil.getOptCfg(cc.getProperties(), GOOGLE_CLOUD_TOKEN_ENDPOINT_URL);
234     if (tokenOpt.isPresent()) {
235       tokenEndpoint = tokenOpt.get();
236       logger.info("Access token endpoint is set to {}", tokenEndpoint);
237     } else {
238       logger.info("Default access token endpoint will be used");
239     }
240 
241     // profanity filter to use
242     Optional<String> profanityOpt = OsgiUtil.getOptCfg(cc.getProperties(), PROFANITY_FILTER);
243     if (profanityOpt.isPresent()) {
244       profanityFilter = Boolean.parseBoolean(profanityOpt.get());
245       logger.info("Profanity filter is set to {}", profanityFilter);
246     } else {
247       logger.info("Default profanity filter will be used");
248     }
249     // Language model to be used
250     Optional<String> languageOpt = OsgiUtil.getOptCfg(cc.getProperties(), GOOGLE_SPEECH_LANGUAGE);
251     if (languageOpt.isPresent()) {
252       defaultLanguage = languageOpt.get();
253       logger.info("Language used is {}", defaultLanguage);
254     } else {
255       logger.info("Default language will be used");
256     }
257     // Enable punctuation or not
258     Optional<String> punctuationOpt = OsgiUtil.getOptCfg(cc.getProperties(), ENABLE_PUNCTUATION);
259     if (punctuationOpt.isPresent()) {
260       enablePunctuation = Boolean.parseBoolean(punctuationOpt.get());
261       logger.info("Enable punctuation is set to {}", enablePunctuation);
262     } else {
263       logger.info("Default punctuation setting will be used");
264     }
265     // Transription model to be used
266     Optional<String> transModel = OsgiUtil.getOptCfg(cc.getProperties(), TRANSCRIPTION_MODEL);
267     if (transModel.isPresent()) {
268       model = transModel.get();
269       logger.info("Transcription model used is {}", model);
270     } else {
271       logger.info("Default Transcription model will be used");
272     }
273     // Encoding to be used
274     Optional<String> encodingOpt = OsgiUtil.getOptCfg(cc.getProperties(), ENCODING_EXTENSION);
275     if (encodingOpt.isPresent()) {
276       defaultEncoding = encodingOpt.get();
277       logger.info("Encoding used is {}", defaultEncoding);
278     } else {
279       logger.info("Default encoding will be used");
280     }
281 
282     // Workflow to execute when getting callback (optional, with default)
283     Optional<String> wfOpt = OsgiUtil.getOptCfg(cc.getProperties(), WORKFLOW_CONFIG);
284     if (wfOpt.isPresent()) {
285       workflowDefinitionId = wfOpt.get();
286     }
287     logger.info("Workflow definition is {}", workflowDefinitionId);
288     // Interval to check for completed transcription jobs and start workflows to attach transcripts
289     Optional<String> intervalOpt = OsgiUtil.getOptCfg(cc.getProperties(), DISPATCH_WORKFLOW_INTERVAL_CONFIG);
290     if (intervalOpt.isPresent()) {
291       try {
292         workflowDispatchInterval = Long.parseLong(intervalOpt.get());
293       } catch (NumberFormatException e) {
294         // Use default
295         logger.warn("Invalid configuration for Workflow dispatch interval. Default used instead: {}",
296             workflowDispatchInterval);
297       }
298     }
299     logger.info("Workflow dispatch interval is {} seconds", workflowDispatchInterval);
300     // How long to wait after a transcription is supposed to finish before starting checking
301     Optional<String> bufferOpt = OsgiUtil.getOptCfg(cc.getProperties(), COMPLETION_CHECK_BUFFER_CONFIG);
302     if (bufferOpt.isPresent()) {
303       try {
304         completionCheckBuffer = Long.parseLong(bufferOpt.get());
305       } catch (NumberFormatException e) {
306         // Use default
307         logger.warn("Invalid configuration for {} : {}. Default used instead: {}",
308                 new Object[]{COMPLETION_CHECK_BUFFER_CONFIG, bufferOpt.get(), completionCheckBuffer});
309       }
310     }
311     logger.info("Completion check buffer is {} seconds", completionCheckBuffer);
312     // How long to wait after a transcription is supposed to finish before marking the job as canceled in the db
313     Optional<String> maxProcessingOpt = OsgiUtil.getOptCfg(cc.getProperties(), MAX_PROCESSING_TIME_CONFIG);
314     if (maxProcessingOpt.isPresent()) {
315       try {
316         maxProcessingSeconds = Long.parseLong(maxProcessingOpt.get());
317       } catch (NumberFormatException e) {
318         // Use default
319         logger.warn("Invalid configuration for maximum processing time. Default used instead: {}",
320             maxProcessingSeconds);
321       }
322     }
323     logger.info("Maximum time a job is checked after it should have ended is {} seconds", maxProcessingSeconds);
324     // How long to keep result files in the working file repository
325     Optional<String> cleaupOpt = OsgiUtil.getOptCfg(cc.getProperties(), CLEANUP_RESULTS_DAYS_CONFIG);
326     if (cleaupOpt.isPresent()) {
327       try {
328         cleanupResultDays = Integer.parseInt(cleaupOpt.get());
329       } catch (NumberFormatException e) {
330         // Use default
331         logger.warn("Invalid configuration for clean up days. Default used instead: {}", cleanupResultDays);
332       }
333     }
334     logger.info("Cleanup result files after {} days", cleanupResultDays);
335 
336     systemAccount = OsgiUtil.getContextProperty(cc, OpencastConstants.DIGEST_USER_PROPERTY);
337 
338     // Schedule the workflow dispatching, starting in 2 minutes
339     scheduledExecutor.scheduleWithFixedDelay(new WorkflowDispatcher(), 120, workflowDispatchInterval,
340             TimeUnit.SECONDS);
341 
342     // Schedule the cleanup of old results jobs from the collection in the wfr once a day
343     scheduledExecutor.scheduleWithFixedDelay(new ResultsFileCleanup(), 1, 1, TimeUnit.DAYS);
344 
345     // Notification email passed in this service configuration?
346     Optional<String> optTo = OsgiUtil.getOptCfg(cc.getProperties(), NOTIFICATION_EMAIL_CONFIG);
347     if (optTo.isPresent()) {
348       toEmailAddress = optTo.get();
349     } else {
350       // Use admin email informed in custom.properties
351       optTo = OsgiUtil.getOptContextProperty(cc, OpencastConstants.ADMIN_EMAIL_PROPERTY);
352       if (optTo.isPresent()) {
353         toEmailAddress = optTo.get();
354       }
355     }
356     if (toEmailAddress != null) {
357       logger.info("Notification email set to {}", toEmailAddress);
358     } else {
359       logger.warn("Email notification disabled");
360     }
361 
362     Optional<String> optCluster = OsgiUtil.getOptContextProperty(cc, OpencastConstants.ENVIRONMENT_NAME_PROPERTY);
363     if (optCluster.isPresent()) {
364       clusterName = optCluster.get();
365     }
366     logger.info("Environment name is {}", clusterName);
367 
368     logger.info("Activated!");
369   }
370 
371   @Override
372   public Job startTranscription(String mpId, Track track, String... args) throws TranscriptionServiceException {
373     if (!enabled) {
374       throw new TranscriptionServiceException(
375               "This service is disabled. If you want to enable it, please update the service configuration.");
376     }
377 
378     if (args.length == 0) {
379       throw new IllegalArgumentException("Additional language argument is required.");
380     }
381 
382     try {
383       return serviceRegistry.createJob(JOB_TYPE, Operation.StartTranscription.name(),
384               Arrays.asList(mpId, MediaPackageElementParser.getAsXml(track), args[0]));
385     } catch (ServiceRegistryException e) {
386       throw new TranscriptionServiceException("Unable to create a job", e);
387     } catch (MediaPackageException e) {
388       throw new TranscriptionServiceException("Invalid track " + track.toString(), e);
389     }
390   }
391 
392   @Override
393   public Job startTranscription(String mpId, Track track) throws TranscriptionServiceException {
394     throw new UnsupportedOperationException("Not supported.");
395   }
396 
397   @Override
398   public void transcriptionDone(String mpId, Object obj) throws TranscriptionServiceException {
399     JSONObject jsonObj = null;
400     String jobId = null;
401     String token = INVALID_TOKEN;
402     try {
403       token = getRefreshAccessToken();
404     } catch (IOException ex) {
405       logger.error("Unable to create access token, error: {}", ex.toString());
406     }
407     if (token.equals(INVALID_TOKEN)) {
408       throw new TranscriptionServiceException("Invalid access token");
409     }
410     try {
411       jsonObj = (JSONObject) obj;
412       jobId = (String) jsonObj.get("name");
413       logger.info("Transcription done for mpId {}, jobId {}", mpId, jobId);
414       JSONArray resultsArray = getTranscriptionResult(jsonObj);
415 
416       // Update state in database
417       // If there's an optimistic lock exception here, it's ok because the workflow dispatcher
418       // may be doing the same thing
419       database.updateJobControl(jobId, TranscriptionJobControl.Status.TranscriptionComplete.name());
420 
421       // Delete audio file from Google storage
422       deleteStorageFile(mpId, token);
423 
424       // Save results in file system if they exist
425       if (resultsArray != null) {
426         saveResults(jobId, jsonObj);
427       }
428     } catch (IOException e) {
429       if (jsonObj == null) {
430         logger.warn("Could not save transcription results file for mpId {}, jobId {}: null",
431                 mpId, jobId);
432       } else {
433         logger.warn("Could not save transcription results file for mpId {}, jobId {}: {}",
434                 mpId, jobId, jsonObj.toJSONString());
435       }
436       throw new TranscriptionServiceException("Could not save transcription results file", e);
437     } catch (TranscriptionDatabaseException e) {
438       logger.warn("Transcription results file were saved but state in db not updated for mpId {}, jobId {}", mpId,
439               jobId);
440       throw new TranscriptionServiceException("Could not update transcription job control db", e);
441     }
442   }
443 
444   @Override
445   public String getLanguage() {
446     return defaultLanguage;
447   }
448 
449   @Override
450   public Map<String, Object> getReturnValues(String mpId, String jobId) throws TranscriptionServiceException {
451     throw new TranscriptionServiceException("Method not implemented");
452   }
453 
454   @Override
455   public void transcriptionError(String mpId, Object obj) throws TranscriptionServiceException {
456     JSONObject jsonObj = null;
457     String jobId = null;
458     try {
459       jsonObj = (JSONObject) obj;
460       jobId = (String) jsonObj.get("name");
461       // Update state in database
462       database.updateJobControl(jobId, TranscriptionJobControl.Status.Error.name());
463       TranscriptionJobControl jobControl = database.findByJob(jobId);
464       logger.warn("Error received for media package {}, job id {}",
465               jobControl.getMediaPackageId(), jobId);
466       // Send notification email
467       sendEmail(TRANSCRIPTION_ERROR,
468               String.format("There was a transcription error for for media package %s, job id %s.",
469                       jobControl.getMediaPackageId(), jobId));
470     } catch (TranscriptionDatabaseException e) {
471       logger.warn("Transcription error. State in db could not be updated to error for mpId {}, jobId {}", mpId, jobId);
472       throw new TranscriptionServiceException("Could not update transcription job control db", e);
473     }
474   }
475 
476   @Override
477   protected String process(Job job) throws Exception {
478     Operation op = null;
479     String operation = job.getOperation();
480     List<String> arguments = job.getArguments();
481     String result = "";
482     op = Operation.valueOf(operation);
483     switch (op) {
484       case StartTranscription:
485         String mpId = arguments.get(0);
486         Track track = (Track) MediaPackageElementParser.getFromXml(arguments.get(1));
487         String languageCode = arguments.get(2);
488         createRecognitionsJob(mpId, track, languageCode);
489         break;
490       default:
491         throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
492     }
493     return result;
494   }
495 
496   /**
497    * Asynchronous Requests and Responses call to Google Speech API
498    * https://cloud.google.com/speech-to-text/docs/basics
499    */
500   void createRecognitionsJob(String mpId, Track track, String languageCode)
501           throws TranscriptionServiceException, IOException {
502     // Use default defaultlanguage if not set by workflow
503     if (StringUtils.isBlank(languageCode)) {
504       languageCode = defaultLanguage;
505     }
506     String audioUrl;
507     audioUrl = uploadAudioFileToGoogleStorage(mpId, track);
508     CloseableHttpClient httpClient = makeHttpClient();
509     CloseableHttpResponse response = null;
510     String token = getRefreshAccessToken();
511     if (token.equals(INVALID_TOKEN) || audioUrl == null) {
512       throw new TranscriptionServiceException("Could not create recognition job. Audio file or access token invalid");
513     }
514 
515     // Create json for configuration and audio file 
516     JSONObject configValues = new JSONObject();
517     JSONObject audioValues = new JSONObject();
518     JSONObject container = new JSONObject();
519     configValues.put("languageCode", languageCode);
520     configValues.put("enableWordTimeOffsets", true);
521     configValues.put("profanityFilter", profanityFilter);
522     configValues.put("enableAutomaticPunctuation", enablePunctuation);
523     configValues.put("model", model);
524     audioValues.put("uri", audioUrl);
525     container.put("config", configValues);
526     container.put("audio", audioValues);
527 
528     try {
529       HttpPost httpPost = new HttpPost(UrlSupport.concat(GOOGLE_SPEECH_URL, REQUEST_METHOD));
530       logger.debug("Url to invoke Google speech service: {}", httpPost.getURI().toString());
531       StringEntity params = new StringEntity(container.toJSONString());
532       httpPost.addHeader("Authorization", "Bearer " + token); // add the authorization header to the request;
533       httpPost.addHeader(HttpHeaders.CONTENT_TYPE, "application/json; charset=utf-8");
534       httpPost.setEntity(params);
535       response = httpClient.execute(httpPost);
536       int code = response.getStatusLine().getStatusCode();
537       HttpEntity entity = response.getEntity();
538       String jsonString = EntityUtils.toString(response.getEntity());
539       JSONParser jsonParser = new JSONParser();
540       JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
541 
542       switch (code) {
543         case HttpStatus.SC_OK: // 200
544           logger.info("Recognitions job has been successfully created");
545 
546           /**
547            * Response returned is a json object: { "name":
548            * "7612202767953098924", "metadata": { "@type":
549            * "type.googleapis.com/google.cloud.speech.v1.LongRunningRecognizeMetadata",
550            * "progressPercent": 90, "startTime": "2017-07-20T16:36:55.033650Z",
551            * "lastUpdateTime": "2017-07-20T16:37:17.158630Z" } }
552            */
553           String jobId = (String) jsonObject.get("name");
554           logger.info(
555                   "Transcription for mp {} has been submitted. Job id: {}", mpId,
556                   jobId);
557 
558           database.storeJobControl(mpId, track.getIdentifier(), jobId, TranscriptionJobControl.Status.InProgress.name(),
559                   track.getDuration() == null ? 0 : track.getDuration().longValue(), null, PROVIDER);
560           EntityUtils.consume(entity);
561           return;
562         default:
563           JSONObject errorObj = (JSONObject) jsonObject.get("error");
564           logger.warn("Invalid argument returned, status: {} with message: {}", code, (String) errorObj.get("message"));
565           break;
566       }
567       throw new TranscriptionServiceException("Could not create recognition job. Status returned: " + code);
568     } catch (Exception e) {
569       logger.warn("Exception when calling the recognitions endpoint", e);
570       throw new TranscriptionServiceException("Exception when calling the recognitions endpoint", e);
571     } finally {
572       try {
573         httpClient.close();
574         if (response != null) {
575           response.close();
576         }
577       } catch (IOException e) {
578       }
579     }
580   }
581 
582   /**
583    * Get transcription job result: GET /v1/operations/{name}
584    *
585    * "response": { "@type":
586    * "type.googleapis.com/google.cloud.speech.v1.LongRunningRecognizeResponse",
587    * "results": [ { "alternatives": [ { "transcript": "Four score and
588    * twenty...", "confidence": 0.97186122, "words": [ { "startTime": "1.300s",
589    * "endTime": "1.400s", "word": "Four" }, { "startTime": "1.400s", "endTime":
590    * "1.600s", "word": "score" }, { "startTime": "1.600s", "endTime": "1.600s",
591    * "word": "and" }, { "startTime": "1.600s", "endTime": "1.900s", "word":
592    * "twenty" }, ] } ] }
593    */
594   boolean getAndSaveJobResults(String jobId) throws TranscriptionServiceException, IOException {
595     CloseableHttpClient httpClient = makeHttpClient();
596     CloseableHttpResponse response = null;
597     String mpId = "unknown";
598     JSONArray resultsArray = null;
599     String token = getRefreshAccessToken();
600     if (token.equals(INVALID_TOKEN)) {
601       return false;
602     }
603     try {
604       HttpGet httpGet = new HttpGet(UrlSupport.concat(GOOGLE_SPEECH_URL, RESULT_PATH, jobId));
605       logger.debug("Url to invoke Google speech service: {}", httpGet.getURI().toString());
606       // add the authorization header to the request;
607       httpGet.addHeader("Authorization", "Bearer " + token);
608       response = httpClient.execute(httpGet);
609       int code = response.getStatusLine().getStatusCode();
610 
611       switch (code) {
612         case HttpStatus.SC_OK: // 200
613           HttpEntity entity = response.getEntity();
614           // Response returned is a json object described above
615           String jsonString = EntityUtils.toString(entity);
616           JSONParser jsonParser = new JSONParser();
617           JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
618           Boolean jobDone = (Boolean) jsonObject.get("done");
619           TranscriptionJobControl jc = database.findByJob(jobId);
620           if (jc != null) {
621             mpId = jc.getMediaPackageId();
622           }
623           if (jobDone) {
624             resultsArray = getTranscriptionResult(jsonObject);
625           }
626           logger.info("Recognitions job {} has been found, completed status {}", jobId, jobDone.toString());
627           EntityUtils.consume(entity);
628 
629           if (jobDone && resultsArray != null) {
630             transcriptionDone(mpId, jsonObject);
631             return true;
632           }
633           return false;
634         case HttpStatus.SC_NOT_FOUND: // 404
635           logger.warn("Job not found: {}", jobId);
636           break;
637         case HttpStatus.SC_SERVICE_UNAVAILABLE: // 503
638           logger.warn("Service unavailable returned, status: {}", code);
639           break;
640         default:
641           logger.warn("Error return status: {}.", code);
642           break;
643       }
644       throw new TranscriptionServiceException(
645               String.format("Could not check recognition job for media package %s, job id %s. Status returned: %d",
646                       mpId, jobId, code), code);
647     } catch (TranscriptionServiceException e) {
648       throw e;
649     } catch (Exception e) {
650       if (hasTranscriptionRequestExpired(jobId)) {
651         // Cancel the job and inform admin
652         cancelTranscription(jobId, "Google Transcription job canceled due to errors");
653         logger.info("Google Transcription job {} has been canceled. Email notification sent", jobId);
654       }
655       String msg = String.format("Exception when calling the recognitions endpoint for media package %s, job id %s",
656               mpId, jobId);
657       logger.warn(msg, e);
658       throw new TranscriptionServiceException(String.format(
659               "Exception when calling the recognitions endpoint for media package %s, job id %s", mpId, jobId), e);
660     } finally {
661       try {
662         httpClient.close();
663         if (response != null) {
664           response.close();
665         }
666       } catch (IOException e) {
667       }
668     }
669   }
670 
671   /**
672    * Get transcription result: GET /v1/operations/{name} Method mainly used by
673    * the REST endpoint
674    *
675    * @param jobId
676    * @return job details
677    * @throws org.opencastproject.transcription.api.TranscriptionServiceException
678    * @throws java.io.IOException
679    */
680   public String getTranscriptionResults(String jobId)
681           throws TranscriptionServiceException, IOException {
682     CloseableHttpClient httpClient = makeHttpClient();
683     CloseableHttpResponse response = null;
684     String token = getRefreshAccessToken();
685     if (token.equals(INVALID_TOKEN)) {
686       logger.warn("Invalid access token");
687       return "No results found";
688     }
689     try {
690       HttpGet httpGet = new HttpGet(UrlSupport.concat(GOOGLE_SPEECH_URL, RESULT_PATH, jobId));
691       logger.debug("Url to invoke Google speech service: {}", httpGet.getURI().toString());
692       // add the authorization header to the request;
693       httpGet.addHeader("Authorization", "Bearer " + token);
694       response = httpClient.execute(httpGet);
695       int code = response.getStatusLine().getStatusCode();
696 
697       switch (code) {
698         case HttpStatus.SC_OK: // 200
699           HttpEntity entity = response.getEntity();
700           logger.info("Retrieved details for transcription with job id: '{}'", jobId);
701           return EntityUtils.toString(entity);
702         default:
703           logger.warn("Error retrieving details for transcription with job id: '{}', return status: {}.", jobId, code);
704           break;
705       }
706     } catch (Exception e) {
707       logger.warn("Exception when calling the transcription service for job id: {}", jobId, e);
708       throw new TranscriptionServiceException(String.format(
709               "Exception when calling the transcription service for job id: %s", jobId), e);
710     } finally {
711       try {
712         httpClient.close();
713         if (response != null) {
714           response.close();
715         }
716       } catch (IOException e) {
717       }
718     }
719     return "No results found";
720   }
721 
722   private void saveResults(String jobId, JSONObject jsonObj) throws IOException {
723     JSONArray resultsArray = getTranscriptionResult(jsonObj);
724     if (resultsArray != null) {
725       // Save the results into a collection
726       workspace.putInCollection(TRANSCRIPT_COLLECTION, jobId + ".json",
727               new ByteArrayInputStream(jsonObj.toJSONString().getBytes()));
728     }
729   }
730 
731   @Override
732   public MediaPackageElement getGeneratedTranscription(String mpId, String jobId, MediaPackageElement.Type type)
733           throws TranscriptionServiceException {
734     try {
735       // If jobId is unknown, look for all jobs associated to that mpId
736       if (jobId == null || "null".equals(jobId)) {
737         jobId = null;
738         for (TranscriptionJobControl jc : database.findByMediaPackage(mpId)) {
739           if (TranscriptionJobControl.Status.Closed.name().equals(jc.getStatus())
740                   || TranscriptionJobControl.Status.TranscriptionComplete.name().equals(jc.getStatus())) {
741             jobId = jc.getTranscriptionJobId();
742           }
743         }
744       }
745 
746       if (jobId == null) {
747         throw new TranscriptionServiceException(
748                 "No completed or closed transcription job found in database for media package " + mpId);
749       }
750 
751       // Results already saved?
752       URI uri = workspace.getCollectionURI(TRANSCRIPT_COLLECTION, jobId + ".json");
753       try {
754         workspace.get(uri);
755       } catch (Exception e) {
756         try {
757           // Not saved yet so call the google speech service to get the results
758           getAndSaveJobResults(jobId);
759         } catch (IOException ex) {
760           logger.error("Unable to retrieve transcription job, error: {}", ex.toString());
761         }
762       }
763       MediaPackageElementBuilder builder = MediaPackageElementBuilderFactory.newInstance().newElementBuilder();
764       return builder.elementFromURI(uri, type,
765           new MediaPackageElementFlavor("captions", "google-speech-json"));
766     } catch (TranscriptionDatabaseException e) {
767       throw new TranscriptionServiceException("Job id not informed and could not find transcription", e);
768     }
769   }
770 
771   /**
772    * Get mediapackage transcription status
773    *
774    * @param mpId, mediapackage id
775    * @return transcription status
776    * @throws TranscriptionServiceException
777    */
778   public String getTranscriptionStatus(String mpId) throws TranscriptionServiceException {
779     try {
780       for (TranscriptionJobControl jc : database.findByMediaPackage(mpId)) {
781         return jc.getStatus();
782       }
783     } catch (TranscriptionDatabaseException e) {
784       throw new TranscriptionServiceException("Mediapackage id transcription status unknown", e);
785     }
786     return "Unknown";
787   }
788 
789   protected CloseableHttpClient makeHttpClient() throws IOException {
790     RequestConfig reqConfig = RequestConfig.custom()
791             .setConnectTimeout(CONNECTION_TIMEOUT)
792             .setSocketTimeout(SOCKET_TIMEOUT)
793             .setConnectionRequestTimeout(CONNECTION_TIMEOUT)
794             .build();
795     return HttpClients.custom().setDefaultRequestConfig(reqConfig).build();
796   }
797 
798   protected String refreshAccessToken(String clientId, String clientSecret, String refreshToken)
799           throws TranscriptionServiceException, IOException {
800     CloseableHttpClient httpClient = makeHttpClient();
801     CloseableHttpResponse response = null;
802 
803     try {
804       HttpPost httpPost = new HttpPost(tokenEndpoint + String.format(
805               "?client_id=%s&client_secret=%s&refresh_token=%s&grant_type=refresh_token",
806               clientId, clientSecret, refreshToken));
807       httpPost.addHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
808       response = httpClient.execute(httpPost);
809       int code = response.getStatusLine().getStatusCode();
810       String jsonString = EntityUtils.toString(response.getEntity());
811       JSONParser jsonParser = new JSONParser();
812       JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
813       switch (code) {
814         case HttpStatus.SC_OK: // 200
815           accessToken = (String) jsonObject.get(ACCESS_TOKEN_NAME);
816           long duration = (long) jsonObject.get(ACCESS_TOKEN_EXPIRY_NAME); // Duration in second
817           tokenExpiryTime = (System.currentTimeMillis() + (duration * 1000)); // time in millisecond
818           if (!INVALID_TOKEN.equals(accessToken)) {
819             logger.info("Google Cloud Service access token created");
820             return accessToken;
821           }
822           throw new TranscriptionServiceException(
823               String.format("Created token is invalid. Status returned: %d", code), code);
824         case HttpStatus.SC_BAD_REQUEST: // 400
825         case HttpStatus.SC_UNAUTHORIZED: // 401
826           String error = (String) jsonObject.get("error");
827           String errorDetails = (String) jsonObject.get("error_description");
828           logger.warn("Invalid argument returned, status: {}", code);
829           logger.warn("Unable to refresh Google Cloud Service token, error: {}, error details: {}",
830               error, errorDetails);
831           break;
832         default:
833           logger.warn("Invalid argument returned, status: {}", code);
834       }
835       throw new TranscriptionServiceException(
836               String.format("Could not create Google access token. Status returned: %d", code), code);
837     } catch (TranscriptionServiceException e) {
838       throw e;
839     } catch (Exception e) {
840       logger.warn("Unable to generate access token for Google Cloud Services");
841       return INVALID_TOKEN;
842     } finally {
843       try {
844         httpClient.close();
845         if (response != null) {
846           response.close();
847         }
848       } catch (IOException e) {
849       }
850     }
851   }
852 
853   protected String getRefreshAccessToken() throws TranscriptionServiceException, IOException {
854     // Check that token hasn't expired
855     if ((!INVALID_TOKEN.equals(accessToken))
856         && (System.currentTimeMillis() < (tokenExpiryTime - ACCESS_TOKEN_MINIMUM_TIME))) {
857       return accessToken;
858     }
859     return refreshAccessToken(clientId, clientSecret, clientToken);
860   }
861 
862   protected String uploadAudioFileToGoogleStorage(String mpId, Track track)
863           throws TranscriptionServiceException, IOException {
864     File audioFile;
865     String audioUrl = null;
866     String fileExtension;
867     int audioResponse;
868     CloseableHttpClient httpClientStorage = makeHttpClient();
869     GoogleSpeechTranscriptionServiceStorage storage = new GoogleSpeechTranscriptionServiceStorage();
870     try {
871       audioFile = workspace.get(track.getURI());
872       fileExtension = FilenameUtils.getExtension(audioFile.getName());
873       long fileSize = audioFile.length();
874       String contentType = track.getMimeType().toString();
875       String token = getRefreshAccessToken();
876       // Upload file to google cloud storage
877       audioResponse = storage.startUpload(httpClientStorage, storageBucket, mpId, fileExtension,
878               audioFile, String.valueOf(fileSize), contentType, token);
879       if (audioResponse == HttpStatus.SC_OK) {
880         audioUrl = String.format("gs://%s/%s.%s", storageBucket, mpId, fileExtension);
881         return audioUrl;
882       }
883       logger.error("Error when uploading audio to Google Storage, error code: {}", audioResponse);
884       return audioUrl;
885     } catch (Exception e) {
886       throw new TranscriptionServiceException("Error reading audio track", e);
887     }
888   }
889 
890   private JSONArray getTranscriptionResult(JSONObject jsonObj) {
891     JSONObject responseObj = (JSONObject) jsonObj.get("response");
892     JSONArray resultsArray = (JSONArray) responseObj.get("results");
893     return resultsArray;
894   }
895 
896   protected void deleteStorageFile(String mpId, String token) throws IOException {
897     CloseableHttpClient httpClientDel = makeHttpClient();
898     GoogleSpeechTranscriptionServiceStorage storage = new GoogleSpeechTranscriptionServiceStorage();
899     storage.deleteGoogleStorageFile(httpClientDel, storageBucket, mpId + "." + defaultEncoding, token);
900   }
901 
902   private void sendEmail(String subject, String body) {
903     if (toEmailAddress == null) {
904       logger.info("Skipping sending email notification. Message is {}.", body);
905       return;
906     }
907     try {
908       logger.debug("Sending e-mail notification to {}", toEmailAddress);
909       smtpService.send(toEmailAddress, String.format("%s (%s)", subject, clusterName), body);
910       logger.info("Sent e-mail notification to {}", toEmailAddress);
911     } catch (Exception e) {
912       logger.error("Could not send email: {}\n{}", subject, body, e);
913     }
914   }
915 
916   private void cancelTranscription(String jobId, String message) {
917     try {
918       database.updateJobControl(jobId, TranscriptionJobControl.Status.Canceled.name());
919       String mpId = database.findByJob(jobId).getMediaPackageId();
920       try {
921         // Delete file stored on Google storage
922         String token = getRefreshAccessToken();
923         deleteStorageFile(mpId, token);
924       } catch (Exception ex) {
925         logger.warn(String.format("could not delete file %s.%s from Google cloud storage", mpId, defaultEncoding), ex);
926       } finally {
927         // Send notification email
928         sendEmail("Transcription ERROR", String.format("%s(media package %s, job id %s).", message, mpId, jobId));
929       }
930     } catch (Exception e) {
931       logger.error(String.format("ERROR while deleting transcription job: %s", jobId), e);
932     }
933   }
934 
935   private boolean hasTranscriptionRequestExpired(String jobId) {
936     try {
937       // set a time limit based on video duration and maximum processing time
938       if (database.findByJob(jobId).getDateCreated().getTime() + database.findByJob(jobId).getTrackDuration()
939               + (completionCheckBuffer + maxProcessingSeconds) * 1000 < System.currentTimeMillis()) {
940         return true;
941       }
942     } catch (Exception e) {
943       logger.error(String.format("ERROR while calculating transcription request expiration for job: %s", jobId), e);
944       // to avoid perpetual non-expired state, transcription is set as expired
945       return true;
946     }
947     return false;
948   }
949 
950   private long getRemainingTranscriptionExpireTimeInMin(String jobId) {
951     try {
952       long expiredTime = (database.findByJob(jobId).getDateCreated().getTime()
953           + database.findByJob(jobId).getTrackDuration()
954           + (completionCheckBuffer + maxProcessingSeconds) * 1000)
955           - (System.currentTimeMillis());
956       // Transcription has expired
957       if (expiredTime < 0) {
958         expiredTime = 0;
959       }
960       return TimeUnit.MILLISECONDS.toMinutes(expiredTime);
961     } catch (Exception e) {
962       logger.error("Unable to calculate remaining transcription expired time for transcription job {}", jobId);
963     }
964     return 0;
965   }
966 
967   @Reference
968   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
969     this.serviceRegistry = serviceRegistry;
970   }
971 
972   @Reference
973   public void setSecurityService(SecurityService securityService) {
974     this.securityService = securityService;
975   }
976 
977   @Reference
978   public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
979     this.userDirectoryService = userDirectoryService;
980   }
981 
982   @Reference
983   public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
984     this.organizationDirectoryService = organizationDirectoryService;
985   }
986 
987   @Reference
988   public void setSmtpService(SmtpService service) {
989     this.smtpService = service;
990   }
991 
992   @Reference
993   public void setWorkspace(Workspace ws) {
994     this.workspace = ws;
995   }
996 
997   @Reference
998   public void setWorkingFileRepository(WorkingFileRepository wfr) {
999     this.wfr = wfr;
1000   }
1001 
1002   @Reference
1003   public void setDatabase(TranscriptionDatabase service) {
1004     this.database = service;
1005   }
1006 
1007   @Reference
1008   public void setAssetManager(AssetManager service) {
1009     this.assetManager = service;
1010   }
1011 
1012   @Reference
1013   public void setWorkflowService(WorkflowService service) {
1014     this.workflowService = service;
1015   }
1016 
1017   @Override
1018   protected ServiceRegistry getServiceRegistry() {
1019     return serviceRegistry;
1020   }
1021 
1022   @Override
1023   protected SecurityService getSecurityService() {
1024     return securityService;
1025   }
1026 
1027   @Override
1028   protected UserDirectoryService getUserDirectoryService() {
1029     return userDirectoryService;
1030   }
1031 
1032   @Override
1033   protected OrganizationDirectoryService getOrganizationDirectoryService() {
1034     return organizationDirectoryService;
1035   }
1036 
1037   // Only used by unit tests!
1038   void setWfUtil(Workflows wfUtil) {
1039     this.wfUtil = wfUtil;
1040   }
1041 
1042   class WorkflowDispatcher implements Runnable {
1043 
1044     /**
1045      * {@inheritDoc}
1046      *
1047      * @see java.lang.Thread#run()
1048      */
1049     @Override
1050     public void run() {
1051       logger.debug("WorkflowDispatcher waking up...");
1052 
1053       try {
1054         // Find jobs that are in progress and jobs that had transcription complete
1055 
1056         long providerId;
1057         TranscriptionProviderControl providerInfo = database.findIdByProvider(PROVIDER);
1058         if (providerInfo != null) {
1059           providerId = providerInfo.getId();
1060         } else {
1061           logger.debug("No jobs yet for provider {}", PROVIDER);
1062           return;
1063         }
1064 
1065         List<TranscriptionJobControl> jobs = database.findByStatus(TranscriptionJobControl.Status.InProgress.name(),
1066                 TranscriptionJobControl.Status.TranscriptionComplete.name());
1067         for (TranscriptionJobControl j : jobs) {
1068 
1069           // Don't process jobs for other services
1070           if (j.getProviderId() != providerId) {
1071             continue;
1072           }
1073 
1074           String mpId = j.getMediaPackageId();
1075           String jobId = j.getTranscriptionJobId();
1076 
1077           // If the job in progress, check if it should already have finished.
1078           if (TranscriptionJobControl.Status.InProgress.name().equals(j.getStatus())) {
1079             // If job should already have been completed, try to get the results. Consider a buffer factor so that we
1080             // don't try it too early. Results normally should be ready 1/3 of the time of the track duration.
1081             // The completionCheckBuffer can be used to delay results check.
1082             if (j.getDateCreated().getTime() + (j.getTrackDuration() / 3) + completionCheckBuffer * 1000 < System
1083                     .currentTimeMillis()) {
1084               try {
1085                 if (!getAndSaveJobResults(jobId)) {
1086                   // Job still running, not finished, so check if it should have finished more than N seconds ago
1087                   if (hasTranscriptionRequestExpired(jobId)) {
1088                     // Processing for too long, mark job as cancelled and don't check anymore
1089                     database.updateJobControl(jobId, TranscriptionJobControl.Status.Canceled.name());
1090                     // Delete file stored on Google storage
1091                     String token = getRefreshAccessToken();
1092                     deleteStorageFile(mpId, token);
1093                     // Send notification email
1094                     sendEmail(TRANSCRIPTION_ERROR, String.format(
1095                         "Transcription job was in processing state for too long and was marked "
1096                             + "as cancelled (media package %s, job id %s).",
1097                         mpId, jobId));
1098                   }
1099                   // else Job still running, not finished
1100                   continue;
1101                 }
1102               } catch (TranscriptionServiceException e) {
1103                 if (e.getCode() == 404) {
1104                   // Job not found there, update job state to canceled
1105                   database.updateJobControl(jobId, TranscriptionJobControl.Status.Canceled.name());
1106                   // Send notification email
1107                   sendEmail(TRANSCRIPTION_ERROR,
1108                           String.format("Transcription job was not found (media package %s, job id %s).", mpId, jobId));
1109                 }
1110                 continue; // Skip this one, exception was already logged
1111               } catch (IOException ex) {
1112                 logger.error("Transcription job not found, error: {}", ex.toString());
1113               }
1114             } else {
1115               continue; // Not time to check yet
1116             }
1117           }
1118 
1119           // Jobs that get here have state TranscriptionCompleted or had an IOException]
1120           try {
1121 
1122             // Apply workflow to attach transcripts
1123             Map<String, String> params = new HashMap<String, String>();
1124             params.put(TRANSCRIPTION_JOB_ID_KEY, jobId);
1125             String wfId = startWorkflow(mpId, workflowDefinitionId, jobId, params);
1126             if (wfId == null) {
1127               logger.warn("Attach transcription workflow could NOT be scheduled for mp {}, google speech job {}",
1128                   mpId, jobId);
1129               continue;
1130             }
1131             // Update state in the database
1132             database.updateJobControl(jobId, TranscriptionJobControl.Status.Closed.name());
1133             logger.info("Attach transcription workflow {} scheduled for mp {}, google speech job {}",
1134                     wfId, mpId, jobId);
1135           } catch (Exception e) {
1136             logger.warn("Attach transcription workflow could NOT be scheduled for mp {}, google speech job {}, {}: {}",
1137                     mpId, jobId, e.getClass().getName(), e.getMessage());
1138           }
1139         }
1140       } catch (TranscriptionDatabaseException e) {
1141         logger.warn("Could not read transcription job control database: {}", e.getMessage());
1142       }
1143     }
1144   }
1145 
1146   private String startWorkflow(String mpId, String wfDefId, String jobId, Map<String, String> params) {
1147     DefaultOrganization defaultOrg = new DefaultOrganization();
1148     securityService.setOrganization(defaultOrg);
1149     securityService.setUser(SecurityUtil.createSystemUser(systemAccount, defaultOrg));
1150 
1151     // Find the episode
1152     Optional<Snapshot> snapshot = assetManager.getLatestSnapshot(mpId);
1153     if (snapshot.isEmpty()) {
1154       if (!hasTranscriptionRequestExpired(jobId)) {
1155         // Media package not archived but still within completion time? Skip until next time.
1156         logger.warn("Media package {} has not been archived yet or has been deleted. Will keep trying for {} "
1157             + "more minutes before cancelling transcription job {}.",
1158             mpId, getRemainingTranscriptionExpireTimeInMin(jobId), jobId);
1159       } else {
1160         // Close transcription job and email admin
1161         cancelTranscription(jobId, " Google Transcription job canceled, archived media package not found");
1162         logger.info("Google Transcription job {} has been canceled. Email notification sent", jobId);
1163       }
1164       return null;
1165     }
1166 
1167     String org = snapshot.get().getOrganizationId();
1168     Organization organization = null;
1169     try {
1170       organization = organizationDirectoryService.getOrganization(org);
1171       if (organization == null) {
1172         logger.warn("Media package {} has an unknown organization {}.", mpId, org);
1173         return null;
1174       }
1175     } catch (NotFoundException e) {
1176       logger.warn("Organization {} not found for media package {}.", org, mpId);
1177       return null;
1178     }
1179     securityService.setOrganization(organization);
1180 
1181     try {
1182       WorkflowDefinition wfDef = workflowService.getWorkflowDefinitionById(wfDefId);
1183       Workflows workflows;
1184       if (wfUtil != null) {
1185         workflows = wfUtil;
1186       } else {
1187         workflows = new Workflows(assetManager, workflowService);
1188       }
1189       Set<String> mpIds = new HashSet<String>();
1190       mpIds.add(mpId);
1191       List<WorkflowInstance> wfList = workflows
1192               .applyWorkflowToLatestVersion(mpIds, ConfiguredWorkflow.workflow(wfDef, params));
1193       return wfList.size() > 0 ? Long.toString(wfList.get(0).getId()) : null;
1194     } catch (NotFoundException | WorkflowDatabaseException e) {
1195       logger.warn("Could not get workflow definition: {}", wfDefId);
1196     }
1197 
1198     return null;
1199   }
1200 
1201   class ResultsFileCleanup implements Runnable {
1202 
1203     @Override
1204     public void run() {
1205       logger.info("ResultsFileCleanup waking up...");
1206       try {
1207         // Cleans up results files older than CLEANUP_RESULT_FILES_DAYS days
1208         wfr.cleanupOldFilesFromCollection(TRANSCRIPT_COLLECTION, cleanupResultDays);
1209       } catch (IOException e) {
1210         logger.warn("Could not cleanup old transcript results files", e);
1211       }
1212     }
1213   }
1214 
1215 }