1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.workflow.impl;
23
24 import static org.opencastproject.security.api.SecurityConstants.GLOBAL_ADMIN_ROLE;
25 import static org.opencastproject.util.ReadinessIndicator.ARTIFACT;
26
27 import org.opencastproject.security.api.Organization;
28 import org.opencastproject.security.api.OrganizationDirectoryListener;
29 import org.opencastproject.security.api.OrganizationDirectoryService;
30 import org.opencastproject.security.api.User;
31 import org.opencastproject.util.ReadinessIndicator;
32 import org.opencastproject.workflow.api.WorkflowDefinition;
33 import org.opencastproject.workflow.api.WorkflowIdentifier;
34 import org.opencastproject.workflow.api.WorkflowStateMapping;
35 import org.opencastproject.workflow.api.XmlWorkflowParser;
36 import org.opencastproject.workflow.api.YamlWorkflowParser;
37
38 import org.apache.felix.fileinstall.ArtifactInstaller;
39 import org.osgi.framework.BundleContext;
40 import org.osgi.service.component.annotations.Activate;
41 import org.osgi.service.component.annotations.Component;
42 import org.osgi.service.component.annotations.Reference;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 import java.io.File;
47 import java.io.FileInputStream;
48 import java.io.InputStream;
49 import java.util.ArrayList;
50 import java.util.Dictionary;
51 import java.util.HashMap;
52 import java.util.Hashtable;
53 import java.util.List;
54 import java.util.Map;
55 import java.util.Set;
56 import java.util.stream.Stream;
57
58
59
60
61
62 @Component(
63 property = {
64 "service.description=Workflow Definition Scanner"
65 },
66 immediate = true,
67 service = { ArtifactInstaller.class, WorkflowDefinitionScanner.class }
68 )
69 public class WorkflowDefinitionScanner implements ArtifactInstaller, OrganizationDirectoryListener {
70 private static final Logger logger = LoggerFactory.getLogger(WorkflowDefinitionScanner.class);
71
72
73 protected Map<WorkflowIdentifier, WorkflowDefinition> installedWorkflows = new HashMap<>();
74
75
76 protected Map<String, Set<WorkflowStateMapping>> workflowStateMappings = new HashMap<>();
77
78
79 protected Map<File, WorkflowIdentifier> artifactIds = new HashMap<>();
80
81
82 protected final List<File> artifactsWithError = new ArrayList<>();
83
84
85 private BundleContext bundleCtx = null;
86
87
88 private boolean isWFSinitialized = false;
89
90 private OrganizationDirectoryService organizationDirectoryService;
91
92 private WorkflowFilenameFilter workflowFilenameFilter;
93
94 @Reference
95 public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
96 this.organizationDirectoryService = organizationDirectoryService;
97 }
98
99
100
101
102
103
104
105
106
107 @Activate
108 void activate(BundleContext ctx) {
109 this.bundleCtx = ctx;
110 organizationDirectoryService.addOrganizationDirectoryListener(this);
111 this.workflowFilenameFilter = new WorkflowFilenameFilter("workflows", ".*\\.(xml|yaml|yml)$");
112 }
113
114
115
116
117
118
119 public void install(File artifact) {
120 synchronized (artifactsWithError) {
121 WorkflowDefinition def = parseWorkflowDefinitionFile(artifact);
122 if (def == null) {
123 logger.warn("Unable to install workflow from '{}'", artifact.getName());
124 artifactsWithError.add(artifact);
125 } else {
126 installWorkflowDefinition(artifact, def);
127 }
128
129
130 String[] filesInDirectory = artifact.getParentFile().list(workflowFilenameFilter);
131 if (filesInDirectory == null) {
132 throw new RuntimeException("error retrieving files from directory \"" + artifact.getParentFile() + "\"");
133 }
134
135
136 if ((filesInDirectory.length - artifactsWithError.size()) == artifactIds.size() && !isWFSinitialized) {
137 logger.info("{} Workflow definitions loaded, activating Workflow service", filesInDirectory.length - artifactsWithError.size());
138 Dictionary<String, String> properties = new Hashtable<>();
139 properties.put(ARTIFACT, "workflowdefinition");
140 logger.debug("Indicating readiness of workflow definitions");
141 bundleCtx.registerService(ReadinessIndicator.class.getName(), new ReadinessIndicator(), properties);
142 isWFSinitialized = true;
143 }
144 }
145 }
146
147 private void installWorkflowDefinition(File artifact, WorkflowDefinition def) {
148 synchronized (artifactsWithError) {
149
150 final WorkflowIdentifier workflowIdentifier = new WorkflowIdentifier(def.getId(), def.getOrganization());
151 for (Map.Entry<File, WorkflowIdentifier> fileWithIdentifier : artifactIds.entrySet()) {
152 if (fileWithIdentifier.getValue().equals(workflowIdentifier) && !fileWithIdentifier.getKey().equals(artifact)) {
153 logger.warn("Workflow with identifier '{}' already registered in file '{}', ignoring", workflowIdentifier,
154 fileWithIdentifier.getKey());
155 artifactsWithError.add(artifact);
156 return;
157 }
158 }
159
160 logger.debug("Installing workflow from file '{}'", artifact.getName());
161 artifactsWithError.remove(artifact);
162 artifactIds.put(artifact, workflowIdentifier);
163 putWorkflowDefinition(workflowIdentifier, def);
164 workflowStateMappings.put(def.getId(), def.getStateMappings());
165
166 logger.info("Workflow definition '{}' from file '{}' installed", workflowIdentifier, artifact.getName());
167 }
168 }
169
170
171
172
173
174
175 public void uninstall(File artifact) {
176
177 WorkflowIdentifier identifier = artifactIds.remove(artifact);
178 if (identifier != null) {
179 removeWorkflowDefinition(identifier);
180 logger.info("Uninstalling workflow definition '{}' from file '{}'", identifier, artifact.getName());
181 }
182 }
183
184
185
186
187
188
189 public void update(File artifact) {
190 WorkflowDefinition def = parseWorkflowDefinitionFile(artifact);
191
192 if (def != null) {
193 uninstall(artifact);
194 installWorkflowDefinition(artifact, def);
195 }
196 }
197
198 private boolean organizationExists(final String organization) {
199 return organizationDirectoryService.getOrganizations().stream().anyMatch(org -> org.getId().equals(organization));
200 }
201
202 @Override
203 public void organizationRegistered(Organization organization) {
204 synchronized (artifactsWithError) {
205 logger.info("New organization '{}' registered: check for previously failed workflow definitions", organization.getId());
206 ArrayList<File> artifactsWithErrorCopy = new ArrayList<>(artifactsWithError);
207 for (File artifact : artifactsWithErrorCopy) {
208 WorkflowDefinition def = parseWorkflowDefinitionFile(artifact);
209 if (def != null && organization.getId().equals(def.getOrganization())) {
210 installWorkflowDefinition(artifact, def);
211 }
212 }
213 }
214 }
215
216 @Override
217 public void organizationUnregistered(Organization organization) {
218
219 }
220
221 @Override
222 public void organizationUpdated(Organization organization) {
223
224 }
225
226
227
228
229
230
231
232
233 public WorkflowDefinition parseWorkflowDefinitionFile(File artifact) {
234 try (InputStream stream = new FileInputStream(artifact)) {
235 WorkflowDefinition def;
236 if (artifact.getName().endsWith(".yml") || artifact.getName().endsWith(".yaml")) {
237 def = YamlWorkflowParser.parseWorkflowDefinition(stream);
238 } else {
239 def = XmlWorkflowParser.parseWorkflowDefinition(stream);
240 }
241 if (def.getOperations().size() == 0)
242 logger.warn("Workflow '{}' has no operations", def.getId());
243 if (def.getOrganization() != null && !organizationExists(def.getOrganization())) {
244 throw new RuntimeException("invalid organization '" + def.getOrganization() + "'");
245 }
246 return def;
247 } catch (Exception e) {
248 logger.warn("Unable to parse workflow from file '{}', {}", artifact.getName(), e.getMessage());
249 return null;
250 }
251 }
252
253 private boolean userCanAccessWorkflow(final User user, final WorkflowIdentifier wfi) {
254 final WorkflowDefinition wd = installedWorkflows.get(wfi);
255 return userCanAccessWorkflowDefinition(user, wd);
256 }
257
258 private boolean userCanAccessWorkflowDefinition(final User user, final WorkflowDefinition wd) {
259 return wd.getRoles().isEmpty() || user.hasRole(GLOBAL_ADMIN_ROLE) || wd.getRoles().stream()
260 .anyMatch(user::hasRole);
261 }
262
263
264
265
266
267
268
269
270
271
272 public Stream<WorkflowDefinition> getAvailableWorkflowDefinitions(final Organization organization, final User user) {
273 return installedWorkflows.keySet().stream()
274 .filter(wfi -> wfi.getOrganization() == null || wfi.getOrganization().equals(organization.getId()))
275 .filter(wfi -> userCanAccessWorkflow(user, wfi))
276 .map(WorkflowIdentifier::getId)
277 .distinct()
278 .map(identifier -> getWorkflowDefinition(user, new WorkflowIdentifier(identifier, organization.getId())));
279 }
280
281
282
283
284
285
286
287
288
289
290
291 public WorkflowDefinition getWorkflowDefinition(final User user, final WorkflowIdentifier workflowIdentifier) {
292 final WorkflowDefinition result = installedWorkflows.get(workflowIdentifier);
293 if (result != null && userCanAccessWorkflowDefinition(user, result)) {
294 return result;
295 }
296 return installedWorkflows.get(new WorkflowIdentifier(workflowIdentifier.getId(), null));
297 }
298
299
300
301
302
303
304
305
306
307 public void putWorkflowDefinition(WorkflowIdentifier identifier, WorkflowDefinition wfd) {
308 installedWorkflows.put(identifier, wfd);
309 }
310
311
312
313
314
315
316
317
318 public WorkflowDefinition removeWorkflowDefinition(WorkflowIdentifier identifier) {
319 return installedWorkflows.remove(identifier);
320 }
321
322
323
324
325
326
327 public boolean canHandle(File artifact) {
328 return workflowFilenameFilter.accept(artifact.getParentFile(),artifact.getName());
329 }
330 }