1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.capture.admin.impl;
23
24 import static org.apache.commons.lang3.StringUtils.isBlank;
25 import static org.opencastproject.capture.admin.api.AgentState.KNOWN_STATES;
26 import static org.opencastproject.capture.admin.api.AgentState.UNKNOWN;
27 import static org.opencastproject.db.Queries.namedQuery;
28 import static org.opencastproject.util.OsgiUtil.getOptContextProperty;
29
30 import org.opencastproject.capture.admin.api.Agent;
31 import org.opencastproject.capture.admin.api.AgentState;
32 import org.opencastproject.capture.admin.api.CaptureAgentStateService;
33 import org.opencastproject.db.DBSession;
34 import org.opencastproject.db.DBSessionFactory;
35 import org.opencastproject.security.api.Organization;
36 import org.opencastproject.security.api.Role;
37 import org.opencastproject.security.api.SecurityConstants;
38 import org.opencastproject.security.api.SecurityService;
39 import org.opencastproject.security.api.User;
40 import org.opencastproject.util.NotFoundException;
41 import org.opencastproject.util.data.Tuple3;
42 import org.opencastproject.util.function.ThrowingFunction;
43
44 import com.google.common.cache.CacheBuilder;
45 import com.google.common.cache.CacheLoader;
46 import com.google.common.cache.LoadingCache;
47 import com.google.common.cache.RemovalCause;
48 import com.google.common.cache.RemovalListener;
49 import com.google.common.cache.RemovalNotification;
50
51 import org.apache.commons.lang3.StringUtils;
52 import org.apache.commons.lang3.tuple.Pair;
53 import org.osgi.service.cm.ConfigurationException;
54 import org.osgi.service.cm.ManagedServiceFactory;
55 import org.osgi.service.component.ComponentContext;
56 import org.osgi.service.component.annotations.Activate;
57 import org.osgi.service.component.annotations.Component;
58 import org.osgi.service.component.annotations.Deactivate;
59 import org.osgi.service.component.annotations.Reference;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 import java.util.Arrays;
64 import java.util.Dictionary;
65 import java.util.Iterator;
66 import java.util.LinkedHashSet;
67 import java.util.List;
68 import java.util.Map;
69 import java.util.Optional;
70 import java.util.Properties;
71 import java.util.Set;
72 import java.util.TreeMap;
73 import java.util.concurrent.ConcurrentHashMap;
74 import java.util.concurrent.TimeUnit;
75
76 import javax.persistence.EntityManager;
77 import javax.persistence.EntityManagerFactory;
78 import javax.persistence.NoResultException;
79 import javax.persistence.RollbackException;
80 import javax.persistence.TypedQuery;
81
82
83
84
85 @Component(
86 property = {
87 "service.description=Capture-Admin Service",
88 "service.pid=org.opencastproject.capture.agent"
89 },
90 immediate = true,
91 service = { CaptureAgentStateService.class , ManagedServiceFactory.class }
92 )
93 public class CaptureAgentStateServiceImpl implements CaptureAgentStateService, ManagedServiceFactory {
94
95 private static final Logger logger = LoggerFactory.getLogger(CaptureAgentStateServiceImpl.class);
96
97
98 public static final String PERSISTENCE_UNIT = "org.opencastproject.capture.admin.impl.CaptureAgentStateServiceImpl";
99
100
101 private static final String DELIMITER = ";==;";
102
103
104 protected EntityManagerFactory emf = null;
105
106 protected DBSessionFactory dbSessionFactory;
107
108 protected DBSession db;
109
110
111 protected SecurityService securityService;
112
113
114 protected Map<String, String> pidMap = new ConcurrentHashMap<>();
115
116
117 private LoadingCache<String, Object> agentCache = null;
118
119
120 public static final String CAPTURE_AGENT_TIMEOUT_KEY = "org.opencastproject.capture.admin.timeout";
121
122
123 protected Object nullToken = new Object();
124
125
126 @Reference(target = "(osgi.unit.name=org.opencastproject.capture.admin.impl.CaptureAgentStateServiceImpl)")
127 void setEntityManagerFactory(EntityManagerFactory emf) {
128 this.emf = emf;
129 }
130
131 @Reference
132 public void setDBSessionFactory(DBSessionFactory dbSessionFactory) {
133 this.dbSessionFactory = dbSessionFactory;
134 }
135
136
137
138
139
140 @Reference
141 public void setSecurityService(SecurityService securityService) {
142 this.securityService = securityService;
143 }
144
145 public CaptureAgentStateServiceImpl() {
146 logger.info("CaptureAgentStateServiceImpl starting.");
147 }
148
149 @Activate
150 public void activate(ComponentContext cc) {
151 db = dbSessionFactory.createSession(emf);
152
153
154 int timeoutInMinutes = 120;
155
156 Optional<String> timeout = getOptContextProperty(cc, CAPTURE_AGENT_TIMEOUT_KEY);
157
158 if (timeout.isPresent()) {
159 try {
160 timeoutInMinutes = Integer.parseInt(timeout.get());
161 } catch (NumberFormatException e) {
162 logger.warn("Invalid configuration for capture agent status timeout (minutes) ({}={})",
163 CAPTURE_AGENT_TIMEOUT_KEY, timeout.get());
164 }
165 }
166
167 setupAgentCache(timeoutInMinutes, TimeUnit.MINUTES);
168 logger.info("Capture agent status timeout is {} minutes", timeoutInMinutes);
169 }
170
171 @Deactivate
172 public void deactivate() {
173 agentCache.invalidateAll();
174 db.close();
175 }
176
177
178
179
180
181
182 @Override
183 public Agent getAgent(String name) throws NotFoundException {
184 String org = securityService.getOrganization().getId();
185 Agent agent = getAgent(name, org);
186 return updateCachedLastHeardFrom(agent, org);
187 }
188
189
190
191
192
193
194
195
196
197
198 protected AgentImpl getAgent(String name, String org) throws NotFoundException {
199 return db.execChecked(getAgentEntityQuery(name, org));
200 }
201
202
203
204
205
206
207
208
209
210
211 protected ThrowingFunction<EntityManager, AgentImpl, NotFoundException> getAgentEntityQuery(String name, String organization) {
212 return em -> {
213 try {
214 TypedQuery<AgentImpl> q = em.createNamedQuery("Agent.get", AgentImpl.class);
215 q.setParameter("id", name);
216 q.setParameter("org", organization);
217 return q.getSingleResult();
218 } catch (NoResultException e) {
219 throw new NotFoundException(e);
220 }
221 };
222 }
223
224
225
226
227
228
229
230
231
232
233 protected Agent updateCachedLastHeardFrom(Agent agent, String org) {
234 String agentKey = agent.getName().concat(DELIMITER).concat(org);
235 Tuple3<String, Properties, Long> cachedAgent = (Tuple3) agentCache.getUnchecked(agentKey);
236 if (cachedAgent != null) {
237 agent.setLastHeardFrom(cachedAgent.getC());
238 }
239 return agent;
240 }
241
242
243
244
245
246
247 @Override
248 public String getAgentState(String agentName) throws NotFoundException {
249 String orgId = securityService.getOrganization().getId();
250 Tuple3<String, Properties, Long> agent = getAgentFromCache(agentName, orgId);
251 return agent.getA();
252 }
253
254
255
256
257
258
259
260 @Override
261 public boolean setAgentState(String agentName, String state) {
262 if (StringUtils.isBlank(agentName))
263 throw new IllegalArgumentException("Unable to set agent state, agent name is blank or null.");
264 if (StringUtils.isBlank(state))
265 throw new IllegalArgumentException("Unable to set agent state, state is blank or null.");
266 if (!KNOWN_STATES.contains(state))
267 throw new IllegalArgumentException("Can not set agent to an invalid state: ".concat(state));
268
269 logger.debug("Agent '{}' state set to '{}'", agentName, state);
270 AgentImpl agent;
271 String orgId = securityService.getOrganization().getId();
272 try {
273
274 if (!updateAgentInCache(agentName, state, orgId)) {
275 return false;
276 }
277
278 agent = (AgentImpl) getAgent(agentName);
279
280
281 logger.debug("Setting Agent {} to state {}.", agentName, state);
282 agent.setState(state);
283 if (!AgentState.UNKNOWN.equals(state)) {
284 agent.setLastHeardFrom(System.currentTimeMillis());
285 }
286 } catch (NotFoundException e) {
287
288 logger.debug("Creating Agent {} with state {}.", agentName, state);
289 agent = new AgentImpl(agentName, orgId, state, "", new Properties());
290 }
291 updateAgentInDatabase(agent);
292 return true;
293 }
294
295
296
297
298
299
300
301
302
303
304
305
306
307 private boolean updateAgentInCache(String agentName, String state, String orgId) {
308 return updateAgentInCache(agentName, state, orgId, null);
309 }
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325 private boolean updateAgentInCache(String agentName, String state, String orgId, Properties configuration) {
326 try {
327 String agentState = getAgentFromCache(agentName, orgId).getA();
328 Properties config = getAgentConfiguration(agentName);
329 if (configuration != null) {
330 config = configuration;
331 }
332 if (!AgentState.UNKNOWN.equals(state)) {
333 agentCache.put(agentName.concat(DELIMITER).concat(orgId),
334 Tuple3.tuple3(state, config, Long.valueOf(System.currentTimeMillis())));
335 } else {
336
337
338 agentCache.put(agentName.concat(DELIMITER).concat(orgId),
339 Tuple3.tuple3(state, config, getAgentFromCache(agentName, orgId).getC()));
340 }
341 if (agentState.equals(state)) {
342 return false;
343 }
344 return true;
345 } catch (NotFoundException e) {
346 agentCache.put(agentName.concat(DELIMITER).concat(orgId),
347 Tuple3.tuple3(state, configuration, Long.valueOf(System.currentTimeMillis())));
348 return true;
349 }
350 }
351
352
353
354
355
356
357 @Override
358 public boolean setAgentUrl(String agentName, String agentUrl) throws NotFoundException {
359 Agent agent = getAgent(agentName);
360 if (agent.getUrl().equals(agentUrl))
361 return false;
362 agent.setUrl(agentUrl);
363 updateAgentInDatabase((AgentImpl) agent);
364 return true;
365 }
366
367
368
369
370
371
372 @Override
373 public void removeAgent(String agentName) throws NotFoundException {
374 deleteAgentFromDatabase(agentName);
375 }
376
377
378
379
380
381
382 @Override
383 public Map<String, Agent> getKnownAgents() {
384 agentCache.cleanUp();
385 User user = securityService.getUser();
386 Organization org = securityService.getOrganization();
387
388 String orgAdmin = org.getAdminRole();
389 Set<Role> roles = user.getRoles();
390
391 List<AgentImpl> agents = db.exec(namedQuery.findAll(
392 "Agent.byOrganization",
393 AgentImpl.class,
394 Pair.of("org", securityService.getOrganization().getId())
395 ));
396
397
398 if (!user.hasRole(SecurityConstants.GLOBAL_ADMIN_ROLE) && !user.hasRole(orgAdmin)) {
399 for (Iterator<AgentImpl> iter = agents.iterator(); iter.hasNext();) {
400 AgentImpl agent = iter.next();
401 Set<String> schedulerRoles = agent.getSchedulerRoles();
402
403
404 if (schedulerRoles == null || schedulerRoles.isEmpty()) {
405 continue;
406 }
407 boolean hasSchedulerRole = false;
408 for (Role role : roles) {
409 if (schedulerRoles.contains(role.getName())) {
410 hasSchedulerRole = true;
411 break;
412 }
413 }
414 if (!hasSchedulerRole) {
415 iter.remove();
416 }
417 }
418 }
419
420
421 Map<String, Agent> map = new TreeMap<>();
422 for (AgentImpl agent : agents) {
423 map.put(agent.getName(), updateCachedLastHeardFrom(agent, org.getId()));
424 }
425 return map;
426 }
427
428
429
430
431
432
433 @Override
434 public Properties getAgentCapabilities(String agentName) throws NotFoundException {
435 return getAgent(agentName).getCapabilities();
436 }
437
438
439
440
441
442
443 @Override
444 public Properties getAgentConfiguration(String agentName) throws NotFoundException {
445 String orgId = securityService.getOrganization().getId();
446 Tuple3<String, Properties, Long> agent = getAgentFromCache(agentName, orgId);
447 return agent.getB();
448 }
449
450 @SuppressWarnings("unchecked")
451 private Tuple3<String, Properties, Long> getAgentFromCache(String agentName, String orgId) throws NotFoundException {
452 Object agent = agentCache.getUnchecked(agentName.concat(DELIMITER).concat(orgId));
453 if (agent == nullToken) {
454 throw new NotFoundException();
455 } else {
456 return (Tuple3<String, Properties, Long>) agent;
457 }
458 }
459
460
461
462
463
464
465 @Override
466 public boolean setAgentConfiguration(String agentName, Properties configuration) {
467 if (StringUtils.isBlank(agentName))
468 throw new IllegalArgumentException("Unable to set agent state, agent name is blank or null.");
469
470 String orgId = securityService.getOrganization().getId();
471 AgentImpl agent;
472 try {
473 Properties agentConfig = getAgentFromCache(agentName, orgId).getB();
474 if (agentConfig.equals(configuration)) {
475 agentCache.put(agentName.concat(DELIMITER).concat(orgId),
476 Tuple3.tuple3(getAgentState(agentName), agentConfig, Long.valueOf(System.currentTimeMillis())));
477 return false;
478 }
479
480 agent = (AgentImpl) getAgent(agentName);
481 logger.debug("Setting Agent {}'s capabilities", agentName);
482 agent.setConfiguration(configuration);
483 } catch (NotFoundException e) {
484
485 logger.debug("Creating Agent {} with state {}.", agentName, UNKNOWN);
486 agent = new AgentImpl(agentName, orgId, UNKNOWN, "", configuration);
487 }
488
489 updateAgentInDatabase(agent);
490 return true;
491 }
492
493
494
495
496
497
498
499 protected void updateAgentInDatabase(AgentImpl agent) {
500 updateAgentInDatabase(agent, true, 10);
501 }
502
503
504
505
506
507
508
509
510
511
512 private void updateAgentInDatabase(AgentImpl agent, boolean updateFromCache, int retries) {
513 try {
514 db.execTx(retries, em -> {
515
516 Long cachedLastHeardFrom = -1L;
517
518 if (updateFromCache) {
519 try {
520 cachedLastHeardFrom = getAgentFromCache(agent.getName(), agent.getOrganization()).getC();
521 } catch (NotFoundException e) {
522
523 }
524 }
525
526 try {
527 AgentImpl existing = getAgentEntityQuery(agent.getName(), agent.getOrganization()).apply(em);
528 existing.setConfiguration(agent.getConfiguration());
529 if (!AgentState.UNKNOWN.equals(agent.getState())) {
530 existing.setLastHeardFrom(Math.max(cachedLastHeardFrom, agent.getLastHeardFrom()));
531 }
532 existing.setState(agent.getState());
533 existing.setSchedulerRoles(agent.getSchedulerRoles());
534 existing.setUrl(agent.getUrl());
535 em.merge(existing);
536 } catch (NotFoundException e) {
537 em.persist(agent);
538 }
539 });
540
541 if (updateFromCache) {
542 updateAgentInCache(agent.getName(), agent.getState(), agent.getOrganization(), agent.getConfiguration());
543 }
544 } catch (RollbackException e) {
545 throw new RollbackException("Maximum number of retries exceeded", e);
546 }
547 }
548
549
550
551
552
553
554
555 private void deleteAgentFromDatabase(String agentName) throws NotFoundException {
556 try {
557 String org = securityService.getOrganization().getId();
558 db.execTxChecked(em -> {
559 Agent existing = getAgentEntityQuery(agentName, org).apply(em);
560 if (existing == null)
561 throw new NotFoundException();
562 em.remove(existing);
563 });
564 agentCache.invalidate(agentName.concat(DELIMITER).concat(org));
565 } catch (RollbackException e) {
566 logger.warn("Unable to commit to DB in deleteAgent.");
567 }
568 }
569
570
571
572
573
574
575
576
577 @Override
578 public String getName() {
579 return "org.opencastproject.capture.agent";
580 }
581
582 protected void setupAgentCache(int count, TimeUnit unit) {
583
584 RemovalListener<String, Object> removalListener = new RemovalListener<String, Object>() {
585 private Set<String> ignoredStates = new LinkedHashSet<>(Arrays.asList(AgentState.UNKNOWN, AgentState.OFFLINE));
586 @Override
587 public void onRemoval(RemovalNotification<String, Object> removal) {
588 if (RemovalCause.EXPIRED.equals(removal.getCause())) {
589 String org = securityService.getOrganization().getId();
590 try {
591 String agentName = removal.getKey().split(DELIMITER)[0];
592 AgentImpl agent = getAgent(agentName, org);
593 if (!ignoredStates.contains(agent.getState())) {
594 agent.setState(AgentState.OFFLINE);
595 updateAgentInDatabase(agent, false, 2);
596 }
597 } catch (NotFoundException e) {
598
599
600 }
601 }
602 }
603 };
604 agentCache = CacheBuilder.newBuilder().expireAfterWrite(count, unit).removalListener(removalListener).build(new CacheLoader<String, Object>() {
605 @Override
606 public Object load(String id) {
607 String[] key = id.split(DELIMITER);
608 AgentImpl agent;
609 try {
610 agent = getAgent(key[0], key[1]);
611 } catch (NotFoundException e) {
612 return nullToken;
613 }
614 return Tuple3.tuple3(agent.getState(), agent.getConfiguration(), agent.getLastHeardFrom());
615 }
616 });
617 }
618
619
620
621
622
623
624 @Override
625 public void updated(String pid, Dictionary<String, ?> properties) throws ConfigurationException {
626
627 String nameConfig = (String) properties.get("id");
628 if (isBlank(nameConfig))
629 throw new ConfigurationException("id", "must be specified");
630
631 nameConfig = nameConfig.trim();
632
633 String urlConfig = (String) properties.get("url");
634 if (isBlank(urlConfig))
635 throw new ConfigurationException("url", "must be specified");
636 urlConfig = urlConfig.trim();
637
638 String orgConfig = (String) properties.get("organization");
639 if (isBlank(orgConfig))
640 throw new ConfigurationException("organization", "must be specified");
641 orgConfig = orgConfig.trim();
642
643 String schedulerRolesConfig = (String) properties.get("schedulerRoles");
644 if (isBlank(schedulerRolesConfig))
645 throw new ConfigurationException("schedulerRoles", "must be specified");
646 String[] schedulerRoles = schedulerRolesConfig.trim().split(",");
647
648
649 if (!pidMap.containsKey(pid)) {
650 pidMap.put(pid, nameConfig);
651 }
652
653 AgentImpl agent;
654 try {
655 agent = getAgent(nameConfig, orgConfig);
656 agent.setUrl(urlConfig);
657 agent.setState(UNKNOWN);
658 } catch (NotFoundException e) {
659 agent = new AgentImpl(nameConfig, orgConfig, UNKNOWN, urlConfig, new Properties());
660 }
661
662 for (String role : schedulerRoles) {
663 agent.schedulerRoles.add(role.trim());
664 }
665
666
667 logger.info("Roles '{}' may schedule '{}'", schedulerRolesConfig, agent.name);
668 updateAgentInDatabase(agent);
669 }
670
671
672
673
674
675
676 @Override
677 public void deleted(String pid) {
678 String agentId = pidMap.remove(pid);
679 if (agentId == null) {
680 logger.warn("{} was not a managed capture agent pid", pid);
681 } else {
682 try {
683 deleteAgentFromDatabase(agentId);
684 } catch (NotFoundException e) {
685 logger.warn("Unable to delete capture agent '{}'", agentId);
686 }
687 }
688 }
689 }