IBMWatsonTranscriptionService.java

/*
 * Licensed to The Apereo Foundation under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional
 * information regarding copyright ownership.
 *
 *
 * The Apereo Foundation licenses this file to you under the Educational
 * Community License, Version 2.0 (the "License"); you may not use this file
 * except in compliance with the License. You may obtain a copy of the License
 * at:
 *
 *   http://opensource.org/licenses/ecl2.txt
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 * License for the specific language governing permissions and limitations under
 * the License.
 *
 */
package org.opencastproject.transcription.ibmwatson;

import static org.opencastproject.systems.OpencastConstants.ADMIN_EMAIL_PROPERTY;

import org.opencastproject.assetmanager.api.AssetManager;
import org.opencastproject.assetmanager.api.fn.Enrichments;
import org.opencastproject.assetmanager.api.query.AQueryBuilder;
import org.opencastproject.assetmanager.api.query.AResult;
import org.opencastproject.assetmanager.util.Workflows;
import org.opencastproject.job.api.AbstractJobProducer;
import org.opencastproject.job.api.Job;
import org.opencastproject.kernel.mail.SmtpService;
import org.opencastproject.mediapackage.MediaPackageElement;
import org.opencastproject.mediapackage.MediaPackageElementBuilder;
import org.opencastproject.mediapackage.MediaPackageElementBuilderFactory;
import org.opencastproject.mediapackage.MediaPackageElementFlavor;
import org.opencastproject.mediapackage.MediaPackageElementParser;
import org.opencastproject.mediapackage.MediaPackageException;
import org.opencastproject.mediapackage.Track;
import org.opencastproject.security.api.DefaultOrganization;
import org.opencastproject.security.api.Organization;
import org.opencastproject.security.api.OrganizationDirectoryService;
import org.opencastproject.security.api.SecurityService;
import org.opencastproject.security.api.UserDirectoryService;
import org.opencastproject.security.util.SecurityUtil;
import org.opencastproject.serviceregistry.api.ServiceRegistry;
import org.opencastproject.serviceregistry.api.ServiceRegistryException;
import org.opencastproject.systems.OpencastConstants;
import org.opencastproject.transcription.api.TranscriptionService;
import org.opencastproject.transcription.api.TranscriptionServiceException;
import org.opencastproject.transcription.persistence.TranscriptionDatabase;
import org.opencastproject.transcription.persistence.TranscriptionDatabaseException;
import org.opencastproject.transcription.persistence.TranscriptionJobControl;
import org.opencastproject.transcription.persistence.TranscriptionProviderControl;
import org.opencastproject.util.LoadUtil;
import org.opencastproject.util.NotFoundException;
import org.opencastproject.util.OsgiUtil;
import org.opencastproject.util.UrlSupport;
import org.opencastproject.util.data.Option;
import org.opencastproject.workflow.api.ConfiguredWorkflow;
import org.opencastproject.workflow.api.WorkflowDatabaseException;
import org.opencastproject.workflow.api.WorkflowDefinition;
import org.opencastproject.workflow.api.WorkflowInstance;
import org.opencastproject.workflow.api.WorkflowService;
import org.opencastproject.workingfilerepository.api.WorkingFileRepository;
import org.opencastproject.workspace.api.Workspace;

import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.AuthCache;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.entity.FileEntity;
import org.apache.http.impl.auth.BasicScheme;
import org.apache.http.impl.client.BasicAuthCache;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@Component(
    immediate = true,
    service = { TranscriptionService.class,IBMWatsonTranscriptionService.class },
    property = {
        "service.description=IBM Watson Transcription Service",
        "provider=ibm.watson"
    }
)
public class IBMWatsonTranscriptionService extends AbstractJobProducer implements TranscriptionService {

  /** The logger */
  private static final Logger logger = LoggerFactory.getLogger(IBMWatsonTranscriptionService.class);

  private static final String PROVIDER = "IBM Watson";

  private static final String JOB_TYPE = "org.opencastproject.transcription.ibmwatson";

  static final String TRANSCRIPT_COLLECTION = "transcripts";
  static final String APIKEY = "apikey";
  private static final int CONNECTION_TIMEOUT = 60000; // ms, 1 minute
  private static final int SOCKET_TIMEOUT = 60000; // ms, 1 minute
  // Default wf to attach transcription results to mp
  public static final String DEFAULT_WF_DEF = "attach-watson-transcripts";
  private static final long DEFAULT_COMPLETION_BUFFER = 600; // in seconds, default is 10 minutes
  private static final long DEFAULT_DISPATCH_INTERVAL = 60; // in seconds, default is 1 minute
  private static final long DEFAULT_MAX_PROCESSING_TIME = 2 * 60 * 60; // in seconds, default is 2 hours
  private static final String DEFAULT_LANGUAGE = "en";
  // Cleans up results files that are older than 7 days (which is how long the IBM watson
  // speech-to-text-service keeps the jobs by default)
  private static final int DEFAULT_CLEANUP_RESULTS_DAYS = 7;

  // Global configuration (custom.properties)
  public static final String ADMIN_URL_PROPERTY = "org.opencastproject.admin.ui.url";
  private static final String DIGEST_USER_PROPERTY = "org.opencastproject.security.digest.user";

  // Cluster name
  private static final String CLUSTER_NAME_PROPERTY = "org.opencastproject.environment.name";
  private String clusterName = "";

  /** The load introduced on the system by creating a transcription job */
  public static final float DEFAULT_START_TRANSCRIPTION_JOB_LOAD = 0.1f;

  /** The key to look for in the service configuration file to override the default h=job load */
  public static final String START_TRANSCRIPTION_JOB_LOAD_KEY = "job.load.start.transcription";

  /** The load introduced on the system by creating a caption job */
  private float jobLoad = DEFAULT_START_TRANSCRIPTION_JOB_LOAD;

  // The events we are interested in receiving notifications
  public interface JobEvent {
    String COMPLETED_WITH_RESULTS = "recognitions.completed_with_results";
    String FAILED = "recognitions.failed";
  }

  public interface RecognitionJobStatus {
    String COMPLETED = "completed";
    String FAILED = "failed";
    String PROCESSING = "processing";
    String WAITING = "waiting";
  }

  /** Service dependencies */
  private ServiceRegistry serviceRegistry;
  private SecurityService securityService;
  private UserDirectoryService userDirectoryService;
  private OrganizationDirectoryService organizationDirectoryService;
  private Workspace workspace;
  private TranscriptionDatabase database;
  private AssetManager assetManager;
  private WorkflowService workflowService;
  private WorkingFileRepository wfr;
  private SmtpService smtpService;

  // Only used by unit tests!
  private Workflows wfUtil;

  private enum Operation {
    StartTranscription
  }

  private static final String IBM_WATSON_SERVICE_URL = "https://stream.watsonplatform.net/speech-to-text/api";
  private static final String API_VERSION = "v1";
  private static final String REGISTER_CALLBACK = "register_callback";
  private static final String RECOGNITIONS = "recognitions";
  private static final String CALLBACK_PATH = "/transcripts/watson/results";

  /** Service configuration options */
  public static final String ENABLED_CONFIG = "enabled";
  public static final String IBM_WATSON_SERVICE_URL_CONFIG = "ibm.watson.service.url";
  public static final String IBM_WATSON_USER_CONFIG = "ibm.watson.user";
  public static final String IBM_WATSON_PSW_CONFIG = "ibm.watson.password";
  public static final String IBM_WATSON_API_KEY_CONFIG = "ibm.watson.api.key";
  public static final String IBM_WATSON_MODEL_CONFIG = "ibm.watson.model";
  public static final String WORKFLOW_CONFIG = "workflow";
  public static final String DISPATCH_WORKFLOW_INTERVAL_CONFIG = "workflow.dispatch.interval";
  public static final String COMPLETION_CHECK_BUFFER_CONFIG = "completion.check.buffer";
  public static final String MAX_PROCESSING_TIME_CONFIG = "max.processing.time";
  public static final String NOTIFICATION_EMAIL_CONFIG = "notification.email";
  public static final String CLEANUP_RESULTS_DAYS_CONFIG = "cleanup.results.days";
  public static final String MAX_ATTEMPTS_CONFIG = "max.attempts";
  public static final String RETRY_WORKLFOW_CONFIG = "retry.workflow";

  /** Service configuration values */
  private boolean enabled = false; // Disabled by default
  private String watsonServiceUrl = UrlSupport.concat(IBM_WATSON_SERVICE_URL, API_VERSION);
  private String model;
  private String workflowDefinitionId = DEFAULT_WF_DEF;
  private long workflowDispatchInterval = DEFAULT_DISPATCH_INTERVAL;
  private long completionCheckBuffer = DEFAULT_COMPLETION_BUFFER;
  private long maxProcessingSeconds = DEFAULT_MAX_PROCESSING_TIME;
  private String toEmailAddress;
  private int cleanupResultDays = DEFAULT_CLEANUP_RESULTS_DAYS;
  private String language = DEFAULT_LANGUAGE;
  private int maxAttempts = 1;
  private String retryWfDefId = null;

  private String systemAccount;
  private String serverUrl;
  private String callbackUrl;
  private boolean callbackAlreadyRegistered = false;
  private ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);
  // For preemptive basic authentication
  private AuthCache authCache;
  private CredentialsProvider credentialsProvider;

  public IBMWatsonTranscriptionService() {
    super(JOB_TYPE);
  }

  @Override
  @Activate
  public void activate(ComponentContext cc) {
    if (cc != null) {
      // Has this service been enabled?
      enabled = OsgiUtil.getOptCfgAsBoolean(cc.getProperties(), ENABLED_CONFIG).get();

      if (enabled) {
        // Service url (optional)
        Option<String> urlOpt = OsgiUtil.getOptCfg(cc.getProperties(), IBM_WATSON_SERVICE_URL_CONFIG);
        if (urlOpt.isSome()) {
          watsonServiceUrl = UrlSupport.concat(urlOpt.get(), API_VERSION);
        }

        // Api key is checked first. If not entered, user and password are mandatory (to
        // support older instances of the STT service)
        String user; // user name or 'apikey'
        String psw; // user password or api key
        Option<String> keyOpt = OsgiUtil.getOptCfg(cc.getProperties(), IBM_WATSON_API_KEY_CONFIG);
        if (keyOpt.isSome()) {
          user = APIKEY;
          psw = keyOpt.get();
          logger.info("Using transcription service at {} with api key", watsonServiceUrl);
        } else {
          // User name (mandatory if api key is empty)
          user = OsgiUtil.getComponentContextProperty(cc, IBM_WATSON_USER_CONFIG);
          // Password (mandatory if api key is empty)
          psw = OsgiUtil.getComponentContextProperty(cc, IBM_WATSON_PSW_CONFIG);
          logger.info("Using transcription service at {} with username {}", watsonServiceUrl, user);
        }
        // We will use preemptive basic auth
        try {
          URI uri = new URI(watsonServiceUrl);
          HttpHost targetHost = new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme());
          credentialsProvider = new BasicCredentialsProvider();
          credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, psw));
          authCache = new BasicAuthCache();
          authCache.put(targetHost, new BasicScheme());
        } catch (URISyntaxException e) {
          throw new RuntimeException("Watson STT service url is not valid: " + watsonServiceUrl, e);
        }

        // Language model to be used (optional)
        Option<String> modelOpt = OsgiUtil.getOptCfg(cc.getProperties(), IBM_WATSON_MODEL_CONFIG);
        if (modelOpt.isSome()) {
          model = modelOpt.get();
          language = StringUtils.substringBefore(model, "-");
          logger.info("Model is {}", model);
        } else {
          logger.info("Default model will be used");
        }

        // Workflow to execute when getting callback (optional, with default)
        Option<String> wfOpt = OsgiUtil.getOptCfg(cc.getProperties(), WORKFLOW_CONFIG);
        if (wfOpt.isSome()) {
          workflowDefinitionId = wfOpt.get();
        }
        logger.info("Workflow definition is {}", workflowDefinitionId);
        // Interval to check for completed transcription jobs and start workflows to attach transcripts
        Option<String> intervalOpt = OsgiUtil.getOptCfg(cc.getProperties(), DISPATCH_WORKFLOW_INTERVAL_CONFIG);
        if (intervalOpt.isSome()) {
          try {
            workflowDispatchInterval = Long.parseLong(intervalOpt.get());
          } catch (NumberFormatException e) {
            // Use default
          }
        }
        logger.info("Workflow dispatch interval is {} seconds", workflowDispatchInterval);
        // How long to wait after a transcription is supposed to finish before starting checking
        Option<String> bufferOpt = OsgiUtil.getOptCfg(cc.getProperties(), COMPLETION_CHECK_BUFFER_CONFIG);
        if (bufferOpt.isSome()) {
          try {
            completionCheckBuffer = Long.parseLong(bufferOpt.get());
          } catch (NumberFormatException e) {
            // Use default
            logger.warn("Invalid configuration for {} : {}. Default used instead: {}",
                    COMPLETION_CHECK_BUFFER_CONFIG, bufferOpt.get(), completionCheckBuffer);
          }
        }
        logger.info("Completion check buffer is {} seconds", completionCheckBuffer);
        // How long to wait after a transcription is supposed to finish before marking the job as canceled in the db
        Option<String> maxProcessingOpt = OsgiUtil.getOptCfg(cc.getProperties(), MAX_PROCESSING_TIME_CONFIG);
        if (maxProcessingOpt.isSome()) {
          try {
            maxProcessingSeconds = Long.parseLong(maxProcessingOpt.get());
          } catch (NumberFormatException e) {
            // Use default
          }
        }
        logger.info("Maximum time a job is checked after it should have ended is {} seconds", maxProcessingSeconds);
        // How long to keep result files in the working file repository
        Option<String> cleaupOpt = OsgiUtil.getOptCfg(cc.getProperties(), CLEANUP_RESULTS_DAYS_CONFIG);
        if (cleaupOpt.isSome()) {
          try {
            cleanupResultDays = Integer.parseInt(cleaupOpt.get());
          } catch (NumberFormatException e) {
            // Use default
          }
        }
        logger.info("Cleanup result files after {} days", cleanupResultDays);

        // Maximum number of retries if error (optional)
        Option<String> maxAttemptsOpt = OsgiUtil.getOptCfg(cc.getProperties(), MAX_ATTEMPTS_CONFIG);
        if (maxAttemptsOpt.isSome()) {
          try {
            maxAttempts = Integer.parseInt(maxAttemptsOpt.get());
            retryWfDefId = OsgiUtil.getComponentContextProperty(cc, RETRY_WORKLFOW_CONFIG);
          } catch (NumberFormatException e) {
            // Use default
            logger.warn("Invalid configuration for {} : {}. Default used instead: no retries", MAX_ATTEMPTS_CONFIG,
                    maxAttemptsOpt.get());
          }
        } else {
          logger.info("No retries in case of errors");
        }

        serverUrl = OsgiUtil.getContextProperty(cc, OpencastConstants.SERVER_URL_PROPERTY);
        systemAccount = OsgiUtil.getContextProperty(cc, DIGEST_USER_PROPERTY);

        jobLoad = LoadUtil.getConfiguredLoadValue(cc.getProperties(), START_TRANSCRIPTION_JOB_LOAD_KEY,
                DEFAULT_START_TRANSCRIPTION_JOB_LOAD, serviceRegistry);

        // Schedule the workflow dispatching, starting in 2 minutes
        scheduledExecutor.scheduleWithFixedDelay(new WorkflowDispatcher(), 120, workflowDispatchInterval,
                TimeUnit.SECONDS);

        // Schedule the cleanup of old results jobs from the collection in the wfr once a day
        scheduledExecutor.scheduleWithFixedDelay(new ResultsFileCleanup(), 1, 1, TimeUnit.DAYS);

        // Notification email passed in this service configuration?
        Option<String> optTo = OsgiUtil.getOptCfg(cc.getProperties(), NOTIFICATION_EMAIL_CONFIG);
        if (optTo.isSome()) {
          toEmailAddress = optTo.get();
        } else {
          // Use admin email informed in custom.properties
          optTo = OsgiUtil.getOptContextProperty(cc, ADMIN_EMAIL_PROPERTY);
          if (optTo.isSome()) {
            toEmailAddress = optTo.get();
          }
        }
        if (toEmailAddress != null) {
          logger.info("Notification email set to {}", toEmailAddress);
        } else {
          logger.warn("Email notification disabled");
        }

        Option<String> optCluster = OsgiUtil.getOptContextProperty(cc, CLUSTER_NAME_PROPERTY);
        if (optCluster.isSome()) {
          clusterName = optCluster.get();
        }
        logger.info("Environment name is {}", clusterName);

        logger.info("Activated!");
        // Cannot call registerCallback here because of the REST service dependency on this service
      } else {
        logger.info("Service disabled. If you want to enable it, please update the service configuration.");
      }
    } else {
      throw new IllegalArgumentException("Missing component context");
    }
  }

  @Override
  public Job startTranscription(String mpId, Track track) throws TranscriptionServiceException {
    if (!enabled) {
      throw new TranscriptionServiceException(
              "This service is disabled. If you want to enable it, please update the service configuration.");
    }

    try {
      return serviceRegistry.createJob(JOB_TYPE, Operation.StartTranscription.name(),
              Arrays.asList(mpId, MediaPackageElementParser.getAsXml(track)), jobLoad);
    } catch (ServiceRegistryException e) {
      throw new TranscriptionServiceException("Unable to create a job", e);
    } catch (MediaPackageException e) {
      throw new TranscriptionServiceException("Invalid track " + track.toString(), e);
    }
  }

  @Override
  public Job startTranscription(String mpId, Track track, String... args) {
    throw new UnsupportedOperationException("Not supported.");
  }

  @Override
  public void transcriptionDone(String mpId, Object obj) throws TranscriptionServiceException {
    JSONObject jsonObj = null;
    String jobId = null;
    try {
      jsonObj = (JSONObject) obj;
      jobId = (String) jsonObj.get("id");

      // Check for errors inside the results object. Sometimes we get a status completed, but
      // the transcription failed e.g.
      // curl --header "Content-Type: application/json" --request POST --data
      // '{"id":"ebeeb546-2e1a-11e9-941d-f349af2d6273",
      // "results":[{"error":"failed when posting audio to the STT service"}],
      // "event":"recognitions.completed_with_results",
      // "user_token":"66c6c9b0-b6a2-4c9a-92c8-55f953ab3d38",
      // "created":"2019-02-11T05:04:29.283Z"}' http://ADMIN/transcripts/watson/results
      if (jsonObj.get("results") instanceof JSONArray) {
        JSONArray resultsArray = (JSONArray) jsonObj.get("results");
        if (resultsArray != null && resultsArray.size() > 0) {
          String error = (String) ((JSONObject) resultsArray.get(0)).get("error");
          if (!StringUtils.isEmpty(error)) {
            retryOrError(jobId, mpId,
                String.format("Transcription completed with error for mpId %s, jobId %s: %s", mpId, jobId, error));
            return;
          }
        }
      }

      logger.info("Transcription done for mpId {}, jobId {}", mpId, jobId);

      // Update state in database
      // If there's an optimistic lock exception here, it's ok because the workflow dispatcher
      // may be doing the same thing
      database.updateJobControl(jobId, TranscriptionJobControl.Status.TranscriptionComplete.name());

      // Save results in file system if there
      if (jsonObj.get("results") != null) {
        saveResults(jobId, jsonObj);
      }
    } catch (IOException e) {
      logger.warn("Could not save transcription results file for mpId {}, jobId {}: {}",
              mpId, jobId, jsonObj == null ? "null" : jsonObj.toJSONString());
      throw new TranscriptionServiceException("Could not save transcription results file", e);
    } catch (TranscriptionDatabaseException e) {
      logger.warn("Error when updating state in database for mpId {}, jobId {}", mpId, jobId);
      throw new TranscriptionServiceException("Could not update transcription job control db", e);
    }
  }

  @Override
  public void transcriptionError(String mpId, Object obj) throws TranscriptionServiceException {
    JSONObject jsonObj = (JSONObject) obj;
    String jobId = (String) jsonObj.get("id");
    try {
      retryOrError(jobId, mpId, String.format("Transcription error for media package %s, job id %s", mpId, jobId));
    } catch (TranscriptionDatabaseException e) {
      throw new TranscriptionServiceException("Error when updating job state.", e);
    }
  }

  @Override
  public String getLanguage() {
    return language;
  }

  @Override
  public Map<String, Object> getReturnValues(String mpId, String jobId) throws TranscriptionServiceException {
    throw new TranscriptionServiceException("Method not implemented");
  }

  @Override
  protected String process(Job job) throws Exception {
    Operation op = null;
    String operation = job.getOperation();
    List<String> arguments = job.getArguments();
    String result = "";

    op = Operation.valueOf(operation);

    switch (op) {
      case StartTranscription:
        String mpId = arguments.get(0);
        Track track = (Track) MediaPackageElementParser.getFromXml(arguments.get(1));
        createRecognitionsJob(mpId, track);
        break;
      default:
        throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
    }

    return result;
  }

  // Example URL is too long but needs to stay like this
  // CHECKSTYLE:OFF
  /**
   * Register the callback url with the Speech-to-text service. From:
   * https://cloud.ibm.com/apidocs/speech-to-text#register-a-callback
   *
   * curl -X POST -u "apikey:{apikey}"
   * "https://stream.watsonplatform.net/speech-to-text/api/v1/register_callback?callback_url=http://{user_callback_path}/job_results&user_secret=ThisIsMySecret"
   * Response looks like: { "status": "created", "url": "http://{user_callback_path}/results" }
   */
  // CHECKSTYLE:ON
  void registerCallback() throws TranscriptionServiceException {
    if (callbackAlreadyRegistered) {
      return;
    }

    Organization org = securityService.getOrganization();
    String adminUrl = StringUtils.trimToNull(org.getProperties().get(ADMIN_URL_PROPERTY));
    if (adminUrl != null) {
      callbackUrl = adminUrl + CALLBACK_PATH;
    } else {
      callbackUrl = serverUrl + CALLBACK_PATH;
    }
    logger.info("Callback url is {}", callbackUrl);

    CloseableHttpClient httpClient = makeHttpClient();
    HttpPost httpPost = new HttpPost(
            UrlSupport.concat(watsonServiceUrl, REGISTER_CALLBACK) + String.format("?callback_url=%s", callbackUrl));
    // Add AuthCache to the execution context for preemptive auth
    HttpClientContext context = HttpClientContext.create();
    context.setCredentialsProvider(credentialsProvider);
    context.setAuthCache(authCache);
    CloseableHttpResponse response = null;

    try {
      response = httpClient.execute(httpPost, context);
      int code = response.getStatusLine().getStatusCode();

      switch (code) {
        case HttpStatus.SC_OK: // 200
          logger.info("Callback url: {} had already already been registered", callbackUrl);
          callbackAlreadyRegistered = true;
          EntityUtils.consume(response.getEntity());
          break;
        case HttpStatus.SC_CREATED: // 201
          logger.info("Callback url: {} has been successfully registered", callbackUrl);
          callbackAlreadyRegistered = true;
          EntityUtils.consume(response.getEntity());
          break;
        case HttpStatus.SC_BAD_REQUEST: // 400
          logger.warn("Callback url {} could not be verified, status: {}", callbackUrl, code);
          break;
        case HttpStatus.SC_SERVICE_UNAVAILABLE: // 503
          logger.warn("Service unavailable when registering callback url {} status: {}", callbackUrl, code);
          break;
        default:
          logger.warn("Unknown status when registering callback url {}, status: {}", callbackUrl, code);
          break;
      }
    } catch (Exception e) {
      logger.warn("Exception when calling the the register callback endpoint", e);
    } finally {
      try {
        httpClient.close();
        if (response != null) {
          response.close();
        }
      } catch (IOException e) {
      }
    }
  }

  // Example URL is too long but needs to stay like this
  // CHECKSTYLE:OFF
  /**
   * From: https://cloud.ibm.com/apidocs/speech-to-text#create-a-job:
   *
   * curl -X POST -u "apikey:{apikey}" --header "Content-Type: audio/flac" --data-binary @audio-file.flac
   * "https://stream.watsonplatform.net/speech-to-text/api/v1/recognitions?callback_url=http://{user_callback_path}/job_results&user_token=job25&timestamps=true"
   *
   * Response: { "id": "4bd734c0-e575-21f3-de03-f932aa0468a0", "status": "waiting", "url":
   * "http://stream.watsonplatform.net/speech-to-text/api/v1/recognitions/4bd734c0-e575-21f3-de03-f932aa0468a0" }
   */
  // CHECKSTYLE:ON
  void createRecognitionsJob(String mpId, Track track) throws TranscriptionServiceException {
    if (!callbackAlreadyRegistered) {
      registerCallback();
    }

    // Get audio track file
    File audioFile = null;
    try {
      audioFile = workspace.get(track.getURI());
    } catch (Exception e) {
      throw new TranscriptionServiceException("Error reading audio track", e);
    }

    CloseableHttpClient httpClient = makeHttpClient();
    String additionalParms = "";
    if (callbackAlreadyRegistered) {
      additionalParms = String.format("&user_token=%s&callback_url=%s&events=%s,%s", mpId, callbackUrl,
              JobEvent.COMPLETED_WITH_RESULTS, JobEvent.FAILED);
    }
    if (!StringUtils.isEmpty(model)) {
      additionalParms += String.format("&model=%s", model);
    }
    // Add AuthCache to the execution context for preemptive auth
    HttpClientContext context = HttpClientContext.create();
    context.setCredentialsProvider(credentialsProvider);
    context.setAuthCache(authCache);
    CloseableHttpResponse response = null;
    try {
      HttpPost httpPost = new HttpPost(UrlSupport.concat(watsonServiceUrl, RECOGNITIONS)
              + String.format(
                      "?inactivity_timeout=-1&timestamps=true&smart_formatting=true%s", additionalParms));
      logger.debug("Url to invoke ibm watson service: {}", httpPost.getURI().toString());
      httpPost.setHeader(HttpHeaders.CONTENT_TYPE, track.getMimeType().toString());
      FileEntity fileEntity = new FileEntity(audioFile);
      fileEntity.setChunked(true);
      httpPost.setEntity(fileEntity);
      response = httpClient.execute(httpPost, context);
      int code = response.getStatusLine().getStatusCode();

      switch (code) {
        case HttpStatus.SC_CREATED: // 201
          logger.info("Recognitions job has been successfully created");

          HttpEntity entity = response.getEntity();
          // Response returned is a json object:
          // {
          // "id": "4bd734c0-e575-21f3-de03-f932aa0468a0",
          // "status": "waiting",
          // "url":
          // "http://stream.watsonplatform.net/speech-to-text/api/v1/recognitions/4bd734c0-e575-21f3-de03-f932aa0468a0"
          // }
          String jsonString = EntityUtils.toString(response.getEntity());
          JSONParser jsonParser = new JSONParser();
          JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
          String jobId = (String) jsonObject.get("id");
          String jobStatus = (String) jsonObject.get("status");
          String jobUrl = (String) jsonObject.get("url");
          logger.info("Transcription for mp {} has been submitted. Job id: {}, job status: {}, job url: {}", mpId,
                  jobId, jobStatus, jobUrl);

          database.storeJobControl(mpId, track.getIdentifier(), jobId, TranscriptionJobControl.Status.InProgress.name(),
                  track.getDuration() == null ? 0 : track.getDuration().longValue(), null, PROVIDER);
          EntityUtils.consume(entity);
          return;
        case HttpStatus.SC_BAD_REQUEST: // 400
          logger.info("Invalid argument returned, status: {}", code);
          break;
        case HttpStatus.SC_SERVICE_UNAVAILABLE: // 503
          logger.info("Service unavailable returned, status: {}", code);
          break;
        default:
          logger.info("Unknown return status: {}.", code);
          break;
      }
      throw new TranscriptionServiceException("Could not create recognition job. Status returned: " + code);
    } catch (Exception e) {
      logger.warn("Exception when calling the recognitions endpoint", e);
      throw new TranscriptionServiceException("Exception when calling the recognitions endpoint", e);
    } finally {
      try {
        httpClient.close();
        if (response != null) {
          response.close();
        }
      } catch (IOException e) {
      }
    }
  }

  /**
   * From: https://cloud.ibm.com/apidocs/speech-to-text#check-a-job:
   *
   * curl -X GET -u "apikey:{apikey}" "https://stream.watsonplatform.net/speech-to-text/api/v1/recognitions/{id}"
   *
   * Response: { "results": [ { "result_index": 0, "results": [ { "final": true, "alternatives": [ { "transcript":
   * "several tornadoes touch down as a line of severe thunderstorms swept through Colorado on Sunday ", "timestamps": [
   * [ "several", 1, 1.52 ], [ "tornadoes", 1.52, 2.15 ], . . . [ "Sunday", 5.74, 6.33 ] ], "confidence": 0.885 } ] } ]
   * } ], "created": "2016-08-17T19:11:04.298Z", "updated": "2016-08-17T19:11:16.003Z", "status": "completed" }
   */
  String getAndSaveJobResults(String jobId) throws TranscriptionServiceException {
    CloseableHttpClient httpClient = makeHttpClient();
    // Add AuthCache to the execution context for preemptive auth
    HttpClientContext context = HttpClientContext.create();
    context.setCredentialsProvider(credentialsProvider);
    context.setAuthCache(authCache);
    CloseableHttpResponse response = null;
    String mpId = "unknown";
    try {
      HttpGet httpGet = new HttpGet(UrlSupport.concat(watsonServiceUrl, RECOGNITIONS, jobId));
      response = httpClient.execute(httpGet, context);
      int code = response.getStatusLine().getStatusCode();

      switch (code) {
        case HttpStatus.SC_OK: // 200
          HttpEntity entity = response.getEntity();

          // Response returned is a json object described above
          String jsonString = EntityUtils.toString(entity);
          JSONParser jsonParser = new JSONParser();
          JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
          String jobStatus = (String) jsonObject.get("status");
          mpId = (String) jsonObject.get("user_token");
          // user_token doesn't come back if this is not in the context of a callback so get the mpId from the db
          if (mpId == null) {
            TranscriptionJobControl jc = database.findByJob(jobId);
            if (jc != null) {
              mpId = jc.getMediaPackageId();
            }
          }
          logger.info("Recognitions job {} has been found, status {}", jobId, jobStatus);
          EntityUtils.consume(entity);

          if (jobStatus.indexOf(RecognitionJobStatus.COMPLETED) > -1 && jsonObject.get("results") != null) {
            transcriptionDone(mpId, jsonObject);
          }
          return jobStatus;
        case HttpStatus.SC_NOT_FOUND: // 404
          logger.info("Job not found: {}", jobId);
          break;
        case HttpStatus.SC_SERVICE_UNAVAILABLE: // 503
          logger.info("Service unavailable returned, status: {}", code);
          break;
        default:
          logger.info("Unknown return status: {}.", code);
          break;
      }
      throw new TranscriptionServiceException(
              String.format("Could not check recognition job for media package %s, job id %s. Status returned: %d",
                      mpId, jobId, code),
              code);
    } catch (TranscriptionServiceException e) {
      throw e;
    } catch (Exception e) {
      logger.warn("Exception when calling the recognitions endpoint for media package {}, job id {}",
              mpId, jobId, e);
      throw new TranscriptionServiceException(String.format(
              "Exception when calling the recognitions endpoint for media package %s, job id %s", mpId, jobId), e);
    } finally {
      try {
        httpClient.close();
        if (response != null) {
          response.close();
        }
      } catch (IOException e) {
      }
    }
  }

  private void saveResults(String jobId, JSONObject jsonObj) throws IOException {
    if (jsonObj.get("results") != null) {
      // Save the results into a collection
      workspace.putInCollection(TRANSCRIPT_COLLECTION, jobId + ".json",
              new ByteArrayInputStream(jsonObj.toJSONString().getBytes()));
    }
  }

  @Override
  public MediaPackageElement getGeneratedTranscription(String mpId, String jobId, MediaPackageElement.Type type)
          throws TranscriptionServiceException {
    try {
      // If jobId is unknown, look for all jobs associated to that mpId
      if (jobId == null || "null".equals(jobId)) {
        jobId = null;
        for (TranscriptionJobControl jc : database.findByMediaPackage(mpId)) {
          if (TranscriptionJobControl.Status.Closed.name().equals(jc.getStatus())
                  || TranscriptionJobControl.Status.TranscriptionComplete.name().equals(jc.getStatus())) {
            jobId = jc.getTranscriptionJobId();
          }
        }
      }

      if (jobId == null) {
        throw new TranscriptionServiceException(
                "No completed or closed transcription job found in database for media package " + mpId);
      }

      // Results already saved?
      URI uri = workspace.getCollectionURI(TRANSCRIPT_COLLECTION, jobId + ".json");
      try {
        workspace.get(uri);
      } catch (Exception e) {
        // Not saved yet so call the ibm watson service to get the results
        getAndSaveJobResults(jobId);
      }
      MediaPackageElementBuilder builder = MediaPackageElementBuilderFactory.newInstance().newElementBuilder();
      return builder.elementFromURI(uri, type, new MediaPackageElementFlavor("captions", "ibm-watson-json"));
    } catch (TranscriptionDatabaseException e) {
      throw new TranscriptionServiceException("Job id not informed and could not find transcription", e);
    }
  }

  protected CloseableHttpClient makeHttpClient() {
    RequestConfig reqConfig = RequestConfig.custom().setConnectTimeout(CONNECTION_TIMEOUT)
            .setSocketTimeout(SOCKET_TIMEOUT).setConnectionRequestTimeout(CONNECTION_TIMEOUT).build();
    return HttpClients.custom().setDefaultRequestConfig(reqConfig)
            .setRetryHandler(new DefaultHttpRequestRetryHandler(3, true)).build();
  }

  protected void retryOrError(String jobId, String mpId, String errorMsg) throws TranscriptionDatabaseException {
    logger.warn(errorMsg);

    // TranscriptionJobControl.Status status
    TranscriptionJobControl jc = database.findByJob(jobId);
    String trackId = jc.getTrackId();
    // Current job is still in progress state
    int attempts = database
            .findByMediaPackageTrackAndStatus(mpId, trackId, TranscriptionJobControl.Status.Error.name(),
                    TranscriptionJobControl.Status.InProgress.name(), TranscriptionJobControl.Status.Canceled.name())
            .size();
    if (attempts < maxAttempts) {
      // Update state in database to retry
      database.updateJobControl(jobId, TranscriptionJobControl.Status.Retry.name());
      logger.info("Will retry transcription for media package {}, track {}", mpId, trackId);
    } else {
      // Update state in database to error
      database.updateJobControl(jobId, TranscriptionJobControl.Status.Error.name());
      // Send error notification email
      logger.error("{} transcription attempts exceeded maximum of {} for media package {}, track {}.", attempts,
              maxAttempts, mpId, trackId);
      sendEmail("Transcription ERROR", String.format(errorMsg, mpId, jobId));
    }
  }

  private void sendEmail(String subject, String body) {
    if (toEmailAddress == null) {
      logger.info("Skipping sending email notification. Message is {}.", body);
      return;
    }
    try {
      logger.debug("Sending e-mail notification to {}", toEmailAddress);
      smtpService.send(toEmailAddress, String.format("%s (%s)", subject, clusterName), body);
      logger.info("Sent e-mail notification to {}", toEmailAddress);
    } catch (Exception e) {
      logger.error("Could not send email: {}\n{}", subject, body, e);
    }
  }

  public boolean isCallbackAlreadyRegistered() {
    return callbackAlreadyRegistered;
  }

  @Reference
  public void setServiceRegistry(ServiceRegistry serviceRegistry) {
    this.serviceRegistry = serviceRegistry;
  }

  @Reference
  public void setSecurityService(SecurityService securityService) {
    this.securityService = securityService;
  }

  @Reference
  public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
    this.userDirectoryService = userDirectoryService;
  }

  @Reference
  public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
    this.organizationDirectoryService = organizationDirectoryService;
  }

  @Reference
  public void setSmtpService(SmtpService service) {
    this.smtpService = service;
  }

  @Reference
  public void setWorkspace(Workspace ws) {
    this.workspace = ws;
  }

  @Reference
  public void setWorkingFileRepository(WorkingFileRepository wfr) {
    this.wfr = wfr;
  }

  @Reference
  public void setDatabase(TranscriptionDatabase service) {
    this.database = service;
  }

  @Reference
  public void setAssetManager(AssetManager service) {
    this.assetManager = service;
  }

  @Reference
  public void setWorkflowService(WorkflowService service) {
    this.workflowService = service;
  }

  @Override
  protected ServiceRegistry getServiceRegistry() {
    return serviceRegistry;
  }

  @Override
  protected SecurityService getSecurityService() {
    return securityService;
  }

  @Override
  protected UserDirectoryService getUserDirectoryService() {
    return userDirectoryService;
  }

  @Override
  protected OrganizationDirectoryService getOrganizationDirectoryService() {
    return organizationDirectoryService;
  }

  // Only used by unit tests!
  void setWfUtil(Workflows wfUtil) {
    this.wfUtil = wfUtil;
  }

  class WorkflowDispatcher implements Runnable {

    /**
     * {@inheritDoc}
     *
     * @see java.lang.Thread#run()
     */
    @Override
    public void run() {
      logger.debug("WorkflowDispatcher waking up...");

      try {
        // Find jobs that are in progress and jobs that had transcription complete i.e. got the callback

        long providerId;
        TranscriptionProviderControl providerInfo = database.findIdByProvider(PROVIDER);
        if (providerInfo != null) {
          providerId = providerInfo.getId();
        } else {
          logger.warn("No provider entry for {}", PROVIDER);
          return;
        }

        List<TranscriptionJobControl> jobs = database.findByStatus(TranscriptionJobControl.Status.InProgress.name(),
                TranscriptionJobControl.Status.TranscriptionComplete.name());

        for (TranscriptionJobControl j : jobs) {

          // Don't process jobs for other services
          if (j.getProviderId() != providerId) {
            continue;
          }

          String mpId = j.getMediaPackageId();
          String jobId = j.getTranscriptionJobId();

          // If the job in progress, check if it should already have finished and we didn't get the callback for some
          // reason. This can happen if the admin server was offline when the callback came.
          if (TranscriptionJobControl.Status.InProgress.name().equals(j.getStatus())) {
            // If job should already have been completed, try to get the results. Consider a buffer factor so that we
            // don't try it too early.
            if (j.getDateCreated().getTime() + j.getTrackDuration() + completionCheckBuffer * 1000 < System
                    .currentTimeMillis()) {
              try {
                String jobStatus = getAndSaveJobResults(jobId);
                if (RecognitionJobStatus.FAILED.equals(jobStatus)) {
                  retryOrError(jobId, mpId,
                          String.format("Transcription job failed for mpId %s, jobId %s", mpId, jobId));
                  continue;
                } else if (RecognitionJobStatus.PROCESSING.equals(jobStatus)
                        || RecognitionJobStatus.WAITING.equals(jobStatus)) {
                  // Job still waiting/running so check if it should have finished more than N seconds ago
                  if (j.getDateCreated().getTime() + j.getTrackDuration()
                          + (completionCheckBuffer + maxProcessingSeconds) * 1000 < System.currentTimeMillis()) {
                    // Processing for too long, mark job as error or retry and don't check anymore
                    retryOrError(jobId, mpId, String.format(
                            "Transcription job was in waiting or processing state for too long "
                                + "(media package %s, job id %s)", mpId, jobId));
                  }
                  // else job still running, not finished
                  continue;
                }
              } catch (TranscriptionServiceException e) {
                if (e.getCode() == 404) {
                  // Job not found there, update job state to canceled
                  database.updateJobControl(jobId, TranscriptionJobControl.Status.Canceled.name());
                  // Send notification email
                  sendEmail("Transcription ERROR",
                          String.format("Transcription job was not found (media package %s, job id %s).", mpId, jobId));
                }
                continue; // Skip this one, exception was already logged
              }
            } else {
              continue; // Not time to check yet
            }
          }

          // Jobs that get here have state TranscriptionCompleted.
          try {
            // Apply workflow to attach transcripts
            Map<String, String> params = new HashMap<String, String>();
            params.put("transcriptionJobId", jobId);
            String wfId = startWorkflow(mpId, workflowDefinitionId, params);
            if (wfId == null) {
              logger.warn("Attach transcription workflow could NOT be scheduled for mp {}, watson job {}", mpId, jobId);
              continue;
            }
            // Update state in the database
            database.updateJobControl(jobId, TranscriptionJobControl.Status.Closed.name());
            logger.info("Attach transcription workflow {} scheduled for mp {}, watson job {}",
                    wfId, mpId, jobId);
          } catch (Exception e) {
            logger.warn(
                "Attach transcription workflow could NOT be scheduled for media package {}, watson job {}, {}: {}",
                mpId, jobId, e.getClass().getName(), e.getMessage()
            );
          }
        }

        if (maxAttempts > 1) {
          // Find jobs that need to be re-submitted
          jobs = database.findByStatus(TranscriptionJobControl.Status.Retry.name());
          HashMap<String, String> params = new HashMap<String, String>();
          for (TranscriptionJobControl j : jobs) {
            String mpId = j.getMediaPackageId();
            String wfId = startWorkflow(mpId, retryWfDefId, params);
            String jobId = j.getTranscriptionJobId();
            if (wfId == null) {
              logger.warn(
                  "Retry transcription workflow could NOT be scheduled for mp {}, watson job {}. "
                      + "Will try again next time.",
                  mpId, jobId);
              // Will try again next time
              continue;
            }
            logger.info("Retry transcription workflow {} scheduled for mp {}.", wfId, mpId);
            // Retry was submitted, update previously failed job state to error
            database.updateJobControl(jobId, TranscriptionJobControl.Status.Error.name());
          }
        }
      } catch (TranscriptionDatabaseException e) {
        logger.warn("Could not read/update transcription job control database.", e);
      }
    }
  }

  private String startWorkflow(String mpId, String wfDefId, Map<String, String> params) {
    DefaultOrganization defaultOrg = new DefaultOrganization();
    securityService.setOrganization(defaultOrg);
    securityService.setUser(SecurityUtil.createSystemUser(systemAccount, defaultOrg));

    // Find the episode
    final AQueryBuilder q = assetManager.createQuery();
    final AResult r = q.select(q.snapshot()).where(q.mediaPackageId(mpId).and(q.version().isLatest())).run();
    if (r.getSize() == 0) {
      // Media package not archived yet.
      logger.warn("Media package {} has not been archived yet.", mpId);
      return null;
    }

    String org = Enrichments.enrich(r).getSnapshots().stream().findFirst().get().getOrganizationId();
    Organization organization = null;
    try {
      organization = organizationDirectoryService.getOrganization(org);
      if (organization == null) {
        logger.warn("Media package {} has an unknown organization {}.", mpId, org);
        return null;
      }
    } catch (NotFoundException e) {
      logger.warn("Organization {} not found for media package {}.", org, mpId);
      return null;
    }
    securityService.setOrganization(organization);

    try {
      WorkflowDefinition wfDef = workflowService.getWorkflowDefinitionById(wfDefId);
      Workflows workflows = wfUtil != null ? wfUtil : new Workflows(assetManager, workflowService);
      Set<String> mpIds = new HashSet<String>();
      mpIds.add(mpId);
      List<WorkflowInstance> wfList = workflows
              .applyWorkflowToLatestVersion(mpIds, ConfiguredWorkflow.workflow(wfDef, params));
      return wfList.size() > 0 ? Long.toString(wfList.get(0).getId()) : null;
    } catch (NotFoundException | WorkflowDatabaseException e) {
      logger.warn("Could not get workflow definition: {}", wfDefId);
    }

    return null;
  }

  /**
   * Allow transcription service to be disabled via config Utility to verify service is active
   *
   * @return true if service is enabled, false if service should be skipped
   */
  public boolean isEnabled() {
    return enabled;
  }

  class ResultsFileCleanup implements Runnable {
    @Override
    public void run() {
      logger.info("ResultsFileCleanup waking up...");
      try {
        // Cleans up results files older than CLEANUP_RESULT_FILES_DAYS days
        wfr.cleanupOldFilesFromCollection(TRANSCRIPT_COLLECTION, cleanupResultDays);
      } catch (IOException e) {
        logger.warn("Could not cleanup old transcript results files", e);
      }
    }
  }

}