View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.liveschedule.message;
22  
23  import static org.opencastproject.scheduler.api.SchedulerService.WORKFLOW_CONFIG_PREFIX;
24  
25  import org.opencastproject.liveschedule.api.LiveScheduleService;
26  import org.opencastproject.message.broker.api.scheduler.SchedulerItem;
27  import org.opencastproject.message.broker.api.update.SchedulerUpdateHandler;
28  import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
29  import org.opencastproject.scheduler.api.RecordingState;
30  import org.opencastproject.scheduler.api.SchedulerException;
31  import org.opencastproject.scheduler.api.SchedulerService;
32  import org.opencastproject.security.api.UnauthorizedException;
33  import org.opencastproject.util.NotFoundException;
34  
35  import org.apache.commons.lang3.BooleanUtils;
36  import org.osgi.service.component.ComponentContext;
37  import org.osgi.service.component.annotations.Activate;
38  import org.osgi.service.component.annotations.Component;
39  import org.osgi.service.component.annotations.Reference;
40  import org.osgi.service.component.annotations.ReferenceCardinality;
41  import org.osgi.service.component.annotations.ReferencePolicy;
42  import org.osgi.service.component.annotations.ReferencePolicyOption;
43  import org.slf4j.Logger;
44  import org.slf4j.LoggerFactory;
45  
46  import java.util.Dictionary;
47  import java.util.Map;
48  import java.util.Objects;
49  
50  @Component(
51      service = { UpdateHandler.class, SchedulerUpdateHandler.class },
52      property = {
53          "service.description=Scheduler Update Listener for Live Schedule Service"
54      }
55  )
56  public class SchedulerEventUpdateHandler extends UpdateHandler implements SchedulerUpdateHandler {
57  
58    private static final Logger logger = LoggerFactory.getLogger(SchedulerEventUpdateHandler.class);
59  
60    private static final String DESTINATION_SCHEDULER = "SCHEDULER.Liveschedule";
61    private static final String DELETE_ON_CAPTURE_ERROR = "live.deleteOnCapureError";
62  
63    protected volatile SchedulerService schedulerService;
64  
65    private boolean deleteOnCaptureError = true;
66  
67    /**
68     * OSGi callback on component activation.
69     *
70     * @param cc
71     *          the component context
72     */
73    @Activate
74    @Override
75    public void activate(ComponentContext cc) {
76      super.activate(cc);
77      Dictionary properties = cc.getProperties();
78      deleteOnCaptureError = BooleanUtils.toBoolean(Objects.toString(properties.get(DELETE_ON_CAPTURE_ERROR), "true"));
79    }
80  
81    public void execute(final String mpId, final SchedulerItem schedulerItem) {
82      try {
83        logger.debug("Scheduler message handler START for mp {} event type {} in thread {}", mpId,
84                schedulerItem.getType(), Thread.currentThread().getId());
85  
86        switch (schedulerItem.getType()) {
87          case UpdateCatalog:
88            if (isLive(mpId)) {
89              liveScheduleService.createOrUpdateLiveEvent(mpId, schedulerItem.getEvent());
90            }
91            break;
92          case UpdateAcl:
93            if (isLive(mpId)) {
94              liveScheduleService.updateLiveEventAcl(mpId, schedulerItem.getAcl());
95            }
96            break;
97          case UpdateProperties:
98            // Workflow properties may have been updated (publishLive configuration)
99            String publishLive = schedulerItem.getProperties().get(WORKFLOW_CONFIG_PREFIX.concat(PUBLISH_LIVE_PROPERTY));
100           if (publishLive == null) {
101             // Not specified so we do nothing. We don't want to delete if we got incomplete props.
102             return;
103           } else if (BooleanUtils.toBoolean(publishLive)) {
104             DublinCoreCatalog episodeDC = schedulerService.getDublinCore(mpId);
105             liveScheduleService.createOrUpdateLiveEvent(mpId, episodeDC);
106           } else {
107             liveScheduleService.deleteLiveEvent(mpId);
108           }
109           break;
110         case Delete:
111         case DeleteRecordingStatus:
112           // We can't check workflow config here to determine if the event is live because the
113           // event has already been deleted. The live scheduler service will do that.
114           liveScheduleService.deleteLiveEvent(mpId);
115           break;
116         case UpdateAgentId:
117         case UpdateStart:
118         case UpdateEnd:
119           if (isLive(mpId)) {
120             DublinCoreCatalog episodeDC = schedulerService.getDublinCore(mpId);
121             liveScheduleService.createOrUpdateLiveEvent(mpId, episodeDC);
122           }
123           break;
124         case UpdateRecordingStatus:
125           String state = schedulerItem.getRecordingState();
126           if (RecordingState.CAPTURE_FINISHED.equals(state) || RecordingState.UPLOADING.equals(state)
127                   || RecordingState.UPLOAD_ERROR.equals(state)
128                   || (RecordingState.CAPTURE_ERROR.equals(state) && deleteOnCaptureError)) {
129             if (isLive(mpId)) {
130               liveScheduleService.deleteLiveEvent(mpId);
131             }
132           }
133           break;
134         case UpdatePresenters:
135           break;
136         default:
137           throw new IllegalArgumentException("Unhandled type of SchedulerItem");
138       }
139     } catch (Exception e) {
140       logger.warn("Exception occurred for mp {}, event type {}", mpId, schedulerItem.getType(), e);
141     } finally {
142       logger.debug("Scheduler message handler END for mp {} event type {} in thread {}", mpId, schedulerItem.getType(),
143               Thread.currentThread().getId());
144     }
145   }
146 
147   protected boolean isLive(String mpId) {
148     try {
149       Map<String, String> config = schedulerService.getWorkflowConfig(mpId);
150       return BooleanUtils.toBoolean((String) config.get(PUBLISH_LIVE_PROPERTY));
151     } catch (UnauthorizedException | NotFoundException | SchedulerException e) {
152       logger.debug("Could not get workflow configuration for mp {}. This is probably ok.", mpId);
153       return false; // Assume non-live
154     }
155   }
156 
157   // === Set by OSGI begin
158   @Reference(
159       cardinality = ReferenceCardinality.OPTIONAL,
160       policy = ReferencePolicy.DYNAMIC,
161       policyOption = ReferencePolicyOption.GREEDY
162   )
163   public void setSchedulerService(SchedulerService service) {
164     this.schedulerService = service;
165   }
166 
167   public void unsetSchedulerService(SchedulerService service) {
168     if (this.schedulerService == service) {
169       this.schedulerService = null;
170     }
171   }
172 
173   @Reference
174   @Override
175   public void setLiveScheduleService(LiveScheduleService liveScheduleService) {
176     super.setLiveScheduleService(liveScheduleService);
177   }
178   // === Set by OSGI end
179 
180 }