View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.terminationstate.aws;
22  
23  import org.opencastproject.serviceregistry.api.ServiceRegistry;
24  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
25  import org.opencastproject.terminationstate.api.AbstractJobTerminationStateService;
26  import org.opencastproject.terminationstate.api.TerminationStateService;
27  import org.opencastproject.util.NotFoundException;
28  import org.opencastproject.util.OsgiUtil;
29  import org.opencastproject.util.data.Option;
30  
31  import com.amazonaws.AmazonClientException;
32  import com.amazonaws.AmazonServiceException;
33  import com.amazonaws.auth.AWSCredentialsProvider;
34  import com.amazonaws.auth.AWSStaticCredentialsProvider;
35  import com.amazonaws.auth.BasicAWSCredentials;
36  import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
37  import com.amazonaws.services.autoscaling.AmazonAutoScaling;
38  import com.amazonaws.services.autoscaling.AmazonAutoScalingClientBuilder;
39  import com.amazonaws.services.autoscaling.model.AutoScalingGroup;
40  import com.amazonaws.services.autoscaling.model.AutoScalingInstanceDetails;
41  import com.amazonaws.services.autoscaling.model.CompleteLifecycleActionRequest;
42  import com.amazonaws.services.autoscaling.model.DescribeAutoScalingGroupsRequest;
43  import com.amazonaws.services.autoscaling.model.DescribeAutoScalingGroupsResult;
44  import com.amazonaws.services.autoscaling.model.DescribeAutoScalingInstancesRequest;
45  import com.amazonaws.services.autoscaling.model.DescribeAutoScalingInstancesResult;
46  import com.amazonaws.services.autoscaling.model.DescribeLifecycleHooksRequest;
47  import com.amazonaws.services.autoscaling.model.DescribeLifecycleHooksResult;
48  import com.amazonaws.services.autoscaling.model.LifecycleHook;
49  import com.amazonaws.services.autoscaling.model.LifecycleState;
50  import com.amazonaws.services.autoscaling.model.RecordLifecycleActionHeartbeatRequest;
51  import com.amazonaws.util.EC2MetadataUtils;
52  
53  import org.osgi.service.cm.ConfigurationException;
54  import org.osgi.service.component.ComponentContext;
55  import org.osgi.service.component.annotations.Activate;
56  import org.osgi.service.component.annotations.Component;
57  import org.osgi.service.component.annotations.Deactivate;
58  import org.osgi.service.component.annotations.Reference;
59  import org.quartz.Job;
60  import org.quartz.JobDetail;
61  import org.quartz.JobExecutionContext;
62  import org.quartz.JobExecutionException;
63  import org.quartz.Scheduler;
64  import org.quartz.SchedulerException;
65  import org.quartz.Trigger;
66  import org.quartz.TriggerUtils;
67  import org.quartz.impl.StdSchedulerFactory;
68  import org.slf4j.Logger;
69  import org.slf4j.LoggerFactory;
70  
71  import java.util.Dictionary;
72  import java.util.List;
73  
74  @Component(
75      immediate = true,
76      service = TerminationStateService.class,
77      property = {
78          "service.description=Termination State Service: AWS Auto Scaling",
79          "service.pid=org.opencastproject.terminationstate.aws.AutoScalingTerminationStateService",
80          "vendor.name=aws",
81          "vendor.service=autoscaling"
82      }
83  )
84  public final class AutoScalingTerminationStateService extends AbstractJobTerminationStateService {
85    private static final Logger logger = LoggerFactory.getLogger(AutoScalingTerminationStateService.class);
86  
87    // AWS String Constants
88    private static final String AUTOSCALING_INSTANCE_TERMINATING = "autoscaling:EC2_INSTANCE_TERMINATING";
89  
90    public static final String CONFIG_ENABLE = "enable";
91    public static final String CONFIG_LIFECYCLE_POLLING_ENABLE = "lifecycle.polling.enable";
92    public static final String CONFIG_LIFECYCLE_POLLING_PERIOD = "lifecycle.polling.period";
93    public static final String CONFIG_LIFECYCLE_HEARTBEAT_PERIOD = "lifecycle.heartbeat.period";
94    public static final String CONFIG_AWS_ACCESS_KEY_ID = "access.id";
95    public static final String CONFIG_AWS_SECRET_ACCESS_KEY = "access.secret";
96  
97    private static final boolean DEFAULT_ENABLE = false;
98    private static final boolean DEFAULT_LIFECYCLE_POLLING_ENABLE = true;
99    private static final int DEFAULT_LIFECYCLE_POLLING_PERIOD = 300; //secs
100   private static final int DEFAULT_LIFECYCLE_HEARTBEAT_PERIOD = 300; // secs
101 
102   protected static final String SCHEDULE_GROUP = AbstractJobTerminationStateService.class.getSimpleName();
103   protected static final String SCHEDULE_LIFECYCLE_POLLING_JOB = "PollLifeCycle";
104   protected static final String SCHEDULE_LIFECYCLE_HEARTBEAT_JOB = "PollTerminationState";
105   protected static final String SCHEDULE_LIFECYCLE_POLLING_TRIGGER = "TriggerPollLifeCycle";
106   protected static final String SCHEDULE_LIFECYCLE_HEARTBEAT_TRIGGER = "TriggerHeartbeat";
107   protected static final String SCHEDULE_JOB_PARAM_PARENT = "parent";
108 
109   private String instanceId;
110   private AWSCredentialsProvider credentials;
111   private AmazonAutoScaling autoScaling;
112   private AutoScalingGroup autoScalingGroup;
113   private LifecycleHook lifeCycleHook;
114 
115   private Scheduler scheduler;
116 
117   // This service must be explicitly enabled
118   private boolean enabled = DEFAULT_ENABLE;
119   private boolean lifecyclePolling = DEFAULT_LIFECYCLE_POLLING_ENABLE;
120   private int lifecyclePollingPeriod = DEFAULT_LIFECYCLE_POLLING_PERIOD;
121   private int lifecycleHeartbeatPeriod = DEFAULT_LIFECYCLE_HEARTBEAT_PERIOD;
122   private Option<String> accessKeyIdOpt = Option.none();
123   private Option<String> accessKeySecretOpt = Option.none();
124 
125   @Activate
126   protected void activate(ComponentContext componentContext) {
127     try {
128       configure(componentContext.getProperties());
129     } catch (ConfigurationException e) {
130       logger.error("Unable to read configuration, using defaults", e);
131     }
132 
133     if (!enabled) {
134       logger.info("Service is disabled by configuration");
135       return;
136     }
137 
138     // make sure host is not in maintenance due to previous termination handling
139     try {
140       String host = getServiceRegistry().getRegistryHostname();
141       getServiceRegistry().setMaintenanceStatus(host, false);
142     } catch (ServiceRegistryException | NotFoundException e) {
143       logger.error("Cannot take this host out of maintenance", e);
144     }
145 
146     if (accessKeyIdOpt.isNone() && accessKeySecretOpt.isNone()) {
147       credentials = new DefaultAWSCredentialsProviderChain();
148     } else {
149       credentials = new AWSStaticCredentialsProvider(
150               new BasicAWSCredentials(accessKeyIdOpt.get(), accessKeySecretOpt.get()));
151     }
152 
153     instanceId = EC2MetadataUtils.getInstanceId();
154     logger.debug("Instance Id is {}", instanceId);
155 
156     if (instanceId == null) {
157       logger.error("Unable to contact AWS metadata endpoint, Is this node running in AWS EC2?");
158       return;
159     }
160 
161     try {
162       autoScaling = AmazonAutoScalingClientBuilder.standard()
163               .withRegion(EC2MetadataUtils.getEC2InstanceRegion())
164               .withCredentials(credentials).build();
165       logger.debug("Created AutoScalingClient {}", autoScaling.toString());
166 
167       String autoScalingGroupName = getAutoScalingGroupName();
168       logger.debug("Auto scaling group name : {}", autoScalingGroupName);
169 
170       if (autoScalingGroupName == null) {
171         logger.error("AWS Instance {} is not part of an auto scaling group. Polling will be disabled", instanceId);
172         stop();
173         return;
174       }
175 
176       autoScalingGroup = getAutoScalingGroup(autoScalingGroupName);
177 
178       if (autoScalingGroup == null) {
179         logger.error("Unable to get Auto Scaling Group {}. Polling will be disabled", autoScalingGroupName);
180         stop();
181         return;
182       }
183 
184       lifeCycleHook = getLifecycleHook(autoScalingGroupName);
185 
186       if (lifeCycleHook == null) {
187         logger.error("Auto scaling group {} does not have a termination stage hook. Polling will be disabled",
188                 autoScalingGroupName);
189         stop();
190         return;
191       } else if (lifecycleHeartbeatPeriod > lifeCycleHook.getHeartbeatTimeout()) {
192         logger.warn(
193             "Lifecycle Heartbeat Period {} is greater than LifecycleHook's HeartbeatTimeout {}, "
194                 + "see https://docs.aws.amazon.com/autoscaling/ec2/userguide/lifecycle-hooks.html",
195             lifecycleHeartbeatPeriod,
196             lifeCycleHook.getHeartbeatTimeout()
197         );
198       }
199     } catch (AmazonServiceException e) {
200       logger.error("EC2 Autoscaling returned an error", e);
201       stop();
202       return;
203     } catch (AmazonClientException e) {
204       logger.error("AWS client can't communicate with EC2 Autoscaling", e);
205       stop();
206       return;
207     }
208 
209     try {
210       scheduler = new StdSchedulerFactory().getScheduler();
211     } catch (SchedulerException e) {
212       logger.error("Cannot create quartz scheduler", e);
213     }
214 
215     if (lifecyclePolling && lifecyclePollingPeriod > 0) {
216       startPollingLifeCycleHook();
217     }
218   }
219 
220   private String getAutoScalingGroupName() {
221     DescribeAutoScalingInstancesRequest request = new DescribeAutoScalingInstancesRequest().withInstanceIds(instanceId);
222     DescribeAutoScalingInstancesResult result = autoScaling.describeAutoScalingInstances(request);
223     List<AutoScalingInstanceDetails> instances = result.getAutoScalingInstances();
224     logger.debug("Found {} autoscaling instances", instances.size());
225 
226     if (!instances.isEmpty()) {
227       AutoScalingInstanceDetails autoScalingInstance = instances.get(0);
228       return autoScalingInstance.getAutoScalingGroupName();
229     }
230     return null;
231   }
232 
233   private AutoScalingGroup getAutoScalingGroup(String autoScalingGroupName) {
234     DescribeAutoScalingGroupsRequest request = new DescribeAutoScalingGroupsRequest()
235             .withAutoScalingGroupNames(autoScalingGroupName);
236     DescribeAutoScalingGroupsResult result = autoScaling.describeAutoScalingGroups(request);
237 
238     List<AutoScalingGroup> groups = result.getAutoScalingGroups();
239 
240     if (!groups.isEmpty()) {
241       AutoScalingGroup group = groups.get(0);
242       return group;
243     }
244 
245     return null;
246   }
247 
248   private LifecycleHook getLifecycleHook(String autoScalingGroupName) {
249     DescribeLifecycleHooksRequest request = new DescribeLifecycleHooksRequest()
250             .withAutoScalingGroupName(autoScalingGroupName);
251     DescribeLifecycleHooksResult result = autoScaling.describeLifecycleHooks(request);
252 
253     for (LifecycleHook hook : result.getLifecycleHooks()) {
254       if (AUTOSCALING_INSTANCE_TERMINATING.equalsIgnoreCase(hook.getLifecycleTransition())) {
255         return hook;
256       }
257     }
258 
259     return null;
260   }
261 
262   protected void configure(Dictionary config) throws ConfigurationException {
263     this.enabled = OsgiUtil.getOptCfgAsBoolean(config, CONFIG_ENABLE).getOrElse(DEFAULT_ENABLE);
264     this.lifecyclePolling = OsgiUtil.getOptCfgAsBoolean(config, CONFIG_LIFECYCLE_POLLING_ENABLE)
265         .getOrElse(DEFAULT_LIFECYCLE_POLLING_ENABLE);
266     this.lifecyclePollingPeriod = OsgiUtil.getOptCfgAsInt(config, CONFIG_LIFECYCLE_POLLING_PERIOD)
267         .getOrElse(DEFAULT_LIFECYCLE_POLLING_PERIOD);
268     this.lifecycleHeartbeatPeriod = OsgiUtil.getOptCfgAsInt(config, CONFIG_LIFECYCLE_HEARTBEAT_PERIOD)
269         .getOrElse(DEFAULT_LIFECYCLE_HEARTBEAT_PERIOD);
270     this.accessKeyIdOpt = OsgiUtil.getOptCfg(config, CONFIG_AWS_ACCESS_KEY_ID);
271     this.accessKeySecretOpt = OsgiUtil.getOptCfg(config, CONFIG_AWS_SECRET_ACCESS_KEY);
272   }
273 
274   @Override
275   public void setState(TerminationState state) {
276     if (enabled && autoScaling != null) {
277       super.setState(state);
278 
279       if (getState() != TerminationState.NONE) {
280         // As this might also be called via Endpoint terminate polling if required
281         if (lifecyclePolling) {
282           stopPollingLifeCycleHook();
283         }
284 
285         // stop accepting new jobs
286         try {
287           String host = getServiceRegistry().getRegistryHostname();
288           getServiceRegistry().setMaintenanceStatus(host, true);
289         } catch (ServiceRegistryException | NotFoundException e) {
290           logger.error("Cannot put this host into maintenance", e);
291         }
292         startPollingTerminationState();
293       }
294     }
295   }
296 
297   protected void startPollingLifeCycleHook() {
298     try {
299       // create and set the job. To actually run it call schedule(..)
300       final JobDetail job = new JobDetail(SCHEDULE_GROUP, SCHEDULE_LIFECYCLE_POLLING_JOB, CheckLifeCycleState.class);
301       job.getJobDataMap().put(SCHEDULE_JOB_PARAM_PARENT, this);
302       final Trigger trigger = TriggerUtils.makeSecondlyTrigger(lifecyclePollingPeriod);
303       trigger.setGroup(SCHEDULE_GROUP);
304       trigger.setName(SCHEDULE_LIFECYCLE_POLLING_TRIGGER);
305       scheduler.scheduleJob(job, trigger);
306       scheduler.start();
307       logger.info("Started polling for Lifecycle state change");
308     } catch (org.quartz.SchedulerException e) {
309       throw new RuntimeException(e);
310     }
311   }
312 
313   protected void stopPollingLifeCycleHook() {
314     try {
315       scheduler.deleteJob(SCHEDULE_GROUP, SCHEDULE_LIFECYCLE_POLLING_JOB);
316     } catch (SchedulerException e) {
317       // ignore
318     }
319   }
320 
321   public static class CheckLifeCycleState implements Job {
322 
323     @Override
324     public void execute(JobExecutionContext context) throws JobExecutionException {
325       AutoScalingTerminationStateService parent
326           = (AutoScalingTerminationStateService) context.getJobDetail().getJobDataMap().get(SCHEDULE_JOB_PARAM_PARENT);
327       if (parent.autoScaling != null) {
328         DescribeAutoScalingInstancesRequest request
329             = new DescribeAutoScalingInstancesRequest().withInstanceIds(parent.instanceId);
330         DescribeAutoScalingInstancesResult result = parent.autoScaling.describeAutoScalingInstances(request);
331         List<AutoScalingInstanceDetails> instances = result.getAutoScalingInstances();
332 
333         if (!instances.isEmpty()) {
334           AutoScalingInstanceDetails autoScalingInstance = instances.get(0);
335 
336           if (LifecycleState.TerminatingWait.toString().equalsIgnoreCase(autoScalingInstance.getLifecycleState())) {
337             logger.info("Lifecycle state changed to Terminating:Wait");
338             parent.stopPollingLifeCycleHook();
339             parent.setState(TerminationState.WAIT);
340           } else {
341             logger.debug("Lifecycle state is {}", autoScalingInstance.getLifecycleState());
342           }
343         }
344       }
345     }
346   }
347 
348   protected void startPollingTerminationState() {
349     try {
350       // create and set the job. To actually run it call schedule(..)
351       final JobDetail job = new JobDetail(
352           SCHEDULE_GROUP, SCHEDULE_LIFECYCLE_HEARTBEAT_JOB, CheckTerminationState.class);
353       job.getJobDataMap().put(SCHEDULE_JOB_PARAM_PARENT, this);
354       final Trigger trigger = TriggerUtils.makeSecondlyTrigger(lifecycleHeartbeatPeriod);
355       trigger.setGroup(SCHEDULE_GROUP);
356       trigger.setName(SCHEDULE_LIFECYCLE_HEARTBEAT_TRIGGER);
357       scheduler.scheduleJob(job, trigger);
358       scheduler.start();
359       logger.info("Started emitting heartbeat until jobs are complete");
360     } catch (org.quartz.SchedulerException e) {
361       throw new RuntimeException(e);
362     }
363   }
364 
365   protected void stopPollingTerminationState() {
366     try {
367       scheduler.deleteJob(SCHEDULE_GROUP, SCHEDULE_LIFECYCLE_HEARTBEAT_JOB);
368     } catch (SchedulerException e) {
369       // ignore
370     }
371   }
372 
373   public static class CheckTerminationState implements Job {
374 
375     @Override
376     public void execute(JobExecutionContext context) throws JobExecutionException {
377       AutoScalingTerminationStateService parent
378           = (AutoScalingTerminationStateService) context.getJobDetail().getJobDataMap().get(SCHEDULE_JOB_PARAM_PARENT);
379 
380       if (parent.readyToTerminate()) {
381         // signal AWS node is ready to terminate
382         logger.debug("No jobs running, trying to complete Lifecycle action");
383         if (parent.autoScaling != null) {
384           CompleteLifecycleActionRequest request = new CompleteLifecycleActionRequest()
385                   .withLifecycleActionResult("CONTINUE")
386                   .withAutoScalingGroupName(parent.autoScalingGroup.getAutoScalingGroupName())
387                   .withLifecycleHookName(parent.lifeCycleHook.getLifecycleHookName())
388                   .withInstanceId(parent.instanceId);
389           parent.autoScaling.completeLifecycleAction(request);
390           logger.info("No jobs running, sent complete Lifecycle action");
391         }
392 
393         // stop monitoring
394         parent.stopPollingTerminationState();
395       } else if (parent.getState() == TerminationState.WAIT) {
396         // emit heart beat
397         logger.debug("Jobs still running, trying to send Lifecycle heartbeat");
398         if (parent.autoScaling != null) {
399           RecordLifecycleActionHeartbeatRequest request = new RecordLifecycleActionHeartbeatRequest()
400                   .withAutoScalingGroupName(parent.autoScalingGroup.getAutoScalingGroupName())
401                   .withLifecycleHookName(parent.lifeCycleHook.getLifecycleHookName())
402                   .withInstanceId(parent.instanceId);
403           parent.autoScaling.recordLifecycleActionHeartbeat(request);
404           logger.info("Jobs still running, sent Lifecycle heartbeat");
405         }
406       }
407     }
408   }
409 
410   /**
411    * Stop scheduled jobs and free resources
412    */
413   private void stop() {
414     lifecyclePolling = false;
415     if (autoScaling != null) {
416       autoScaling.shutdown();
417       autoScaling = null;
418     }
419 
420     try {
421       if (scheduler != null) {
422         this.scheduler.shutdown();
423       }
424     } catch (SchedulerException e) {
425       logger.error("Failed to stop scheduler", e);
426     }
427   }
428 
429   /**
430    * OSGI deactivate callback
431    */
432   @Deactivate
433   public void deactivate() {
434     stop();
435   }
436 
437   /** Methods below are used by test class */
438 
439   protected void setAutoScaling(AmazonAutoScaling autoScaling) {
440     this.autoScaling = autoScaling;
441   }
442 
443   protected void setAutoScalingGroup(AutoScalingGroup autoScalingGroup) {
444     this.autoScalingGroup = autoScalingGroup;
445   }
446 
447   protected void setLifecycleHook(LifecycleHook lifecycleHook) {
448     this.lifeCycleHook = lifecycleHook;
449   }
450 
451   protected void setScheduler(Scheduler scheduler) {
452     this.scheduler = scheduler;
453   }
454 
455   @Reference
456   @Override
457   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
458     super.setServiceRegistry(serviceRegistry);
459   }
460 
461 }