1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.capture.admin.impl;
23
24 import static org.apache.commons.lang3.StringUtils.isBlank;
25 import static org.opencastproject.capture.admin.api.AgentState.KNOWN_STATES;
26 import static org.opencastproject.capture.admin.api.AgentState.UNKNOWN;
27 import static org.opencastproject.db.Queries.namedQuery;
28 import static org.opencastproject.util.OsgiUtil.getOptContextProperty;
29
30 import org.opencastproject.capture.admin.api.Agent;
31 import org.opencastproject.capture.admin.api.AgentState;
32 import org.opencastproject.capture.admin.api.CaptureAgentStateService;
33 import org.opencastproject.db.DBSession;
34 import org.opencastproject.db.DBSessionFactory;
35 import org.opencastproject.security.api.Organization;
36 import org.opencastproject.security.api.Role;
37 import org.opencastproject.security.api.SecurityConstants;
38 import org.opencastproject.security.api.SecurityService;
39 import org.opencastproject.security.api.User;
40 import org.opencastproject.util.NotFoundException;
41 import org.opencastproject.util.data.Tuple3;
42 import org.opencastproject.util.function.ThrowingFunction;
43
44 import com.google.common.cache.CacheBuilder;
45 import com.google.common.cache.CacheLoader;
46 import com.google.common.cache.LoadingCache;
47 import com.google.common.cache.RemovalCause;
48 import com.google.common.cache.RemovalListener;
49 import com.google.common.cache.RemovalNotification;
50
51 import org.apache.commons.lang3.StringUtils;
52 import org.apache.commons.lang3.tuple.Pair;
53 import org.osgi.service.cm.ConfigurationException;
54 import org.osgi.service.cm.ManagedServiceFactory;
55 import org.osgi.service.component.ComponentContext;
56 import org.osgi.service.component.annotations.Activate;
57 import org.osgi.service.component.annotations.Component;
58 import org.osgi.service.component.annotations.Deactivate;
59 import org.osgi.service.component.annotations.Reference;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 import java.util.Arrays;
64 import java.util.Dictionary;
65 import java.util.Iterator;
66 import java.util.LinkedHashSet;
67 import java.util.List;
68 import java.util.Map;
69 import java.util.Optional;
70 import java.util.Properties;
71 import java.util.Set;
72 import java.util.TreeMap;
73 import java.util.concurrent.ConcurrentHashMap;
74 import java.util.concurrent.TimeUnit;
75
76 import javax.persistence.EntityManager;
77 import javax.persistence.EntityManagerFactory;
78 import javax.persistence.NoResultException;
79 import javax.persistence.RollbackException;
80 import javax.persistence.TypedQuery;
81
82
83
84
85 @Component(
86 property = {
87 "service.description=Capture-Admin Service",
88 "service.pid=org.opencastproject.capture.agent"
89 },
90 immediate = true,
91 service = { CaptureAgentStateService.class , ManagedServiceFactory.class }
92 )
93 public class CaptureAgentStateServiceImpl implements CaptureAgentStateService, ManagedServiceFactory {
94
95 private static final Logger logger = LoggerFactory.getLogger(CaptureAgentStateServiceImpl.class);
96
97
98 public static final String PERSISTENCE_UNIT = "org.opencastproject.capture.admin.impl.CaptureAgentStateServiceImpl";
99
100
101 private static final String DELIMITER = ";==;";
102
103
104 protected EntityManagerFactory emf = null;
105
106 protected DBSessionFactory dbSessionFactory;
107
108 protected DBSession db;
109
110
111 protected SecurityService securityService;
112
113
114 protected Map<String, String> pidMap = new ConcurrentHashMap<>();
115
116
117 private LoadingCache<String, Object> agentCache = null;
118
119
120 public static final String CAPTURE_AGENT_TIMEOUT_KEY = "org.opencastproject.capture.admin.timeout";
121
122
123 protected Object nullToken = new Object();
124
125
126 @Reference(target = "(osgi.unit.name=org.opencastproject.capture.admin.impl.CaptureAgentStateServiceImpl)")
127 void setEntityManagerFactory(EntityManagerFactory emf) {
128 this.emf = emf;
129 }
130
131 @Reference
132 public void setDBSessionFactory(DBSessionFactory dbSessionFactory) {
133 this.dbSessionFactory = dbSessionFactory;
134 }
135
136
137
138
139
140 @Reference
141 public void setSecurityService(SecurityService securityService) {
142 this.securityService = securityService;
143 }
144
145 public CaptureAgentStateServiceImpl() {
146 logger.info("CaptureAgentStateServiceImpl starting.");
147 }
148
149 @Activate
150 public void activate(ComponentContext cc) {
151 db = dbSessionFactory.createSession(emf);
152
153
154 int timeoutInMinutes = 120;
155
156 Optional<String> timeout = getOptContextProperty(cc, CAPTURE_AGENT_TIMEOUT_KEY);
157
158 if (timeout.isPresent()) {
159 try {
160 timeoutInMinutes = Integer.parseInt(timeout.get());
161 } catch (NumberFormatException e) {
162 logger.warn("Invalid configuration for capture agent status timeout (minutes) ({}={})",
163 CAPTURE_AGENT_TIMEOUT_KEY, timeout.get());
164 }
165 }
166
167 setupAgentCache(timeoutInMinutes, TimeUnit.MINUTES);
168 logger.info("Capture agent status timeout is {} minutes", timeoutInMinutes);
169 }
170
171 @Deactivate
172 public void deactivate() {
173 agentCache.invalidateAll();
174 db.close();
175 }
176
177
178
179
180
181
182 @Override
183 public Agent getAgent(String name) throws NotFoundException {
184 String org = securityService.getOrganization().getId();
185 Agent agent = getAgent(name, org);
186 return updateCachedLastHeardFrom(agent, org);
187 }
188
189
190
191
192
193
194
195
196
197
198 protected AgentImpl getAgent(String name, String org) throws NotFoundException {
199 return db.execChecked(getAgentEntityQuery(name, org));
200 }
201
202
203
204
205
206
207
208
209
210
211 protected ThrowingFunction<EntityManager, AgentImpl, NotFoundException> getAgentEntityQuery(String name,
212 String organization) {
213 return em -> {
214 try {
215 TypedQuery<AgentImpl> q = em.createNamedQuery("Agent.get", AgentImpl.class);
216 q.setParameter("id", name);
217 q.setParameter("org", organization);
218 return q.getSingleResult();
219 } catch (NoResultException e) {
220 throw new NotFoundException(e);
221 }
222 };
223 }
224
225
226
227
228
229
230
231
232
233
234 protected Agent updateCachedLastHeardFrom(Agent agent, String org) {
235 String agentKey = agent.getName().concat(DELIMITER).concat(org);
236 Tuple3<String, Properties, Long> cachedAgent = (Tuple3) agentCache.getUnchecked(agentKey);
237 if (cachedAgent != null) {
238 agent.setLastHeardFrom(cachedAgent.getC());
239 }
240 return agent;
241 }
242
243
244
245
246
247
248 @Override
249 public String getAgentState(String agentName) throws NotFoundException {
250 String orgId = securityService.getOrganization().getId();
251 Tuple3<String, Properties, Long> agent = getAgentFromCache(agentName, orgId);
252 return agent.getA();
253 }
254
255
256
257
258
259
260
261 @Override
262 public boolean setAgentState(String agentName, String state) {
263 if (StringUtils.isBlank(agentName)) {
264 throw new IllegalArgumentException("Unable to set agent state, agent name is blank or null.");
265 }
266 if (StringUtils.isBlank(state)) {
267 throw new IllegalArgumentException("Unable to set agent state, state is blank or null.");
268 }
269 if (!KNOWN_STATES.contains(state)) {
270 throw new IllegalArgumentException("Can not set agent to an invalid state: ".concat(state));
271 }
272
273 logger.debug("Agent '{}' state set to '{}'", agentName, state);
274 AgentImpl agent;
275 String orgId = securityService.getOrganization().getId();
276 try {
277
278 if (!updateAgentInCache(agentName, state, orgId)) {
279 return false;
280 }
281
282 agent = (AgentImpl) getAgent(agentName);
283
284
285 logger.debug("Setting Agent {} to state {}.", agentName, state);
286 agent.setState(state);
287 if (!AgentState.UNKNOWN.equals(state)) {
288 agent.setLastHeardFrom(System.currentTimeMillis());
289 }
290 } catch (NotFoundException e) {
291
292 logger.debug("Creating Agent {} with state {}.", agentName, state);
293 agent = new AgentImpl(agentName, orgId, state, "", new Properties());
294 }
295 updateAgentInDatabase(agent);
296 return true;
297 }
298
299
300
301
302
303
304
305
306
307
308
309
310
311 private boolean updateAgentInCache(String agentName, String state, String orgId) {
312 return updateAgentInCache(agentName, state, orgId, null);
313 }
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329 private boolean updateAgentInCache(String agentName, String state, String orgId, Properties configuration) {
330 try {
331 String agentState = getAgentFromCache(agentName, orgId).getA();
332 Properties config = getAgentConfiguration(agentName);
333 if (configuration != null) {
334 config = configuration;
335 }
336 if (!AgentState.UNKNOWN.equals(state)) {
337 agentCache.put(agentName.concat(DELIMITER).concat(orgId),
338 Tuple3.tuple3(state, config, Long.valueOf(System.currentTimeMillis())));
339 } else {
340
341
342 agentCache.put(agentName.concat(DELIMITER).concat(orgId),
343 Tuple3.tuple3(state, config, getAgentFromCache(agentName, orgId).getC()));
344 }
345 if (agentState.equals(state)) {
346 return false;
347 }
348 return true;
349 } catch (NotFoundException e) {
350 agentCache.put(agentName.concat(DELIMITER).concat(orgId),
351 Tuple3.tuple3(state, configuration, Long.valueOf(System.currentTimeMillis())));
352 return true;
353 }
354 }
355
356
357
358
359
360
361 @Override
362 public boolean setAgentUrl(String agentName, String agentUrl) throws NotFoundException {
363 Agent agent = getAgent(agentName);
364 if (agent.getUrl().equals(agentUrl)) {
365 return false;
366 }
367 agent.setUrl(agentUrl);
368 updateAgentInDatabase((AgentImpl) agent);
369 return true;
370 }
371
372
373
374
375
376
377 @Override
378 public void removeAgent(String agentName) throws NotFoundException {
379 deleteAgentFromDatabase(agentName);
380 }
381
382
383
384
385
386
387 @Override
388 public Map<String, Agent> getKnownAgents() {
389 agentCache.cleanUp();
390 User user = securityService.getUser();
391 Organization org = securityService.getOrganization();
392
393 String orgAdmin = org.getAdminRole();
394 Set<Role> roles = user.getRoles();
395
396 List<AgentImpl> agents = db.exec(namedQuery.findAll(
397 "Agent.byOrganization",
398 AgentImpl.class,
399 Pair.of("org", securityService.getOrganization().getId())
400 ));
401
402
403 if (!user.hasRole(SecurityConstants.GLOBAL_ADMIN_ROLE) && !user.hasRole(orgAdmin)) {
404 for (Iterator<AgentImpl> iter = agents.iterator(); iter.hasNext();) {
405 AgentImpl agent = iter.next();
406 Set<String> schedulerRoles = agent.getSchedulerRoles();
407
408
409 if (schedulerRoles == null || schedulerRoles.isEmpty()) {
410 continue;
411 }
412 boolean hasSchedulerRole = false;
413 for (Role role : roles) {
414 if (schedulerRoles.contains(role.getName())) {
415 hasSchedulerRole = true;
416 break;
417 }
418 }
419 if (!hasSchedulerRole) {
420 iter.remove();
421 }
422 }
423 }
424
425
426 Map<String, Agent> map = new TreeMap<>();
427 for (AgentImpl agent : agents) {
428 map.put(agent.getName(), updateCachedLastHeardFrom(agent, org.getId()));
429 }
430 return map;
431 }
432
433
434
435
436
437
438 @Override
439 public Properties getAgentCapabilities(String agentName) throws NotFoundException {
440 return getAgent(agentName).getCapabilities();
441 }
442
443
444
445
446
447
448 @Override
449 public Properties getAgentConfiguration(String agentName) throws NotFoundException {
450 String orgId = securityService.getOrganization().getId();
451 Tuple3<String, Properties, Long> agent = getAgentFromCache(agentName, orgId);
452 return agent.getB();
453 }
454
455 @SuppressWarnings("unchecked")
456 private Tuple3<String, Properties, Long> getAgentFromCache(String agentName, String orgId) throws NotFoundException {
457 Object agent = agentCache.getUnchecked(agentName.concat(DELIMITER).concat(orgId));
458 if (agent == nullToken) {
459 throw new NotFoundException();
460 } else {
461 return (Tuple3<String, Properties, Long>) agent;
462 }
463 }
464
465
466
467
468
469
470 @Override
471 public boolean setAgentConfiguration(String agentName, Properties configuration) {
472 if (StringUtils.isBlank(agentName)) {
473 throw new IllegalArgumentException("Unable to set agent state, agent name is blank or null.");
474 }
475
476 String orgId = securityService.getOrganization().getId();
477 AgentImpl agent;
478 try {
479 Properties agentConfig = getAgentFromCache(agentName, orgId).getB();
480 if (agentConfig.equals(configuration)) {
481 agentCache.put(agentName.concat(DELIMITER).concat(orgId),
482 Tuple3.tuple3(getAgentState(agentName), agentConfig, Long.valueOf(System.currentTimeMillis())));
483 return false;
484 }
485
486 agent = (AgentImpl) getAgent(agentName);
487 logger.debug("Setting Agent {}'s capabilities", agentName);
488 agent.setConfiguration(configuration);
489 } catch (NotFoundException e) {
490
491 logger.debug("Creating Agent {} with state {}.", agentName, UNKNOWN);
492 agent = new AgentImpl(agentName, orgId, UNKNOWN, "", configuration);
493 }
494
495 updateAgentInDatabase(agent);
496 return true;
497 }
498
499
500
501
502
503
504
505 protected void updateAgentInDatabase(AgentImpl agent) {
506 updateAgentInDatabase(agent, true, 10);
507 }
508
509
510
511
512
513
514
515
516
517
518
519 private void updateAgentInDatabase(AgentImpl agent, boolean updateFromCache, int retries) {
520 try {
521 db.execTx(retries, em -> {
522
523 Long cachedLastHeardFrom = -1L;
524
525 if (updateFromCache) {
526 try {
527 cachedLastHeardFrom = getAgentFromCache(agent.getName(), agent.getOrganization()).getC();
528 } catch (NotFoundException e) {
529
530 }
531 }
532
533 try {
534 AgentImpl existing = getAgentEntityQuery(agent.getName(), agent.getOrganization()).apply(em);
535 existing.setConfiguration(agent.getConfiguration());
536 if (!AgentState.UNKNOWN.equals(agent.getState())) {
537 existing.setLastHeardFrom(Math.max(cachedLastHeardFrom, agent.getLastHeardFrom()));
538 }
539 existing.setState(agent.getState());
540 existing.setSchedulerRoles(agent.getSchedulerRoles());
541 existing.setUrl(agent.getUrl());
542 em.merge(existing);
543 } catch (NotFoundException e) {
544 em.persist(agent);
545 }
546 });
547
548 if (updateFromCache) {
549 updateAgentInCache(agent.getName(), agent.getState(), agent.getOrganization(), agent.getConfiguration());
550 }
551 } catch (RollbackException e) {
552 throw new RollbackException("Maximum number of retries exceeded", e);
553 }
554 }
555
556
557
558
559
560
561
562 private void deleteAgentFromDatabase(String agentName) throws NotFoundException {
563 try {
564 String org = securityService.getOrganization().getId();
565 db.execTxChecked(em -> {
566 Agent existing = getAgentEntityQuery(agentName, org).apply(em);
567 if (existing == null) {
568 throw new NotFoundException();
569 }
570 em.remove(existing);
571 });
572 agentCache.invalidate(agentName.concat(DELIMITER).concat(org));
573 } catch (RollbackException e) {
574 logger.warn("Unable to commit to DB in deleteAgent.");
575 }
576 }
577
578
579
580
581
582
583
584
585 @Override
586 public String getName() {
587 return "org.opencastproject.capture.agent";
588 }
589
590 protected void setupAgentCache(int count, TimeUnit unit) {
591
592 RemovalListener<String, Object> removalListener = new RemovalListener<String, Object>() {
593 private Set<String> ignoredStates = new LinkedHashSet<>(Arrays.asList(AgentState.UNKNOWN, AgentState.OFFLINE));
594 @Override
595 public void onRemoval(RemovalNotification<String, Object> removal) {
596 if (RemovalCause.EXPIRED.equals(removal.getCause())) {
597 String org = securityService.getOrganization().getId();
598 try {
599 String agentName = removal.getKey().split(DELIMITER)[0];
600 AgentImpl agent = getAgent(agentName, org);
601 if (!ignoredStates.contains(agent.getState())) {
602 agent.setState(AgentState.OFFLINE);
603 updateAgentInDatabase(agent, false, 2);
604 }
605 } catch (NotFoundException e) {
606
607
608 }
609 }
610 }
611 };
612 agentCache = CacheBuilder.newBuilder().expireAfterWrite(count, unit).removalListener(removalListener)
613 .build(new CacheLoader<String, Object>() {
614 @Override
615 public Object load(String id) {
616 String[] key = id.split(DELIMITER);
617 AgentImpl agent;
618 try {
619 agent = getAgent(key[0], key[1]);
620 } catch (NotFoundException e) {
621 return nullToken;
622 }
623 return Tuple3.tuple3(agent.getState(), agent.getConfiguration(), agent.getLastHeardFrom());
624 }
625 });
626 }
627
628
629
630
631
632
633 @Override
634 public void updated(String pid, Dictionary<String, ?> properties) throws ConfigurationException {
635
636 String nameConfig = (String) properties.get("id");
637 if (isBlank(nameConfig)) {
638 throw new ConfigurationException("id", "must be specified");
639 }
640
641 nameConfig = nameConfig.trim();
642
643 String urlConfig = (String) properties.get("url");
644 if (isBlank(urlConfig)) {
645 throw new ConfigurationException("url", "must be specified");
646 }
647 urlConfig = urlConfig.trim();
648
649 String orgConfig = (String) properties.get("organization");
650 if (isBlank(orgConfig)) {
651 throw new ConfigurationException("organization", "must be specified");
652 }
653 orgConfig = orgConfig.trim();
654
655 String schedulerRolesConfig = (String) properties.get("schedulerRoles");
656 if (isBlank(schedulerRolesConfig)) {
657 throw new ConfigurationException("schedulerRoles", "must be specified");
658 }
659 String[] schedulerRoles = schedulerRolesConfig.trim().split(",");
660
661
662 if (!pidMap.containsKey(pid)) {
663 pidMap.put(pid, nameConfig);
664 }
665
666 AgentImpl agent;
667 try {
668 agent = getAgent(nameConfig, orgConfig);
669 agent.setUrl(urlConfig);
670 agent.setState(UNKNOWN);
671 } catch (NotFoundException e) {
672 agent = new AgentImpl(nameConfig, orgConfig, UNKNOWN, urlConfig, new Properties());
673 }
674
675 for (String role : schedulerRoles) {
676 agent.schedulerRoles.add(role.trim());
677 }
678
679
680 logger.info("Roles '{}' may schedule '{}'", schedulerRolesConfig, agent.name);
681 updateAgentInDatabase(agent);
682 }
683
684
685
686
687
688
689 @Override
690 public void deleted(String pid) {
691 String agentId = pidMap.remove(pid);
692 if (agentId == null) {
693 logger.warn("{} was not a managed capture agent pid", pid);
694 } else {
695 try {
696 deleteAgentFromDatabase(agentId);
697 } catch (NotFoundException e) {
698 logger.warn("Unable to delete capture agent '{}'", agentId);
699 }
700 }
701 }
702 }