GoogleSpeechTranscriptionService.java

/*
 * Licensed to The Apereo Foundation under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional
 * information regarding copyright ownership.
 *
 *
 * The Apereo Foundation licenses this file to you under the Educational
 * Community License, Version 2.0 (the "License"); you may not use this file
 * except in compliance with the License. You may obtain a copy of the License
 * at:
 *
 *   http://opensource.org/licenses/ecl2.txt
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 * License for the specific language governing permissions and limitations under
 * the License.
 *
 */
package org.opencastproject.transcription.googlespeech;

import org.opencastproject.assetmanager.api.AssetManager;
import org.opencastproject.assetmanager.api.Snapshot;
import org.opencastproject.assetmanager.util.Workflows;
import org.opencastproject.job.api.AbstractJobProducer;
import org.opencastproject.job.api.Job;
import org.opencastproject.kernel.mail.SmtpService;
import org.opencastproject.mediapackage.MediaPackageElement;
import org.opencastproject.mediapackage.MediaPackageElementBuilder;
import org.opencastproject.mediapackage.MediaPackageElementBuilderFactory;
import org.opencastproject.mediapackage.MediaPackageElementFlavor;
import org.opencastproject.mediapackage.MediaPackageElementParser;
import org.opencastproject.mediapackage.MediaPackageException;
import org.opencastproject.mediapackage.Track;
import org.opencastproject.security.api.DefaultOrganization;
import org.opencastproject.security.api.Organization;
import org.opencastproject.security.api.OrganizationDirectoryService;
import org.opencastproject.security.api.SecurityService;
import org.opencastproject.security.api.UserDirectoryService;
import org.opencastproject.security.util.SecurityUtil;
import org.opencastproject.serviceregistry.api.ServiceRegistry;
import org.opencastproject.serviceregistry.api.ServiceRegistryException;
import org.opencastproject.systems.OpencastConstants;
import org.opencastproject.transcription.api.TranscriptionService;
import org.opencastproject.transcription.api.TranscriptionServiceException;
import org.opencastproject.transcription.persistence.TranscriptionDatabase;
import org.opencastproject.transcription.persistence.TranscriptionDatabaseException;
import org.opencastproject.transcription.persistence.TranscriptionJobControl;
import org.opencastproject.transcription.persistence.TranscriptionProviderControl;
import org.opencastproject.util.NotFoundException;
import org.opencastproject.util.OsgiUtil;
import org.opencastproject.util.UrlSupport;
import org.opencastproject.workflow.api.ConfiguredWorkflow;
import org.opencastproject.workflow.api.WorkflowDatabaseException;
import org.opencastproject.workflow.api.WorkflowDefinition;
import org.opencastproject.workflow.api.WorkflowInstance;
import org.opencastproject.workflow.api.WorkflowService;
import org.opencastproject.workingfilerepository.api.WorkingFileRepository;
import org.opencastproject.workspace.api.Workspace;

import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@Component(
    immediate = true,
    service = { TranscriptionService.class,GoogleSpeechTranscriptionService.class },
    property = {
        "service.description=Google Speech Transcription Service",
        "provider=google.speech"
    }
)
public class GoogleSpeechTranscriptionService extends AbstractJobProducer implements TranscriptionService {

  /**
   * The logger
   */
  private static final Logger logger = LoggerFactory.getLogger(GoogleSpeechTranscriptionService.class);

  private static final String JOB_TYPE = "org.opencastproject.transcription.googlespeech";

  static final String TRANSCRIPT_COLLECTION = "transcripts";
  static final String TRANSCRIPTION_ERROR = "Transcription ERROR";
  static final String TRANSCRIPTION_JOB_ID_KEY = "transcriptionJobId";
  static final String ACCESS_TOKEN_NAME = "access_token";
  static final String ACCESS_TOKEN_EXPIRY_NAME = "expires_in";
  private static final int CONNECTION_TIMEOUT = 60000; // ms, 1 minute
  private static final int SOCKET_TIMEOUT = 60000; // ms, 1 minute
  private static final int ACCESS_TOKEN_MINIMUM_TIME = 60000; // ms , 1 minute
  // Default workflow to attach transcription results to mediapackage
  public static final String DEFAULT_WF_DEF = "google-speech-attach-transcripts";
  private static final long DEFAULT_COMPLETION_BUFFER = 300; // in seconds, default is 5 minutes
  private static final long DEFAULT_DISPATCH_INTERVAL = 60; // in seconds, default is 1 minute
  private static final long DEFAULT_MAX_PROCESSING_TIME = 5 * 60 * 60; // in seconds, default is 5 hours
  // Cleans up results files that are older than 7 days
  private static final int DEFAULT_CLEANUP_RESULTS_DAYS = 7;
  private static final boolean DEFAULT_PROFANITY_FILTER = false;
  private static final String DEFAULT_LANGUAGE = "en-US";
  private static final boolean DEFAULT_ENABLE_PUNCTUATION = false;
  private static final String DEFAULT_MODEL = "default";
  private static final String GOOGLE_SPEECH_URL = "https://speech.googleapis.com/v1";
  private static final String GOOGLE_AUTH2_URL = "https://www.googleapis.com/oauth2/v4/token";
  private static final String REQUEST_METHOD = "speech:longrunningrecognize";
  private static final String RESULT_PATH = "operations";
  private static final String INVALID_TOKEN = "-1";
  private static final String PROVIDER = "Google Speech";
  private static final String DEFAULT_ENCODING = "flac";

  // Cluster name
  private String clusterName = "";

  /**
   * Service dependencies
   */
  private ServiceRegistry serviceRegistry;
  private SecurityService securityService;
  private UserDirectoryService userDirectoryService;
  private OrganizationDirectoryService organizationDirectoryService;
  private Workspace workspace;
  private TranscriptionDatabase database;
  private AssetManager assetManager;
  private WorkflowService workflowService;
  private WorkingFileRepository wfr;
  private SmtpService smtpService;

  // Only used by unit tests!
  private Workflows wfUtil;

  private enum Operation {
    StartTranscription
  }

  /**
   * Service configuration options
   */
  public static final String ENABLED_CONFIG = "enabled";
  public static final String GOOGLE_SPEECH_LANGUAGE = "google.speech.language";
  public static final String PROFANITY_FILTER = "google.speech.profanity.filter";
  public static final String ENABLE_PUNCTUATION = "google.speech.transcription.punctuation";
  public static final String TRANSCRIPTION_MODEL = "google.speech.transcription.model";
  public static final String WORKFLOW_CONFIG = "workflow";
  public static final String DISPATCH_WORKFLOW_INTERVAL_CONFIG = "workflow.dispatch.interval";
  public static final String COMPLETION_CHECK_BUFFER_CONFIG = "completion.check.buffer";
  public static final String MAX_PROCESSING_TIME_CONFIG = "max.processing.time";
  public static final String NOTIFICATION_EMAIL_CONFIG = "notification.email";
  public static final String CLEANUP_RESULTS_DAYS_CONFIG = "cleanup.results.days";
  public static final String GOOGLE_CLOUD_CLIENT_ID = "google.cloud.client.id";
  public static final String GOOGLE_CLOUD_CLIENT_SECRET = "google.cloud.client.secret";
  public static final String GOOGLE_CLOUD_REFRESH_TOKEN = "google.cloud.refresh.token";
  public static final String GOOGLE_CLOUD_BUCKET = "google.cloud.storage.bucket";
  public static final String GOOGLE_CLOUD_TOKEN_ENDPOINT_URL = "google.cloud.token.endpoint.url";
  public static final String ENCODING_EXTENSION = "encoding.extension";

  /**
   * Service configuration values
   */
  private boolean enabled = false; // Disabled by default
  private boolean profanityFilter = DEFAULT_PROFANITY_FILTER;
  private boolean enablePunctuation = DEFAULT_ENABLE_PUNCTUATION;
  private String model = DEFAULT_MODEL;
  private String defaultLanguage = DEFAULT_LANGUAGE;
  private String defaultEncoding = DEFAULT_ENCODING;
  private String workflowDefinitionId = DEFAULT_WF_DEF;
  private long workflowDispatchInterval = DEFAULT_DISPATCH_INTERVAL;
  private long completionCheckBuffer = DEFAULT_COMPLETION_BUFFER;
  private long maxProcessingSeconds = DEFAULT_MAX_PROCESSING_TIME;
  private String toEmailAddress;
  private int cleanupResultDays = DEFAULT_CLEANUP_RESULTS_DAYS;
  private String clientId;
  private String clientSecret;
  private String clientToken;
  private String accessToken = INVALID_TOKEN;
  private String tokenEndpoint = GOOGLE_AUTH2_URL;
  private String storageBucket;
  private long tokenExpiryTime = 0;
  private String systemAccount;
  private ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);

  public GoogleSpeechTranscriptionService() {
    super(JOB_TYPE);
  }

  @Activate
  public void activate(ComponentContext cc) {
    // Has this service been enabled?
    enabled = OsgiUtil.getOptCfgAsBoolean(cc.getProperties(), ENABLED_CONFIG).get();
    if (!enabled) {
      logger.info("Service disabled. If you want to enable it, please update the service configuration.");
      return;
    }
    // Mandatory API access properties
    clientId = OsgiUtil.getComponentContextProperty(cc, GOOGLE_CLOUD_CLIENT_ID);
    clientSecret = OsgiUtil.getComponentContextProperty(cc, GOOGLE_CLOUD_CLIENT_SECRET);
    clientToken = OsgiUtil.getComponentContextProperty(cc, GOOGLE_CLOUD_REFRESH_TOKEN);
    storageBucket = OsgiUtil.getComponentContextProperty(cc, GOOGLE_CLOUD_BUCKET);

    // access token endpoint
    Optional<String> tokenOpt = OsgiUtil.getOptCfg(cc.getProperties(), GOOGLE_CLOUD_TOKEN_ENDPOINT_URL);
    if (tokenOpt.isPresent()) {
      tokenEndpoint = tokenOpt.get();
      logger.info("Access token endpoint is set to {}", tokenEndpoint);
    } else {
      logger.info("Default access token endpoint will be used");
    }

    // profanity filter to use
    Optional<String> profanityOpt = OsgiUtil.getOptCfg(cc.getProperties(), PROFANITY_FILTER);
    if (profanityOpt.isPresent()) {
      profanityFilter = Boolean.parseBoolean(profanityOpt.get());
      logger.info("Profanity filter is set to {}", profanityFilter);
    } else {
      logger.info("Default profanity filter will be used");
    }
    // Language model to be used
    Optional<String> languageOpt = OsgiUtil.getOptCfg(cc.getProperties(), GOOGLE_SPEECH_LANGUAGE);
    if (languageOpt.isPresent()) {
      defaultLanguage = languageOpt.get();
      logger.info("Language used is {}", defaultLanguage);
    } else {
      logger.info("Default language will be used");
    }
    // Enable punctuation or not
    Optional<String> punctuationOpt = OsgiUtil.getOptCfg(cc.getProperties(), ENABLE_PUNCTUATION);
    if (punctuationOpt.isPresent()) {
      enablePunctuation = Boolean.parseBoolean(punctuationOpt.get());
      logger.info("Enable punctuation is set to {}", enablePunctuation);
    } else {
      logger.info("Default punctuation setting will be used");
    }
    // Transription model to be used
    Optional<String> transModel = OsgiUtil.getOptCfg(cc.getProperties(), TRANSCRIPTION_MODEL);
    if (transModel.isPresent()) {
      model = transModel.get();
      logger.info("Transcription model used is {}", model);
    } else {
      logger.info("Default Transcription model will be used");
    }
    // Encoding to be used
    Optional<String> encodingOpt = OsgiUtil.getOptCfg(cc.getProperties(), ENCODING_EXTENSION);
    if (encodingOpt.isPresent()) {
      defaultEncoding = encodingOpt.get();
      logger.info("Encoding used is {}", defaultEncoding);
    } else {
      logger.info("Default encoding will be used");
    }

    // Workflow to execute when getting callback (optional, with default)
    Optional<String> wfOpt = OsgiUtil.getOptCfg(cc.getProperties(), WORKFLOW_CONFIG);
    if (wfOpt.isPresent()) {
      workflowDefinitionId = wfOpt.get();
    }
    logger.info("Workflow definition is {}", workflowDefinitionId);
    // Interval to check for completed transcription jobs and start workflows to attach transcripts
    Optional<String> intervalOpt = OsgiUtil.getOptCfg(cc.getProperties(), DISPATCH_WORKFLOW_INTERVAL_CONFIG);
    if (intervalOpt.isPresent()) {
      try {
        workflowDispatchInterval = Long.parseLong(intervalOpt.get());
      } catch (NumberFormatException e) {
        // Use default
        logger.warn("Invalid configuration for Workflow dispatch interval. Default used instead: {}",
            workflowDispatchInterval);
      }
    }
    logger.info("Workflow dispatch interval is {} seconds", workflowDispatchInterval);
    // How long to wait after a transcription is supposed to finish before starting checking
    Optional<String> bufferOpt = OsgiUtil.getOptCfg(cc.getProperties(), COMPLETION_CHECK_BUFFER_CONFIG);
    if (bufferOpt.isPresent()) {
      try {
        completionCheckBuffer = Long.parseLong(bufferOpt.get());
      } catch (NumberFormatException e) {
        // Use default
        logger.warn("Invalid configuration for {} : {}. Default used instead: {}",
                new Object[]{COMPLETION_CHECK_BUFFER_CONFIG, bufferOpt.get(), completionCheckBuffer});
      }
    }
    logger.info("Completion check buffer is {} seconds", completionCheckBuffer);
    // How long to wait after a transcription is supposed to finish before marking the job as canceled in the db
    Optional<String> maxProcessingOpt = OsgiUtil.getOptCfg(cc.getProperties(), MAX_PROCESSING_TIME_CONFIG);
    if (maxProcessingOpt.isPresent()) {
      try {
        maxProcessingSeconds = Long.parseLong(maxProcessingOpt.get());
      } catch (NumberFormatException e) {
        // Use default
        logger.warn("Invalid configuration for maximum processing time. Default used instead: {}",
            maxProcessingSeconds);
      }
    }
    logger.info("Maximum time a job is checked after it should have ended is {} seconds", maxProcessingSeconds);
    // How long to keep result files in the working file repository
    Optional<String> cleaupOpt = OsgiUtil.getOptCfg(cc.getProperties(), CLEANUP_RESULTS_DAYS_CONFIG);
    if (cleaupOpt.isPresent()) {
      try {
        cleanupResultDays = Integer.parseInt(cleaupOpt.get());
      } catch (NumberFormatException e) {
        // Use default
        logger.warn("Invalid configuration for clean up days. Default used instead: {}", cleanupResultDays);
      }
    }
    logger.info("Cleanup result files after {} days", cleanupResultDays);

    systemAccount = OsgiUtil.getContextProperty(cc, OpencastConstants.DIGEST_USER_PROPERTY);

    // Schedule the workflow dispatching, starting in 2 minutes
    scheduledExecutor.scheduleWithFixedDelay(new WorkflowDispatcher(), 120, workflowDispatchInterval,
            TimeUnit.SECONDS);

    // Schedule the cleanup of old results jobs from the collection in the wfr once a day
    scheduledExecutor.scheduleWithFixedDelay(new ResultsFileCleanup(), 1, 1, TimeUnit.DAYS);

    // Notification email passed in this service configuration?
    Optional<String> optTo = OsgiUtil.getOptCfg(cc.getProperties(), NOTIFICATION_EMAIL_CONFIG);
    if (optTo.isPresent()) {
      toEmailAddress = optTo.get();
    } else {
      // Use admin email informed in custom.properties
      optTo = OsgiUtil.getOptContextProperty(cc, OpencastConstants.ADMIN_EMAIL_PROPERTY);
      if (optTo.isPresent()) {
        toEmailAddress = optTo.get();
      }
    }
    if (toEmailAddress != null) {
      logger.info("Notification email set to {}", toEmailAddress);
    } else {
      logger.warn("Email notification disabled");
    }

    Optional<String> optCluster = OsgiUtil.getOptContextProperty(cc, OpencastConstants.ENVIRONMENT_NAME_PROPERTY);
    if (optCluster.isPresent()) {
      clusterName = optCluster.get();
    }
    logger.info("Environment name is {}", clusterName);

    logger.info("Activated!");
  }

  @Override
  public Job startTranscription(String mpId, Track track, String... args) throws TranscriptionServiceException {
    if (!enabled) {
      throw new TranscriptionServiceException(
              "This service is disabled. If you want to enable it, please update the service configuration.");
    }

    if (args.length == 0) {
      throw new IllegalArgumentException("Additional language argument is required.");
    }

    try {
      return serviceRegistry.createJob(JOB_TYPE, Operation.StartTranscription.name(),
              Arrays.asList(mpId, MediaPackageElementParser.getAsXml(track), args[0]));
    } catch (ServiceRegistryException e) {
      throw new TranscriptionServiceException("Unable to create a job", e);
    } catch (MediaPackageException e) {
      throw new TranscriptionServiceException("Invalid track " + track.toString(), e);
    }
  }

  @Override
  public Job startTranscription(String mpId, Track track) throws TranscriptionServiceException {
    throw new UnsupportedOperationException("Not supported.");
  }

  @Override
  public void transcriptionDone(String mpId, Object obj) throws TranscriptionServiceException {
    JSONObject jsonObj = null;
    String jobId = null;
    String token = INVALID_TOKEN;
    try {
      token = getRefreshAccessToken();
    } catch (IOException ex) {
      logger.error("Unable to create access token, error: {}", ex.toString());
    }
    if (token.equals(INVALID_TOKEN)) {
      throw new TranscriptionServiceException("Invalid access token");
    }
    try {
      jsonObj = (JSONObject) obj;
      jobId = (String) jsonObj.get("name");
      logger.info("Transcription done for mpId {}, jobId {}", mpId, jobId);
      JSONArray resultsArray = getTranscriptionResult(jsonObj);

      // Update state in database
      // If there's an optimistic lock exception here, it's ok because the workflow dispatcher
      // may be doing the same thing
      database.updateJobControl(jobId, TranscriptionJobControl.Status.TranscriptionComplete.name());

      // Delete audio file from Google storage
      deleteStorageFile(mpId, token);

      // Save results in file system if they exist
      if (resultsArray != null) {
        saveResults(jobId, jsonObj);
      }
    } catch (IOException e) {
      if (jsonObj == null) {
        logger.warn("Could not save transcription results file for mpId {}, jobId {}: null",
                mpId, jobId);
      } else {
        logger.warn("Could not save transcription results file for mpId {}, jobId {}: {}",
                mpId, jobId, jsonObj.toJSONString());
      }
      throw new TranscriptionServiceException("Could not save transcription results file", e);
    } catch (TranscriptionDatabaseException e) {
      logger.warn("Transcription results file were saved but state in db not updated for mpId {}, jobId {}", mpId,
              jobId);
      throw new TranscriptionServiceException("Could not update transcription job control db", e);
    }
  }

  @Override
  public String getLanguage() {
    return defaultLanguage;
  }

  @Override
  public Map<String, Object> getReturnValues(String mpId, String jobId) throws TranscriptionServiceException {
    throw new TranscriptionServiceException("Method not implemented");
  }

  @Override
  public void transcriptionError(String mpId, Object obj) throws TranscriptionServiceException {
    JSONObject jsonObj = null;
    String jobId = null;
    try {
      jsonObj = (JSONObject) obj;
      jobId = (String) jsonObj.get("name");
      // Update state in database
      database.updateJobControl(jobId, TranscriptionJobControl.Status.Error.name());
      TranscriptionJobControl jobControl = database.findByJob(jobId);
      logger.warn("Error received for media package {}, job id {}",
              jobControl.getMediaPackageId(), jobId);
      // Send notification email
      sendEmail(TRANSCRIPTION_ERROR,
              String.format("There was a transcription error for for media package %s, job id %s.",
                      jobControl.getMediaPackageId(), jobId));
    } catch (TranscriptionDatabaseException e) {
      logger.warn("Transcription error. State in db could not be updated to error for mpId {}, jobId {}", mpId, jobId);
      throw new TranscriptionServiceException("Could not update transcription job control db", e);
    }
  }

  @Override
  protected String process(Job job) throws Exception {
    Operation op = null;
    String operation = job.getOperation();
    List<String> arguments = job.getArguments();
    String result = "";
    op = Operation.valueOf(operation);
    switch (op) {
      case StartTranscription:
        String mpId = arguments.get(0);
        Track track = (Track) MediaPackageElementParser.getFromXml(arguments.get(1));
        String languageCode = arguments.get(2);
        createRecognitionsJob(mpId, track, languageCode);
        break;
      default:
        throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
    }
    return result;
  }

  /**
   * Asynchronous Requests and Responses call to Google Speech API
   * https://cloud.google.com/speech-to-text/docs/basics
   */
  void createRecognitionsJob(String mpId, Track track, String languageCode)
          throws TranscriptionServiceException, IOException {
    // Use default defaultlanguage if not set by workflow
    if (StringUtils.isBlank(languageCode)) {
      languageCode = defaultLanguage;
    }
    String audioUrl;
    audioUrl = uploadAudioFileToGoogleStorage(mpId, track);
    CloseableHttpClient httpClient = makeHttpClient();
    CloseableHttpResponse response = null;
    String token = getRefreshAccessToken();
    if (token.equals(INVALID_TOKEN) || audioUrl == null) {
      throw new TranscriptionServiceException("Could not create recognition job. Audio file or access token invalid");
    }

    // Create json for configuration and audio file 
    JSONObject configValues = new JSONObject();
    JSONObject audioValues = new JSONObject();
    JSONObject container = new JSONObject();
    configValues.put("languageCode", languageCode);
    configValues.put("enableWordTimeOffsets", true);
    configValues.put("profanityFilter", profanityFilter);
    configValues.put("enableAutomaticPunctuation", enablePunctuation);
    configValues.put("model", model);
    audioValues.put("uri", audioUrl);
    container.put("config", configValues);
    container.put("audio", audioValues);

    try {
      HttpPost httpPost = new HttpPost(UrlSupport.concat(GOOGLE_SPEECH_URL, REQUEST_METHOD));
      logger.debug("Url to invoke Google speech service: {}", httpPost.getURI().toString());
      StringEntity params = new StringEntity(container.toJSONString());
      httpPost.addHeader("Authorization", "Bearer " + token); // add the authorization header to the request;
      httpPost.addHeader(HttpHeaders.CONTENT_TYPE, "application/json; charset=utf-8");
      httpPost.setEntity(params);
      response = httpClient.execute(httpPost);
      int code = response.getStatusLine().getStatusCode();
      HttpEntity entity = response.getEntity();
      String jsonString = EntityUtils.toString(response.getEntity());
      JSONParser jsonParser = new JSONParser();
      JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);

      switch (code) {
        case HttpStatus.SC_OK: // 200
          logger.info("Recognitions job has been successfully created");

          /**
           * Response returned is a json object: { "name":
           * "7612202767953098924", "metadata": { "@type":
           * "type.googleapis.com/google.cloud.speech.v1.LongRunningRecognizeMetadata",
           * "progressPercent": 90, "startTime": "2017-07-20T16:36:55.033650Z",
           * "lastUpdateTime": "2017-07-20T16:37:17.158630Z" } }
           */
          String jobId = (String) jsonObject.get("name");
          logger.info(
                  "Transcription for mp {} has been submitted. Job id: {}", mpId,
                  jobId);

          database.storeJobControl(mpId, track.getIdentifier(), jobId, TranscriptionJobControl.Status.InProgress.name(),
                  track.getDuration() == null ? 0 : track.getDuration().longValue(), null, PROVIDER);
          EntityUtils.consume(entity);
          return;
        default:
          JSONObject errorObj = (JSONObject) jsonObject.get("error");
          logger.warn("Invalid argument returned, status: {} with message: {}", code, (String) errorObj.get("message"));
          break;
      }
      throw new TranscriptionServiceException("Could not create recognition job. Status returned: " + code);
    } catch (Exception e) {
      logger.warn("Exception when calling the recognitions endpoint", e);
      throw new TranscriptionServiceException("Exception when calling the recognitions endpoint", e);
    } finally {
      try {
        httpClient.close();
        if (response != null) {
          response.close();
        }
      } catch (IOException e) {
      }
    }
  }

  /**
   * Get transcription job result: GET /v1/operations/{name}
   *
   * "response": { "@type":
   * "type.googleapis.com/google.cloud.speech.v1.LongRunningRecognizeResponse",
   * "results": [ { "alternatives": [ { "transcript": "Four score and
   * twenty...", "confidence": 0.97186122, "words": [ { "startTime": "1.300s",
   * "endTime": "1.400s", "word": "Four" }, { "startTime": "1.400s", "endTime":
   * "1.600s", "word": "score" }, { "startTime": "1.600s", "endTime": "1.600s",
   * "word": "and" }, { "startTime": "1.600s", "endTime": "1.900s", "word":
   * "twenty" }, ] } ] }
   */
  boolean getAndSaveJobResults(String jobId) throws TranscriptionServiceException, IOException {
    CloseableHttpClient httpClient = makeHttpClient();
    CloseableHttpResponse response = null;
    String mpId = "unknown";
    JSONArray resultsArray = null;
    String token = getRefreshAccessToken();
    if (token.equals(INVALID_TOKEN)) {
      return false;
    }
    try {
      HttpGet httpGet = new HttpGet(UrlSupport.concat(GOOGLE_SPEECH_URL, RESULT_PATH, jobId));
      logger.debug("Url to invoke Google speech service: {}", httpGet.getURI().toString());
      // add the authorization header to the request;
      httpGet.addHeader("Authorization", "Bearer " + token);
      response = httpClient.execute(httpGet);
      int code = response.getStatusLine().getStatusCode();

      switch (code) {
        case HttpStatus.SC_OK: // 200
          HttpEntity entity = response.getEntity();
          // Response returned is a json object described above
          String jsonString = EntityUtils.toString(entity);
          JSONParser jsonParser = new JSONParser();
          JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
          Boolean jobDone = (Boolean) jsonObject.get("done");
          TranscriptionJobControl jc = database.findByJob(jobId);
          if (jc != null) {
            mpId = jc.getMediaPackageId();
          }
          if (jobDone) {
            resultsArray = getTranscriptionResult(jsonObject);
          }
          logger.info("Recognitions job {} has been found, completed status {}", jobId, jobDone.toString());
          EntityUtils.consume(entity);

          if (jobDone && resultsArray != null) {
            transcriptionDone(mpId, jsonObject);
            return true;
          }
          return false;
        case HttpStatus.SC_NOT_FOUND: // 404
          logger.warn("Job not found: {}", jobId);
          break;
        case HttpStatus.SC_SERVICE_UNAVAILABLE: // 503
          logger.warn("Service unavailable returned, status: {}", code);
          break;
        default:
          logger.warn("Error return status: {}.", code);
          break;
      }
      throw new TranscriptionServiceException(
              String.format("Could not check recognition job for media package %s, job id %s. Status returned: %d",
                      mpId, jobId, code), code);
    } catch (TranscriptionServiceException e) {
      throw e;
    } catch (Exception e) {
      if (hasTranscriptionRequestExpired(jobId)) {
        // Cancel the job and inform admin
        cancelTranscription(jobId, "Google Transcription job canceled due to errors");
        logger.info("Google Transcription job {} has been canceled. Email notification sent", jobId);
      }
      String msg = String.format("Exception when calling the recognitions endpoint for media package %s, job id %s",
              mpId, jobId);
      logger.warn(msg, e);
      throw new TranscriptionServiceException(String.format(
              "Exception when calling the recognitions endpoint for media package %s, job id %s", mpId, jobId), e);
    } finally {
      try {
        httpClient.close();
        if (response != null) {
          response.close();
        }
      } catch (IOException e) {
      }
    }
  }

  /**
   * Get transcription result: GET /v1/operations/{name} Method mainly used by
   * the REST endpoint
   *
   * @param jobId
   * @return job details
   * @throws org.opencastproject.transcription.api.TranscriptionServiceException
   * @throws java.io.IOException
   */
  public String getTranscriptionResults(String jobId)
          throws TranscriptionServiceException, IOException {
    CloseableHttpClient httpClient = makeHttpClient();
    CloseableHttpResponse response = null;
    String token = getRefreshAccessToken();
    if (token.equals(INVALID_TOKEN)) {
      logger.warn("Invalid access token");
      return "No results found";
    }
    try {
      HttpGet httpGet = new HttpGet(UrlSupport.concat(GOOGLE_SPEECH_URL, RESULT_PATH, jobId));
      logger.debug("Url to invoke Google speech service: {}", httpGet.getURI().toString());
      // add the authorization header to the request;
      httpGet.addHeader("Authorization", "Bearer " + token);
      response = httpClient.execute(httpGet);
      int code = response.getStatusLine().getStatusCode();

      switch (code) {
        case HttpStatus.SC_OK: // 200
          HttpEntity entity = response.getEntity();
          logger.info("Retrieved details for transcription with job id: '{}'", jobId);
          return EntityUtils.toString(entity);
        default:
          logger.warn("Error retrieving details for transcription with job id: '{}', return status: {}.", jobId, code);
          break;
      }
    } catch (Exception e) {
      logger.warn("Exception when calling the transcription service for job id: {}", jobId, e);
      throw new TranscriptionServiceException(String.format(
              "Exception when calling the transcription service for job id: %s", jobId), e);
    } finally {
      try {
        httpClient.close();
        if (response != null) {
          response.close();
        }
      } catch (IOException e) {
      }
    }
    return "No results found";
  }

  private void saveResults(String jobId, JSONObject jsonObj) throws IOException {
    JSONArray resultsArray = getTranscriptionResult(jsonObj);
    if (resultsArray != null) {
      // Save the results into a collection
      workspace.putInCollection(TRANSCRIPT_COLLECTION, jobId + ".json",
              new ByteArrayInputStream(jsonObj.toJSONString().getBytes()));
    }
  }

  @Override
  public MediaPackageElement getGeneratedTranscription(String mpId, String jobId, MediaPackageElement.Type type)
          throws TranscriptionServiceException {
    try {
      // If jobId is unknown, look for all jobs associated to that mpId
      if (jobId == null || "null".equals(jobId)) {
        jobId = null;
        for (TranscriptionJobControl jc : database.findByMediaPackage(mpId)) {
          if (TranscriptionJobControl.Status.Closed.name().equals(jc.getStatus())
                  || TranscriptionJobControl.Status.TranscriptionComplete.name().equals(jc.getStatus())) {
            jobId = jc.getTranscriptionJobId();
          }
        }
      }

      if (jobId == null) {
        throw new TranscriptionServiceException(
                "No completed or closed transcription job found in database for media package " + mpId);
      }

      // Results already saved?
      URI uri = workspace.getCollectionURI(TRANSCRIPT_COLLECTION, jobId + ".json");
      try {
        workspace.get(uri);
      } catch (Exception e) {
        try {
          // Not saved yet so call the google speech service to get the results
          getAndSaveJobResults(jobId);
        } catch (IOException ex) {
          logger.error("Unable to retrieve transcription job, error: {}", ex.toString());
        }
      }
      MediaPackageElementBuilder builder = MediaPackageElementBuilderFactory.newInstance().newElementBuilder();
      return builder.elementFromURI(uri, type,
          new MediaPackageElementFlavor("captions", "google-speech-json"));
    } catch (TranscriptionDatabaseException e) {
      throw new TranscriptionServiceException("Job id not informed and could not find transcription", e);
    }
  }

  /**
   * Get mediapackage transcription status
   *
   * @param mpId, mediapackage id
   * @return transcription status
   * @throws TranscriptionServiceException
   */
  public String getTranscriptionStatus(String mpId) throws TranscriptionServiceException {
    try {
      for (TranscriptionJobControl jc : database.findByMediaPackage(mpId)) {
        return jc.getStatus();
      }
    } catch (TranscriptionDatabaseException e) {
      throw new TranscriptionServiceException("Mediapackage id transcription status unknown", e);
    }
    return "Unknown";
  }

  protected CloseableHttpClient makeHttpClient() throws IOException {
    RequestConfig reqConfig = RequestConfig.custom()
            .setConnectTimeout(CONNECTION_TIMEOUT)
            .setSocketTimeout(SOCKET_TIMEOUT)
            .setConnectionRequestTimeout(CONNECTION_TIMEOUT)
            .build();
    return HttpClients.custom().setDefaultRequestConfig(reqConfig).build();
  }

  protected String refreshAccessToken(String clientId, String clientSecret, String refreshToken)
          throws TranscriptionServiceException, IOException {
    CloseableHttpClient httpClient = makeHttpClient();
    CloseableHttpResponse response = null;

    try {
      HttpPost httpPost = new HttpPost(tokenEndpoint + String.format(
              "?client_id=%s&client_secret=%s&refresh_token=%s&grant_type=refresh_token",
              clientId, clientSecret, refreshToken));
      httpPost.addHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded");
      response = httpClient.execute(httpPost);
      int code = response.getStatusLine().getStatusCode();
      String jsonString = EntityUtils.toString(response.getEntity());
      JSONParser jsonParser = new JSONParser();
      JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
      switch (code) {
        case HttpStatus.SC_OK: // 200
          accessToken = (String) jsonObject.get(ACCESS_TOKEN_NAME);
          long duration = (long) jsonObject.get(ACCESS_TOKEN_EXPIRY_NAME); // Duration in second
          tokenExpiryTime = (System.currentTimeMillis() + (duration * 1000)); // time in millisecond
          if (!INVALID_TOKEN.equals(accessToken)) {
            logger.info("Google Cloud Service access token created");
            return accessToken;
          }
          throw new TranscriptionServiceException(
              String.format("Created token is invalid. Status returned: %d", code), code);
        case HttpStatus.SC_BAD_REQUEST: // 400
        case HttpStatus.SC_UNAUTHORIZED: // 401
          String error = (String) jsonObject.get("error");
          String errorDetails = (String) jsonObject.get("error_description");
          logger.warn("Invalid argument returned, status: {}", code);
          logger.warn("Unable to refresh Google Cloud Service token, error: {}, error details: {}",
              error, errorDetails);
          break;
        default:
          logger.warn("Invalid argument returned, status: {}", code);
      }
      throw new TranscriptionServiceException(
              String.format("Could not create Google access token. Status returned: %d", code), code);
    } catch (TranscriptionServiceException e) {
      throw e;
    } catch (Exception e) {
      logger.warn("Unable to generate access token for Google Cloud Services");
      return INVALID_TOKEN;
    } finally {
      try {
        httpClient.close();
        if (response != null) {
          response.close();
        }
      } catch (IOException e) {
      }
    }
  }

  protected String getRefreshAccessToken() throws TranscriptionServiceException, IOException {
    // Check that token hasn't expired
    if ((!INVALID_TOKEN.equals(accessToken))
        && (System.currentTimeMillis() < (tokenExpiryTime - ACCESS_TOKEN_MINIMUM_TIME))) {
      return accessToken;
    }
    return refreshAccessToken(clientId, clientSecret, clientToken);
  }

  protected String uploadAudioFileToGoogleStorage(String mpId, Track track)
          throws TranscriptionServiceException, IOException {
    File audioFile;
    String audioUrl = null;
    String fileExtension;
    int audioResponse;
    CloseableHttpClient httpClientStorage = makeHttpClient();
    GoogleSpeechTranscriptionServiceStorage storage = new GoogleSpeechTranscriptionServiceStorage();
    try {
      audioFile = workspace.get(track.getURI());
      fileExtension = FilenameUtils.getExtension(audioFile.getName());
      long fileSize = audioFile.length();
      String contentType = track.getMimeType().toString();
      String token = getRefreshAccessToken();
      // Upload file to google cloud storage
      audioResponse = storage.startUpload(httpClientStorage, storageBucket, mpId, fileExtension,
              audioFile, String.valueOf(fileSize), contentType, token);
      if (audioResponse == HttpStatus.SC_OK) {
        audioUrl = String.format("gs://%s/%s.%s", storageBucket, mpId, fileExtension);
        return audioUrl;
      }
      logger.error("Error when uploading audio to Google Storage, error code: {}", audioResponse);
      return audioUrl;
    } catch (Exception e) {
      throw new TranscriptionServiceException("Error reading audio track", e);
    }
  }

  private JSONArray getTranscriptionResult(JSONObject jsonObj) {
    JSONObject responseObj = (JSONObject) jsonObj.get("response");
    JSONArray resultsArray = (JSONArray) responseObj.get("results");
    return resultsArray;
  }

  protected void deleteStorageFile(String mpId, String token) throws IOException {
    CloseableHttpClient httpClientDel = makeHttpClient();
    GoogleSpeechTranscriptionServiceStorage storage = new GoogleSpeechTranscriptionServiceStorage();
    storage.deleteGoogleStorageFile(httpClientDel, storageBucket, mpId + "." + defaultEncoding, token);
  }

  private void sendEmail(String subject, String body) {
    if (toEmailAddress == null) {
      logger.info("Skipping sending email notification. Message is {}.", body);
      return;
    }
    try {
      logger.debug("Sending e-mail notification to {}", toEmailAddress);
      smtpService.send(toEmailAddress, String.format("%s (%s)", subject, clusterName), body);
      logger.info("Sent e-mail notification to {}", toEmailAddress);
    } catch (Exception e) {
      logger.error("Could not send email: {}\n{}", subject, body, e);
    }
  }

  private void cancelTranscription(String jobId, String message) {
    try {
      database.updateJobControl(jobId, TranscriptionJobControl.Status.Canceled.name());
      String mpId = database.findByJob(jobId).getMediaPackageId();
      try {
        // Delete file stored on Google storage
        String token = getRefreshAccessToken();
        deleteStorageFile(mpId, token);
      } catch (Exception ex) {
        logger.warn(String.format("could not delete file %s.%s from Google cloud storage", mpId, defaultEncoding), ex);
      } finally {
        // Send notification email
        sendEmail("Transcription ERROR", String.format("%s(media package %s, job id %s).", message, mpId, jobId));
      }
    } catch (Exception e) {
      logger.error(String.format("ERROR while deleting transcription job: %s", jobId), e);
    }
  }

  private boolean hasTranscriptionRequestExpired(String jobId) {
    try {
      // set a time limit based on video duration and maximum processing time
      if (database.findByJob(jobId).getDateCreated().getTime() + database.findByJob(jobId).getTrackDuration()
              + (completionCheckBuffer + maxProcessingSeconds) * 1000 < System.currentTimeMillis()) {
        return true;
      }
    } catch (Exception e) {
      logger.error(String.format("ERROR while calculating transcription request expiration for job: %s", jobId), e);
      // to avoid perpetual non-expired state, transcription is set as expired
      return true;
    }
    return false;
  }

  private long getRemainingTranscriptionExpireTimeInMin(String jobId) {
    try {
      long expiredTime = (database.findByJob(jobId).getDateCreated().getTime()
          + database.findByJob(jobId).getTrackDuration()
          + (completionCheckBuffer + maxProcessingSeconds) * 1000)
          - (System.currentTimeMillis());
      // Transcription has expired
      if (expiredTime < 0) {
        expiredTime = 0;
      }
      return TimeUnit.MILLISECONDS.toMinutes(expiredTime);
    } catch (Exception e) {
      logger.error("Unable to calculate remaining transcription expired time for transcription job {}", jobId);
    }
    return 0;
  }

  @Reference
  public void setServiceRegistry(ServiceRegistry serviceRegistry) {
    this.serviceRegistry = serviceRegistry;
  }

  @Reference
  public void setSecurityService(SecurityService securityService) {
    this.securityService = securityService;
  }

  @Reference
  public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
    this.userDirectoryService = userDirectoryService;
  }

  @Reference
  public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
    this.organizationDirectoryService = organizationDirectoryService;
  }

  @Reference
  public void setSmtpService(SmtpService service) {
    this.smtpService = service;
  }

  @Reference
  public void setWorkspace(Workspace ws) {
    this.workspace = ws;
  }

  @Reference
  public void setWorkingFileRepository(WorkingFileRepository wfr) {
    this.wfr = wfr;
  }

  @Reference
  public void setDatabase(TranscriptionDatabase service) {
    this.database = service;
  }

  @Reference
  public void setAssetManager(AssetManager service) {
    this.assetManager = service;
  }

  @Reference
  public void setWorkflowService(WorkflowService service) {
    this.workflowService = service;
  }

  @Override
  protected ServiceRegistry getServiceRegistry() {
    return serviceRegistry;
  }

  @Override
  protected SecurityService getSecurityService() {
    return securityService;
  }

  @Override
  protected UserDirectoryService getUserDirectoryService() {
    return userDirectoryService;
  }

  @Override
  protected OrganizationDirectoryService getOrganizationDirectoryService() {
    return organizationDirectoryService;
  }

  // Only used by unit tests!
  void setWfUtil(Workflows wfUtil) {
    this.wfUtil = wfUtil;
  }

  class WorkflowDispatcher implements Runnable {

    /**
     * {@inheritDoc}
     *
     * @see java.lang.Thread#run()
     */
    @Override
    public void run() {
      logger.debug("WorkflowDispatcher waking up...");

      try {
        // Find jobs that are in progress and jobs that had transcription complete

        long providerId;
        TranscriptionProviderControl providerInfo = database.findIdByProvider(PROVIDER);
        if (providerInfo != null) {
          providerId = providerInfo.getId();
        } else {
          logger.debug("No jobs yet for provider {}", PROVIDER);
          return;
        }

        List<TranscriptionJobControl> jobs = database.findByStatus(TranscriptionJobControl.Status.InProgress.name(),
                TranscriptionJobControl.Status.TranscriptionComplete.name());
        for (TranscriptionJobControl j : jobs) {

          // Don't process jobs for other services
          if (j.getProviderId() != providerId) {
            continue;
          }

          String mpId = j.getMediaPackageId();
          String jobId = j.getTranscriptionJobId();

          // If the job in progress, check if it should already have finished.
          if (TranscriptionJobControl.Status.InProgress.name().equals(j.getStatus())) {
            // If job should already have been completed, try to get the results. Consider a buffer factor so that we
            // don't try it too early. Results normally should be ready 1/3 of the time of the track duration.
            // The completionCheckBuffer can be used to delay results check.
            if (j.getDateCreated().getTime() + (j.getTrackDuration() / 3) + completionCheckBuffer * 1000 < System
                    .currentTimeMillis()) {
              try {
                if (!getAndSaveJobResults(jobId)) {
                  // Job still running, not finished, so check if it should have finished more than N seconds ago
                  if (hasTranscriptionRequestExpired(jobId)) {
                    // Processing for too long, mark job as cancelled and don't check anymore
                    database.updateJobControl(jobId, TranscriptionJobControl.Status.Canceled.name());
                    // Delete file stored on Google storage
                    String token = getRefreshAccessToken();
                    deleteStorageFile(mpId, token);
                    // Send notification email
                    sendEmail(TRANSCRIPTION_ERROR, String.format(
                        "Transcription job was in processing state for too long and was marked "
                            + "as cancelled (media package %s, job id %s).",
                        mpId, jobId));
                  }
                  // else Job still running, not finished
                  continue;
                }
              } catch (TranscriptionServiceException e) {
                if (e.getCode() == 404) {
                  // Job not found there, update job state to canceled
                  database.updateJobControl(jobId, TranscriptionJobControl.Status.Canceled.name());
                  // Send notification email
                  sendEmail(TRANSCRIPTION_ERROR,
                          String.format("Transcription job was not found (media package %s, job id %s).", mpId, jobId));
                }
                continue; // Skip this one, exception was already logged
              } catch (IOException ex) {
                logger.error("Transcription job not found, error: {}", ex.toString());
              }
            } else {
              continue; // Not time to check yet
            }
          }

          // Jobs that get here have state TranscriptionCompleted or had an IOException]
          try {

            // Apply workflow to attach transcripts
            Map<String, String> params = new HashMap<String, String>();
            params.put(TRANSCRIPTION_JOB_ID_KEY, jobId);
            String wfId = startWorkflow(mpId, workflowDefinitionId, jobId, params);
            if (wfId == null) {
              logger.warn("Attach transcription workflow could NOT be scheduled for mp {}, google speech job {}",
                  mpId, jobId);
              continue;
            }
            // Update state in the database
            database.updateJobControl(jobId, TranscriptionJobControl.Status.Closed.name());
            logger.info("Attach transcription workflow {} scheduled for mp {}, google speech job {}",
                    wfId, mpId, jobId);
          } catch (Exception e) {
            logger.warn("Attach transcription workflow could NOT be scheduled for mp {}, google speech job {}, {}: {}",
                    mpId, jobId, e.getClass().getName(), e.getMessage());
          }
        }
      } catch (TranscriptionDatabaseException e) {
        logger.warn("Could not read transcription job control database: {}", e.getMessage());
      }
    }
  }

  private String startWorkflow(String mpId, String wfDefId, String jobId, Map<String, String> params) {
    DefaultOrganization defaultOrg = new DefaultOrganization();
    securityService.setOrganization(defaultOrg);
    securityService.setUser(SecurityUtil.createSystemUser(systemAccount, defaultOrg));

    // Find the episode
    Optional<Snapshot> snapshot = assetManager.getLatestSnapshot(mpId);
    if (snapshot.isEmpty()) {
      if (!hasTranscriptionRequestExpired(jobId)) {
        // Media package not archived but still within completion time? Skip until next time.
        logger.warn("Media package {} has not been archived yet or has been deleted. Will keep trying for {} "
            + "more minutes before cancelling transcription job {}.",
            mpId, getRemainingTranscriptionExpireTimeInMin(jobId), jobId);
      } else {
        // Close transcription job and email admin
        cancelTranscription(jobId, " Google Transcription job canceled, archived media package not found");
        logger.info("Google Transcription job {} has been canceled. Email notification sent", jobId);
      }
      return null;
    }

    String org = snapshot.get().getOrganizationId();
    Organization organization = null;
    try {
      organization = organizationDirectoryService.getOrganization(org);
      if (organization == null) {
        logger.warn("Media package {} has an unknown organization {}.", mpId, org);
        return null;
      }
    } catch (NotFoundException e) {
      logger.warn("Organization {} not found for media package {}.", org, mpId);
      return null;
    }
    securityService.setOrganization(organization);

    try {
      WorkflowDefinition wfDef = workflowService.getWorkflowDefinitionById(wfDefId);
      Workflows workflows;
      if (wfUtil != null) {
        workflows = wfUtil;
      } else {
        workflows = new Workflows(assetManager, workflowService);
      }
      Set<String> mpIds = new HashSet<String>();
      mpIds.add(mpId);
      List<WorkflowInstance> wfList = workflows
              .applyWorkflowToLatestVersion(mpIds, ConfiguredWorkflow.workflow(wfDef, params));
      return wfList.size() > 0 ? Long.toString(wfList.get(0).getId()) : null;
    } catch (NotFoundException | WorkflowDatabaseException e) {
      logger.warn("Could not get workflow definition: {}", wfDefId);
    }

    return null;
  }

  class ResultsFileCleanup implements Runnable {

    @Override
    public void run() {
      logger.info("ResultsFileCleanup waking up...");
      try {
        // Cleans up results files older than CLEANUP_RESULT_FILES_DAYS days
        wfr.cleanupOldFilesFromCollection(TRANSCRIPT_COLLECTION, cleanupResultDays);
      } catch (IOException e) {
        logger.warn("Could not cleanup old transcript results files", e);
      }
    }
  }

}