Closed homeyjd closed 5 years ago
Here's a simple first-version (untested). I added signal-handling for config reloads or graceful (and fast) shutdown in a way that doesn't have to interrupt existing API calls.
Thoughts?
EDIT 1: Updated script to minimally work
#!/usr/bin/env python
import json, logging, os, signal, sys, time
import botocore.session
from botocore.exceptions import ValidationError
logging.basicConfig()
logger = logging.getLogger('lifecycle_poller')
ecsClient = None
sqsClient = None
asgClient = None
isRunning = True
CONFIG = {
# region, cluster, queueUrl,
'waitTime': 25,
'maxWaitInSeconds': 60
}
def initConfig():
global CONFIG
try:
conf = json.load(open(os.path.dirname(os.path.abspath(__file__)) +'/worker.json'))
if conf:
for k in conf:
if conf[k]:
CONFIG[k] = conf[k]
except IOError as err:
logger.warning("Failed to read config.json: %s" % str(err))
logger.info('CONFIG: %s' % json.dumps(CONFIG))
session = botocore.session.get_session()
kwargs = {}
if 'region' in CONFIG:
kwargs['region_name'] = CONFIG['region']
global ecsClient, sqsClient, asgClient
ecsClient = session.create_client('ecs', **kwargs)
sqsClient = session.create_client('sqs', **kwargs)
asgClient = session.create_client('autoscaling', **kwargs)
def getContainerInstanceId(ec2InstanceId):
res = ecsClient.list_container_instances(
cluster=CONFIG['cluster'],
filter=('ec2InstanceId == %s' % ec2InstanceId))
if len(res.containerInstanceArns) < 1:
return None
return res.containerInstanceArns[0].split("/")[-1]
def drainAndWait(containerInstanceId):
ecsClient.update_container_instances_state(
cluster=CONFIG['cluster'],
containerInstances=[containerInstanceId],
status="DRAINING")
startTime = time.time()
# Wait for half-time for first round
time.sleep(CONFIG['waitTime']/2)
while isRunning and time.time() < startTime + CONFIG['maxWaitInSeconds']:
res = ecsClient.list_tasks(
cluster=CONFIG['cluster'],
containerInstance=containerInstanceId)
cnt = len(res.taskArns)
if cnt == 0:
logger.info('[drainAndWait: %s] is now idle' % containerInstanceId)
return
logger.info('[drainAndWait: %s] has %i tasks' % (containerInstanceId, cnt))
yield # delegate to caller to extend the message visibility
time.sleep(CONFIG['waitTime'])
logger.warning('[drainAndWait: %s] TIMEOUT, wait time elapsed!' % containerInstanceId)
def completeLifecycleAction(token, hook, asg):
asgClient.complete_lifecycle_action(
lifecycle_hook_name=hook,
auto_scaling_group_name=asg,
lifecycle_action_token=token,
lifecycle_action_result='CONTINUE')
def handleMessage(message):
"""Process an individual message. A `yield` informs caller to extend message visibility."""
body = json.loads(message['Body'])
ec2InstanceId = body['EC2InstanceId']
if body['Event'] == 'autoscaling:TEST_NOTIFICATION':
logger.info('Message: TEST_NOTIFICATION')
return # Do not process
if body['LifecycleTransition'] != 'autoscaling:EC2_INSTANCE_TERMINATING':
logger.error('Message: unsupported lifecycle transition: %s' % body['LifecycleTransition'])
return # Do not process
logger.info('Message: lifecycle transition for EC2 instance %s' % ec2InstanceId)
containerInstanceId = getContainerInstanceId(ec2InstanceId)
if not containerInstanceId:
logger.info('no container instance found for EC2 instance %s' % ec2InstanceId)
return # Not relevant to this cluster
logger.info('lifecycle transition for container instance %s' % containerInstanceId)
# Wait for all containers to drain off the instance
for _ in drainAndWait(containerInstanceId):
yield # allow caller to extend message
try:
completeLifecycleAction(body['LifecycleActionToken'], body['LifecycleHookName'], body['AutoScalingGroupName'])
logger.info('Lifecycle action completed')
except ValidationError as err:
logger.warning('Lifecycle action failed validation: %s' % str(err))
def listen():
"""Start main listener thread."""
def signalHandler(signum, frame):
global isRunning
isRunning = False
logger.info('caught signal: %i' % signum)
signal.signal(signal.SIGINT, signalHandler)
signal.signal(signal.SIGTERM, signalHandler)
signal.signal(signal.SIGHUP, initConfig)
initConfig()
while isRunning:
response = sqsClient.receive_message(QueueUrl=CONFIG['queueUrl'], WaitTimeSeconds=20)
try:
for message in response.Messages:
for _ in handleMessage(message):
# Need more time!
client.change_message_visibility(
QueueUrl=CONFIG['queueUrl'],
ReceiptHandle=message['ReceiptHandle'],
VisibilityTimeout=30)
except Exception as e:
logger.error('Failed to process message: '+ str(e))
time.sleep(1) # layman's rate-limit
if __name__ == '__main__':
listen()
@andreaswittig is working on a Lambda based implementation at the moment. (not related to the ruby issue, we started with this earlier to move the logic out of the EC2 instance)
see #293
@michaelwittig Thank you @andreaswittig for sharing the Lambda impl! Certainly makes for a cleaner launch in EC2 cf-init
.
I've submitted a couple PR's related to the Lambda impl to fix a couple JS issues (#294) and make debugging easier (#295). Totally up to you guys, but seems like #296 would be the most-consistent path with how your other templates are structured.
TemplateID: ecs/cluster Region: any
Long-time lurker/user -- thank you for the great work in upgrading to Amzn Linux 2!
We were recently hit with this bug: https://github.com/aws/aws-sdk-ruby/issues/1994 I saw @michaelwittig was involved in that discussion -- thank you for staying on top of that. The bug prevented our ASG from spinning up new instances, essentially "locking" us into existing configuration. This is a particularly egregious problem. Thankfully nothing extraneous happened over the weekend!
I'm considering ways to mitigate this by-design.
The problem of an unknown deplist is only truly mitigated by baking your own pre-init'd AMI -- I acknowledge that. But sometimes aligning dependencies with existing toolchains can limit the deplist scope to the point where its reasonable to assume a managed service would contain a stable copy at all times.
The current AMI has installed Python 2.7.14. It is required by none other than the base amazon-linux-extras package and the pre-installed aws-cli toolkit.
Using a Python-based poller would leverage existing pre-installed code, removing the Ruby requirement, and possibly be more stable. As a side-benefit, it might also lighten the instance init process.
Have you considered a Python-based re-write of the Lifecycle Poller?