Open dowdt opened 7 months ago
Got it. Working on it
Hello,
Have added size calculator and csv dumper --> https://github.com/Openmesh-Network/core/pull/2
Few questions :
Msg from channel : {"type":"subscriptions","channels":[{"name":"ticker","product_ids":["BTC-USD"],"account_ids":null}]}
Msg from channel : {"type":"ticker","sequence":79028420699,"product_id":"BTC-USD","price":"66471.85","open_24h":"64543.99","volume_24h":"10364.07405470","low_24h":"64362.87","high_24h":"66884.95","volume_30d":"439558.76237041","best_bid":"66470.34","best_bid_size":"0.10000000","best_ask":"66473.75","best_ask_size":"0.00002575","side":"buy","time":"2024-04-22T19:55:02.630186Z","trade_id":635058890,"last_size":"0.01722"}
--> Have written an unmarshal function either way, if planning something like exporting whole datasheets, we can use this function, or even If we are using the data internally, we can unmarshal as and when required.
Have added busiest time windows for sources and (source+topics) --> https://github.com/Openmesh-Network/core/pull/2#issuecomment-2071384792
Also, there is a library i came across : go-jay : https://github.com/francoispqt/gojay?tab=readme-ov-file#gojay , which apparently is a fast encoder for json data and is made for io stream decoding, where we'll need to directly unmarshal to structs.
Lmk your thoughts on using it in the codebase.
Also, Just a confirmation, the program panics if there is a websocket or any other exception. Was there any specific reason for this? @dowdt
i.e.
func defaultJoinCEX(ctx context.Context, source Source, topic string) (chan []byte, <-chan error, error) {
ws, resp, err := websocket.Dial(ctx, source.ApiURL, &websocket.DialOptions{
Subprotocols: []string{"phoenix"},
})
msgChannel := make(chan []byte)
errChannel := make(chan error, 1)
if err != nil {
// fmt.Println(resp)
// panic(err) --> This was uncommented.
// Added below change
if resp.StatusCode != 201 {
fmt.Println("Received websocket exception : ", resp.StatusCode, " for : ", source.ApiURL, " . Please check the subscription (451) or try later (500)")
}
return msgChannel, errChannel, fmt.Errorf("Websocket_Error : ", err)
}
Would prefer returning empty channels and an error such that the caller can just stop processing from that endpoint and retry later. Lmk what you think and will make the changes.
Also, there is a library i came across : go-jay : https://github.com/francoispqt/gojay?tab=readme-ov-file#gojay , which apparently is a fast encoder for json data and is made for io stream decoding, where we'll need to directly unmarshal to structs.
That library looks alright, but I'd rather use: https://github.com/buger/jsonparser
It looks simpler and has been maintained more recently. Plus it's got more stars for whatever that's worth.
Also, Just a confirmation, the program panics if there is a websocket or any other exception. Was there any specific reason for this? @dowdt
This is really just to catch bugs early on. People won't care about an error message, but will react if the program crashes. I'd say it's fine to remove it at this point. But we will have to add a sanity check test in the future.
Got it. Wrt data compression and storage. I went through the architecture : Universal Data Collector . Focussing on the "Raw Data Filtering" aspect, the stream inflow and storage of data could be processed in the following way : @dowdt
Compression : i. We accumulate stream of []byte data for a given time period --> for every source-symbol pair ii. Marshal []byte to Bson as we accumulate for storage (since currently we plan on storing data as is) .
--> and Also unmarshal immidiately from []byte to a struct/string using json parser for any immediate operations, such as processing and sending the data in topics through a Kafka producer. (Can look into serverless here --> OpenFaas )
iii. Append this Bson blob to a binary, delimited by the length of the bson
bson file format : { len(data1) data1 len(data2) data2 .... }
--> why? : helps with quick access of data while reading back.
iv. Compress the whole file for storage using lz4 or zst.
Decompression : i. Decompress into binary. ii. Read bson value from the binary (separated by its length) , and unmarshal each blob to a struct or string, perform operations.
Let me know what you think or if im missing anything design wise. Thanks.
I went through the architecture : Universal Data Collector.
Those docs are outdated, sorry about that. We changed the design a while ago, we plan to store all our data across multiple machines using the IPFS (Similar to the resource pool demo). All of the Kafka and postgres stuff was scrapped because it complicates the system too much and adds a lot of overhead.
What I'm more interested in for this issue is measuring the impact of different compression techniques on storage. Specifically, by what factor can these streams of data be compressed without writing a manual parser for each source. The best way to work that out is to measure it with tests.
For the scope of this issue compression at the message level (Meaning as soon as we get a message) is more than enough.
To test how compression works at the "chunk" (Fixed size blob) level, use a 4kb buffer of the data with no fancy formatting.
Alright. I ran a few tests today for 2 compression techniques, Lz4 and Zst at different file sizes as a whole (I tried at message level, but observed no significant difference in sizes, it was not worth the processing time taken to compress each message). Below is the benchmark scores for each at file level : @dowdt
As seen. Zst compression is providing higher ratios.
Running tests now with 4094 chunk compression. Will check and update the variation.
Edit : Tested for one source for a short duration, hence file sizes are small. Will run a longer test probably tonight.
Update :
Tested with 4Kb chunksize, Non filtered data (Everything unmarshalled into generic json-type interface) benchmark results :
Havent tested medium and large file sizes. But i doubt if Lz4 would outperform Zst in those as well. The only downside of Zst is that comp & decompression time is slightly slower than Lz4. If we're okay with the tradeoff then Zst would be a much better choice.
One more thing to highlight here is the conversion to Bson itself is taking lesser space with larger files. But ofcourse, will need to do a few more tests on different file sizes and different formats of json data to be completely sure.
I'd just ask if you're compressing each "chunk" or the whole file? We want the chunks to be accessed independently. So they'd have to be compressed individually.
Could you check:
Compare this to buffer without compression in result.
Thanks in advance mate, you've gone above and beyond.
I was adding all the messages (each in Bson format) to a large buffer (100Mb) . And after collecting data till a certain point, compress the buffer in chunks of 4kbs and write to file as its compressed. But I see your point, this method is memory consuming and only valid for small tests. Plus it will split up the data across memory and padding the buffer makes sense.
Have made the change and ran a few tests. Benchmarks :
UPDATE : @dowdt
Few things :
The buffer size of 4kb is small for every message and hence have kept a standard of 8Kb across most sources, except for dydx, whose average msg size is 16Kb (UPDATE : some messages even have more than 16Kb sizes, safest bet is to keep a 32 / 64 Kb buffer size across all sources). Will decide based on long term test results how large should the constant buffer size be.
Even in this scenario, for smaller files, zst outperforms lz4. The size of Bson file is increased since we are adding padding. UPDATE : Size change between json and bson is arbitrary and hard to measure with precision, since padding may range anywhere between a few bytes to more than 14Kb/15Kb in a given buffer space of 16Kb.
Have raised another PR, separate from the busy window calculation, for the compression and marshal aspects. https://github.com/Openmesh-Network/core/pull/3
In the same PR, have added a sanity checker in the collector module, runs for ~ 25 seconds at program init to make sure endpoints are live and messages are being received.
PS: No problem! Im loving the work :)
We need to measure the storage requirements of each source+symbol in our system.
A "source" is an API endpoint that gives us data we're interested in (ie. public binance websocket server).
The task is to design and implement a system to automatically go through all of the sources in the sources array (Global variable in collector/sources.go) and measure the size of incoming data over a certain period.
It should:
Ideas + advice:
Bonus: