LoveofSportsLLC / NFL

NFL + AI
https://loveoffootball.io/
MIT License
0 stars 0 forks source link

Implement ETL (Extract, Transform, Load) Processes: #65

Open zepor opened 1 month ago

zepor commented 1 month ago
    ○ Time: 1 week
    ○ Tools Required: Azure Data Factory
    ○ Steps:
        1. Design ETL processes to extract data from data providers.
        2. Transform the data into suitable formats for analysis.
        3. Load the data into Azure SQL Database.
        4. Store ETL service principal credentials in GitHub Secrets.
            □ Secrets Needed: ETL_SERVICE_PRINCIPAL_ID, ETL_SERVICE_PRINCIPAL_SECRET
        § Existing Pipelines: Review and update existing pipelines to align with updated SportsRadar API documentation.
    ○ Documentation:
        § ETL process diagrams and data flow charts.
        § Example ETL scripts and configurations.
    ○ Major Milestone: ETL processes implemented and data flowing into the warehouse.
    ○ GitHub Issue:

Implement ETL Processes

Description: Implement ETL processes tailored to NFL data. Tasks:

codeautopilot[bot] commented 1 month ago

Potential solution

The task involves implementing ETL (Extract, Transform, Load) processes using Azure Data Factory to handle NFL data. The solution requires designing ETL scripts to extract data from data providers, transform it into suitable formats, and load it into an Azure SQL Database. Additionally, ETL service principal credentials need to be securely stored in GitHub Secrets.

How to implement

File: backend-container/src/etl/__init__.py

This file initializes the ETL module, imports necessary components, and defines a simple ETL pipeline function.

from .extract import extract_data
from .transformations import transform_data
from .load import load_data
from .config import Config
from .credentials import get_service_principal_credentials

config = Config()
credentials = get_service_principal_credentials()

def run_etl_pipeline():
    raw_data = extract_data(config.DATA_PROVIDERS['sports_radar']['base_url'], credentials)
    transformed_data = transform_data(raw_data)
    load_data(transformed_data, config.AZURE_SQL_DATABASE)

File: backend-container/src/etl/config.py

This file defines configuration settings for the ETL processes, including database connection settings, API endpoints, and ETL process settings.

import os

class Config:
    AZURE_SQL_DATABASE = {
        'server': os.getenv('AZURE_SQL_SERVER', 'your_server.database.windows.net'),
        'database': os.getenv('AZURE_SQL_DATABASE', 'your_database'),
        'username': os.getenv('AZURE_SQL_USERNAME', 'your_username'),
        'password': os.getenv('AZURE_SQL_PASSWORD', 'your_password'),
        'driver': '{ODBC Driver 17 for SQL Server}'
    }

    DATA_PROVIDERS = {
        'sports_radar': {
            'base_url': os.getenv('SPORTS_RADAR_BASE_URL', 'https://api.sportradar.us'),
            'api_key': os.getenv('SPORTS_RADAR_API_KEY', 'your_api_key')
        }
    }

    ETL_SETTINGS = {
        'batch_size': int(os.getenv('ETL_BATCH_SIZE', 1000)),
        'retry_attempts': int(os.getenv('ETL_RETRY_ATTEMPTS', 3)),
        'retry_delay': int(os.getenv('ETL_RETRY_DELAY', 5))
    }

    ETL_SERVICE_PRINCIPAL_ID = os.getenv('ETL_SERVICE_PRINCIPAL_ID', 'your_service_principal_id')
    ETL_SERVICE_PRINCIPAL_SECRET = os.getenv('ETL_SERVICE_PRINCIPAL_SECRET', 'your_service_principal_secret')

File: backend-container/src/etl/etl_process.py

This file implements the ETL process, encapsulating extraction, transformation, and loading steps.

import logging
from .extract import extract_data
from .transformations import transform_data
from .load import load_data
from .config import Config
from .credentials import get_service_principal_credentials

class ETLProcess:
    def __init__(self):
        self.config = Config()
        self.credentials = get_service_principal_credentials()
        self.logger = logging.getLogger('ETLProcess')
        logging.basicConfig(level=logging.INFO)

    def run(self):
        try:
            self.logger.info("Starting ETL process")
            raw_data = extract_data(self.config.DATA_PROVIDERS['sports_radar']['base_url'], self.credentials)
            transformed_data = transform_data(raw_data)
            load_data(transformed_data, self.config.AZURE_SQL_DATABASE)
            self.logger.info("ETL process completed successfully")
        except Exception as e:
            self.logger.error(f"ETL process failed: {e}")
            raise

if __name__ == "__main__":
    etl_process = ETLProcess()
    etl_process.run()

File: backend-container/src/etl/pipelines.py

This file defines and updates ETL pipelines to align with the updated SportsRadar API documentation.

import logging
from .extract import extract_data
from .transformations import transform_data
from .load import load_data
from .config import Config
from .credentials import get_service_principal_credentials

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def etl_pipeline():
    try:
        logger.info("Starting data extraction...")
        raw_data = extract_data(Config.DATA_PROVIDERS['sports_radar']['base_url'], get_service_principal_credentials())
        logger.info("Data extraction completed.")

        logger.info("Starting data transformation...")
        transformed_data = transform_data(raw_data)
        logger.info("Data transformation completed.")

        logger.info("Starting data loading...")
        load_data(transformed_data, Config.AZURE_SQL_DATABASE)
        logger.info("Data loading completed.")
    except Exception as e:
        logger.error(f"ETL pipeline failed: {e}")
        raise

if __name__ == "__main__":
    etl_pipeline()

File: backend-container/src/etl/extract.py

This file implements the data extraction logic.

import requests
import pandas as pd
from .config import Config

def extract_data(api_url, credentials):
    headers = {
        'Authorization': f'Bearer {credentials["api_key"]}'
    }
    response = requests.get(api_url, headers=headers)
    if response.status_code == 200:
        data = response.json()
        return pd.DataFrame(data)
    else:
        response.raise_for_status()

def extract_nfl_data():
    nfl_endpoint = f"{Config.DATA_PROVIDERS['sports_radar']['base_url']}/nfl"
    return extract_data(nfl_endpoint, Config.DATA_PROVIDERS['sports_radar'])

File: backend-container/src/etl/credentials.py

This file retrieves and uses ETL service principal credentials from GitHub Secrets.

import os
from azure.identity import ClientSecretCredential
from azure.keyvault.secrets import SecretClient

def get_github_secret(secret_name):
    secret_value = os.getenv(secret_name)
    if not secret_value:
        raise ValueError(f"GitHub Secret {secret_name} not found in environment variables.")
    return secret_value

def get_service_principal_credentials():
    client_id = get_github_secret('ETL_SERVICE_PRINCIPAL_ID')
    client_secret = get_github_secret('ETL_SERVICE_PRINCIPAL_SECRET')
    tenant_id = get_github_secret('AZURE_TENANT_ID')

    credential = ClientSecretCredential(
        tenant_id=tenant_id,
        client_id=client_id,
        client_secret=client_secret
    )
    return credential

def get_secret_from_key_vault(vault_url, secret_name):
    credential = get_service_principal_credentials()
    client = SecretClient(vault_url=vault_url, credential=credential)
    secret = client.get_secret(secret_name)
    return secret.value

if __name__ == "__main__":
    try:
        key_vault_url = "https://<your-key-vault-name>.vault.azure.net/"
        secret_name = "your-secret-name"
        secret_value = get_secret_from_key_vault(key_vault_url, secret_name)
        print(f"Retrieved secret: {secret_value}")
    except Exception as e:
        print(f"Error retrieving secret: {e}")

File: backend-container/src/etl/load.py

This file implements the data loading logic.

import pyodbc
import logging
from .config import Config

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def load_data(data, db_config):
    try:
        conn = pyodbc.connect(
            f"DRIVER={{ODBC Driver 17 for SQL Server}};"
            f"SERVER={db_config['server']};"
            f"DATABASE={db_config['database']};"
            f"UID={db_config['username']};"
            f"PWD={db_config['password']}"
        )
        cursor = conn.cursor()
        for index, row in data.iterrows():
            cursor.execute(
                "INSERT INTO your_table_name (column1, column2, column3) VALUES (?, ?, ?)",
                row['column1'], row['column2'], row['column3']
            )
        conn.commit()
        logger.info("Data loaded successfully into Azure SQL Database.")
    except Exception as e:
        logger.error(f"Error loading data into Azure SQL Database: {e}")
    finally:
        if conn:
            conn.close()
            logger.info("Database connection closed.")

if __name__ == "__main__":
    transformed_data = ...  # Replace with actual transformed data
    load_data(transformed_data, Config.AZURE_SQL_DATABASE)

File: backend-container/src/etl/transformations.py

This file implements the data transformation logic.

import pandas as pd

def clean_data(df):
    df.fillna(method='ffill', inplace=True)
    df['date'] = pd.to_datetime(df['date'])
    df.drop_duplicates(inplace=True)
    return df

def enrich_data(df):
    df['new_field'] = df['existing_field'] * 2
    return df

def transform_data(raw_data):
    cleaned_data = clean_data(raw_data)
    enriched_data = enrich_data(cleaned_data)
    return enriched_data

if __name__ == "__main__":
    raw_data = pd.DataFrame({
        'date': ['2021-01-01', '2021-01-02', None],
        'existing_field': [10, 20, 30]
    })
    transformed_data = transform_data(raw_data)
    print(transformed_data)

File: backend-container/src/etl/documentation.md

This file provides comprehensive documentation for the ETL processes.

# ETL Process Documentation

## 1. ETL Process Diagrams

### Overview Diagram
![ETL Overview](images/etl_overview.png)

### Detailed Diagrams
#### Extraction
![Extraction Process](images/extraction_process.png)

#### Transformation
![Transformation Process](images/transformation_process.png)

#### Loading
![Loading Process](images/loading_process.png)

## 2. Data Flow Charts

### Extraction Flow
```mermaid
graph TD;
    A[Data Provider] --> B[Extract Data];
    B --> C[Raw Data Storage];

Transformation Flow

graph TD;
    C[Raw Data Storage] --> D[Transform Data];
    D --> E[Transformed Data Storage];

Loading Flow

graph TD;
    E[Transformed Data Storage] --> F[Load Data];
    F --> G[Azure SQL Database];

3. Example ETL Scripts

Extraction Script

# extract.py
import requests

def extract_data(api_url):
    response = requests.get(api_url)
    data = response.json()
    return data

Transformation Script

# transformations.py
def transform_data(raw_data):
    transformed_data = [process_record(record) for record in raw_data]
    return transformed_data

def process_record(record):
    return transformed_record

Loading Script

# load.py
import pyodbc

def load_data(transformed_data, connection_string):
    conn = pyodbc.connect(connection_string)
    cursor = conn.cursor()
    for record in transformed_data:
        cursor.execute("INSERT INTO table_name (columns) VALUES (values)", record)
    conn.commit()
    conn.close()

4. Configurations

ETL Configuration Settings

5. Credentials Management

Storing Credentials

Store the following secrets in GitHub Secrets:

Retrieving Credentials

# credentials.py
import os

ETL_SERVICE_PRINCIPAL_ID = os.getenv('ETL_SERVICE_PRINCIPAL_ID')
ETL_SERVICE_PRINCIPAL_SECRET = os.getenv('ETL_SERVICE_PRINCIPAL_SECRET')

This documentation provides a clear and comprehensive guide for implementing and understanding the ETL processes.
<!--- REPLY FORMAT END -->

[Click here to create a Pull Request with the proposed solution](https://api.codeautopilot.com/create-pr?task=29600&hash=f66e0ecd902ff3055ab21f701e91ee0982dc29def9aa17e2f769672468ad5f2a)

**Files used for this task:**

<details>
  <summary>Changes on backend-container/src/etl/__init__.py</summary>

  To initialize the ETL module, you need to create an `__init__.py` file in the `backend-container/src/etl/` directory. This file will serve as the initializer for the ETL module, allowing you to import and use the various components of the ETL process (extraction, transformation, loading, etc.) in a structured manner.

Here's a practical proposal for the `__init__.py` file:

1. **Import Necessary Modules**: Import the essential modules that will be used across the ETL process.
2. **Define Initialization Logic**: If there are any global variables or configurations that need to be set up when the module is initialized, define them here.

### Implementation

```python
# backend-container/src/etl/__init__.py

# Import necessary modules for ETL process
from .extract import extract_data
from .transformations import transform_data
from .load import load_data
from .config import ETLConfig
from .credentials import get_service_principal_credentials

# Initialize ETL configuration
config = ETLConfig()

# Retrieve ETL service principal credentials
credentials = get_service_principal_credentials()

# Define a simple ETL pipeline function for demonstration
def run_etl_pipeline():
    # Extract data
    raw_data = extract_data(config, credentials)

    # Transform data
    transformed_data = transform_data(raw_data)

    # Load data
    load_data(transformed_data, config, credentials)

# If needed, you can add more initialization logic here

Explanation

This setup ensures that the ETL module is properly initialized and ready to be used in other parts of the application. If there are additional initialization steps or global settings required, they can be added to this file.

Changes on backend-container/src/etl/config.py To define the configuration settings for the ETL processes in the `config.py` file, you need to consider the following aspects: 1. **Database Connection Settings**: Information required to connect to the Azure SQL Database. 2. **API Endpoints and Credentials**: URLs and credentials for data providers. 3. **ETL Process Settings**: Any specific settings that control the behavior of the ETL processes. 4. **Secrets Management**: Integration with GitHub Secrets for secure storage of sensitive information. Here's a concrete proposal for the `config.py` file: ```python import os class Config: # Database connection settings AZURE_SQL_DATABASE = { 'server': os.getenv('AZURE_SQL_SERVER', 'your_server.database.windows.net'), 'database': os.getenv('AZURE_SQL_DATABASE', 'your_database'), 'username': os.getenv('AZURE_SQL_USERNAME', 'your_username'), 'password': os.getenv('AZURE_SQL_PASSWORD', 'your_password'), 'driver': '{ODBC Driver 17 for SQL Server}' } # API endpoints and credentials DATA_PROVIDERS = { 'sports_radar': { 'base_url': os.getenv('SPORTS_RADAR_BASE_URL', 'https://api.sportradar.us'), 'api_key': os.getenv('SPORTS_RADAR_API_KEY', 'your_api_key') } } # ETL process settings ETL_SETTINGS = { 'batch_size': int(os.getenv('ETL_BATCH_SIZE', 1000)), 'retry_attempts': int(os.getenv('ETL_RETRY_ATTEMPTS', 3)), 'retry_delay': int(os.getenv('ETL_RETRY_DELAY', 5)) # in seconds } # GitHub Secrets for ETL service principal credentials ETL_SERVICE_PRINCIPAL_ID = os.getenv('ETL_SERVICE_PRINCIPAL_ID', 'your_service_principal_id') ETL_SERVICE_PRINCIPAL_SECRET = os.getenv('ETL_SERVICE_PRINCIPAL_SECRET', 'your_service_principal_secret') # Example usage if __name__ == "__main__": config = Config() print("Database Server:", config.AZURE_SQL_DATABASE['server']) print("Sports Radar API Key:", config.DATA_PROVIDERS['sports_radar']['api_key']) ``` ### Explanation: 1. **Database Connection Settings**: These settings include the server, database name, username, password, and driver required to connect to the Azure SQL Database. These values are fetched from environment variables for security reasons. 2. **API Endpoints and Credentials**: This section includes the base URL and API key for the SportsRadar API. These are also fetched from environment variables. 3. **ETL Process Settings**: These settings control the behavior of the ETL processes, such as batch size, retry attempts, and retry delay. 4. **Secrets Management**: The service principal credentials are fetched from environment variables, which should be stored securely in GitHub Secrets. ### Next Steps: 1. **Environment Variables**: Ensure that the necessary environment variables are set up in your deployment environment or local development environment. 2. **GitHub Secrets**: Store sensitive information like `ETL_SERVICE_PRINCIPAL_ID` and `ETL_SERVICE_PRINCIPAL_SECRET` in GitHub Secrets. 3. **Integration**: Use this configuration in your ETL scripts to access the necessary settings and credentials. This setup will provide a robust and secure way to manage configuration settings for your ETL processes.
Changes on backend-container/src/etl/etl_process.py To implement the ETL scripts for data extraction, transformation, and loading in the `etl_process.py` file, follow these steps: 1. **Define the ETL Process Structure**: - Create a class or functions to encapsulate the ETL process. - Ensure the process includes steps for extraction, transformation, and loading. 2. **Import Necessary Modules**: - Import the modules for extraction, transformation, and loading. - Import configuration settings and credentials. 3. **Implement the ETL Workflow**: - Define the sequence of operations for the ETL process. - Handle exceptions and logging for better traceability. Here is a concrete implementation proposal for the `etl_process.py` file: ### backend-container/src/etl/etl_process.py ```python import logging from .extract import extract_data from .transformations import transform_data from .load import load_data from .config import ETLConfig from .credentials import get_service_principal_credentials class ETLProcess: def __init__(self): self.config = ETLConfig() self.credentials = get_service_principal_credentials() self.logger = logging.getLogger('ETLProcess') logging.basicConfig(level=logging.INFO) def run(self): try: self.logger.info("Starting ETL process") # Step 1: Extract self.logger.info("Extracting data") raw_data = extract_data(self.config, self.credentials) # Step 2: Transform self.logger.info("Transforming data") transformed_data = transform_data(raw_data) # Step 3: Load self.logger.info("Loading data") load_data(transformed_data, self.config, self.credentials) self.logger.info("ETL process completed successfully") except Exception as e: self.logger.error(f"ETL process failed: {e}") raise if __name__ == "__main__": etl_process = ETLProcess() etl_process.run() ``` ### Explanation: 1. **Imports**: - Import necessary modules for extraction, transformation, loading, configuration, and credentials. 2. **ETLProcess Class**: - Initialize the class with configuration and credentials. - Set up logging for the ETL process. 3. **run Method**: - Log the start of the ETL process. - Extract data using the `extract_data` function. - Transform data using the `transform_data` function. - Load data using the `load_data` function. - Log the successful completion of the ETL process. - Handle exceptions and log errors. This implementation provides a clear and structured approach to the ETL process, ensuring that each step is logged and any errors are properly handled.
Changes on backend-container/src/etl/pipelines.py To address the task for the file `backend-container/src/etl/pipelines.py`, you need to create and update ETL pipelines to align with the updated SportsRadar API documentation. Since this is a new file, you will need to implement the necessary logic from scratch. Here is a concrete plan to achieve this: ### Plan for `backend-container/src/etl/pipelines.py` 1. **Import Necessary Modules**: - Import modules for extraction, transformation, and loading. - Import configuration settings and credentials. 2. **Define Pipeline Functions**: - Create functions for each step of the ETL process: extraction, transformation, and loading. - Ensure these functions are modular and reusable. 3. **Update Pipelines to Align with SportsRadar API**: - Review the updated SportsRadar API documentation. - Update the extraction logic to fetch data according to the new API endpoints and data structures. - Adjust transformation logic to handle any changes in the data format. - Ensure the loading logic correctly inserts the transformed data into the Azure SQL Database. 4. **Error Handling and Logging**: - Implement error handling to manage API errors, data transformation issues, and database connectivity problems. - Add logging to track the ETL process and facilitate debugging. 5. **Testing**: - Write test cases to validate the ETL pipeline functions. - Ensure the pipeline works end-to-end with sample data. ### Example Implementation Here is a basic example to get you started: ```python import logging from .extract import extract_data from .transformations import transform_data from .load import load_data from .config import Config from .credentials import get_service_principal_credentials # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def etl_pipeline(): try: # Step 1: Extract data logger.info("Starting data extraction...") raw_data = extract_data(Config.SPORTSRADAR_API_ENDPOINT, get_service_principal_credentials()) logger.info("Data extraction completed.") # Step 2: Transform data logger.info("Starting data transformation...") transformed_data = transform_data(raw_data) logger.info("Data transformation completed.") # Step 3: Load data logger.info("Starting data loading...") load_data(transformed_data, Config.AZURE_SQL_CONNECTION_STRING) logger.info("Data loading completed.") except Exception as e: logger.error(f"ETL pipeline failed: {e}") raise if __name__ == "__main__": etl_pipeline() ``` ### Steps to Implement 1. **Extract Data**: - Implement the `extract_data` function in `extract.py` to fetch data from the SportsRadar API. 2. **Transform Data**: - Implement the `transform_data` function in `transformations.py` to process and format the raw data. 3. **Load Data**: - Implement the `load_data` function in `load.py` to insert the transformed data into the Azure SQL Database. 4. **Configuration and Credentials**: - Ensure `Config` in `config.py` contains necessary settings like API endpoints and database connection strings. - Implement `get_service_principal_credentials` in `credentials.py` to fetch credentials from GitHub Secrets. 5. **Testing and Documentation**: - Write unit tests for each function. - Document the ETL process in `documentation.md`. By following this plan, you will create a robust ETL pipeline that aligns with the updated SportsRadar API documentation and ensures data is correctly extracted, transformed, and loaded into your data warehouse.
Changes on backend-container/src/etl/extract.py To implement the data extraction logic in the `backend-container/src/etl/extract.py` file, follow these steps: 1. **Import Necessary Libraries**: Import libraries that will help in making HTTP requests and handling data, such as `requests` and `pandas`. 2. **Define Configuration Settings**: Import configuration settings from `config.py` to get API endpoints, authentication details, and other necessary parameters. 3. **Create Extraction Functions**: Implement functions to extract data from the data providers. These functions should handle API requests, error checking, and data parsing. 4. **Handle Authentication**: If the API requires authentication, ensure that the necessary headers or tokens are included in the requests. 5. **Return Extracted Data**: Ensure that the functions return the extracted data in a format suitable for further transformation. Here is a concrete implementation plan for the `extract.py` file: ### Step-by-Step Implementation 1. **Import Libraries**: ```python import requests import pandas as pd from .config import API_ENDPOINT, API_KEY ``` 2. **Define Extraction Function**: ```python def extract_data(endpoint): """ Extract data from the given API endpoint. Args: endpoint (str): The API endpoint to extract data from. Returns: pd.DataFrame: Extracted data in a DataFrame format. """ headers = { 'Authorization': f'Bearer {API_KEY}' } response = requests.get(endpoint, headers=headers) if response.status_code == 200: data = response.json() return pd.DataFrame(data) else: response.raise_for_status() ``` 3. **Example Usage**: ```python def extract_nfl_data(): """ Extract NFL data from the configured API endpoint. Returns: pd.DataFrame: Extracted NFL data. """ nfl_endpoint = f"{API_ENDPOINT}/nfl" return extract_data(nfl_endpoint) ``` ### Full Implementation ```python import requests import pandas as pd from .config import API_ENDPOINT, API_KEY def extract_data(endpoint): """ Extract data from the given API endpoint. Args: endpoint (str): The API endpoint to extract data from. Returns: pd.DataFrame: Extracted data in a DataFrame format. """ headers = { 'Authorization': f'Bearer {API_KEY}' } response = requests.get(endpoint, headers=headers) if response.status_code == 200: data = response.json() return pd.DataFrame(data) else: response.raise_for_status() def extract_nfl_data(): """ Extract NFL data from the configured API endpoint. Returns: pd.DataFrame: Extracted NFL data. """ nfl_endpoint = f"{API_ENDPOINT}/nfl" return extract_data(nfl_endpoint) ``` ### Explanation - **Imports**: The necessary libraries (`requests` for HTTP requests and `pandas` for data handling) are imported. Configuration settings (`API_ENDPOINT` and `API_KEY`) are imported from `config.py`. - **extract_data Function**: This function takes an API endpoint as an argument, makes a GET request to the endpoint, and returns the data as a pandas DataFrame. It includes error handling to raise an exception if the request fails. - **extract_nfl_data Function**: This function is a specific implementation for extracting NFL data using the `extract_data` function. This implementation provides a robust foundation for extracting data from various endpoints, which can be extended as needed for other data providers.
Changes on backend-container/src/etl/credentials.py To implement the logic for retrieving and using ETL service principal credentials from GitHub Secrets in the `backend-container/src/etl/credentials.py` file, follow these steps: 1. **Install Required Libraries**: - Ensure you have the `requests` library installed to interact with GitHub Secrets. - Ensure you have the `azure-identity` and `azure-keyvault-secrets` libraries installed to manage Azure credentials. 2. **Retrieve GitHub Secrets**: - Use GitHub Actions to set up the secrets and make them available to your script. 3. **Use Azure SDK to Authenticate**: - Use the Azure SDK to authenticate using the service principal credentials retrieved from GitHub Secrets. Here is a concrete implementation plan for the `credentials.py` file: ### Step-by-Step Implementation 1. **Install Required Libraries**: ```sh pip install requests azure-identity azure-keyvault-secrets ``` 2. **Implement the `credentials.py` File**: ```python import os from azure.identity import ClientSecretCredential from azure.keyvault.secrets import SecretClient def get_github_secret(secret_name): """ Retrieve a secret from GitHub Actions environment variables. """ secret_value = os.getenv(secret_name) if not secret_value: raise ValueError(f"GitHub Secret {secret_name} not found in environment variables.") return secret_value def get_azure_credentials(): """ Retrieve Azure credentials using service principal details stored in GitHub Secrets. """ client_id = get_github_secret('ETL_SERVICE_PRINCIPAL_ID') client_secret = get_github_secret('ETL_SERVICE_PRINCIPAL_SECRET') tenant_id = get_github_secret('AZURE_TENANT_ID') # Ensure this secret is also set in GitHub Secrets credential = ClientSecretCredential( tenant_id=tenant_id, client_id=client_id, client_secret=client_secret ) return credential def get_secret_from_key_vault(vault_url, secret_name): """ Retrieve a secret from Azure Key Vault. """ credential = get_azure_credentials() client = SecretClient(vault_url=vault_url, credential=credential) secret = client.get_secret(secret_name) return secret.value # Example usage: if __name__ == "__main__": try: # Replace with your Key Vault URL and secret name key_vault_url = "https://.vault.azure.net/" secret_name = "your-secret-name" secret_value = get_secret_from_key_vault(key_vault_url, secret_name) print(f"Retrieved secret: {secret_value}") except Exception as e: print(f"Error retrieving secret: {e}") ``` ### Explanation 1. **Environment Variables**: - `get_github_secret(secret_name)`: Retrieves secrets from the environment variables set by GitHub Actions. 2. **Azure Credentials**: - `get_azure_credentials()`: Uses the service principal credentials to authenticate with Azure. 3. **Key Vault Secret Retrieval**: - `get_secret_from_key_vault(vault_url, secret_name)`: Retrieves a secret from Azure Key Vault using the authenticated credentials. ### GitHub Actions Configuration Ensure that the following secrets are set in your GitHub repository settings: - `ETL_SERVICE_PRINCIPAL_ID` - `ETL_SERVICE_PRINCIPAL_SECRET` - `AZURE_TENANT_ID` ### Conclusion This implementation will allow your ETL processes to securely retrieve and use service principal credentials stored in GitHub Secrets, facilitating secure access to Azure resources.
Changes on backend-container/src/etl/load.py To implement the data loading logic in the `backend-container/src/etl/load.py` file, follow these steps: 1. **Import Necessary Libraries**: - Import libraries for database connection and handling, such as `pyodbc` or `sqlalchemy` for Azure SQL Database. - Import any other necessary libraries for logging and configuration. 2. **Define Configuration Settings**: - Ensure that the configuration settings for the database connection are defined and accessible. These settings might be stored in a separate configuration file (`config.py`). 3. **Implement the Load Function**: - Create a function that connects to the Azure SQL Database. - Implement logic to load the transformed data into the database. - Ensure proper error handling and logging. 4. **Close Database Connection**: - Ensure that the database connection is properly closed after the data is loaded. Here is a concrete implementation proposal for the `load.py` file: ```python import pyodbc import logging from config import DATABASE_CONFIG # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def load_data_to_azure_sql(data): """ Load transformed data into Azure SQL Database. Parameters: data (DataFrame): Transformed data to be loaded into the database. """ try: # Establish a database connection conn = pyodbc.connect( f"DRIVER={{ODBC Driver 17 for SQL Server}};" f"SERVER={DATABASE_CONFIG['server']};" f"DATABASE={DATABASE_CONFIG['database']};" f"UID={DATABASE_CONFIG['username']};" f"PWD={DATABASE_CONFIG['password']}" ) cursor = conn.cursor() # Example: Insert data into a table for index, row in data.iterrows(): cursor.execute( "INSERT INTO your_table_name (column1, column2, column3) VALUES (?, ?, ?)", row['column1'], row['column2'], row['column3'] ) # Commit the transaction conn.commit() logger.info("Data loaded successfully into Azure SQL Database.") except Exception as e: logger.error(f"Error loading data into Azure SQL Database: {e}") finally: # Close the database connection if conn: conn.close() logger.info("Database connection closed.") # Example usage if __name__ == "__main__": # Assuming `transformed_data` is a DataFrame containing the transformed data transformed_data = ... # Replace with actual transformed data load_data_to_azure_sql(transformed_data) ``` ### Explanation: 1. **Import Libraries**: The script imports `pyodbc` for database connection and `logging` for logging purposes. 2. **Configuration Settings**: The database configuration settings are imported from a `config.py` file. 3. **Load Function**: The `load_data_to_azure_sql` function connects to the Azure SQL Database, inserts the transformed data into a specified table, and handles any errors that occur during the process. 4. **Close Connection**: The database connection is closed in the `finally` block to ensure it is always closed, even if an error occurs. This implementation provides a basic structure for loading data into an Azure SQL Database. You may need to adjust the table name, column names, and data types based on your specific requirements.
Changes on backend-container/src/etl/transformations.py To implement the data transformation logic in the `backend-container/src/etl/transformations.py` file, follow these steps: 1. **Define the Transformation Functions**: Create functions that will handle the transformation of raw data into a format suitable for analysis and loading into the Azure SQL Database. 2. **Implement Data Cleaning**: Include steps to clean the data, such as handling missing values, correcting data types, and removing duplicates. 3. **Implement Data Enrichment**: Add any necessary enrichment steps, such as calculating new fields or merging data from multiple sources. 4. **Unit Testing**: Ensure that each transformation function is tested to verify its correctness. Here is a concrete implementation plan for the `transformations.py` file: ### Step-by-Step Implementation 1. **Define the Transformation Functions**: - Create a function for each transformation step. - Use Python's pandas library for data manipulation. 2. **Implement Data Cleaning**: - Handle missing values. - Correct data types. - Remove duplicates. 3. **Implement Data Enrichment**: - Calculate new fields. - Merge data from multiple sources if necessary. 4. **Unit Testing**: - Write unit tests for each transformation function. ### Example Code ```python import pandas as pd def clean_data(df: pd.DataFrame) -> pd.DataFrame: """ Clean the raw data. - Handle missing values. - Correct data types. - Remove duplicates. """ # Example: Fill missing values df.fillna(method='ffill', inplace=True) # Example: Correct data types df['date'] = pd.to_datetime(df['date']) # Example: Remove duplicates df.drop_duplicates(inplace=True) return df def enrich_data(df: pd.DataFrame) -> pd.DataFrame: """ Enrich the data. - Calculate new fields. - Merge data from multiple sources if necessary. """ # Example: Calculate a new field df['new_field'] = df['existing_field'] * 2 return df def transform_data(raw_data: pd.DataFrame) -> pd.DataFrame: """ Transform the raw data into a suitable format for analysis. """ # Step 1: Clean the data cleaned_data = clean_data(raw_data) # Step 2: Enrich the data enriched_data = enrich_data(cleaned_data) return enriched_data # Example usage if __name__ == "__main__": # Load raw data (this would be replaced with actual data loading logic) raw_data = pd.DataFrame({ 'date': ['2021-01-01', '2021-01-02', None], 'existing_field': [10, 20, 30] }) # Transform the data transformed_data = transform_data(raw_data) # Print the transformed data print(transformed_data) ``` ### Explanation 1. **clean_data**: This function handles missing values, corrects data types, and removes duplicates. 2. **enrich_data**: This function calculates new fields and merges data if necessary. 3. **transform_data**: This function orchestrates the transformation process by calling `clean_data` and `enrich_data`. ### Unit Testing To ensure the correctness of the transformation logic, write unit tests using a testing framework like `pytest`. ```python import pytest import pandas as pd from transformations import clean_data, enrich_data, transform_data def test_clean_data(): raw_data = pd.DataFrame({ 'date': ['2021-01-01', '2021-01-02', None], 'existing_field': [10, 20, 30] }) cleaned_data = clean_data(raw_data) assert cleaned_data.isnull().sum().sum() == 0 assert cleaned_data.duplicated().sum() == 0 def test_enrich_data(): raw_data = pd.DataFrame({ 'existing_field': [10, 20, 30] }) enriched_data = enrich_data(raw_data) assert 'new_field' in enriched_data.columns assert (enriched_data['new_field'] == raw_data['existing_field'] * 2).all() def test_transform_data(): raw_data = pd.DataFrame({ 'date': ['2021-01-01', '2021-01-02', None], 'existing_field': [10, 20, 30] }) transformed_data = transform_data(raw_data) assert transformed_data.isnull().sum().sum() == 0 assert 'new_field' in transformed_data.columns if __name__ == "__main__": pytest.main() ``` This plan and example code should help you implement the data transformation logic in the `transformations.py` file effectively.
Changes on backend-container/src/etl/documentation.md To address the task for the `backend-container/src/etl/documentation.md` file, you need to create comprehensive documentation for the ETL processes. This documentation should include: 1. **ETL Process Diagrams**: Visual representations of the ETL workflow, showing how data moves from extraction to transformation and finally to loading. 2. **Data Flow Charts**: Detailed charts that illustrate the flow of data through different stages of the ETL process. 3. **Example ETL Scripts**: Sample scripts that demonstrate how to implement the ETL processes. 4. **Configurations**: Information on configuration settings required for the ETL processes. 5. **Credentials Management**: Instructions on how to store and retrieve ETL service principal credentials from GitHub Secrets. Here is a concrete proposal for the content of `documentation.md`: ### ETL Process Documentation #### 1. ETL Process Diagrams - **Overview Diagram**: A high-level diagram showing the overall ETL process. - **Detailed Diagrams**: Diagrams for each stage (Extract, Transform, Load) showing the specific steps involved. #### 2. Data Flow Charts - **Extraction Flow**: Chart showing how data is extracted from data providers. - **Transformation Flow**: Chart showing how data is transformed into suitable formats for analysis. - **Loading Flow**: Chart showing how data is loaded into the Azure SQL Database. #### 3. Example ETL Scripts - **Extraction Script**: Example script for extracting data from a data provider. - **Transformation Script**: Example script for transforming data. - **Loading Script**: Example script for loading data into Azure SQL Database. #### 4. Configurations - **ETL Configuration Settings**: Detailed explanation of configuration settings required for the ETL processes, including file paths, database connections, and other necessary parameters. #### 5. Credentials Management - **Storing Credentials**: Instructions on how to store ETL service principal credentials (`ETL_SERVICE_PRINCIPAL_ID`, `ETL_SERVICE_PRINCIPAL_SECRET`) in GitHub Secrets. - **Retrieving Credentials**: Example code snippets showing how to retrieve and use these credentials in your ETL scripts. ### Example Content for `documentation.md` ```markdown # ETL Process Documentation ## 1. ETL Process Diagrams ### Overview Diagram ![ETL Overview](images/etl_overview.png) ### Detailed Diagrams #### Extraction ![Extraction Process](images/extraction_process.png) #### Transformation ![Transformation Process](images/transformation_process.png) #### Loading ![Loading Process](images/loading_process.png) ## 2. Data Flow Charts ### Extraction Flow ```mermaid graph TD; A[Data Provider] --> B[Extract Data]; B --> C[Raw Data Storage]; ``` ### Transformation Flow ```mermaid graph TD; C[Raw Data Storage] --> D[Transform Data]; D --> E[Transformed Data Storage]; ``` ### Loading Flow ```mermaid graph TD; E[Transformed Data Storage] --> F[Load Data]; F --> G[Azure SQL Database]; ``` ## 3. Example ETL Scripts ### Extraction Script ```python # extract.py import requests def extract_data(api_url): response = requests.get(api_url) data = response.json() return data ``` ### Transformation Script ```python # transformations.py def transform_data(raw_data): transformed_data = [process_record(record) for record in raw_data] return transformed_data def process_record(record): # Transformation logic here return transformed_record ``` ### Loading Script ```python # load.py import pyodbc def load_data(transformed_data, connection_string): conn = pyodbc.connect(connection_string) cursor = conn.cursor() for record in transformed_data: cursor.execute("INSERT INTO table_name (columns) VALUES (values)", record) conn.commit() conn.close() ``` ## 4. Configurations ### ETL Configuration Settings - **API_URL**: URL of the data provider API. - **DB_CONNECTION_STRING**: Connection string for Azure SQL Database. - **RAW_DATA_PATH**: Path to store raw data. - **TRANSFORMED_DATA_PATH**: Path to store transformed data. ## 5. Credentials Management ### Storing Credentials Store the following secrets in GitHub Secrets: - `ETL_SERVICE_PRINCIPAL_ID` - `ETL_SERVICE_PRINCIPAL_SECRET` ### Retrieving Credentials ```python # credentials.py import os ETL_SERVICE_PRINCIPAL_ID = os.getenv('ETL_SERVICE_PRINCIPAL_ID') ETL_SERVICE_PRINCIPAL_SECRET = os.getenv('ETL_SERVICE_PRINCIPAL_SECRET') ``` ``` This documentation will provide a clear and comprehensive guide for implementing and understanding the ETL processes.