michaelgrosner / tribeca

A high frequency, market making cryptocurrency trading platform in node.js
Other
3.99k stars 949 forks source link

Bitfinex WebSocket API Support #70

Open josephfrazier opened 8 years ago

josephfrazier commented 8 years ago

Bitfinex has added a WebSocket API (documentation), but tribeca currently doesn't support it. Would you be interested in reviewing/merging pull requests that add WebSocket support? I haven't yet familiarized myself with the code, but I thought I'd go ahead and ask before digging too far into it. Thanks!

michaelgrosner commented 8 years ago

Hey @josephfrazier, I do have some sample code I've started:

import ws = require('ws');
import Utils = require("./utils");
import moment = require("moment");
import Models = require("../common/models");
import _ = require("lodash");
import Interfaces = require("./interfaces");
import Messages = require("./messages");
import Persister = require("./persister");
import Config = require("./config");
import QuotingParameters = require("./quoting-parameters");

const Deque = require("collections/deque");

export type BitfinexMarketTrade = [number, string, string, number, number, number, number];

export interface IBitfinexSocket {
    subscribe<T>(channel: string, 
                 pair: string, 
                 snapshot: (msgs: Models.Timestamped<T[]>) => void, 
                 update: (msg: Models.Timestamped<T>) => void);
}

export function createBitfinexSocket(config: Config.IConfigProvider, timeProvider : Utils.ITimeProvider) : IBitfinexSocket {
    const url = config.GetString("BitfinexWebsocketUrl");
    if (url === null || url === "NULL") {
        return {subscribe: () => {}}; 
    }
    else {
        return new BitfinexWebsocket(config, timeProvider);
    }
}

class BitfinexWebsocket implements IBitfinexSocket {
    private _ws : ws = null;
    private _log = Utils.log("bitfinex:websocket");
    private _snapshotHandlers : {[key: string]: [string, string, (msgs: Models.Timestamped<any[]>) => void, (msg: Models.Timestamped<any>) => void]} = {};
    private _channelIdsToChannels : {[channelId: string]: string} = {};

    constructor(
            private _config: Config.IConfigProvider,
            private _timeProvider : Utils.ITimeProvider) {
        this.createSocket();
    }

    private createSocket = () => {
        this.discardSocket();
        this._log.info("creating bitfinex websocket");
        this._ws = new ws(this._config.GetString("BitfinexWebsocketUrl"));   // "wss://api2.bitfinex.com:3000/ws"
        this._ws.on("open", this.onOpen);
        this._ws.on("message", this.onMessage);
        this._ws.on("close", this.onClose);
        this._ws.on("error", this.onError);
    };

    private discardSocket = () => {
        if (this._ws === null) return;
        this._log.info("discarding bitfinex websocket");

        try {
            this._ws.close();
        }
        finally {
            this._ws = null;
        }
    };

    private onOpen = () => {
        this._log.info("connected to bitfinex.");
        for (let v of _.values(this._snapshotHandlers)) {
            this.sendSubscriptionRequest(v[0], v[1]);
        }
    }

    private onClose = (code, msg) => {
        this._log.warn("socket closed to bitfinex, attempting to reconnect in 15 seconds.", {code: code, msg: msg});
        this.delayedReconnect();
    };

    private onError = (err) => {
        this._log.error("error to bitfinex!", err);
        this.delayedReconnect();
    };

    private delayedReconnect = () => {
        this.discardSocket();        
        this._timeProvider.setTimeout(this.createSocket, moment.duration(15, 'seconds'));
    };

    private onMessage = (rawData) => { 
        try {
            this.onMessageUnsafe(rawData);
        }
        catch (e) {
            this._log.error("Unhandled exception handling Bitfinex data", rawData, e);
        }
    };

    private onMessageUnsafe = (rawData) => {
        const t = this._timeProvider.utcNow();
        const parsed = JSON.parse(rawData);
        if (parsed['event'] === "subscribed") {
            const key = `${parsed['channel']}:${parsed['pair']}`;
            this._channelIdsToChannels[parsed['chanId']] = key;
            this._log.info("subscription response", parsed);
        }
        else {
            const channelId = parsed[0];

            const channelKey = this._channelIdsToChannels[channelId];
            if (typeof channelKey === 'undefined') {
                if (parsed['event'] === 'info')
                    this._log.info("info from bitfinex", parsed);
                else
                    this._log.warn("received update for unknown channel", parsed);
                return;
            }

            const handlers = this._snapshotHandlers[channelKey];
            if (typeof handlers === 'undefined') {
                this._log.warn("received update without handlers", parsed);
                return;
            }

            if (_.isArray(parsed[1])) {
                // snapshot
                const handler = handlers[2];
                if (handler === null) return;

                handler(new Models.Timestamped(parsed[1], t));
            }
            else {
                // incremental
                if (parsed[1] === "hb") return;

                const handler = handlers[3];
                if (handler === null) return;

                handler(new Models.Timestamped(parsed, t));
            }
        }
    };

    public subscribe = <T>(channel: string, 
                           pair: string, 
                           snapshot: (msgs: Models.Timestamped<T[]>) => void, 
                           update: (msg: Models.Timestamped<T>) => void) => {
        const key = `${channel}:${pair}`;
        if (typeof this._snapshotHandlers[key] !== 'undefined') {
            throw new Error(`Already subscribed for channel=${channel}, pair=${pair}`);
        }
        this._snapshotHandlers[key] = [channel, pair, snapshot, update];
        if (this._ws.readyState === ws.OPEN) {
            this.sendSubscriptionRequest(channel, pair);
        }
    }

    private sendSubscriptionRequest = (channel: string, pair: string) => {
        const data = {
            "event": "subscribe",
            "channel": channel,
            "pair": pair
        };
        this._log.info("sending subscription request", data);
        this._ws.send(JSON.stringify(data));
    };
}

This just writes to their websocket in a generic way, filling in the subscriptions for market data, positions, and order entry should be straightforward from their docs.

josephfrazier commented 8 years ago

Wow, that helps a lot! I'll see what I can do with it and submit a pull request if I get anywhere.

michaelgrosner commented 8 years ago

@josephfrazier Awesome, let me know if you have any questions.

gacelita commented 7 years ago

@josephfrazier @michaelgrosner Hi, what's the state on this? :) Can help if needed

josephfrazier commented 7 years ago

I... never actually started on this :/ Hope you weren't waiting long before asking about it.

gacelita commented 7 years ago

Shame on you! :) I just stumbled upon this bot, so I waited 0 seconds, no worries

thekenshow commented 7 years ago

Taking at run at this despite being a Node/JS/TS noob, and this is a progress report with some hurdles I think I've overcome.

Created src/service/gateways/bitfinex-api.ts with the sample code above, and edited tribeca/service/tribeca.json to remove the REST URL and add the WSS URL:

"BitfinexWebsocketUrl": "wss://api2.bitfinex.com:3000/ws"

Next, ran grunt compile to generate the JS, but the import paths that being with "./" all failed. E.g.,

import Utils = require("./utils");

Assuming there aren't supposed to be new, separate versions of these, I edited the paths to point to the existing files where I found them. E.g,.

import Utils = require("../utils");

Ran grunt compile again and got the following:

src/service/gateways/bitfinex-api.ts(35,26): error TS2339: Property 'log' does not exist on type 'typeof "/var/www/tribeca/src/service/utils"'.

>> 1 non-emit-preventing type warning  
>> Error: tsc return code: 2
Warning: Task "ts:service" failed. Use --force to continue.

I confirmed that there is no log property in the utils file. Compared with coinbase-api.ts, and found the following there:

import log from "../logging";

Added this to bitfinex-api.ts, and modified the offending line to:

private _log = log("bitfinex:websocket");

Ran grunt compile again, looks like success this time:

Running "ts:service" (ts) task
Compiling...
Using tsc v2.4.1

TypeScript compilation complete: 7.62s for 2 TypeScript files.

Running "ts:admin" (ts) task
Compiling...
No file changes were detected. Skipping Compile

TypeScript compilation complete: 0.00s for 0 TypeScript files.

Running "copy:main" (copy) task
Created 3 directories, copied 16 files

Running "browserify:dist" (browserify) task
>> Bundle tribeca/service/admin/js/admin/bundle.min.js created.

Confirmed expected new files had been generated:

tribeca/service/gateways/bitfinex-api.js
tribeca/tribeca/service/gateways/bitfinex-api.js.map

Loaded the Web UI and unless the old tribeca.json data is being cached, it's connected via Websocket and update Market data pane.

Not sure about next steps, but would love to see this happen and happy to work with anyone else who's interested...JoelSanchez, @michaelgrosner? I get the sense from my Bitfinex REST testing earlier that those connections will get the their a** slowly but surely kicked by faster bots :-)

thekenshow commented 7 years ago

Update: looks like the connection was cached yesterday, because this morning I'm disconnected. Instead, I see {{ pair_name }} and similar values in the upper left where the BTC/USD button and exchange details were:

screen shot 2017-08-13 at 8 39 39 am

michaelgrosner commented 7 years ago

@thekenshow you shouldn't have to modify the paths like that. grunt compile should be smart enough regardless of which subdirectory you are running in.

I'd love to see some code, did you push a branch?

thekenshow commented 7 years ago

@michaelgrosner Thanks for jumping in (and congrats on the new baby!). I created https://github.com/thekenshow/tribeca with a new branch bf_wss that contains the only file I've added so far (src/service/gateways/bitfinex-api.ts.) I haven't had a chance to look any further than described above.

arrrgi commented 7 years ago

Would be great to see this implemented @michaelgrosner, HitBTC has been a little flaky recently and I'm not a big fan of their security (or lack thereof). From a trading cap, Bitfinex moves a fair bit more volume so an update to support their WebSocket based API would be great appreciated.

I'm a complete ES6 / Typescript noob otherwise I'd contribute with a PR :)

ntenko07 commented 6 years ago

hello @michaelgrosner . I am new in the javascript and i'm very interested by tribeca. when i try to install i'm stuck in this step: 6. "Create a tribeca.json file based off the provided sample-dev-tribeca.json or sample-prod-tribeca.json files and save it in the current directory. Modify the config keys (see configuration section) and point the instance towards the running mongoDB instance." thank you for your reply.

frankely commented 5 years ago

Any progress on this?