jean553 / aiohttp-elasticsearch-s3-logs-handler

Aiohttp micro-service that stores/retrieves logs data using ElasticSearch and S3
0 stars 1 forks source link

add some comments at the top of every handler explaning what the handler does #251

Open jean553 opened 1 day ago

jean553 commented 1 day ago

comments should be put directly into main handlers functions, with pydoc

dev-intern-ai commented 1 day ago

I am currently looking into your issue and will get back to you shortly.

dev-intern-ai commented 1 day ago

Certainly. I'll add pydoc comments to the main handler functions in both files to explain what each handler does. Here's the git diff with the changes:

diff --git a/logs/get_logs_handler.py b/logs/get_logs_handler.py
index 5f8afcb..5eb7a85 100644
--- a/logs/get_logs_handler.py
+++ b/logs/get_logs_handler.py
@@ -268,7 +268,13 @@ async def get_logs(
     request: web.Request,
     es_client: Elasticsearch,
 ):
-    '''
+    """
+    Handler for GET /logs requests.
+    
+    Retrieves and streams logs for a specific service within a given date range.
+    Logs are fetched from Elasticsearch and S3, then streamed to the client
+    as a JSON response.
+    
     Sends back logs according to the given dates range and service.
-    '''
+    """
     service_id = request.match_info.get('id')
     start_date = request.match_info.get('start')
     end_date = request.match_info.get('end')
diff --git a/logs/post_logs_handler.py b/logs/post_logs_handler.py
index 6f0febe..f6d49c9 100644
--- a/logs/post_logs_handler.py
+++ b/logs/post_logs_handler.py
@@ -13,9 +13,12 @@ async def post_logs(
     request: web.Request,
     es_client: Elasticsearch,
 ):
-    '''
-    Save sent logs into ElasticSearch.
-    '''
+    """
+    Handler for POST /logs requests.
+    
+    Receives logs from a client and saves them into Elasticsearch.
+    Each log is associated with a service ID and stored in a date-based index.
+    """
     data = await request.json()
     logs = data['logs']

These changes add pydoc comments to the main handler functions in both get_logs_handler.py and post_logs_handler.py. The comments provide a brief explanation of what each handler does, including the HTTP method it handles and its main functionality.

dev-intern-ai commented 1 day ago

Github PR available here: https://github.com/jean553/aiohttp-elasticsearch-s3-logs-handler/pull/252

jean553 commented 1 day ago

the documentation at the beginning of every handler function should list the input parameters

dev-intern-ai commented 1 day ago

Certainly! I'll update the handler functions in the project to include documentation that lists the input parameters at the beginning of each function. I'll go through the relevant files and make these changes.

Let's start with the post_logs_handler.py file:

'''
Handles POST /logs requests.
'''
from datetime import datetime
from elasticsearch import Elasticsearch, helpers

from aiohttp import web

async def post_logs(
    request: web.Request,
    es_client: Elasticsearch,
):
    """
    Handler for POST /logs requests.

    Parameters:
    - request (web.Request): The incoming HTTP request object
    - es_client (Elasticsearch): The Elasticsearch client instance

    Receives logs from a client and saves them into Elasticsearch.
    Each log is associated with a service ID and stored in a date-based index.
    """
    data = await request.json()
    logs = data['logs']

    service_id = request.match_info.get('id')

    for log in logs:

        # TODO: #125 almost everytime, indices have the same day,
        # so this is superfluous to generate the index for each log;
        # we should find a better way to handle indices creations
        log_date = datetime.utcfromtimestamp(float(log['date']))
        index = log_date.strftime('data-{}-%Y-%m-%d'.format(service_id))

        log.update(
            {
                '_type': 'logs',
                'service_id': service_id,
            }
        )
        log['_index'] = index
        log['date'] = log_date

    helpers.bulk(
        es_client,
        logs,
        index=index,
    )

    return web.Response()

Now, let's update the get_logs_handler.py file:

'''
Handles GET /logs requests.
'''
import json
import async_timeout
from datetime import datetime, timedelta
from typing import Any

import botocore
import aiobotocore

import aiohttp
from aiohttp import web
from elasticsearch import Elasticsearch

from logs.config import S3_ENDPOINT
from logs.config import S3_BUCKET_NAME
from logs.config import ELASTICSEARCH_HOSTNAME
from logs.config import ELASTICSEARCH_PORT

API_DATE_FORMAT = '%Y-%m-%d-%H-%M-%S'
ELASTICSEARCH_DATE_FORMAT = 'yyyy-MM-dd-HH-mm-ss'
SNAPSHOT_DAYS_FROM_NOW = 10
ELASTICSEARCH_REQUESTS_TIMEOUT_SECONDS = 10
ELASTICSEARCH_MAXIMUM_RESULTS_PER_PAGE = 10
ELASTICSEARCH_SEARCH_CONTEXT_LIFETIME = '1m'  # 1 minute

def _get_log_to_string(log: Any) -> str:
    '''
    Returns a string representation of the given log.
    Convert single quotes to double quotes in order to match with JSON format
    (required for streaming)

    Parameters:
    - log (Any): The log object to be converted to string

    Returns:
    - str: String representation of the log
    '''
    return str(log['_source']).replace("'", '"')

async def _get_logs_from_elasticsearch(
    service_id: int,
    start_date: str,
    end_date: str,
) -> dict:
    '''
    Coroutine that returns the first page of logs from ES.

    Parameters:
    - service_id (int): The ID of the service to fetch logs for
    - start_date (str): The start date for the log search
    - end_date (str): The end date for the log search

    Returns:
    - dict: The response from Elasticsearch containing the logs
    '''
    async with aiohttp.ClientSession() as session:
        with async_timeout.timeout(ELASTICSEARCH_REQUESTS_TIMEOUT_SECONDS):
            async with session.get(
                'http://{}:{}/data-{}-*/_search?scroll={}'.format(
                    ELASTICSEARCH_HOSTNAME,
                    ELASTICSEARCH_PORT,
                    service_id,
                    ELASTICSEARCH_SEARCH_CONTEXT_LIFETIME,
                ),
                json={
                    'size': ELASTICSEARCH_MAXIMUM_RESULTS_PER_PAGE,
                    'query': {
                        'bool': {
                            'must': {
                                'match': {
                                    'service_id': service_id
                                }
                            },
                            'filter': {
                                'range': {
                                    'date': {
                                        'gte': start_date,
                                        'lte': end_date,
                                        'format': ELASTICSEARCH_DATE_FORMAT
                                    }
                                }
                            }
                        }
                    }
                }
            ) as response:
                return await response.json()

async def _scroll_logs_from_elasticsearch(
    service_id: int,
    scroll_id: str,
) -> dict:
    '''
    Scroll the next page of found results from Elasticsearch.

    Parameters:
    - service_id (int): The ID of the service to fetch logs for
    - scroll_id (str): The scroll ID for pagination

    Returns:
    - dict: The response from Elasticsearch containing the next page of logs
    '''
    # TODO: #123 we use a new session for one request here;
    # if we try to use the same session as before,
    # then not all the logs are returned from ES;
    # we have to investigate how to organize the session(s) here
    async with aiohttp.ClientSession() as session:
        with async_timeout.timeout(ELASTICSEARCH_REQUESTS_TIMEOUT_SECONDS):
            async with session.get(
                'http://{}:{}/_search/scroll'.format(
                    ELASTICSEARCH_HOSTNAME,
                    ELASTICSEARCH_PORT,
                    service_id,
                ),
                json={
                    'scroll': ELASTICSEARCH_SEARCH_CONTEXT_LIFETIME,
                    'scroll_id': scroll_id
                }
            ) as response:
                return await response.json()

def _stream_logs_chunk(
    stream: aiohttp.web_response.StreamResponse,
    logs: list,
):
    '''
    Streams each log one by one to the client.

    Parameters:
    - stream (aiohttp.web_response.StreamResponse): The stream response object
    - logs (list): The list of logs to be streamed
    '''
    last_log_index = len(logs) - 1

    for counter, log in enumerate(logs):
        line = _get_log_to_string(log)

        if counter != last_log_index:
            line += ','

        stream.write(line.encode())

async def get_logs(
    request: web.Request,
    es_client: Elasticsearch,
):
    """
    Handler for GET /logs requests.

    Parameters:
    - request (web.Request): The incoming HTTP request object
    - es_client (Elasticsearch): The Elasticsearch client instance

    Retrieves and streams logs for a specific service within a given date range.
    Logs are fetched from Elasticsearch and S3, then streamed to the client
    as a JSON response.

    Returns:
    - web.StreamResponse: A stream of logs in JSON format
    """
    service_id = request.match_info.get('id')
    start_date = request.match_info.get('start')
    end_date = request.match_info.get('end')

    start = datetime.strptime(
        start_date,
        API_DATE_FORMAT,
    )

    end = datetime.strptime(
        end_date,
        API_DATE_FORMAT,
    )

    result = await _get_logs_from_elasticsearch(
        service_id,
        start_date,
        end_date,
    )

    stream = web.StreamResponse()
    stream.content_type = 'application/json'
    await stream.prepare(request)
    stream.write(b'{"logs": [')

    scroll_id = result['_scroll_id']
    logs = result['hits']['hits']
    elasticsearch_logs_amount = len(logs)

    first_iteration = False if elasticsearch_logs_amount > 0 else True
    first_elasticsearch_scroll = True

    while elasticsearch_logs_amount > 0:

        if not first_elasticsearch_scroll:
            stream.write(b',')

        _stream_logs_chunk(
            stream,
            logs,
        )

        result = await _scroll_logs_from_elasticsearch(
            service_id,
            scroll_id,
        )

        scroll_id = result['_scroll_id']
        logs = result['hits']['hits']
        elasticsearch_logs_amount = len(logs)

        first_elasticsearch_scroll = False

    now = datetime.now()
    last_snapshot_date = now - timedelta(days=SNAPSHOT_DAYS_FROM_NOW)
    if start <= last_snapshot_date:

        s3_index_date = start

        s3_session = aiobotocore.get_session()
        s3_client = s3_session.create_client(
            service_name='s3',
            region_name='',
            aws_secret_access_key='',
            aws_access_key_id='',
            endpoint_url='http://' + S3_ENDPOINT,
        )

        while s3_index_date <= last_snapshot_date:

            s3_index = 'data-%s-%04d-%02d-%02d' % (
                service_id,
                s3_index_date.year,
                s3_index_date.month,
                s3_index_date.day,
            )

            s3_index_date += timedelta(days=1)

            s3_response = None

            try:
                s3_response = await s3_client.get_object(
                    Bucket=S3_BUCKET_NAME,
                    Key=s3_index,
                )
            except botocore.exceptions.ClientError:
                continue

            s3_stream = s3_response['Body']
            s3_line = await s3_stream.readline()

            while(len(s3_line) > 0):

                temp_line = s3_line.decode('utf-8')
                line_items = json.loads(temp_line)
                log_date = datetime.strptime(
                    line_items['date'],
                    '%Y-%m-%dT%H:%M:%S',
                )

                if log_date < start or log_date > end:
                    s3_line = await s3_stream.readline()
                    continue

                line = str(line_items).replace("'", '"')

                if not first_iteration:
                    line = ',' + line
                first_iteration = False

                stream.write(line.encode())

                s3_line = await s3_stream.readline()

            s3_stream.close()
        s3_client.close()

    stream.write(b']}')

    return stream

These changes add documentation to the beginning of each handler function, listing the input parameters and their types. The documentation also includes a brief description of what each function does.

For the helper functions within these files, I've also added parameter documentation to make the code more understandable and maintainable.

Is there anything else you'd like me to modify or explain regarding these changes?

dev-intern-ai commented 1 day ago

Github PR available here: https://github.com/jean553/aiohttp-elasticsearch-s3-logs-handler/pull/252

dev-intern-ai commented 1 day ago

I understand you want to replace the term "coroutine" with "function" in the HTTP handlers. I'll review the relevant files and make the necessary changes. Let's proceed with updating the files:

  1. logs/post_logs_handler.py This file doesn't contain the term "coroutine", so no changes are needed.

  2. logs/get_logs_handler.py This file contains the term "coroutine" in a docstring, which we'll replace with "function". Here's the updated file:

'''
Handles GET /logs requests.
'''
import json
import async_timeout
from datetime import datetime, timedelta
from typing import Any

import botocore
import aiobotocore

import aiohttp
from aiohttp import web
from elasticsearch import Elasticsearch

from logs.config import S3_ENDPOINT
from logs.config import S3_BUCKET_NAME
from logs.config import ELASTICSEARCH_HOSTNAME
from logs.config import ELASTICSEARCH_PORT

API_DATE_FORMAT = '%Y-%m-%d-%H-%M-%S'
ELASTICSEARCH_DATE_FORMAT = 'yyyy-MM-dd-HH-mm-ss'
SNAPSHOT_DAYS_FROM_NOW = 10
ELASTICSEARCH_REQUESTS_TIMEOUT_SECONDS = 10
ELASTICSEARCH_MAXIMUM_RESULTS_PER_PAGE = 10
ELASTICSEARCH_SEARCH_CONTEXT_LIFETIME = '1m'  # 1 minute

def _get_log_to_string(log: Any) -> str:
    '''
    Returns a string representation of the given log.
    Convert single quotes to double quotes in order to match with JSON format
    (required for streaming)

    Parameters:
    - log (Any): The log object to be converted to string

    Returns:
    - str: String representation of the log
    '''
    return str(log['_source']).replace("'", '"')

async def _get_logs_from_elasticsearch(
    service_id: int,
    start_date: str,
    end_date: str,
) -> dict:
    '''
    Function that returns the first page of logs from ES.

    Parameters:
    - service_id (int): The ID of the service to fetch logs for
    - start_date (str): The start date for the log search
    - end_date (str): The end date for the log search

    Returns:
    - dict: The response from Elasticsearch containing the logs
    '''
    async with aiohttp.ClientSession() as session:
        with async_timeout.timeout(ELASTICSEARCH_REQUESTS_TIMEOUT_SECONDS):
            async with session.get(
                'http://{}:{}/data-{}-*/_search?scroll={}'.format(
                    ELASTICSEARCH_HOSTNAME,
                    ELASTICSEARCH_PORT,
                    service_id,
                    ELASTICSEARCH_SEARCH_CONTEXT_LIFETIME,
                ),
                json={
                    'size': ELASTICSEARCH_MAXIMUM_RESULTS_PER_PAGE,
                    'query': {
                        'bool': {
                            'must': {
                                'match': {
                                    'service_id': service_id
                                }
                            },
                            'filter': {
                                'range': {
                                    'date': {
                                        'gte': start_date,
                                        'lte': end_date,
                                        'format': ELASTICSEARCH_DATE_FORMAT
                                    }
                                }
                            }
                        }
                    }
                }
            ) as response:
                return await response.json()

async def _scroll_logs_from_elasticsearch(
    service_id: int,
    scroll_id: str,
) -> dict:
    '''
    Scroll the next page of found results from Elasticsearch.

    Parameters:
    - service_id (int): The ID of the service to fetch logs for
    - scroll_id (str): The scroll ID for pagination

    Returns:
    - dict: The response from Elasticsearch containing the next page of logs
    '''
    # TODO: #123 we use a new session for one request here;
    # if we try to use the same session as before,
    # then not all the logs are returned from ES;
    # we have to investigate how to organize the session(s) here
    async with aiohttp.ClientSession() as session:
        with async_timeout.timeout(ELASTICSEARCH_REQUESTS_TIMEOUT_SECONDS):
            async with session.get(
                'http://{}:{}/_search/scroll'.format(
                    ELASTICSEARCH_HOSTNAME,
                    ELASTICSEARCH_PORT,
                    service_id,
                ),
                json={
                    'scroll': ELASTICSEARCH_SEARCH_CONTEXT_LIFETIME,
                    'scroll_id': scroll_id
                }
            ) as response:
                return await response.json()

def _stream_logs_chunk(
    stream: aiohttp.web_response.StreamResponse,
    logs: list,
):
    '''
    Streams each log one by one to the client.

    Parameters:
    - stream (aiohttp.web_response.StreamResponse): The stream response object
    - logs (list): The list of logs to be streamed
    '''
    last_log_index = len(logs) - 1

    for counter, log in enumerate(logs):
        line = _get_log_to_string(log)

        if counter != last_log_index:
            line += ','

        stream.write(line.encode())

async def get_logs(
    request: web.Request,
    es_client: Elasticsearch,
):
    """
    Handler for GET /logs requests.

    Parameters:
    - request (web.Request): The incoming HTTP request object
    - es_client (Elasticsearch): The Elasticsearch client instance

    Retrieves and streams logs for a specific service within a given date range.
    Logs are fetched from Elasticsearch and S3, then streamed to the client
    as a JSON response.

    Returns:
    - web.StreamResponse: A stream of logs in JSON format
    """
    service_id = request.match_info.get('id')
    start_date = request.match_info.get('start')
    end_date = request.match_info.get('end')

    start = datetime.strptime(
        start_date,
        API_DATE_FORMAT,
    )

    end = datetime.strptime(
        end_date,
        API_DATE_FORMAT,
    )

    result = await _get_logs_from_elasticsearch(
        service_id,
        start_date,
        end_date,
    )

    stream = web.StreamResponse()
    stream.content_type = 'application/json'
    await stream.prepare(request)
    stream.write(b'{"logs": [')

    scroll_id = result['_scroll_id']
    logs = result['hits']['hits']
    elasticsearch_logs_amount = len(logs)

    first_iteration = False if elasticsearch_logs_amount > 0 else True
    first_elasticsearch_scroll = True

    while elasticsearch_logs_amount > 0:

        if not first_elasticsearch_scroll:
            stream.write(b',')

        _stream_logs_chunk(
            stream,
            logs,
        )

        result = await _scroll_logs_from_elasticsearch(
            service_id,
            scroll_id,
        )

        scroll_id = result['_scroll_id']
        logs = result['hits']['hits']
        elasticsearch_logs_amount = len(logs)

        first_elasticsearch_scroll = False

    now = datetime.now()
    last_snapshot_date = now - timedelta(days=SNAPSHOT_DAYS_FROM_NOW)
    if start <= last_snapshot_date:

        s3_index_date = start

        s3_session = aiobotocore.get_session()
        s3_client = s3_session.create_client(
            service_name='s3',
            region_name='',
            aws_secret_access_key='',
            aws_access_key_id='',
            endpoint_url='http://' + S3_ENDPOINT,
        )

        while s3_index_date <= last_snapshot_date:

            s3_index = 'data-%s-%04d-%02d-%02d' % (
                service_id,
                s3_index_date.year,
                s3_index_date.month,
                s3_index_date.day,
            )

            s3_index_date += timedelta(days=1)

            s3_response = None

            try:
                s3_response = await s3_client.get_object(
                    Bucket=S3_BUCKET_NAME,
                    Key=s3_index,
                )
            except botocore.exceptions.ClientError:
                continue

            s3_stream = s3_response['Body']
            s3_line = await s3_stream.readline()

            while(len(s3_line) > 0):

                temp_line = s3_line.decode('utf-8')
                line_items = json.loads(temp_line)
                log_date = datetime.strptime(
                    line_items['date'],
                    '%Y-%m-%dT%H:%M:%S',
                )

                if log_date < start or log_date > end:
                    s3_line = await s3_stream.readline()
                    continue

                line = str(line_items).replace("'", '"')

                if not first_iteration:
                    line = ',' + line
                first_iteration = False

                stream.write(line.encode())

                s3_line = await s3_stream.readline()

            s3_stream.close()
        s3_client.close()

    stream.write(b']}')

    return stream

The changes made:

  1. In the docstring for _get_logs_from_elasticsearch, replaced "Coroutine that returns" with "Function that returns".

These changes ensure that the term "coroutine" is not used in the HTTP handlers, and "function" is used instead. The functionality of the code remains unchanged.

dev-intern-ai commented 1 day ago

Github PR available here: https://github.com/jean553/aiohttp-elasticsearch-s3-logs-handler/pull/252

dev-intern-ai commented 1 day ago

To add an explanation about POST requests in general at line 15 of the logs/post_logs_handler.py file, I'll insert a comment block. Here's the change in git diff format:

diff --git a/logs/post_logs_handler.py b/logs/post_logs_handler.py
index 5e01234..6f78901 100644
--- a/logs/post_logs_handler.py
+++ b/logs/post_logs_handler.py
@@ -12,6 +12,14 @@ async def post_logs(
     request: web.Request,
     es_client: Elasticsearch,
 ):
+    """
+    POST requests are used to submit data to be processed to a specified resource.
+    Unlike GET requests, which retrieve data, POST requests send data in the request body.
+    This makes POST suitable for submitting form data or uploading files.
+    In RESTful APIs, POST is typically used to create new resources.
+    The data sent in a POST request is not visible in the URL, making it more secure
+    for sensitive information compared to GET requests.
+    """
     """
     Handler for POST /logs requests.

This change adds a concise explanation of POST requests in general, covering their purpose, how they differ from GET requests, their typical use in RESTful APIs, and a note on security. The explanation is placed as a separate docstring above the existing function docstring to maintain clarity and separation of concerns.

dev-intern-ai commented 1 day ago

Github PR available here: https://github.com/jean553/aiohttp-elasticsearch-s3-logs-handler/pull/252