wj-turner / OmegaFin

This open-source project is designed for gathering, processing, and analyzing financial data.
5 stars 2 forks source link

App Architecture #8

Open wj-turner opened 1 year ago

wj-turner commented 1 year ago

Data Ingestion Module: This module is responsible for fetching raw financial data from various sources. Data Cleaning & Transformation Module: This module handles the process of cleaning raw data. Data Storage Module: This module ensures cleaned data is stored efficiently. Data Export Module: This module facilitates the export of cleaned data to various formats. Data Visualization Module : This module offers visualization capabilities for the cleaned data.

wj-turner commented 1 year ago
/forex-trading-app
   /modules
      /data-collection
         /api (FastAPI routes for this module)
         /models
         /services
            /kafka
            /quickfix
            /kafka-connect
      /data-cleaning
         /api (FastAPI routes for this module)
         /models
         /services
            /airflow
            /redis
      /data-storage
         /api (FastAPI routes for this module)
         /models
         /services
            /timescaledb

   /shared-libraries
      /common-models
      /utility-functions

   /frontend
      /web-app
      /admin-dashboard

   /docker
      /kafka
      /redis
      /airflow
      /timescaledb
      ...

   /core
      /config (For global configurations, .env handling, etc.)
      /db (Database connection setups, ORM configurations)
      /middleware (Any middlewares that the FastAPI app might use globally)

   /docs

   /scripts
      /build
      /deploy
      /test

   docker-compose.yml
   README.md
   main.py (Your FastAPI app's entry point)
wj-turner commented 1 year ago

1. Data Collection:

Historical Data:

Live Data:

2. Buffering & Intermediate Processing:

3. Data Cleaning and Transformation:

4. Storage:

5. Monitoring, Scheduling, and Orchestration:

Summary:

  1. Data Collection:

    • Apache NiFi or Kafka Connect for extracting data from sources and pushing to Kafka.
    • Kafka as the initial messaging queue for ingested data.
  2. Buffering & Intermediate Storage:

    • Kafka to buffer incoming data.
    • Redis for fast-access storage and potential reprocessing.
  3. Data Cleaning & Transformation:

    • Apache Airflow to orchestrate the ETL tasks.
    • Pandas for data cleaning and transformations.
  4. Long-Term Storage:

    • TimescaleDB for storing the cleaned and structured data.
  5. Monitoring, Scheduling, & Orchestration:

    • Apache Airflow for scheduling, error alerts, and visual monitoring.

Implementation Tips:

  1. Start by setting up your data sources and ensuring they can be read by NiFi or Kafka Connect.
  2. Get Kafka running and ensure data flows correctly from your sources into Kafka.
  3. Set up Redis and the Kafka consumers to populate it.
  4. Install and configure Airflow. Begin by creating simple DAGs and gradually integrate Pandas-based cleaning tasks.
  5. Finally, integrate TimescaleDB and ensure cleaned data is stored appropriately.

Remember to have a development or staging environment to test out configurations and data flows before deploying to a production environment. This will help ensure data accuracy and system robustness.

wj-turner commented 1 year ago

my_project/ │ ├── api/ │ ├── spec/ # API specifications (Swagger/OpenAPI or RAML files) │ │ └── v1.yaml # Versioned API spec │ ├── mock/ # Mock server scripts and configurations │ └── src/ # FastAPI application code │ ├── main.py # Application entry point │ ├── routers/ # Endpoint routes │ ├── models/ # Pydantic models for request and response validation │ ├── services/ # Business logic (e.g., fetching from TimescaleDB) │ └── dependencies/ # Dependency utilities for FastAPI (e.g., DB session) │ ├── connectors/ │ ├── kafka_connect/ │ └── quickfix/ # QuickFIX related configurations and scripts │ ├── processing/ │ ├── airflow_dags/ # Apache Airflow DAGs │ ├── scripts/ # Data cleaning/transformation scripts (utilizing Pandas) │ └── tests/ # Tests for your processing scripts │ ├── db/ │ ├── migrations/ # DB migration scripts │ ├── models/ # ORM models (if you're using an ORM with TimescaleDB) │ └── init_scripts/ # Initialization scripts for TimescaleDB │ ├── redis/ │ └── config/ # Redis configuration files │ ├── docker/ │ ├── db/ # Docker setup for TimescaleDB │ ├── kafka/ # Docker setup for Kafka │ ├── redis/ # Docker setup for Redis │ ├── airflow/ # Docker setup for Apache Airflow │ ├── api/ # Docker setup for FastAPI application │ └── docker-compose.yml # Orchestrate all containers │ ├── .gitignore ├── README.md └── ...

wj-turner commented 1 year ago

my_project/ │ ├── api/ # RESTful API and related components │ ├── spec/ # API specifications and documentation │ │ └── v1.yaml # Versioned API specification, typically in OpenAPI format │ ├── mock/ # Mock servers and test responses for API endpoints │ └── src/ # Source code for the API │ ├── main.py # Main entry point for the FastAPI application │ ├── routers/ # Endpoint routing configurations │ ├── models/ # Pydantic models for data validation │ ├── services/ # Business logic for endpoint actions │ └── dependencies/ # FastAPI dependency utilities │ ├── tests/ # Unit tests for the API │ │ ├── unit/
│ │ └── integration/ │ ├── connectors/ # Code and configs related to external connectors │ ├── kafka_connect/ │ │ ├── config/ # Kafka Connect configuration files │ │ └── logs/ # Logs related to Kafka Connect │ ├── quickfix/
│ │ ├── config/ # Configuration for QuickFIX │ │ ├── sessions/ # Session-related data for QuickFIX │ │ └── logs/ # Logs related to QuickFIX operations │ └── fxcm_api/ │ ├── config/ # Configuration files for FXCM API integration │ └── logs/ # Logs related to FXCM API operations │ ├── modules/
│ ├── processing/
│ │ ├── feature_engineering/ │ │ │ ├── scripts/ # Scripts for feature extraction and engineering │ │ │ └── tests/ # Tests for the feature engineering scripts │ │ │ ├── unit/ # Unit tests │ │ │ └── integration/ # Integration tests │ │ ├── statistics/ │ │ │ ├── scripts/ # Scripts for statistical analysis │ │ │ └── tests/ │ │ │ ├── unit/ │ │ │ └── integration/ │ │ ├── data_cleaning/ │ │ │ ├── scripts/ # Scripts specifically for cleaning and pre-processing │ │ │ └── tests/ │ │ │ ├── unit/ │ │ │ └── integration/ │ │ └── airflow_dags/
│ │ ├── dags/ # Definitions of the workflows for Airflow │ │ └── logs/ # Logs related to Airflow operations │ │ │ ├── algorithms/
│ │ ├── backtesting/ │ │ │ ├── strategies/ # Trading strategies to be backtested │ │ │ ├── data/ # Historical data for backtesting │ │ │ └── results/ # Results and performance metrics from backtesting │ │ ├── live/ │ │ │ ├── strategies/ # Live trading strategies │ │ │ └── logs/ # Logs related to live trading operations │ │ ├── utils/ # Utility scripts and tools for trading │ │ └── tests/ │ │ ├── unit/ │ │ └── integration/ │ │ │ ├── visualizations/
│ │ ├── charts/ │ │ │ ├── scripts/ # Scripts to generate charts │ │ │ ├── assets/ # Static assets (e.g., images, styles) for charts │ │ │ └── tests/ │ │ │ ├── unit/ │ │ │ └── integration/ │ │ ├── graphs/ │ │ │ ├── scripts/ # Scripts to generate graphs │ │ │ ├── assets/ │ │ │ └── tests/ │ │ │ ├── unit/ │ │ │ └── integration/ │ │ └── reports/ # Generated analysis reports and summaries │ │ │ └── frontend/
│ ├── components/ # Reusable UI components │ ├── views/ # Page views and layouts │ ├── assets/ # Static assets for the frontend │ └── tests/ # Frontend specific tests │ ├── unit/ # Unit tests for components │ └── e2e/ # End-to-end tests (e.g., using tools like Cypress) │ ├── db/ │ ├── migrations/ # Database migration scripts │ ├── models/ # ORM models for your database │ ├── init_scripts/ # Initialization scripts for setting up the database │ └── logs/ # Logs related to database operations │ ├── redis/ │ ├── config/ # Redis configuration files │ └── logs/ # Logs related to Redis operations │ ├── docker/ │ ├── db/
│ ├── kafka/
│ ├── redis/
│ ├── airflow/
│ ├── api/
│ └── docker-compose.yml
│ ├── docs/ # Documentation for the entire project ├── .gitignore ├── README.md