1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.liveschedule.message;
22
23 import static org.opencastproject.scheduler.api.SchedulerService.WORKFLOW_CONFIG_PREFIX;
24
25 import org.opencastproject.liveschedule.api.LiveScheduleService;
26 import org.opencastproject.message.broker.api.scheduler.SchedulerItem;
27 import org.opencastproject.message.broker.api.update.SchedulerUpdateHandler;
28 import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
29 import org.opencastproject.scheduler.api.RecordingState;
30 import org.opencastproject.scheduler.api.SchedulerException;
31 import org.opencastproject.scheduler.api.SchedulerService;
32 import org.opencastproject.security.api.UnauthorizedException;
33 import org.opencastproject.util.NotFoundException;
34
35 import org.apache.commons.lang3.BooleanUtils;
36 import org.osgi.service.component.ComponentContext;
37 import org.osgi.service.component.annotations.Activate;
38 import org.osgi.service.component.annotations.Component;
39 import org.osgi.service.component.annotations.Reference;
40 import org.osgi.service.component.annotations.ReferenceCardinality;
41 import org.osgi.service.component.annotations.ReferencePolicy;
42 import org.osgi.service.component.annotations.ReferencePolicyOption;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 import java.util.Dictionary;
47 import java.util.Map;
48 import java.util.Objects;
49
50 @Component(
51 service = { UpdateHandler.class, SchedulerUpdateHandler.class },
52 property = {
53 "service.description=Scheduler Update Listener for Live Schedule Service"
54 }
55 )
56 public class SchedulerEventUpdateHandler extends UpdateHandler implements SchedulerUpdateHandler {
57
58 private static final Logger logger = LoggerFactory.getLogger(SchedulerEventUpdateHandler.class);
59
60 private static final String DESTINATION_SCHEDULER = "SCHEDULER.Liveschedule";
61 private static final String DELETE_ON_CAPTURE_ERROR = "live.deleteOnCapureError";
62
63 protected volatile SchedulerService schedulerService;
64
65 private boolean deleteOnCaptureError = true;
66
67
68
69
70
71
72
73 @Activate
74 @Override
75 public void activate(ComponentContext cc) {
76 super.activate(cc);
77 Dictionary properties = cc.getProperties();
78 deleteOnCaptureError = BooleanUtils.toBoolean(Objects.toString(properties.get(DELETE_ON_CAPTURE_ERROR), "true"));
79 }
80
81 public void execute(final String mpId, final SchedulerItem schedulerItem) {
82 try {
83 logger.debug("Scheduler message handler START for mp {} event type {} in thread {}", mpId,
84 schedulerItem.getType(), Thread.currentThread().getId());
85
86 switch (schedulerItem.getType()) {
87 case UpdateCatalog:
88 if (isLive(mpId)) {
89 liveScheduleService.createOrUpdateLiveEvent(mpId, schedulerItem.getEvent());
90 }
91 break;
92 case UpdateAcl:
93 if (isLive(mpId)) {
94 liveScheduleService.updateLiveEventAcl(mpId, schedulerItem.getAcl());
95 }
96 break;
97 case UpdateProperties:
98
99 String publishLive = schedulerItem.getProperties().get(WORKFLOW_CONFIG_PREFIX.concat(PUBLISH_LIVE_PROPERTY));
100 if (publishLive == null) {
101
102 return;
103 } else if (BooleanUtils.toBoolean(publishLive)) {
104 DublinCoreCatalog episodeDC = schedulerService.getDublinCore(mpId);
105 liveScheduleService.createOrUpdateLiveEvent(mpId, episodeDC);
106 } else {
107 liveScheduleService.deleteLiveEvent(mpId);
108 }
109 break;
110 case Delete:
111 case DeleteRecordingStatus:
112
113
114 liveScheduleService.deleteLiveEvent(mpId);
115 break;
116 case UpdateAgentId:
117 case UpdateStart:
118 case UpdateEnd:
119 if (isLive(mpId)) {
120 DublinCoreCatalog episodeDC = schedulerService.getDublinCore(mpId);
121 liveScheduleService.createOrUpdateLiveEvent(mpId, episodeDC);
122 }
123 break;
124 case UpdateRecordingStatus:
125 String state = schedulerItem.getRecordingState();
126 if (RecordingState.CAPTURE_FINISHED.equals(state) || RecordingState.UPLOADING.equals(state)
127 || RecordingState.UPLOAD_ERROR.equals(state)
128 || (RecordingState.CAPTURE_ERROR.equals(state) && deleteOnCaptureError)) {
129 if (isLive(mpId)) {
130 liveScheduleService.deleteLiveEvent(mpId);
131 }
132 }
133 break;
134 case UpdatePresenters:
135 break;
136 default:
137 throw new IllegalArgumentException("Unhandled type of SchedulerItem");
138 }
139 } catch (Exception e) {
140 logger.warn("Exception occurred for mp {}, event type {}", mpId, schedulerItem.getType(), e);
141 } finally {
142 logger.debug("Scheduler message handler END for mp {} event type {} in thread {}", mpId, schedulerItem.getType(),
143 Thread.currentThread().getId());
144 }
145 }
146
147 protected boolean isLive(String mpId) {
148 try {
149 Map<String, String> config = schedulerService.getWorkflowConfig(mpId);
150 return BooleanUtils.toBoolean((String) config.get(PUBLISH_LIVE_PROPERTY));
151 } catch (UnauthorizedException | NotFoundException | SchedulerException e) {
152 logger.debug("Could not get workflow configuration for mp {}. This is probably ok.", mpId);
153 return false;
154 }
155 }
156
157
158 @Reference(
159 cardinality = ReferenceCardinality.OPTIONAL,
160 policy = ReferencePolicy.DYNAMIC,
161 policyOption = ReferencePolicyOption.GREEDY
162 )
163 public void setSchedulerService(SchedulerService service) {
164 this.schedulerService = service;
165 }
166
167 public void unsetSchedulerService(SchedulerService service) {
168 if (this.schedulerService == service) {
169 this.schedulerService = null;
170 }
171 }
172
173 @Reference
174 @Override
175 public void setLiveScheduleService(LiveScheduleService liveScheduleService) {
176 super.setLiveScheduleService(liveScheduleService);
177 }
178
179
180 }