WorkflowDefinitionScanner.java

/*
 * Licensed to The Apereo Foundation under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional
 * information regarding copyright ownership.
 *
 *
 * The Apereo Foundation licenses this file to you under the Educational
 * Community License, Version 2.0 (the "License"); you may not use this file
 * except in compliance with the License. You may obtain a copy of the License
 * at:
 *
 *   http://opensource.org/licenses/ecl2.txt
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 * License for the specific language governing permissions and limitations under
 * the License.
 *
 */

package org.opencastproject.workflow.impl;

import static org.opencastproject.security.api.SecurityConstants.GLOBAL_ADMIN_ROLE;
import static org.opencastproject.util.ReadinessIndicator.ARTIFACT;

import org.opencastproject.security.api.Organization;
import org.opencastproject.security.api.OrganizationDirectoryListener;
import org.opencastproject.security.api.OrganizationDirectoryService;
import org.opencastproject.security.api.User;
import org.opencastproject.util.ReadinessIndicator;
import org.opencastproject.workflow.api.WorkflowDefinition;
import org.opencastproject.workflow.api.WorkflowIdentifier;
import org.opencastproject.workflow.api.WorkflowStateMapping;
import org.opencastproject.workflow.api.XmlWorkflowParser;
import org.opencastproject.workflow.api.YamlWorkflowParser;

import org.apache.felix.fileinstall.ArtifactInstaller;
import org.osgi.framework.BundleContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;

/**
 * Loads, unloads, and reloads {@link WorkflowDefinition}s from "*workflow.xml" files in any of fileinstall's watch
 * directories.
 */
@Component(
    property = {
      "service.description=Workflow Definition Scanner"
    },
    immediate = true,
    service = { ArtifactInstaller.class, WorkflowDefinitionScanner.class }
)
public class WorkflowDefinitionScanner implements ArtifactInstaller, OrganizationDirectoryListener {
  private static final Logger logger = LoggerFactory.getLogger(WorkflowDefinitionScanner.class);

  /** An internal collection of workflows that we have installed */
  protected Map<WorkflowIdentifier, WorkflowDefinition> installedWorkflows = new HashMap<>();

  /** All workflow state mappings which are configured for the workflow defintions */
  protected Map<String, Set<WorkflowStateMapping>> workflowStateMappings = new HashMap<>();

  /** An internal collection of artifact id, bind the workflow definition files and their id */
  protected Map<File, WorkflowIdentifier> artifactIds = new HashMap<>();

  /** List of artifact parsed with error */
  protected final List<File> artifactsWithError = new ArrayList<>();

  /** OSGi bundle context */
  private BundleContext bundleCtx = null;

  /** Tag to define if the workflows definition has already been loaded */
  private boolean isWFSinitialized = false;

  private OrganizationDirectoryService organizationDirectoryService;

  private WorkflowFilenameFilter workflowFilenameFilter;

  @Reference
  public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
    this.organizationDirectoryService = organizationDirectoryService;
  }

  /**
   * OSGi callback on component activation. private boolean initialized = true;
   *
   * /** OSGi callback on component activation.
   *
   * @param ctx
   *          the bundle context
   */
  @Activate
  void activate(BundleContext ctx) {
    this.bundleCtx = ctx;
    organizationDirectoryService.addOrganizationDirectoryListener(this);
    this.workflowFilenameFilter = new WorkflowFilenameFilter("workflows", ".*\\.(xml|yaml|yml)$");
  }

  /**
   * {@inheritDoc}
   *
   * @see org.apache.felix.fileinstall.ArtifactInstaller#install(java.io.File)
   */
  public void install(File artifact) {
    synchronized (artifactsWithError) {
      WorkflowDefinition def = parseWorkflowDefinitionFile(artifact);
      if (def == null) {
        logger.warn("Unable to install workflow from '{}'", artifact.getName());
        artifactsWithError.add(artifact);
      } else {
        installWorkflowDefinition(artifact, def);
      }

      // Determine the number of available profiles
      String[] filesInDirectory = artifact.getParentFile().list(workflowFilenameFilter);
      if (filesInDirectory == null) {
        throw new RuntimeException("error retrieving files from directory \"" + artifact.getParentFile() + "\"");
      }

      // Once all profiles have been loaded, announce readiness
      if ((filesInDirectory.length - artifactsWithError.size()) == artifactIds.size() && !isWFSinitialized) {
        logger.info("{} Workflow definitions loaded, activating Workflow service",
            filesInDirectory.length - artifactsWithError.size());
        Dictionary<String, String> properties = new Hashtable<>();
        properties.put(ARTIFACT, "workflowdefinition");
        logger.debug("Indicating readiness of workflow definitions");
        bundleCtx.registerService(ReadinessIndicator.class.getName(), new ReadinessIndicator(), properties);
        isWFSinitialized = true;
      }
    }
  }

  private void installWorkflowDefinition(File artifact, WorkflowDefinition def) {
    synchronized (artifactsWithError) {
      // Is there a workflow with the exact same ID, but a different file name? Then ignore.
      final WorkflowIdentifier workflowIdentifier = new WorkflowIdentifier(def.getId(), def.getOrganization());
      for (Map.Entry<File, WorkflowIdentifier> fileWithIdentifier : artifactIds.entrySet()) {
        if (fileWithIdentifier.getValue().equals(workflowIdentifier) && !fileWithIdentifier.getKey().equals(artifact)) {
          logger.warn("Workflow with identifier '{}' already registered in file '{}', ignoring", workflowIdentifier,
              fileWithIdentifier.getKey());
          artifactsWithError.add(artifact);
          return;
        }
      }

      logger.debug("Installing workflow from file '{}'", artifact.getName());
      artifactsWithError.remove(artifact);
      artifactIds.put(artifact, workflowIdentifier);
      putWorkflowDefinition(workflowIdentifier, def);
      workflowStateMappings.put(def.getId(), def.getStateMappings());

      logger.info("Workflow definition '{}' from file '{}' installed", workflowIdentifier, artifact.getName());
    }
  }

  /**
   * {@inheritDoc}
   *
   * @see org.apache.felix.fileinstall.ArtifactInstaller#uninstall(java.io.File)
   */
  public void uninstall(File artifact) {
    // Since the artifact is gone, we can't open it to read its ID. So we look in the local map.
    WorkflowIdentifier identifier = artifactIds.remove(artifact);
    if (identifier != null) {
      removeWorkflowDefinition(identifier);
      logger.info("Uninstalling workflow definition '{}' from file '{}'", identifier, artifact.getName());
    }
  }

  /**
   * {@inheritDoc}
   *
   * @see org.apache.felix.fileinstall.ArtifactInstaller#update(java.io.File)
   */
  public void update(File artifact) {
    WorkflowDefinition def = parseWorkflowDefinitionFile(artifact);

    if (def != null) {
      uninstall(artifact);
      installWorkflowDefinition(artifact, def);
    }
  }

  private boolean organizationExists(final String organization) {
    return organizationDirectoryService.getOrganizations().stream().anyMatch(org -> org.getId().equals(organization));
  }

  @Override
  public void organizationRegistered(Organization organization) {
    synchronized (artifactsWithError) {
      logger.info("New organization '{}' registered: check for previously failed workflow definitions",
          organization.getId());
      ArrayList<File> artifactsWithErrorCopy = new ArrayList<>(artifactsWithError);
      for (File artifact : artifactsWithErrorCopy) {
        WorkflowDefinition def = parseWorkflowDefinitionFile(artifact);
        if (def != null && organization.getId().equals(def.getOrganization())) {
          installWorkflowDefinition(artifact, def);
        }
      }
    }
  }

  @Override
  public void organizationUnregistered(Organization organization) {
    // ignore
  }

  @Override
  public void organizationUpdated(Organization organization) {
    // ignore
  }

  /**
   * Parse the given workflow definition file and return the related workflow definition
   *
   * @param artifact
   *          The workflow definition file to parse
   * @return the workflow definition if the given contained a valid one, or null if the file can not be parsed.
   */
  public WorkflowDefinition parseWorkflowDefinitionFile(File artifact) {
    try (InputStream stream = new FileInputStream(artifact)) {
      WorkflowDefinition def;
      if (artifact.getName().endsWith(".yml") || artifact.getName().endsWith(".yaml")) {
        def = YamlWorkflowParser.parseWorkflowDefinition(stream);
      } else {
        def = XmlWorkflowParser.parseWorkflowDefinition(stream);
      }
      if (def.getOperations().size() == 0) {
        logger.warn("Workflow '{}' has no operations", def.getId());
      }
      if (def.getOrganization() != null && !organizationExists(def.getOrganization())) {
        throw new RuntimeException("invalid organization '" + def.getOrganization() + "'");
      }
      return def;
    } catch (Exception e) {
      logger.warn("Unable to parse workflow from file '{}', {}", artifact.getName(), e.getMessage());
      return null;
    }
  }

  private boolean userCanAccessWorkflow(final User user, final WorkflowIdentifier wfi) {
    final WorkflowDefinition wd = installedWorkflows.get(wfi);
    return userCanAccessWorkflowDefinition(user, wd);
  }

  private boolean userCanAccessWorkflowDefinition(final User user, final WorkflowDefinition wd) {
    return wd.getRoles().isEmpty() || user.hasRole(GLOBAL_ADMIN_ROLE) || wd.getRoles().stream()
            .anyMatch(user::hasRole);
  }

  /**
   * Return available workflow definitions
   *
   * This method finds workflows that are either globally defined or have the correct organization/roles
   * set.
   * @param organization The organization to check for (must not be <code>null</code>)
   * @param user The user to check for (must not be <code>null</code>)
   * @return A stream of available organizations
   */
  public Stream<WorkflowDefinition> getAvailableWorkflowDefinitions(final Organization organization, final User user) {
    return installedWorkflows.keySet().stream()
            .filter(wfi -> wfi.getOrganization() == null || wfi.getOrganization().equals(organization.getId()))
            .filter(wfi -> userCanAccessWorkflow(user, wfi))
            .map(WorkflowIdentifier::getId)
            .distinct()
            .map(identifier -> getWorkflowDefinition(user, new WorkflowIdentifier(identifier, organization.getId())));
  }

  /**
   * Return the workflow definition for a given workflow identifier
   *
   * This method tries to get the workflow using the exact identifier and falls back to the global workflow (without
   * the organization) if that fails.
   *
   * @param user The user to check for
   * @param workflowIdentifier The workflow identifier
   * @return Either <code>null</code> if no workflow is found for this identifier, or the workflow definition.
   */
  public WorkflowDefinition getWorkflowDefinition(final User user, final WorkflowIdentifier workflowIdentifier) {
    final WorkflowDefinition result = installedWorkflows.get(workflowIdentifier);
    if (result != null && userCanAccessWorkflowDefinition(user, result)) {
      return result;
    }
    return installedWorkflows.get(new WorkflowIdentifier(workflowIdentifier.getId(), null));
  }

  /**
   * Add the given workflow definition to the installed workflow definition id.
   *
   * @param identifier
   *          the identifier of the workflow definition to add
   * @param wfd
   *          the workflow definition
   */
  public void putWorkflowDefinition(WorkflowIdentifier identifier, WorkflowDefinition wfd) {
    installedWorkflows.put(identifier, wfd);
  }

  /**
   * Remove the workflow definition with the given id from the installed definition list.
   *
   * @param identifier
   *          the workflow definition identifier
   * @return the removed workflow definition
   */
  public WorkflowDefinition removeWorkflowDefinition(WorkflowIdentifier identifier) {
    return installedWorkflows.remove(identifier);
  }

  /**
   * {@inheritDoc}
   *
   * @see org.apache.felix.fileinstall.ArtifactListener#canHandle(java.io.File)
   */
  public boolean canHandle(File artifact) {
    return workflowFilenameFilter.accept(artifact.getParentFile(),artifact.getName());
  }
}