AmberscriptTranscriptionService.java
/*
* Licensed to The Apereo Foundation under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional
* information regarding copyright ownership.
*
*
* The Apereo Foundation licenses this file to you under the Educational
* Community License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License
* at:
*
* http://opensource.org/licenses/ecl2.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.opencastproject.transcription.amberscript;
import org.opencastproject.assetmanager.api.AssetManager;
import org.opencastproject.assetmanager.api.Snapshot;
import org.opencastproject.assetmanager.util.Workflows;
import org.opencastproject.job.api.AbstractJobProducer;
import org.opencastproject.job.api.Job;
import org.opencastproject.mediapackage.Catalog;
import org.opencastproject.mediapackage.MediaPackageElement;
import org.opencastproject.mediapackage.MediaPackageElementBuilder;
import org.opencastproject.mediapackage.MediaPackageElementBuilderFactory;
import org.opencastproject.mediapackage.MediaPackageElementFlavor;
import org.opencastproject.mediapackage.MediaPackageElementParser;
import org.opencastproject.mediapackage.MediaPackageElements;
import org.opencastproject.mediapackage.MediaPackageException;
import org.opencastproject.mediapackage.Track;
import org.opencastproject.metadata.dublincore.DublinCore;
import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
import org.opencastproject.metadata.dublincore.DublinCoreValue;
import org.opencastproject.metadata.dublincore.DublinCores;
import org.opencastproject.security.api.DefaultOrganization;
import org.opencastproject.security.api.Organization;
import org.opencastproject.security.api.OrganizationDirectoryService;
import org.opencastproject.security.api.SecurityService;
import org.opencastproject.security.api.UserDirectoryService;
import org.opencastproject.security.util.SecurityUtil;
import org.opencastproject.serviceregistry.api.ServiceRegistry;
import org.opencastproject.serviceregistry.api.ServiceRegistryException;
import org.opencastproject.systems.OpencastConstants;
import org.opencastproject.transcription.api.TranscriptionService;
import org.opencastproject.transcription.api.TranscriptionServiceException;
import org.opencastproject.transcription.persistence.TranscriptionDatabase;
import org.opencastproject.transcription.persistence.TranscriptionDatabaseException;
import org.opencastproject.transcription.persistence.TranscriptionJobControl;
import org.opencastproject.transcription.persistence.TranscriptionProviderControl;
import org.opencastproject.util.NotFoundException;
import org.opencastproject.util.OsgiUtil;
import org.opencastproject.workflow.api.ConfiguredWorkflow;
import org.opencastproject.workflow.api.WorkflowDefinition;
import org.opencastproject.workflow.api.WorkflowInstance;
import org.opencastproject.workflow.api.WorkflowService;
import org.opencastproject.workingfilerepository.api.WorkingFileRepository;
import org.opencastproject.workspace.api.Workspace;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.mime.HttpMultipartMode;
import org.apache.http.entity.mime.MultipartEntityBuilder;
import org.apache.http.entity.mime.content.FileBody;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Component(
immediate = true,
service = { TranscriptionService.class,AmberscriptTranscriptionService.class },
property = {
"service.description=AmberScript Transcription Service",
"provider=amberscript"
}
)
public class AmberscriptTranscriptionService extends AbstractJobProducer implements TranscriptionService {
private static final Logger logger = LoggerFactory.getLogger(AmberscriptTranscriptionService.class);
private static final String JOB_TYPE = "org.opencastproject.transcription.amberscript";
public static final String SUBMISSION_COLLECTION = "amberscript-submission";
private static final String TRANSCRIPT_COLLECTION = "amberscript-transcripts";
private static final int CONNECTION_TIMEOUT = 60000; // ms, 1 minute
private static final int SOCKET_TIMEOUT = 60000; // ms, 1 minute
private static final String BASE_URL = "https://qs.amberscript.com";
private static final String STATUS_OPEN = "OPEN";
private static final String STATUS_DONE = "DONE";
private static final String STATUS_ERROR = "ERROR";
private static final String ERROR_NO_SPEECH = "No speech found";
private static final String PROVIDER = "amberscript";
private AssetManager assetManager;
private OrganizationDirectoryService organizationDirectoryService;
private ScheduledExecutorService scheduledExecutor;
private SecurityService securityService;
private ServiceRegistry serviceRegistry;
private TranscriptionDatabase database;
private UserDirectoryService userDirectoryService;
private WorkflowService workflowService;
private WorkingFileRepository wfr;
private Workspace workspace;
// Only used by unit tests
private Workflows wfUtil;
private enum Operation {
StartTranscription
}
// Enum for configuring which metadata field shall be used
// for determining the number of speakers of an event
private enum SpeakerMetadataField {
creator, contributor, both
}
// service configuration keys
private static final String ENABLED_CONFIG = "enabled";
private static final String CLIENT_KEY = "client.key";
private static final String LANGUAGE = "language";
private static final String LANGUAGE_FROM_DUBLINCORE = "language.from.dublincore";
private static final String LANGUAGE_CODE_MAP = "language.code.map";
private static final String AMBERSCRIPTJOBTYPE = "jobtype";
private static final String WORKFLOW_CONFIG = "workflow";
private static final String DISPATCH_WORKFLOW_INTERVAL_CONFIG = "workflow.dispatch.interval";
private static final String MAX_PROCESSING_TIME_CONFIG = "max.overdue.time";
private static final String CLEANUP_RESULTS_DAYS_CONFIG = "cleanup.results.days";
private static final String SPEAKER = "speaker";
private static final String SPEAKER_FROM_DUBLINCORE = "speaker.from.dublincore";
private static final String SPEAKER_METADATA_FIELD = "speaker.metadata.field";
private static final String TRANSCRIPTIONTYPE = "transcriptiontype";
private static final String GLOSSARY = "glossary";
private static final String TRANSCRIPTIONSTYLE = "cleanread";
private static final String TARGETLANGUAGE = "targetlanguage";
// service configuration default values
private boolean enabled = false;
private String clientKey;
private String language = "en";
/** determines if the transcription language should be taken from the dublincore */
private boolean languageFromDublinCore;
private String amberscriptJobType = "direct";
private String workflowDefinitionId = "amberscript-attach-transcripts";
private long workflowDispatchIntervalSeconds = 60;
private long maxProcessingSeconds = 8 * 24 * 60 * 60; // maximum runtime for jobType perfect is 8 days
private int cleanupResultDays = 7;
private int numberOfSpeakers = 1;
private boolean speakerFromDublinCore = true;
private SpeakerMetadataField speakerMetadataField = SpeakerMetadataField.creator;
private String transcriptionType = "transcription";
private String glossary = "";
private String transcriptionStyle = "cleanread";
private String targetLanguage = "";
/**
* Contains mappings from several possible ways of writing a language name/code to the
* corresponding amberscript language code
*/
private AmberscriptLangUtil amberscriptLangUtil;
private String systemAccount;
public AmberscriptTranscriptionService() {
super(JOB_TYPE);
}
@Activate
public void activate(ComponentContext cc) {
Optional<Boolean> enabledOpt = OsgiUtil.getOptCfgAsBoolean(cc.getProperties(), ENABLED_CONFIG);
if (enabledOpt.isPresent()) {
enabled = enabledOpt.get();
}
if (!enabled) {
logger.info("Amberscript Transcription Service disabled."
+ " If you want to enable it, please update the service configuration.");
return;
}
Optional<String> clientKeyOpt = OsgiUtil.getOptCfg(cc.getProperties(), CLIENT_KEY);
if (clientKeyOpt.isPresent()) {
clientKey = clientKeyOpt.get();
} else {
logger.warn("API key was not set.");
return;
}
Optional<String> languageOpt = OsgiUtil.getOptCfg(cc.getProperties(), LANGUAGE);
if (languageOpt.isPresent()) {
language = languageOpt.get();
logger.info("Default language is set to '{}'.", language);
} else {
logger.info("Default language '{}' will be used.", language);
}
Optional<String> languageFromDublinCoreOpt = OsgiUtil.getOptCfg(cc.getProperties(), LANGUAGE_FROM_DUBLINCORE);
if (languageFromDublinCoreOpt.isPresent()) {
try {
languageFromDublinCore = Boolean.parseBoolean(languageFromDublinCoreOpt.get());
} catch (Exception e) {
logger.warn("Configuration value for '{}' is invalid, defaulting to false.", LANGUAGE_FROM_DUBLINCORE);
}
}
logger.info("Configuration value for '{}' is set to '{}'.", LANGUAGE_FROM_DUBLINCORE, languageFromDublinCore);
amberscriptLangUtil = AmberscriptLangUtil.getInstance();
int customMapEntriesCount = 0;
Optional<String> langCodeMapOpt = OsgiUtil.getOptCfg(cc.getProperties(), LANGUAGE_CODE_MAP);
if (langCodeMapOpt.isPresent()) {
try {
String langCodeMapStr = langCodeMapOpt.get();
if (langCodeMapStr != null) {
for (String mapping : langCodeMapStr.split(",")) {
String[] mapEntries = mapping.split(":");
amberscriptLangUtil.addCustomMapping(mapEntries[0], mapEntries[1]);
customMapEntriesCount += 1;
}
}
} catch (Exception e) {
logger.warn("Configuration '{}' is invalid. Using just default mapping.", LANGUAGE_CODE_MAP);
}
}
logger.info("Language code map was set. Added '{}' additional entries.", customMapEntriesCount);
Optional<String> amberscriptJobTypeOpt = OsgiUtil.getOptCfg(cc.getProperties(), AMBERSCRIPTJOBTYPE);
if (amberscriptJobTypeOpt.isPresent()) {
amberscriptJobType = amberscriptJobTypeOpt.get();
logger.info("Default Amberscript job type is set to '{}'.", amberscriptJobType);
} else {
logger.info("Default Amberscript job type '{}' will be used.", amberscriptJobType);
}
Optional<String> wfOpt = OsgiUtil.getOptCfg(cc.getProperties(), WORKFLOW_CONFIG);
if (wfOpt.isPresent()) {
workflowDefinitionId = wfOpt.get();
logger.info("Workflow is set to '{}'.", workflowDefinitionId);
} else {
logger.info("Default workflow '{}' will be used.", workflowDefinitionId);
}
Optional<String> intervalOpt = OsgiUtil.getOptCfg(cc.getProperties(), DISPATCH_WORKFLOW_INTERVAL_CONFIG);
if (intervalOpt.isPresent()) {
try {
workflowDispatchIntervalSeconds = Long.parseLong(intervalOpt.get());
} catch (NumberFormatException e) {
logger.warn("Configured '{}' is invalid. Using default.", DISPATCH_WORKFLOW_INTERVAL_CONFIG);
}
}
logger.info("Workflow dispatch interval is {} seconds.", workflowDispatchIntervalSeconds);
Optional<String> maxProcessingOpt = OsgiUtil.getOptCfg(cc.getProperties(), MAX_PROCESSING_TIME_CONFIG);
if (maxProcessingOpt.isPresent()) {
try {
maxProcessingSeconds = Long.parseLong(maxProcessingOpt.get());
} catch (NumberFormatException e) {
logger.warn("Configured '{}' is invalid. Using default.", MAX_PROCESSING_TIME_CONFIG);
}
}
logger.info("Maximum processing time for transcription job is {} seconds.", maxProcessingSeconds);
Optional<String> cleanupOpt = OsgiUtil.getOptCfg(cc.getProperties(), CLEANUP_RESULTS_DAYS_CONFIG);
if (cleanupOpt.isPresent()) {
try {
cleanupResultDays = Integer.parseInt(cleanupOpt.get());
} catch (NumberFormatException e) {
logger.warn("Configured '{}' is invalid. Using default.", CLEANUP_RESULTS_DAYS_CONFIG);
}
}
logger.info("Cleanup result files after {} days.", cleanupResultDays);
Optional<String> speakerOpt = OsgiUtil.getOptCfg(cc.getProperties(), SPEAKER);
if (speakerOpt.isPresent()) {
try {
numberOfSpeakers = Integer.parseInt(speakerOpt.get());
} catch (NumberFormatException e) {
logger.warn("Configured '{}' is invalid. Using default.", SPEAKER);
}
}
logger.info("Default number of speakers is set to '{}'.", numberOfSpeakers);
Optional<String> speakerFromDublinCoreOpt = OsgiUtil.getOptCfg(cc.getProperties(), SPEAKER_FROM_DUBLINCORE);
if (speakerFromDublinCoreOpt.isPresent()) {
try {
speakerFromDublinCore = Boolean.parseBoolean(speakerFromDublinCoreOpt.get());
} catch (Exception e) {
logger.warn("Configuration value for '{}' is invalid, defaulting to true.", SPEAKER_FROM_DUBLINCORE);
}
}
logger.info("Configuration value for '{}' is set to '{}'.", SPEAKER_FROM_DUBLINCORE, speakerFromDublinCore);
Optional<String> speakerMetadataFieldOpt = OsgiUtil.getOptCfg(cc.getProperties(), SPEAKER_METADATA_FIELD);
if (speakerMetadataFieldOpt.isPresent()) {
try {
speakerMetadataField = SpeakerMetadataField.valueOf(speakerMetadataFieldOpt.get());
} catch (IllegalArgumentException e) {
logger.warn("Value '{}' is invalid for configuration '{}'. Using default: '{}'.",
speakerMetadataFieldOpt.get(), SPEAKER_METADATA_FIELD, speakerMetadataField);
}
}
logger.info("Default metadata field for calculating the amount of speakers is set to '{}'.", speakerMetadataField);
Optional<String> transcriptionTypeOpt = OsgiUtil.getOptCfg(cc.getProperties(), TRANSCRIPTIONTYPE);
if (transcriptionTypeOpt.isPresent()) {
if (List.of("transcription", "captions", "translatedSubtitles").contains(transcriptionType)) {
transcriptionType = transcriptionTypeOpt.get();
logger.info("Default transcription type is set to '{}'.", transcriptionType);
} else {
logger.warn("Value '{}' is invalid for configuration '{}'. Using default: '{}'.",
transcriptionTypeOpt.get(), TRANSCRIPTIONTYPE, transcriptionType);
}
} else {
logger.info("Default transcription type '{}' will be used.", transcriptionType);
}
Optional<String> glossaryOpt = OsgiUtil.getOptCfg(cc.getProperties(), GLOSSARY);
if (glossaryOpt.isPresent()) {
glossary = glossaryOpt.get();
logger.info("Default glossary is set to '{}'.", glossary);
} else {
logger.info("No glossary will be used by default");
}
Optional<String> transcriptionStyleOpt = OsgiUtil.getOptCfg(cc.getProperties(), TRANSCRIPTIONSTYLE);
if (transcriptionStyleOpt.isPresent()) {
if (List.of("cleanread", "verbatim").contains(transcriptionStyle)) {
transcriptionStyle = transcriptionStyleOpt.get();
logger.info("Default transcription style is set to '{}'.", transcriptionStyle);
} else {
logger.warn("Value '{}' is invalid for configuration '{}'. Using default: '{}'.",
transcriptionStyleOpt.get(), TRANSCRIPTIONSTYLE, transcriptionStyle);
}
} else {
logger.info("Default transcription style '{}' will be used.", transcriptionStyle);
}
Optional<String> targetLanguageOpt = OsgiUtil.getOptCfg(cc.getProperties(), TARGETLANGUAGE);
if (targetLanguageOpt.isPresent()) {
targetLanguage = targetLanguageOpt.get();
logger.info("Default target language is set to '{}'.", targetLanguage);
} else {
logger.info("Transcriptions won't be translated");
}
systemAccount = OsgiUtil.getContextProperty(cc, OpencastConstants.DIGEST_USER_PROPERTY);
scheduledExecutor = Executors.newScheduledThreadPool(2);
scheduledExecutor.scheduleWithFixedDelay(new WorkflowDispatcher(), 120, workflowDispatchIntervalSeconds,
TimeUnit.SECONDS);
scheduledExecutor.scheduleWithFixedDelay(new ResultsFileCleanup(), 1, 1, TimeUnit.DAYS);
logger.info("Activated.");
}
@Deactivate
public void deactivate() {
if (scheduledExecutor != null) {
scheduledExecutor.shutdown();
}
}
@Override
public Job startTranscription(String mpId, Track track) throws TranscriptionServiceException {
throw new UnsupportedOperationException("Not supported.");
}
@Override
public Job startTranscription(String mpId, Track track, String... args) throws TranscriptionServiceException {
if (!enabled) {
throw new TranscriptionServiceException("AmberScript Transcription Service disabled."
+ " If you want to enable it, please update the service configuration.");
}
String language = null;
if (languageFromDublinCore) {
for (Catalog catalog : track.getMediaPackage().getCatalogs(MediaPackageElements.EPISODE)) {
try (InputStream in = workspace.read(catalog.getURI())) {
DublinCoreCatalog dublinCatalog = DublinCores.read(in);
String dublinCoreLang = dublinCatalog.getFirst(DublinCore.PROPERTY_LANGUAGE);
if (dublinCoreLang != null) {
language = amberscriptLangUtil.getLanguageCodeOrNull(dublinCoreLang);
}
if (language != null) {
break;
}
} catch (IOException | NotFoundException e) {
logger.error(String.format("Unable to load dublin core catalog for event '%s'",
track.getMediaPackage().getIdentifier()), e);
}
}
}
if (language == null) {
if (args.length > 0 && StringUtils.isNotBlank(args[0])) {
language = args[0];
} else {
language = getLanguage();
}
}
String jobType;
if (args.length > 1 && StringUtils.isNotBlank(args[1])) {
jobType = args[1];
} else {
jobType = getAmberscriptJobType();
}
int numberOfSpeakers = 0;
if (speakerFromDublinCore) {
Set<String> speakers = new HashSet<>();
for (Catalog catalog : track.getMediaPackage().getCatalogs(MediaPackageElements.EPISODE)) {
try (InputStream in = workspace.read(catalog.getURI())) {
DublinCoreCatalog dublinCatalog = DublinCores.read(in);
if (speakerMetadataField.equals(SpeakerMetadataField.creator)
|| speakerMetadataField.equals(SpeakerMetadataField.both)) {
dublinCatalog.get(DublinCore.PROPERTY_CREATOR).stream()
.map(DublinCoreValue::getValue).forEach(speakers::add);
}
if (speakerMetadataField.equals(SpeakerMetadataField.contributor)
|| speakerMetadataField.equals(SpeakerMetadataField.both)) {
dublinCatalog.get(DublinCore.PROPERTY_CONTRIBUTOR).stream()
.map(DublinCoreValue::getValue).forEach(speakers::add);
}
} catch (IOException | NotFoundException e) {
logger.error("Unable to load dublin core catalog for event '{}'",
track.getMediaPackage().getIdentifier(), e);
}
}
if (speakers.size() >= 1) {
numberOfSpeakers = speakers.size();
}
}
if (numberOfSpeakers == 0) {
if (args.length > 2 && StringUtils.isNotBlank(args[2])) {
numberOfSpeakers = Integer.parseInt(args[2]);
} else {
numberOfSpeakers = getNumberOfSpeakers();
}
}
String transcriptionType;
if (args.length > 3 && StringUtils.isNotBlank(args[3])) {
transcriptionType = args[3];
} else {
transcriptionType = getTranscriptionType();
}
String glossary;
if (args.length > 4 && args[4] != null) {
glossary = args[4];
} else {
glossary = getGlossary();
}
String transcriptionStyle;
if (args.length > 5 && StringUtils.isNotBlank(args[5])) {
transcriptionStyle = args[5];
} else {
transcriptionStyle = getTranscriptionStyle();
}
String targetLanguage;
if (args.length > 6 && args[6] != null) {
targetLanguage = args[6];
} else {
targetLanguage = getTargetLanguage();
}
logger.info("New transcription job for mpId '{}' language '{}' job type '{}' speakers '{}' transcription type '{}'"
+ "glossary '{}'.", mpId, language, jobType, numberOfSpeakers, transcriptionType, glossary);
try {
return serviceRegistry.createJob(JOB_TYPE, Operation.StartTranscription.name(), Arrays.asList(
mpId, MediaPackageElementParser.getAsXml(track), language, jobType, Integer.toString(numberOfSpeakers),
transcriptionType, glossary, transcriptionStyle, targetLanguage));
} catch (ServiceRegistryException e) {
throw new TranscriptionServiceException("Unable to create a job", e);
} catch (MediaPackageException e) {
throw new TranscriptionServiceException("Invalid track '" + track.toString() + "'", e);
}
}
@Override
public void transcriptionDone(String mpId, Object results) { }
private void transcriptionDone(String mpId, String jobId) {
try {
logger.info("Transcription done for mpId '{}'.", mpId);
if (getAndSaveJobResult(jobId)) {
database.updateJobControl(jobId, TranscriptionJobControl.Status.TranscriptionComplete.name());
} else {
logger.debug("Unable to get and save the transcription result for mpId '{}'.", mpId);
}
} catch (IOException | TranscriptionServiceException e) {
logger.warn("Could not save transcription results file for mpId '{}': {}", mpId, e.toString());
} catch (TranscriptionDatabaseException e) {
logger.warn("Transcription results file were saved but state in db not updated for mpId '{}': ", mpId, e);
}
}
@Override
public void transcriptionError(String mpId, Object obj) throws TranscriptionServiceException {
JSONObject jsonObj = null;
String jobId = null;
try {
jsonObj = (JSONObject) obj;
jobId = (String) jsonObj.get("name");
// Update state in database
database.updateJobControl(jobId, TranscriptionJobControl.Status.Error.name());
TranscriptionJobControl jobControl = database.findByJob(jobId);
logger.warn(String.format("Error received for media package %s, job id %s",
jobControl.getMediaPackageId(), jobId));
// Send notification email
} catch (TranscriptionDatabaseException e) {
logger.warn("Transcription error. State in db could not be updated to error for mpId {}, jobId {}", mpId, jobId);
throw new TranscriptionServiceException("Could not update transcription job control db", e);
}
}
@Override
public Map<String, Object> getReturnValues(String mpId, String jobId) throws TranscriptionServiceException {
throw new TranscriptionServiceException("Method not implemented");
}
@Override
public String getLanguage() {
return language;
}
public String getAmberscriptJobType() {
return amberscriptJobType;
}
public int getNumberOfSpeakers() {
return numberOfSpeakers;
}
public String getTranscriptionType() {
return transcriptionType;
}
public String getGlossary() {
return glossary;
}
public String getTranscriptionStyle() {
return transcriptionStyle;
}
public String getTargetLanguage() {
return targetLanguage;
}
// Called by workflow
@Override
protected String process(Job job) throws Exception {
Operation op = null;
String operation = job.getOperation();
List<String> arguments = job.getArguments();
String result = "";
op = Operation.valueOf(operation);
switch (op) {
case StartTranscription:
String mpId = arguments.get(0);
Track track = (Track) MediaPackageElementParser.getFromXml(arguments.get(1));
String languageCode = arguments.get(2);
String jobType = arguments.get(3);
String numberOfSpeakers = arguments.get(4);
String transcriptionType = arguments.get(5);
String glossary = arguments.get(6);
String transcriptionStyle = arguments.get(7);
String targetLanguage = arguments.get(8);
createRecognitionsJob(mpId, track, languageCode, jobType, numberOfSpeakers, transcriptionType, glossary,
transcriptionStyle, targetLanguage);
break;
default:
throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
}
return result;
}
void createRecognitionsJob(String mpId, Track track, String languageCode, String jobType, String numberOfSpeakers,
String transcriptionType, String glossary, String transcriptionStyle, String targetLanguage)
throws TranscriptionServiceException {
// Timeout 3 hours (needs to include the time for the remote service to
// fetch the media URL before sending final response)
CloseableHttpClient httpClient = makeHttpClient(3 * 3600 * 1000);
CloseableHttpResponse response = null;
String submitUrl = BASE_URL + "/jobs/upload-media"
+ "?apiKey=" + clientKey
+ "&language=" + languageCode
+ "&jobType=" + jobType
+ "&numberOfSpeakers=" + numberOfSpeakers
+ "&transcriptionType=" + transcriptionType
+ "&transcriptionStyle=" + transcriptionStyle;
if (StringUtils.isNotBlank(glossary)) {
submitUrl += "&glossary=" + glossary;
}
if (StringUtils.isNotBlank(targetLanguage)) {
submitUrl += "&targetLanguage=" + targetLanguage;
}
try {
FileBody fileBody = new FileBody(workspace.get(track.getURI()), ContentType.DEFAULT_BINARY);
MultipartEntityBuilder builder = MultipartEntityBuilder.create();
builder.setMode(HttpMultipartMode.BROWSER_COMPATIBLE);
builder.addPart("file", fileBody);
HttpEntity multipartEntity = builder.build();
HttpPost httpPost = new HttpPost(submitUrl);
httpPost.setEntity(multipartEntity);
response = httpClient.execute(httpPost);
int code = response.getStatusLine().getStatusCode();
HttpEntity entity = response.getEntity();
String jsonString = EntityUtils.toString(response.getEntity());
JSONParser jsonParser = new JSONParser();
JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
logger.debug("Submitting new transcription job: {}" + System.lineSeparator()
+ "Response: {}", removePrivateInfo(submitUrl), jsonString);
JSONObject result = (JSONObject) jsonObject.get("jobStatus");
String jobId = (String) result.get("jobId");
switch (code) {
case HttpStatus.SC_OK: // 200
logger.info("mp {} has been submitted to AmberScript service with jobId {}.", mpId, jobId);
database.storeJobControl(mpId, track.getIdentifier(), jobId, TranscriptionJobControl.Status.InProgress.name(),
track.getDuration() == null ? 0 : track.getDuration().longValue(), new Date(), PROVIDER);
EntityUtils.consume(entity);
return;
default:
String error = (String) result.get("error");
String message = (String) result.get("message");
String msg = String.format("Unable to submit job: API returned %s - %s: %s", code, error, message);
logger.warn(msg);
throw new TranscriptionServiceException(msg);
}
} catch (Exception e) {
logger.warn("Exception when calling the captions endpoint", e);
throw new TranscriptionServiceException("Exception when calling the captions endpoint", e);
} finally {
try {
httpClient.close();
if (response != null) {
response.close();
}
} catch (IOException e) {
}
}
}
boolean checkJobResults(String jobId) throws TranscriptionServiceException {
String mpId = "unknown";
CloseableHttpClient httpClient = makeHttpClient();
CloseableHttpResponse response = null;
String checkUrl = BASE_URL + "/jobs/status?jobId=" + jobId + "&apiKey=" + clientKey;
try {
HttpGet httpGet = new HttpGet(checkUrl);
response = httpClient.execute(httpGet);
int code = response.getStatusLine().getStatusCode();
HttpEntity entity = response.getEntity();
String jsonString = EntityUtils.toString(entity);
EntityUtils.consume(entity);
logger.debug("AmberScript API call was '{}'." + System.lineSeparator() + "Response: {}",
removePrivateInfo(checkUrl), jsonString);
JSONParser jsonParser = new JSONParser();
JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonString);
switch (code) {
case HttpStatus.SC_OK:
JSONObject result = (JSONObject) jsonObject.get("jobStatus");
String status = (String) result.get("status");
switch (status) {
case STATUS_OPEN:
logger.debug("Captions job '{}' has not finished yet.", jobId);
return false;
case STATUS_ERROR:
var errorMsg = (String) result.get("errorMsg");
throw new TranscriptionServiceException(
String.format("Captions job '%s' failed: %s", jobId, errorMsg),
code,
ERROR_NO_SPEECH.equals(errorMsg));
case STATUS_DONE:
logger.info("Captions job '{}' has finished.", jobId);
TranscriptionJobControl jc = database.findByJob(jobId);
if (jc != null) {
mpId = jc.getMediaPackageId();
}
transcriptionDone(mpId, jobId);
return true;
default:
return false; // only here to obey checkstyle
}
default:
String error = (String) jsonObject.get("error");
String errorMessage = (String) jsonObject.get("errorMessage");
logger.warn("Error while checking status: {}."
+ System.lineSeparator() + "{}: {}", code, error, errorMessage);
throw new TranscriptionServiceException(
String.format("Captions job '%s' failed: Return Code %d", jobId, code), code);
}
} catch (TranscriptionDatabaseException | IOException | ParseException e) {
logger.warn("Error while checking status: {}", e.toString());
} finally {
try {
httpClient.close();
if (response != null) {
response.close();
}
} catch (IOException e) {
}
}
return false;
}
private boolean getAndSaveJobResult(String jobId) throws TranscriptionServiceException, IOException {
CloseableHttpClient httpClient = makeHttpClient();
CloseableHttpResponse response = null;
String transcriptUrl = BASE_URL + "/jobs/export?format=srt&jobId=" + jobId + "&apiKey=" + clientKey;
boolean done = false;
try {
HttpGet httpGet = new HttpGet(transcriptUrl);
response = httpClient.execute(httpGet);
int code = response.getStatusLine().getStatusCode();
logger.debug("AmberScript API {} http response {}", removePrivateInfo(transcriptUrl), code);
switch (code) {
case HttpStatus.SC_OK: // 200
HttpEntity entity = response.getEntity();
logger.info("Retrieved details for transcription with jobid: '{}'", jobId);
// Save the result subrip (srt) file into a collection
workspace.putInCollection(TRANSCRIPT_COLLECTION, jobId + ".srt", entity.getContent());
done = true;
break;
default:
logger.warn("Error retrieving details for transcription with jobid: '{}', return status: {}.", jobId, code);
break;
}
} catch (Exception e) {
throw new TranscriptionServiceException(String.format(
"Exception when calling the transcription service for jobid: %s", jobId), e);
} finally {
try {
httpClient.close();
if (response != null) {
response.close();
}
} catch (IOException e) {
}
}
return done;
}
@Override
// Called by the attach workflow operation
public MediaPackageElement getGeneratedTranscription(String mpId, String jobId, MediaPackageElement.Type type)
throws TranscriptionServiceException {
try {
// If jobId is unknown, look for all jobs associated to that mpId
if (jobId == null || "null".equals(jobId)) {
jobId = null;
for (TranscriptionJobControl jc : database.findByMediaPackage(mpId)) {
if (TranscriptionJobControl.Status.Closed.name().equals(jc.getStatus())
|| TranscriptionJobControl.Status.TranscriptionComplete.name().equals(jc.getStatus())) {
jobId = jc.getTranscriptionJobId();
}
}
}
if (jobId == null) {
throw new TranscriptionServiceException(
"No completed or closed transcription job found in database for media package " + mpId);
}
// Results already saved?
URI uri = workspace.getCollectionURI(TRANSCRIPT_COLLECTION, jobId + ".srt");
logger.info("Looking for transcript at URI: {}", uri);
try {
workspace.get(uri);
logger.info("Found captions at URI: {}", uri);
} catch (Exception e) {
logger.info("Results not saved: getting from service for jobId {}", jobId);
// Not saved yet so call the transcription service to get the results
checkJobResults(jobId);
}
MediaPackageElementBuilder builder = MediaPackageElementBuilderFactory.newInstance().newElementBuilder();
logger.debug("Returning MPE with results file URI: {}", uri);
return builder.elementFromURI(uri, type, new MediaPackageElementFlavor("captions", "srt"));
} catch (TranscriptionDatabaseException e) {
throw new TranscriptionServiceException("Job id not informed and could not find transcription", e);
}
}
/**
* Get mediapackage transcription status
*
* @param mpId, mediapackage id
* @return transcription status
* @throws TranscriptionServiceException
*/
public String getTranscriptionStatus(String mpId) throws TranscriptionServiceException {
try {
for (TranscriptionJobControl jc : database.findByMediaPackage(mpId)) {
return jc.getStatus();
}
} catch (TranscriptionDatabaseException e) {
throw new TranscriptionServiceException("Mediapackage id transcription status unknown", e);
}
return "Unknown";
}
/**
* Creates a closable http client with default configuration.
*
* @return closable http client
*/
protected CloseableHttpClient makeHttpClient() {
return makeHttpClient(SOCKET_TIMEOUT);
}
/**
* Creates a closable http client.
*
* @param socketTimeout http socket timeout value in milliseconds
*/
protected CloseableHttpClient makeHttpClient(int socketTimeout) {
RequestConfig reqConfig = RequestConfig.custom()
.setConnectTimeout(AmberscriptTranscriptionService.CONNECTION_TIMEOUT)
.setSocketTimeout(socketTimeout)
.setConnectionRequestTimeout(AmberscriptTranscriptionService.CONNECTION_TIMEOUT)
.build();
CloseableHttpClient httpClient = HttpClientBuilder.create()
.useSystemProperties()
.setDefaultRequestConfig(reqConfig)
.build();
return httpClient;
}
// Called when a transcription job has been submitted
protected void deleteStorageFile(String filename) {
try {
logger.debug("Removing {} from collection {}.", filename, SUBMISSION_COLLECTION);
wfr.deleteFromCollection(SUBMISSION_COLLECTION, filename, false);
} catch (IOException e) {
logger.warn("Unable to remove submission file {} from collection {}", filename, SUBMISSION_COLLECTION);
}
}
@Reference
public void setServiceRegistry(ServiceRegistry serviceRegistry) {
this.serviceRegistry = serviceRegistry;
}
@Reference
public void setSecurityService(SecurityService securityService) {
this.securityService = securityService;
}
@Reference
public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
this.userDirectoryService = userDirectoryService;
}
@Reference
public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
this.organizationDirectoryService = organizationDirectoryService;
}
@Reference
public void setWorkspace(Workspace ws) {
this.workspace = ws;
}
@Reference
public void setWorkingFileRepository(WorkingFileRepository wfr) {
this.wfr = wfr;
}
@Reference
public void setDatabase(TranscriptionDatabase service) {
this.database = service;
}
@Reference
public void setAssetManager(AssetManager service) {
this.assetManager = service;
}
@Reference
public void setWorkflowService(WorkflowService service) {
this.workflowService = service;
}
@Override
protected ServiceRegistry getServiceRegistry() {
return serviceRegistry;
}
@Override
protected SecurityService getSecurityService() {
return securityService;
}
@Override
protected UserDirectoryService getUserDirectoryService() {
return userDirectoryService;
}
@Override
protected OrganizationDirectoryService getOrganizationDirectoryService() {
return organizationDirectoryService;
}
// Only used by unit tests!
void setWfUtil(Workflows wfUtil) {
this.wfUtil = wfUtil;
}
class WorkflowDispatcher implements Runnable {
/**
* {@inheritDoc}
*
* @see java.lang.Thread#run()
*/
@Override
public void run() {
logger.debug("WorkflowDispatcher waking up...");
try {
// Find jobs that are in progress and jobs that had transcription complete
long providerId;
TranscriptionProviderControl providerInfo = database.findIdByProvider(PROVIDER);
if (providerInfo != null) {
providerId = providerInfo.getId();
} else {
logger.debug("No jobs yet for provider {}.", PROVIDER);
return;
}
List<TranscriptionJobControl> jobs = database.findByStatus(TranscriptionJobControl.Status.InProgress.name(),
TranscriptionJobControl.Status.TranscriptionComplete.name());
for (TranscriptionJobControl j : jobs) {
// Don't process jobs for other services
if (j.getProviderId() != providerId) {
continue;
}
String mpId = j.getMediaPackageId();
String jobId = j.getTranscriptionJobId();
// If the job in progress, check if it should already have finished.
if (TranscriptionJobControl.Status.InProgress.name().equals(j.getStatus())) {
// If job should already have been completed, try to get the results.
if (j.getDateExpected().getTime() < System.currentTimeMillis()) {
try {
if (!checkJobResults(jobId)) {
// Job still running, not finished, so check if it should have finished more than N seconds ago
if (j.getDateExpected().getTime() + maxProcessingSeconds * 1000 < System.currentTimeMillis()) {
// Processing for too long, mark job as canceled and don't check anymore
database.updateJobControl(jobId, TranscriptionJobControl.Status.Canceled.name());
}
// else Job still running, not finished
continue;
}
} catch (TranscriptionServiceException e) {
try {
database.updateJobControl(jobId, TranscriptionJobControl.Status.Canceled.name());
continue;
} catch (TranscriptionDatabaseException ex) {
logger.warn("Could not cancel job '{}'.", jobId);
}
}
} else {
continue; // Not time to check yet
}
}
// Jobs that get here have state TranscriptionCompleted
try {
DefaultOrganization defaultOrg = new DefaultOrganization();
securityService.setOrganization(defaultOrg);
securityService.setUser(SecurityUtil.createSystemUser(systemAccount, defaultOrg));
// Find the episode
Optional<Snapshot> snapshot = assetManager.getLatestSnapshot(mpId);
if (snapshot.isEmpty()) {
logger.warn("Media package {} no longer exists in the asset manager. It was likely deleted. "
+ "Dropping the generated transcription.", mpId);
database.updateJobControl(jobId, TranscriptionJobControl.Status.Error.name());
continue;
}
String org = snapshot.get().getOrganizationId();
Organization organization = organizationDirectoryService.getOrganization(org);
if (organization == null) {
logger.warn("Media package {} has an unknown organization {}. Skipped.", mpId, org);
continue;
}
securityService.setOrganization(organization);
// Build workflow
Map<String, String> params = new HashMap<String, String>();
params.put("transcriptionJobId", jobId);
WorkflowDefinition wfDef = workflowService.getWorkflowDefinitionById(workflowDefinitionId);
// Apply workflow
// wfUtil is only used by unit tests
Workflows workflows = wfUtil != null ? wfUtil : new Workflows(assetManager, workflowService);
Set<String> mpIds = new HashSet<String>();
mpIds.add(mpId);
List<WorkflowInstance> wfList = workflows
.applyWorkflowToLatestVersion(mpIds, ConfiguredWorkflow.workflow(wfDef, params));
String wfId = wfList.size() > 0 ? Long.toString(wfList.get(0).getId()) : "Unknown";
// Update state in the database
database.updateJobControl(jobId, TranscriptionJobControl.Status.Closed.name());
logger.info("Attach transcription workflow {} scheduled for mp {}, transcription service job {}",
wfId, mpId, jobId);
} catch (Exception e) {
logger.warn("Attach transcription workflow could NOT be scheduled for mp {}, amberscript job {}, {}: {}",
mpId, jobId, e.getClass().getName(), e.getMessage());
}
}
} catch (TranscriptionDatabaseException e) {
logger.warn("Could not read transcription job control database: {}", e.getMessage());
}
}
}
class ResultsFileCleanup implements Runnable {
@Override
public void run() {
logger.info("ResultsFileCleanup waking up...");
try {
// Cleans up results files older than CLEANUP_RESULT_FILES_DAYS days
wfr.cleanupOldFilesFromCollection(TRANSCRIPT_COLLECTION, cleanupResultDays);
wfr.cleanupOldFilesFromCollection(SUBMISSION_COLLECTION, cleanupResultDays);
} catch (IOException e) {
logger.warn("Could not cleanup old submission and transcript results files", e);
}
}
}
private String removePrivateInfo(String unsafeString) {
String safeString = unsafeString.replace(clientKey, "__api-key-was-hidden__");
return safeString;
}
}