Point72 / csp

csp is a high performance reactive stream processing library, written in C++ and Python
https://github.com/Point72/csp/wiki
Apache License 2.0
160 stars 28 forks source link

`CSV` adapter difficult to use and not very flexible #259

Open timkpaine opened 1 month ago

timkpaine commented 1 month ago

Discovered during the hackathon, the current CSV adapter is not great. It was difficult to map multiple columns as datetimes, it basically presupposes a "symbol column", and it doesnt allow for returning just e.g. a dict of values in the row. Here is a naive alternative built for the hackathon to read Citibike historical CSV data:

import csv as pycsv
from datetime import datetime

from csp import ts
from csp.impl.pulladapter import PullInputAdapter
from csp.impl.wiring import py_pull_adapter_def

class CSVAdapterImpl(PullInputAdapter):
    def __init__(self, filename: str, datetime_columns: list = None):
        if not datetime_columns:
            raise Exception("Must provide at least one datetime column")
        self._filename = filename
        self._datetime_columns = datetime_columns
        self._csv_reader = None
        self._first_row = None
        super().__init__()

    def start(self, starttime, endtime):
        super().start(starttime, endtime)
        self._csv_reader = pycsv.DictReader(open(self._filename, "r"))

        # fast forward to first record
        while True:
            try:
                row = next(self._csv_reader)
                time = datetime.strptime(
                    row[self._datetime_columns[0]], "%Y-%m-%d %H:%M:%S"
                )

                if time < starttime:
                    continue

                for dtc in self._datetime_columns:
                    row[dtc] = datetime.strptime(row[dtc], "%Y-%m-%d %H:%M:%S")
                self._first_row = row
                break

            except StopIteration:
                return

    def stop(self):
        self._csv_reader = None

    def next(self):
        if self._first_row is not None:
            ret = self._first_row[self._datetime_columns[0]], self._first_row
            self._first_row = None
        try:
            row = next(self._csv_reader)
            time = datetime.strptime(
                row[self._datetime_columns[0]], "%Y-%m-%d %H:%M:%S"
            )
            for dtc in self._datetime_columns:
                row[dtc] = datetime.strptime(row[dtc], "%Y-%m-%d %H:%M:%S")
            return time, row
        except StopIteration:
            return None

CSVAdapter = py_pull_adapter_def(
    "CSVAdapter", CSVAdapterImpl, ts[dict], filename=str, datetime_columns=list
)
robambalu commented 1 month ago

CSVReader should be completely re-implemented with an efficient c++ impl