dazza-codes / aio-aws

Asyncio utils for AWS Services
Apache License 2.0
3 stars 1 forks source link

Add async app-server to submit/monitor jobs #5

Open dazza-codes opened 4 years ago

dazza-codes commented 4 years ago

Consider the pattern used by https://github.com/AdRoll/batchiepatchie to run a backend server for submitting and monitoring jobs. The current patterns for CLI scripting can tie up the user without any option to continue interactions.

See also

dazza-codes commented 3 years ago

Consider using arq in a local docker deployment with redis

# This is inspired by https://github.com/jaredvasquez/rq-docker
# - there is no explicit LICENSE on that project

# See this SO about sharing AWS credentials with a docker container
# https://stackoverflow.com/questions/36354423/which-is-the-best-way-to-pass-aws-credentials-to-docker-container

version: '3.7'
services:
  rq-server:
    image: redis:alpine
    ports:
      - 6379:6379

  #  # rq-dashboard does not work with arq
  #  # https://github.com/samuelcolvin/arq/issues/219
  #  rq-dashboard:
  #    image: aio-aws-arq
  #    depends_on:
  #      - "rq-server"
  #    command: rq-dashboard -H rq-server
  #    ports:
  #      - 9181:9181

  arq-worker:
    image: aio-aws-arq
    depends_on:
      - "rq-server"
    command: arq aio_aws.aio_aws_arq.WorkerSettings
    environment:
      - REDIS_DSN=redis://rq-server:6379
      - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
      - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
      - AWS_DEFAULT_REGION=${AWS_DEFAULT_REGION}
  #    deploy:
  #      replicas: 3

import json
import os
import time
from collections import deque
from random import random
from typing import Dict
from typing import List

from aio_aws.aio_aws_config import AioAWSConfig
from aio_aws.aio_aws_config import response_success
from aio_aws.aio_aws_lambda import AWSLambdaFunction
from arq.jobs import JobStatus

LOGGER = logging.getLogger(__name__)

try:
    from arq import create_pool
    from arq.connections import RedisSettings

    LOGGER.debug("Optional arq module is available")

except ModuleNotFoundError as err:
    LOGGER.error("Optional arq package is not available")
    raise err

# See https://arq-docs.helpmanual.io/

# See queue names in docker-arq/docker-compose.yml

def get_redis_settings() -> RedisSettings:
    redis_dsn = os.getenv("REDIS_DSN")
    LOGGER.info("redis_dsn: %s", redis_dsn)
    if redis_dsn:
        return RedisSettings.from_dsn(redis_dsn)
    else:
        return RedisSettings()

async def startup(ctx):
    aws_region = os.getenv("AWS_DEFAULT_REGION", "us-east-1")
    aio_config = AioAWSConfig(
        aws_region=aws_region,
        max_pool_connections=80,  # a large connection pool
        min_jitter=0.2,
        max_jitter=0.8,
        retries=1,
    )
    ctx["aio_config"] = aio_config

    ctx["lambda_func_name"] = "aioAwsQuery"

async def shutdown(ctx):
    # await ctx['session'].close()
    return

async def aio_aws_query(ctx: Dict) -> List[str]:

    lambda_func_name = ctx["lambda_func_name"]

    # Create a Lambda event payload
    event = {
        "process": "aio_aws_lambda",
        "query": "s3://bucket/query_key",
    }
    LOGGER.info("Submit query: %s", event)

    payload = json.dumps(event).encode()
    func = AWSLambdaFunction(name=lambda_func_name, payload=payload)

    await func.invoke()

    reports = []
    if response_success(func.response):
        # the lambda response is a list
        reports.extend(func.content)

    return reports

async def arq_task(tasks, wait: bool = False) -> List[str]:
    """

    :param tasks: ARQ tasks
    :param wait: wait for the queued jobs to complete or not
    :return: if waiting, gather results
    """

    redis = await create_pool(get_redis_settings())

    jobs = deque()

    for task in tasks:
        job = await redis.enqueue_job(task)
        if job:
            jobs.append(job)

    if not jobs:
        LOGGER.warning("Failed to enqueue any new jobs")

    reports = []

    if not wait:
        LOGGER.info("Not waiting, see rq-dashboard for jobs")
        return reports

    while jobs:
        # polling jobs for complete jobs, with random sleep
        job = jobs.popleft()

        # queued, started, deferred, finished, and failed
        status = await job.status()
        LOGGER.info("Job %s: %s", job.job_id, status)

        if status in [JobStatus.deferred, JobStatus.queued, JobStatus.in_progress]:
            jobs.append(job)
            time.sleep(5 + random())  # maybe not required?
        elif status == JobStatus.not_found:
            LOGGER.warning("Discarding job not found %s", job.job_id)
        elif status == JobStatus.complete:
            result = await job.result(timeout=5)
            LOGGER.info("Report %s", result)
            reports.extend(result)
        else:
            LOGGER.warning("Unknown job status %s", status)

    return reports

# WorkerSettings defines the settings to use when creating the work,
# it's used by the arq cli, see options in docs at
# https://arq-docs.helpmanual.io/#arq.worker.Worker
class WorkerSettings:
    functions = [aio_aws_query]
    on_startup = startup
    on_shutdown = shutdown
    redis_settings = get_redis_settings()
    queue_name = "aio-aws"