seoyeong200 / Book-data-Pipeline

책 줄거리를 기반으로 비슷한 책을 추천해주는 서비스를 위한 데이터 파이프라인입니다. 도커 환경에서 파이프라인을 구성하여 스파크를 사용해 배치 데이터를 처리합니다.
1 stars 0 forks source link

데이터 수집 단계 refactor #2

Closed seoyeong200 closed 8 months ago

seoyeong200 commented 9 months ago
  1. 데이터 수집 단계 refactoring
seoyeong200 commented 9 months ago

aws lambda blueprints

basic structure

import json

print('Loading function')

def lambda_handler(event, context):
    #print("Received event: " + json.dumps(event, indent=2))
    print("value1 = " + event['key1'])
    print("value2 = " + event['key2'])
    print("value3 = " + event['key3'])
    return event['key1']  # Echo back the first key value
    #raise Exception('Something went wrong')

create a microservice that interacts with a DDB table

simple backend(read, write to DynamoDB) + restful API endpoint (Amazon API Gateway)

import boto3
import json

print('Loading function')
dynamo = boto3.client('dynamodb')

def respond(err, res=None):
    return {
        'statusCode': '400' if err else '200',
        'body': err.message if err else json.dumps(res),
        'headers': {
            'Content-Type': 'application/json',
        },
    }

def lambda_handler(event, context):
    '''Demonstrates a simple HTTP endpoint using API Gateway. You have full access to the request and response payload, including headers and status code.

    To scan a DynamoDB table, make a GET request with the TableName as a query string parameter. 
    To put, update, or delete an item, make a POST, PUT, or DELETE request respectively, passing in the payload to the DynamoDB API as a JSON body.
    '''
    #print("Received event: " + json.dumps(event, indent=2))

    operations = {
        'DELETE': lambda dynamo, x: dynamo.delete_item(**x),
        'GET': lambda dynamo, x: dynamo.scan(**x),
        'POST': lambda dynamo, x: dynamo.put_item(**x),
        'PUT': lambda dynamo, x: dynamo.update_item(**x),
    }

    operation = event['httpMethod']
    if operation in operations:
        payload = event['queryStringParameters'] if operation == 'GET' else json.loads(event['body'])
        return respond(None, operations[operation](dynamo, payload))
    else:
        return respond(ValueError('Unsupported method "{}"'.format(operation)))

batch job

submit an aws batch jon and returns the jobid

import json
import boto3

print('Loading function')

batch = boto3.client('batch')

def lambda_handler(event, context):
    # Log the received event
    print("Received event: " + json.dumps(event, indent=2))
    # Get parameters for the SubmitJob call
    # http://docs.aws.amazon.com/batch/latest/APIReference/API_SubmitJob.html
    jobName = event['jobName']
    jobQueue = event['jobQueue']
    jobDefinition = event['jobDefinition']
    # containerOverrides and parameters are optional
    if event.get('containerOverrides'):
        containerOverrides = event['containerOverrides']
    else:
        containerOverrides = {}
    if event.get('parameters'):
        parameters = event['parameters']
    else:
        parameters = {}

    try:
        # Submit a Batch Job
        response = batch.submit_job(jobQueue=jobQueue, jobName=jobName, jobDefinition=jobDefinition,
                                    containerOverrides=containerOverrides, parameters=parameters)
        # Log response from AWS Batch
        print("Response: " + json.dumps(response, indent=2))
        # Return the jobId
        jobId = response['jobId']
        return {
            'jobId': jobId
        }
    except Exception as e:
        print(e)
        message = 'Error submitting Batch Job'
        print(message)
        raise Exception(message)

returns the current status of an aws batch job

import json
import boto3

print('Loading function')

batch = boto3.client('batch')

def lambda_handler(event, context):
    print("Received event: " + json.dumps(event, indent=2)) # Log the received event
    jobId = event['jobId'] # Get jobId from the event

    try:
        # Call DescribeJobs
        response = batch.describe_jobs(jobs=[jobId])
        # Log response from AWS Batch
        print("Response: " + json.dumps(response, indent=2))
        # Return the jobStatus
        jobStatus = response['jobs'][0]['status']
        return jobStatus
    except Exception as e:
        print(e)
        message = 'Error getting Batch Job status'
        print(message)
        raise Exception(message)
seoyeong200 commented 9 months ago

Selenium in Lambda

available options

  1. on-premise(literally in my own labtop. local environment)

  2. EC2 - kinda on-premise but in the cloud environment

  3. ECS - Elastic Container Servcie.

    • independence
    • docker file
    • advantage to running it on aws ? integration with other services Screenshot 2024-01-03 at 1 27 17 PM Screenshot 2024-01-03 at 1 27 35 PM
  4. Lambda

    • monthly free tier
    • execution time = 15mis
    • need to be split the whole scraping task into subtasks, run them in parallel
      • or fall back to #3
    • lambda gives you access to python built-in functions by default,
    • or you can build your own package https://docs.aws.amazon.com/lambda/latest/dg/python-package.html
    • or use Lambda Layers

what is needed

seoyeong200 commented 8 months ago