Closed eprbell closed 2 years ago
Ill can take a stab at this. When you say abstract, your still leaving this plugin the ability to be subclassed right? Cause while ccxt does a good job generalizing a lot of exchanges, not everything can 100% be guaranteed. For example trade history papigation might need to be changed on kucoin, which is just a few extra attributes on the ccxt config, or the REST api versions need to switch to get older data etc.
So how I imagine this plugin would work is either as abstract class or the actual class. For example wed write a exchange specific subclass of it that we know 100% will work, and we make a list saying these are the supported exchanges. Then we say that if your exchange doesn't have a exchange specific implementation of ccxt, the fall back is this, however there is no guarantee on our end that RP2-dali will work on this exchange, however it is quite possible it will.
That's great, thanks for taking on this issue! Yes, by abstract I mean that this class (let's call it AbstractCCXTPlugin) is not meant to be instantiated directly: it's only useful as a superclass. Some of its methods may be left unimplemented (but defined and just raising a NYI exception): they will be filled in the subclasses. The way I envision this is there will be one subclass of AbstractCCXTPlugin for each CCXT-based exchange we want to support. The subclass will have to fill in empty methods and it's also free to override superclass methods, if it wants to (perhaps because there is an exchange-specific implementation that is faster/better).
I don't think we should have only one CCXT plugin that supports everything, because as you say there are no guarantees it will work everywhere. Better to have a new subclass for every exchange, which can be developed, tested and supported individually.
Also I wanted to ping @macanudo527 about this, because he's starting to work on a CCXT-based binance.com plugin (see #4), so I wanted to connect you guys since these two issues are overlapping (I know you already connected on #7).
Ok cool gotcha. Yea what I am meaning is that this would serve a superclass, but this class would also have each of its methods implemented, because in quite a few cases this "just might work" with no changes on alot of exchanges. For example my ohlcv downloader so far works on 5 exchanges that I have tested and uses purely ccxt, while when downloading trades I had to make a subclass for kucoin.
Yep, that's great: if the abstract superclass has an implementation for every method, even better. I have a question on CCXT: do you know how they support exchange-specific transactions? Do they capture them within the generic CCXT API or do they have exchange-specific APIs?
Hmm can you give an example of "exchange-specific transactions"? What I have seen is in the response data dictionary there are generic attributes and then usually they stuff all exchange specific stuff in a sub dictionary. So that sub data is usually the same, if not all the data in the generic response, but you can still retrieve them by the exchange specific attribute names.
I was thinking of something like Coinbase inflation reward, Coinbase Earn (their reward program) or Binance dust trades. An exchange-specific dictionary in the response sounds good: that way subclasses are free to dig in there, if needed.
Ah gotcha, yea so those should be listed, however you would probably need to parse the dict response in your subclass and add extra logic handle cases like that.
@jamesbaber1 I have forked the project and begun work on a Binance specific plugin that makes use of CCXT. The idea is to start with that and extract what we need into an abstract class. It has been a little difficult since we will have to cut out a lot of things. I don't know if you want to wait until I have something more fleshed out or not. I might be a little slow.
I think it's probably OK to make progress in parallel: since we're experimenting with CCXT, it's fine to try both things together and learn as much as possible. We can consolidate the work at the end.
@macanudo527 Ok gotcha. Im in the US so as of 8/2021 I lost API access to binance, so I wrote a CSV parser for it. The CSV parser relies alot on price history since the way the price history table for binance is structured really sucks cause the quote currency isnt listed in the transaction. I was able to implement csv parsers fairly easily with very little reliance on price history for gateio
kucoin
and houbi
tho.
Currently the only REST implementation I have for fetching trades is on kucoin
.
However I have an spent a bit of time on the ohclv downloader and that is working for binance
, gateio
, kucoin
, houbi
, kraken
, coinbasepro
which doesn't require auth. So maybe if your wanting to focus on something, maybe prioritize the trade downloader, since I feel that the ohclv downloading and caching logic I have written seems to be in a decent state. In anycase I wont be writting a trade downloader for binance since I cant test.
Here is at least what the kucoin trade downloader looks like
from datetime import datetime
import calendar
import pytz
import math
import logging
from rp2_compiler.base import BaseTool
logger = logging.getLogger(__package__)
logger.setLevel(level=logging.INFO)
class DownloadTrades(BaseTool):
def __init__(self, exchange_id, config_file_path):
super(DownloadTrades, self).__init__(config_file_path)
self.exchange_id = exchange_id
self.exchanges = self.get_exchanges_by_type(
exchange_id,
params={
'options': {'fetchMyTradesMethod': 'private_get_hist_orders'}
}
)
self.trades = []
self.page_size = 500
def fetch_all_trades_for_week(self, since, current_page=1):
for exchange in self.exchanges:
trades = exchange.fetch_my_trades(
since=since,
params={
'currentPage': current_page,
'pageSize': self.page_size
})
logger.info(f'Found {len(trades)} trades on page {current_page}')
self.trades.extend(trades)
# time.sleep(0.5)
if len(trades) == self.page_size:
self.fetch_all_trades_for_week(since, current_page + 1)
def fetch_all_trades_since(self, start_date):
end_date = datetime.now(tz=pytz.utc)
unix_current_time = calendar.timegm(end_date.utctimetuple())
weeks_back = math.ceil((end_date - start_date).days / 7) + 1
week_in_seconds = 60 * 60 * 24 * 7
for week_number in range(weeks_back):
since = (unix_current_time - (week_number * week_in_seconds)) * 1000
timestamp = self.to_time_string(since)
logger.info(f'Fetching {self.exchange_id} trades around week {timestamp}')
self.fetch_all_trades_for_week(since)
def get_trades_for_year(self, year):
self.fetch_all_trades_since(datetime(year=year, month=1, day=1, tzinfo=pytz.utc))
if __name__ == '__main__':
trade_downloader = DownloadTrades('kucoin', config_file_path='config.yml')
trade_downloader.get_trades_for_year(2021)
Still a WIP and I need to clean it up, but this is what I did to at least get it working
@jamesbaber1 Were you able to make progress on this abstract CCXT plugin? I'm about finished with the Binance.com plugin that makes use of CCXT. I was thinking of getting started on abstracting the CCXT components, and then using this abstract plugin for Binance.US.
If you are too busy, that is okay I can get started on it and push it through since I have some experience working with it. I just want to double check we are not doing double work.
Hey @macanudo527 sorry been swamped lately. I will post a link to my repo when I am back in a few days so you can see the CCXT downloader I have for price data. What I posted above is the trade downloader with CCXT but has some special params to make the kucoin pagination play nice, but I think there will need to be some base pagination implementation for trades regardless. You can copy or repurpose it how you see fit
OK, once the Binance.com CCXT-based plugin gets rolled through, I'm going to start work on abstracting out all the common elements that can be reused by all exchanges.
I'd like some input on the architecture of this.
It seems like every exchange has 4 sections to pull from:
CCXT has unified functions for the following:
However, each exchange has different pagination methods. And I just discovered that CCXT has 3 ways to do pagination. The pagination method will have to be declared in the inherited class along with exchange-specific variables needed to make each of them function.
And then, there are numerous implicit functions for each of the exchanges that allow us to pull information available on other endpoints outside of the unified API. For example, Binance.com has its dividend and mining endpoints that are accessed this way.
Note that CCXT does not have unified functions for gains and so they will have to be implemented with the implicit API.
The abstract class will encase these unified functions in a function that takes the pagination method plus any exchange-specific parameters that are needed.
Implicit functions that are needed will still have to be written and have custom pagination.
I think I can mostly recycle the single transaction processing I built for Binance.com. It will need to be made a little more general-purpose, though.
Unfortunately, I don't think there is a way to just drop in a CCXT exchange class and have it work 'out-of-the-box', we will still need to know the pagination method an exchange uses. Although, we can assume it is time-based since that's what CCXT implies is the most common from their documentation.
I think you're already pretty much on the right track. Here are some thoughts I had about this:
load()
method is defined in the abstract class and it is similar to the one you already have in the binance.com plugin (perhaps slightly generalized, as you say). The goal is that subclasses don't have to override this method._get_pagination_style()
):
NotImplementedError()
in the abstract superclass, but it can be called in the superclass code;_process_deposits
, _process_withdrawals
, _process_trades
can have the existing implementation (or a generalization of it);_process_gains
will throw a NotImplementedError()
.super
method and then adding its own extra code (or in the case of _process_gains
it'll have to provide full implementation)._process_deposits
, _process_withdrawals
, _process_trades
(see the Coinbase pool as an example). This is not a blocker for MVP, but it's good to keep it on our radar.BTW, just curious, did CCXT mention a reason why gains are not part of the uniform API?
A further clarification on the last bullet (multi-thread-friendly code): the signatures of _process_deposits
, _process_withdrawals
, _process_trades
and _process_gains
would have to be modified to return results, rather than modify passed-in lists. For example, the functions could return results in a class like this:
class _ProcessAccountResult(NamedTuple):
in_transactions: List[InTransaction]
out_transactions: List[OutTransaction]
intra_transactions: List[IntraTransaction]
Each of these results gets joined after the threads have finished their processing (see example here). Even if we don't add multi-threading support right away it's good to make the signatures multi-thread-friendly.
- pagination style can be captured with a protected abstract method (something like
_get_pagination_style()
):
So, this function takes a global like _WITHDRAWALS
and returns the pagination style plus limit and possibly exchange-specific parameter name(s). Maybe return it as a Dict
?
BTW, just curious, did CCXT mention a reason why gains are not part of the uniform API?
They didn't. I would think this because they are simply too swamped keeping up with everything and want to maintain the basics first. It appears there is just one very active developer on the project and there are a lot of loose ends / unfinished PRs. I'd like to contribute to it if I have time since it really needs some patching up here and there.
About multi-threading, I can definitely see the need to keep this in mind. However, I think most exchanges have an API limit that will prevent us from going very fast. The real potential is being able to multithread dali-rp2 so that you can pull from multiple exchanges at once. I'll build the abstract plugin for multithreading in mind though in case things change with exchanges.
- pagination style can be captured with a protected abstract method (something like
_get_pagination_style()
):So, this function takes a global like
_WITHDRAWALS
and returns the pagination style plus limit and possibly exchange-specific parameter name(s). Maybe return it as aDict
?
I would have it return a standardized data structure containing all the pagination-related things you need to use the API of the given exchange. Something like:
class _PaginationDetails:
style: _PaginationStyle
limit: float # or int or whatever...
exchange_specific: Dict[str, str]
BTW, just curious, did CCXT mention a reason why gains are not part of the uniform API?
They didn't. I would think this because they are simply too swamped keeping up with everything and want to maintain the basics first. It appears there is just one very active developer on the project and there are a lot of loose ends / unfinished PRs. I'd like to contribute to it if I have time since it really needs some patching up here and there.
Interesting: thanks for the details.
About multi-threading, I can definitely see the need to keep this in mind. However, I think most exchanges have an API limit that will prevent us from going very fast. The real potential is being able to multithread dali-rp2 so that you can pull from multiple exchanges at once. I'll build the abstract plugin for multithreading in mind though in case things change with exchanges.
I think both approaches are important and useful (inter-plugin and intra-plugin):
I think we should aim at doing both (I can look at inter-plugin multi-threading).
I forgot to ask: why does _get_pagination_style()
need to be passed _WITHDRAWALS
or anything else?
I forgot to ask: why does
_get_pagination_style()
need to be passed_WITHDRAWALS
or anything else?
Each unified function could potentially have a different pagination style, limit, and exchange-specific params.
For example, Binance.com has different limits for each of these. And it uses date-based pagination for most of its functions, but it uses page-based (like Coinbase) pagination for mining records. Exchanges could potentially have a mishmash of pagination styles and limits. Lots of fun!
Oh, I see: then should we pass in the function itself as a parameter? The plugin could have a hash table mapping functions to pagination styles. Just spitballing...
Well, my original idea was to have functions that wrap the unified functions something like this:
self.unified_trades(pagination=_TIME, limit=500, from_time=exchange_start)
The other idea would be to have functions for pagination:
self.time_pagination(function=fetch_my_trades(), limit=500, from_time=exchange_start)
And then put default calls in load()
:
self.unified_trades(_TIME, 100, exchange_start)
self.unified_deposits(_TIME, 100, exchange_start)
self.unified_withdrawals(_TIME, 100, exchange_start)
Inter-plugin multi-threading is now implemented: use the -t
command line option to select the number of threads to use to run data loader plugins in parallel. Total runtime is now equivalent to the runtime of the slowest input plugin.
Nice! That is a great improvement. Thanks for the quick turn around.
Thanks! Let me know what your runtime improvement is, next time you upgrade.
I think I'm going with the second method of pagination methods that take functions as parameters. They will be useful for the implicit API as well (I think). I'll start implementing it and see what comes up.
Well, my original idea was to have functions that wrap the unified functions something like this:
self.unified_trades(pagination=_TIME, limit=500, from_time=exchange_start)
The other idea would be to have functions for pagination:
self.time_pagination(function=fetch_my_trades(), limit=500, from_time=exchange_start)
And then put default calls in
load()
:self.unified_trades(_TIME, 100, exchange_start) self.unified_deposits(_TIME, 100, exchange_start) self.unified_withdrawals(_TIME, 100, exchange_start)
I would suggest a small class hierarchy for pagination details:
Each subclass would have all the style-specific data it needs. So each wrapper would receive an AbstractPaginationDetails parameter with all the correct data (it can even check at runtime that the received pagination detail instance is the right one, using isinstance()
).
This would work with the first approach you indicated. E.g.:
date_based_pagination_style = DateBasedPagination(limit=500, from_time=exchange_start)
self.unified_trades(date_based_pagination_style)
Ah, okay, that seems like a slick implementation. I"ll give that a try.
On second thought, I'm going to have to implement the pagination methods by themselves anyway, so why take an extra step?
Basically, in load()
:
def load(self) -> List[AbstractTransaction]:
self.unified_trades(date_based_pagination_style)
...
def unified_trades(self, pagination_style: AbstractPaginationDetails) -> _ProcessAccountResult:
if isinstance(pagination_style, DateBasedPaginationDetails):
results = self.date_based_pagination(self.unified_trades_pull, pagination_style)
...
def date_based_pagination(self, function: Any, pagination_details: DateBasedPaginationDetails) -> _ProcessAccountResult:
while date_condition:
function(parameters_from_datebasedpaginationdetails)
# pagination code goes here
It seems like a lot of extra work. Can't we just call 'self.date_based_pagination(function, start, limit, exchange_specific_param)` in load or is that bad form?
You know what, let's actually spec it out to make sure we capture everything and we're on the same page. Here's an initial rough iteration of the spec. Let's go iteratively from here: feel free to fix, revise, comment, etc.
The way I thought about pagination details is that the user provides a table mapping client methods to pagination details and the superclass uses that table to provide the correct pagination to any given method (see example in the Wiki page inside _process_deposits()). On the other hand exchange-specific code that is only in the subclass can call client methods and pass them pagination details directly.
Let me know what you think.
I thought some more about this: perhaps the method_2_pagination_details
dictionary idea is too convoluted. It may be cleaner / easier to just have multiple abstract methods like this:
def get_fetch_deposits_pagination_details(self) -> AbstractPaginationDetails:
raise NotImplementedError("Abstract method")
def get_<some_other_method>_pagination_details(self) -> AbstractPaginationDetails:
raise NotImplementedError("Abstract method")
...
These methods are defined and used in the superclass, but are implemented in the subclass. This approach seems less error prone and simpler to understand. Perhaps it's also similar to what you were suggesting?
Is there a way for me to edit the wiki?
Oh, right... changed some settings. Can you try again?
I guess my major hang up is with the while loop. The While condition is different for every pagination method.
Date based pagination needs:
while current_start < now():
Id/Page based pagination needs:
while len(results):
They can't be combined because date based could return no records for a section of time where no records exist. Well, I guess the condition could be:
while len(results) or (date_based and current_start < exchange.milliseconds())
Oh, right... changed some settings. Can you try again?
Yes, now I can! Thanks!
I see. Can a top level function have more than one pagination method in the abstract class? E.g. can _process_deposits() (or another of its peers) have multiple loops each of which has a different pagination method or could the pagination method be passed in to these top functions and used throughout its code?
each unified method (top level function) should have only one pagination method.
I modified the wiki to show what I think will work. I'll add in some of the other stuff I've been working on today (next 12-14 hours).
By top level function I meant one of the functions of the abstract class (that can be extended in the subclass): e.g. _process_deposits(), _process_withdrawals(), etc. So my question can be expressed better this way:
By top level function I meant one of the functions of the abstract class (that can be extended in the subclass): e.g. _process_deposits(), _process_withdrawals(), etc. So my question can be expressed better this way:
- do top-level function implementations in the abstract superclass call more than one CCXT unified function? Note that I only refer to the superclass code, not subclass
No, at the moment, there are only 3 unified functions and those correspond 1:1 with the top level functions we will call.
Having said that, there will be supplemental implicit API function calls (that will be implemented in the subclass) that need to be made in order to pull all of the data that each of the top-level functions represents. Let me give an example from Binance.com
Binance.com has two different endpoints to pull deposits, one for crypto deposits, and the other for fiat deposits. How CCXT handles this is if you explicitly request a fiat code, one that is in its exchange-specific variable LEGAL_MONEY
it will use the Binance endpoint for fiat deposits to pull deposits of that specific fiat. This is too specific for our needs. We need something more generalized because users could theoretically deposit many kinds of fiat.
So, the top-level function will call the unified function which will pull all crypto deposits (and no fiat deposits). And in the subclass, we will need to implement the implicit API to pull the fiat deposits.
Yes, that is what I was thinking too: unified CCXT functions get called in the superclass, implicit ones in the subclass.
The reason I asked the question is that if in the superclass there is a 1:1 relationship between top-level functions and unified CCXT functions, then we could have the pagination details be passed in to the top level function as a parameter. And the pagination object could have a pagination-dependent method implementing the condition for the loop. Something along the lines of:
def evaluate_loop_condition(self, ...)
Would this solve the original problem you posed (how to represent different loop conditions, depending on the pagination details?
To be more precise:
class AbstractPaginationDetails:
def evaluate_loop_condition(self, ...) -> bool:
...
Class AbstractCCXTInputPlugin:
def _process_deposits(
self,
in_transactions: List[InTransaction],
out_transactions: List[OutTransaction],
intra_transactions: List[IntraTransaction],
pagination_details: AbstractPaginationDetail,
) -> None:
...
while pagination_details.evaluate_loop_condition(...):
...
I updated the Wiki with my latest thinking: I'll get your feedback, integrate it, and then I'll clean up the doc and make a V2 version (which we can continue to brainstorm on until we're happy).
I updated it with my notes. I think we are really close. I've been plugging this in to the code I'm writing now.
Great! I cleaned it up a little bit (but preserved the old version for reference at the end of the file in case we need it). A few questions / comments:
self.get_process_deposits_pagination_details()
at the beginning of _process_deposits()
.I added multithreading support to the wiki (both interface and pseudocode).
Ok, thanks. Can I run two functions with a pool like so:
with ThreadPool(self.__thread_count) as pool:
processing_result_list = pool.map(self._process_buy, trades)
processing_result_list += pool.map(self._process_sell, trades)
I'm going to get trades working and tested as a proof of concept, submit a PR, and then flesh out the other standard functions.
I haven't seen it used this way. I would do:
with ThreadPool(self.__thread_count) as pool:
processing_result_list = pool.map(self._process_buy, trades)
with ThreadPool(self.__thread_count) as pool:
processing_result_list += pool.map(self._process_sell, trades)
Another option, if you want to get full parallelism for buys and sells you could do this:
with ThreadPool(self.__thread_count) as pool:
processing_result_list = pool.map(self._process_buy_and_sell, trades)
Here the _process_buy_and_sell()
method would have to distinguish internally between a buy and a sell.
Python also has a more modern multithreading API called ThreadPoolExecutor, which may allow fancier options, but I haven't spent much time learning it yet (it's been on my todo list for a while).
Ok, I'll go with the simple option to start and stick with what we have.
One more question, I'm recreating the Binance.com plugin. One that inherits from the abstract ccxt plugin. Should I name it the same thing? src/dali/plugin/input/binance_com.py
or will that cause major issues with git and merging?
Writing a DaLI data loader plugin has high impact on the functionality and usefulness of both DaLI and RP2. Data loader plugins are well-defined, encapsulated modules that translate native exchange REST API-based data into DaLI's standard format. As such they are good first issues for newcomers to the the project.
Before starting, please read the contributing guidelines.
Plugin development is described in the developer documentation, in particular read the Dali Internals and Plugin Development sections, which contain all the information needed to build a new data loader plugin.
CCXT is documented here: https://docs.ccxt.com/en/latest/manual.html. This would be an abstract plugin that is meant to be subclassed by concrete ones.
The Coinbase plugin can be used as an example.
Before submitting a PR for the new plugin, make sure to go through the DaLI Plugin Laundry List.