bmoscon / cryptofeed

Cryptocurrency Exchange Websocket Data Feed Handler
Other
2.19k stars 679 forks source link

Upsert functionality for kline stream to store only closed candles for many exchanges where candle closed boolean is not available #904

Open koradiyakaushal opened 1 year ago

koradiyakaushal commented 1 year ago

Is your feature request related to a problem? Please describe. Instead of the Insert statement for Postgres backend for kline stream, where is_closed param is not available in exchange(ex: kucoin) response, to use upsert statement to update the low, high, close, volume data in the database, to avoid duplicate candles

Describe the solution you'd like Upsert could be a solution for that solution, I've made a rough solution, like below using custom columns

CREATE TABLE IF NOT EXISTS ohlcv (id serial PRIMARY KEY, timestamp TIMESTAMP, receipt TIMESTAMP, exchange VARCHAR(32), symbol VARCHAR(32), datetime TIMESTAMP, stop TIMESTAMP, interval VARCHAR(4), open NUMERIC(64, 8), close NUMERIC(64, 8), high NUMERIC(64, 8), low NUMERIC(64, 8), volume NUMERIC(64, 8), unique(symbol, exchange, interval, datetime));
self.insert_statement = f"INSERT INTO {self.table} ({','.join([v for v in self.custom_columns.values()])}) VALUES " if custom_columns else None
self.upsert_statement = " ON CONFLICT (symbol, datetime) DO UPDATE SET volume = excluded.volume, high = excluded.high, low = excluded.low, close = excluded.close;"

in write_batch function use it like await self.conn.execute(self.insert_statement + args_str + self.upsert_statement)

There are some issues with these upsert solutions like batch insert/batch upsert

asyncpg.exceptions.CardinalityViolationError: ON CONFLICT DO UPDATE command cannot affect row a second time
HINT:  Ensure that no rows proposed for insertion within the same command have duplicate constrained values.

what will be the standard way to solve this issue?

koradiyakaushal commented 1 year ago

I've found a solution and will share it in a few days, used the data frame to remove duplicates by (candle_start_time and symbol) and keep last, and used that as the final list.

PeetCrypto commented 1 year ago

@koradiyakaushal Does your solutions work? I'm curious for your solutions

koradiyakaushal commented 1 year ago

@PeetCrypto I will share solution tomorrow, but few pointers on how it works, I use it for Postgres backend, instead of batch insert, I use batch upsert

it still has one issue as given in the above issue description solved using dataframe to remove duplicate open candle time and symbol subsets then used dataframe to batch upsert.