Closed WolfieTS closed 3 years ago
So I am not sure if the API has changed, but this is what I have from the original project:
package net.cryptodirect.cryptodash.exchange;
import static java.time.format.DateTimeFormatter.ISO_INSTANT;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.java_websocket.drafts.Draft_17;
import org.java_websocket.handshake.ServerHandshake;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Michael Ennen
*/
public class CoinbaseWebSocketClient extends ExchangeWebSocketClient {
private final Set<TradePair> tradePairs;
final ConcurrentLinkedQueue<TradePair> subscribedChannels;
private static final Logger logger = LoggerFactory.getLogger(CoinbaseWebSocketClient.class);
CoinbaseWebSocketClient(Set<TradePair> tradePairs) {
super("wss://ws-feed.gdax.com", new Draft_17());
this.tradePairs = tradePairs;
subscribedChannels = new ConcurrentLinkedQueue<>();
}
@Override
public void onMessage(String message) {
Json messageJson = Json.read(message);
if (messageJson.has("event") && messageJson.at("event").asString().equalsIgnoreCase("info")) {
connectionEstablished.setValue(true);
}
TradePair tradePair = null;
try {
tradePair = parseTradePair(messageJson);
} catch (CurrencyNotFoundException exception) {
logger.error("coinbase websocket client: could not initialize trade pair: " +
messageJson.at("product_id").asString(), exception);
}
Side side = messageJson.has("side") ? Side.getSide(messageJson.at("side").asString()) : null;
UUID orderUuid = messageJson.has("order_id") ?
UUID.fromString(messageJson.at("order_id").asString()) : null;
switch (messageJson.at("type").asString()) {
case "heartbeat":
send(Json.object("type", "heartbeat", "on", "false").toString());
break;
case "received":
break;
case "open":
break;
case "done":
break;
case "match":
if (liveTradeConsumers.containsKey(tradePair)) {
Trade newTrade = new Trade(tradePair,
DefaultMoney.of(messageJson.at("price").asBigDecimal(), tradePair.getCounterCurrency()),
DefaultMoney.of(messageJson.at("size").asBigDecimal(), tradePair.getBaseCurrency()),
side, messageJson.at("trade_id").asLong(),
Instant.from(ISO_INSTANT.parse(messageJson.at("time").asString())));
liveTradeConsumers.get(tradePair).acceptTrades(Collections.singletonList(newTrade));
recentTrades.addTrade(tradePair, newTrade);
}
break;
case "change":
break;
case "error":
throw new IllegalArgumentException("Error on Gdax websocket client: " +
messageJson.at("message").asString());
default:
throw new IllegalStateException("Unhandled message type on Gdax websocket client: " +
messageJson.at("type").asString());
}
}
private TradePair parseTradePair(Json messageJson) throws CurrencyNotFoundException {
final String productId = messageJson.at("product_id").asString();
final String[] products = productId.split("-");
TradePair tradePair;
if (products[0].equalsIgnoreCase("BTC")) {
tradePair = TradePair.parse(productId, "-", Pair.of(CryptoCurrency.class, FiatCurrency.class));
} else {
// products[0] == "ETH"
if (products[1].equalsIgnoreCase("usd")) {
tradePair = TradePair.parse(productId, "-", Pair.of(CryptoCurrency.class, FiatCurrency.class));
} else {
// productId == "ETH-BTC"
tradePair = TradePair.parse(productId, "-", Pair.of(CryptoCurrency.class, CryptoCurrency.class));
}
}
return tradePair;
}
@Override
public void streamLiveTrades(TradePair tradePair, LiveTradesConsumer liveTradesConsumer) {
subscribedChannels.add(tradePair);
send(Json.object("type", "subscribe", "product_id", tradePair.toString('-')).toString());
liveTradeConsumers.put(tradePair, liveTradesConsumer);
}
@Override
public void stopStreamLiveTrades(TradePair tradePair) {
liveTradeConsumers.remove(tradePair);
}
@Override
public boolean supportsStreamingTrades(TradePair tradePair) {
return tradePairs.contains(tradePair);
}
@Override
public void onClose(int code, String reason, boolean remote) {
}
@Override
public void onOpen(ServerHandshake serverHandshake) {
}
}
Note that it will not be able to be used as-is - mostly because it uses a custom JSON library. But it should be pretty straight-forward to convert it to GSON, Jackson, etc. I tried to remove all the extra functionality when I ripped it out of the original project just now, so there may be some undefined things and references to other functionality but I hope it will help.
This is helpful thanks. What concrete implementation of Set is the constructor expecting?
On Mon, May 3, 2021, 5:50 PM Michael Ennen @.***> wrote:
So I am not sure if the API has changed, but this is what I have from the original project:
package net.cryptodirect.cryptodash.exchange; import static java.time.format.DateTimeFormatter.ISO_INSTANT; import java.math.BigDecimal;import java.time.Instant;import java.util.Collections;import java.util.HashMap;import java.util.Locale;import java.util.Map;import java.util.Set;import java.util.UUID;import java.util.concurrent.ConcurrentLinkedQueue; import org.java_websocket.drafts.Draft_17;import org.java_websocket.handshake.ServerHandshake;import org.slf4j.Logger;import org.slf4j.LoggerFactory; /* @author Michael Ennen */public class CoinbaseWebSocketClient extends ExchangeWebSocketClient { private final Set
tradePairs; final ConcurrentLinkedQueue subscribedChannels; private static final Logger logger = LoggerFactory.getLogger(CoinbaseWebSocketClient.class); CoinbaseWebSocketClient(Set<TradePair> tradePairs) { super("wss://ws-feed.gdax.com", new Draft_17()); this.tradePairs = tradePairs; subscribedChannels = new ConcurrentLinkedQueue<>(); } @Override public void onMessage(String message) { Json messageJson = Json.read(message); if (messageJson.has("event") && messageJson.at("event").asString().equalsIgnoreCase("info")) { connectionEstablished.setValue(true); } TradePair tradePair = null; try { tradePair = parseTradePair(messageJson); } catch (CurrencyNotFoundException exception) { logger.error("coinbase websocket client: could not initialize trade pair: " + messageJson.at("product_id").asString(), exception); } Side side = messageJson.has("side") ? Side.getSide(messageJson.at("side").asString()) : null; UUID orderUuid = messageJson.has("order_id") ? UUID.fromString(messageJson.at("order_id").asString()) : null; switch (messageJson.at("type").asString()) { case "heartbeat": send(Json.object("type", "heartbeat", "on", "false").toString()); break; case "received": break; case "open": break; case "done": break; case "match": if (liveTradeConsumers.containsKey(tradePair)) { Trade newTrade = new Trade(tradePair, DefaultMoney.of(messageJson.at("price").asBigDecimal(), tradePair.getCounterCurrency()), DefaultMoney.of(messageJson.at("size").asBigDecimal(), tradePair.getBaseCurrency()), side, messageJson.at("trade_id").asLong(), Instant.from(ISO_INSTANT.parse(messageJson.at("time").asString()))); liveTradeConsumers.get(tradePair).acceptTrades(Collections.singletonList(newTrade)); recentTrades.addTrade(tradePair, newTrade); } break; case "change": break; case "error": throw new IllegalArgumentException("Error on Gdax websocket client: " + messageJson.at("message").asString()); default: throw new IllegalStateException("Unhandled message type on Gdax websocket client: " + messageJson.at("type").asString()); } } private TradePair parseTradePair(Json messageJson) throws CurrencyNotFoundException { final String productId = messageJson.at("product_id").asString(); final String[] products = productId.split("-"); TradePair tradePair; if (products[0].equalsIgnoreCase("BTC")) { tradePair = TradePair.parse(productId, "-", Pair.of(CryptoCurrency.class, FiatCurrency.class)); } else { // products[0] == "ETH" if (products[1].equalsIgnoreCase("usd")) { tradePair = TradePair.parse(productId, "-", Pair.of(CryptoCurrency.class, FiatCurrency.class)); } else { // productId == "ETH-BTC" tradePair = TradePair.parse(productId, "-", Pair.of(CryptoCurrency.class, CryptoCurrency.class)); } } return tradePair; } @Override public void streamLiveTrades(TradePair tradePair, LiveTradesConsumer liveTradesConsumer) { subscribedChannels.add(tradePair); send(Json.object("type", "subscribe", "product_id", tradePair.toString('-')).toString()); liveTradeConsumers.put(tradePair, liveTradesConsumer); } @Override public void stopStreamLiveTrades(TradePair tradePair) { liveTradeConsumers.remove(tradePair); } @Override public boolean supportsStreamingTrades(TradePair tradePair) { return tradePairs.contains(tradePair); } @Override public void onClose(int code, String reason, boolean remote) { } @Override public void onOpen(ServerHandshake serverHandshake) { }
}
Note that it will not be able to be used as-is - mostly because it uses a custom JSON library. But it should be pretty straight-forward to convert it to GSON, Jackson, etc. I tried to remove all the extra functionality when I ripped it out of the original project just now, so there may be some undefined things and references to other functionality but I hope it will help.
— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/brcolow/candlefx/issues/1#issuecomment-831525092, or unsubscribe https://github.com/notifications/unsubscribe-auth/AOSD2CKBNJG4GTXEB7ZAV6LTL4EALANCNFSM44BFMJSA .
I use HashSet
, but any Set
implementation should work. I used Set
at the time to avoid duplicates.
Hi! Fantastic work. This is exactly what I've been looking for. You said that if it helps just one person then your efforts will have been worth it. Maybe I'm that person! I was wondering if you had an example of how to instantiate the WebSockets client correctly? Thanks!!!