Closed sbenthall closed 2 years ago
Hi @sbenthall , Nicholas here!
I was just thinking about this, and I wanted to write some quick thoughts on this and I think this would be a good place for them (If its not please let me know).
First, the more I think about it the more I think its a good solution.
As I see it we would need to communicate two types of information. One is the closing price in the ammps market (stock market) the other is the desired target position for the agents to take during the next day. So really we just need to pass two (maybe three as we want a buy and a sell target) numbers between the two systems.
I suggest that we implement a RPC server in the oversight code which the ammps broker agent(s) can then connect to (I may be wrong here).
I've made a very crude sketch of this.
As far as I understand the RPC calls are blocking so if we set it up so the AMMPS agent sends a RPC call with the closing price which returns a buy and sell target (a tuple or similar), the AMMPS code will automatically wait while the HARK agents are doing their sim.
Does this make sense to you? Best regards Nicholas
Thanks @mesalas This is a perfect place for this.
This design looks good to me!
The one thing I'm not clear on yet is what an RPC server is. I'm familiar with server/client architectures in the context of messages being sent over a networking protocol like HTTP. But for the sake of speed, I expect these messages to be sent within a local machine. Does the distinction between server and client still hold in that context?
I'm sure this is just something I need to research more in the documentation. But I wanted to bring up the question, since the main remaining design issue seems to be: (a) is there some necessary asymmetry in RPC (i.e. between server and client), and (b) if there is that asymmetry, how should it be delegated between SHARKFin and AMMPS.
OK, I should have refreshed my memory on the RPC documentation before replying, but now I have. I see that RPC does involve a client/server distinction as you say.
This raises the question: If RPC, then should SHARKFin or AMMPS be the server?
You are quite possibly correct that SHARKFin should be the server. But I wonder what your reasoning is.
Just so we don't prematurely commit to that design, here are some other thoughts:
One issue that has come up in this meeting is that AMMPS requires that each thread is deterministic up to its random seed.
So, if there is an agent that communicates with a remote service (such as SHARKFin), there will need to be a syncing between the services that maintains this determinism.
A few thoughts:
1) It looks like there may be pros and cons to the use of gRPC and RabbitMQ. RabbitMQ looks like it's more flexible (support multiple paradigms besides RPC, such as PubSub), while gRPC is more scalable. RabbitMQ is ~7 years older, as a project. Either would likely be just fine for our use case.
2) At the last Monday meeting, we decided that the AMMPS simulation would interact with SHARKFin through an overall interface that sends out recent prices and collects broker demand (and possibly, in the future, credit) information. Then AMMPS will send the broker demand data to a particular trading agent. This actually aligns very well with SHARKFin's current architecture, in which the Market takes demand info and returns price changes.
3) We can make progress on the SHARKFin side by enabling our Market architecture with RPC. Currently, we have an AbstractMarket
class which is instantiated by various subclasses, one of which is a very simple MockMarket
used for testing. We can prepare for the AMMPS integration by developing:
RPCMarketClient
class, which subclasses AbstractMarket
and shares its interface with the rest of HARK, but which wraps an RPC call to ...RPCMarketServer
class, subclassed for now to an RPCMockMarketServer
which can be used for testing.The AbstractMarket
class is used like this:
trade()
Broker will call market.run_market()
with a random seed, a buy order, and a sell ordermarket.daily_rate_of_return()
[Assuming for now that SHARKFin is mainly responsible for a client that requests from a possibly external server.]
ClientRPCMarket
subclasses AbstractMarket
run_market()
method that fits the type signature of the abstract classdaily_rate_of_return
But what it does that's special is that run_market()
makes an RPC call to a different class, MockServerRPCMarket
(for now), which takes the seed and buy/sell order data and returns a new closing price.
This closing price is then used to calculate the daily rate of return (this also on the ClientRPCMarket class, which then gets passed back to the Broker. Voila!
I have pushed a basic network architecture to the networking
branch of my fork. With RabbitMQ, there isn't really a client or a server, both SHARKFin and AMMPS will have to act as both clients and servers, sending and receiving messages between each other. Luckily, RabbitMQ has an "exchange" abstraction that allows for two separate streams of data, one for the seed/buy limit/sell limit, and another for the prices.
This is a basic overview of the necessary functions that AMMPS would need to add to communicate with SHARKFin. It's not a lot of extra code, but the continuous event loop caused by the start_consuming
function means that all the necessary computations would need to be handled by callback functions.
import json
import pika
def send_price(addr, exch_name, rkey, body):
channel.basic_publish(exch_name, rkey, body)
def callback(ch, method, properties, body):
print('callback triggered')
data = json.loads(body)
seed = data['seed']
bl = data['bl']
sl = data['sl']
print(f'seed: {seed}, bl: {bl}, sl: {sl}')
# send closing price
send_price('localhost', 'market', 'prices_queue', '10.5')
con_addr = 'localhost'
connection = pika.BlockingConnection(pika.ConnectionParameters(con_addr))
channel = connection.channel()
channel.exchange_declare('market')
params_queue = channel.queue_declare('params_queue')
prices_queue = channel.queue_declare('prices_queue')
channel.queue_bind('params_queue', 'market')
channel.queue_bind('prices_queue', 'market')
channel.basic_consume('params_queue', callback)
channel.start_consuming()
If we don't want to use a callback-oriented paradigm for both AMMPS and SHARKFin, there is also a way to consume messages from a queue individually. However, there would be issues with timing to make sure the necessary messages are present. If we can design the code around callbacks, the order of message passing can be easily confirmed.
https://pika.readthedocs.io/en/stable/examples/blocking_basic_get.html
Another example of consuming messages without callbacks: https://pika.readthedocs.io/en/stable/examples/blocking_consumer_generator.html
https://github.com/sbenthall/SHARKFin/issues/50#issuecomment-1016068888 Looks super.
I will try to implement that in C# at some point this week, so the agents can trigger the call on the market close. It doesn't look too complicated (famous last words)
This is an introduction to the same type of rabbitMQ client I used in SHARKFin but for C++: https://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
There should be one exchange declared called 'market'. 'params_queue' and 'prices_queue' should both be declared and bound to 'market'. AMMPS should consume seeds, buy limits, and sell limits in JSON format from 'params_queue' and should send prices into 'prices_queue'
Hi. Using the tutorials, I have managed to get a simple C# class talking to a simple python program using gRPC. The python program listens for calls, while the C# program makes calls. The C# program sends a close price (a dictionary serialized as json), really just a random double, and the python program receives the message, deserializes and responds with two targets (again as a serialized dict). When the response is received the C# program deserializes.
the gif shows the C# code making 10 calls:
Next up is wrapping it an a C# class and making it conform the the format that nick is using.
One point: i think we want to send the data back and forth using the same queue. That way AMMPS is blocked while HARK is running (I think).
Talk to you later Nicholas
Based on some tests i ran we need too queues with the current architecture to ensure blocking, otherwise sometimes the sending client will also consume the message it just sent. I’m working on implementing the queue retrieval as a single function instead of conforming to a callback loop, and it seems like we will still have to use two queues, but I’ll try to see if blocking behavior can be ensured with just one. I’ll paste some updated code here later this week. The first link seb sent in this thread is what I’m basing the new code on.
On Jan 24, 2022, at 8:09 AM, Nicholas @.***> wrote:
Hi. Using the tutorials, I have managed to get a simple C# class talking to a simple python program using gRPC. The python program listens for calls, while the C# program makes calls. The C# program sends a close price (a dictionary serialized as json), really just a random double, and the python program receives the message, deserializes and responds with two targets (again as a serialized dict). When the response is received the C# program deserializes.
the gif shows the C# code making 10 calls:
Next up is wrapping it an a C# class and making it conform the the format that nick is using. One point: i think we want to send the data back and forth using the same queue. That way AMMPS is blocked while HARK is running (I think).
Talk to you later Nicholas
— Reply to this email directly, view it on GitHub, or unsubscribe. Triage notifications on the go with GitHub Mobile for iOS or Android. You are receiving this because you were assigned.
Actually, I figured out a way to only use 1 queue.
price receiving code:
class ClientRPCMarket(AbstractMarket):
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def run_market(self, seed=0, buy_sell=(0, 0)):
data = {
'seed': seed,
'bl': buy_sell[0],
'sl': buy_sell[1]
}
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=json.dumps(data))
while self.response is None:
self.connection.process_data_events()
return float(self.response)
def get_simulation_price():
return
def daily_rate_of_return():
return
price sending code:
import json
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def on_request(ch, method, props, body):
data = json.loads(body)
print(f'seed: {data["seed"]}, bl: {data["bl"]}, sl: {data["sl"]}')
response = data['seed'] + data['bl'] + data['sl']
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
print("Awaiting RPC requests")
channel.start_consuming()
With the message-passing details figured out, we need to give some attention to the specifics of how the RPC Market class deals with the data it receives. With the old PNL simulations, each simulation creates a log file of transactions, which is used by the PNL Market class to get the latest simulation price. Should this logging be done on the SHARKFin end (which will only allow us to access the final price sent by AMMPS), or does AMMPS create a transaction log we can access? If there isn't a specific logging system in AMMPS yet, I can provide the specifics of the PNL simulation output to match.
For now, I'm just saving the latest price as a mutable attribute to the class. To update the price, run_market
must be called.
An observation: @ mesalas says he's using gRPC, and @nicksawhney says he's using RabbitMQ. That is mismatched.
@nicksawhney Currently, the (closing) price history of the simulation is being stored in the FinanceModel (the class the computes expectations of the risky asset)
https://github.com/sbenthall/SHARKFin/blob/master/HARK/hark_portfolio_agents.py#L385-L389
I think it's fine for the SHARKFin side to handle just the closing price for now. AMMPS can handle the intraday price logging.
An observation: @ mesalas says he's using gRPC, and @nicksawhney says he's using RabbitMQ. That is mismatched.
Dont mind me, im rambling. Im using RabbitMQ. same as nick
@nicksawhney I have been looking implementing the other side to the ClientRPCMarket in AMMPS. As far as i understand, what you are suggesting is AMMPS implement a RPC "server" that waits for SHARKFin to call it. Once a call is received the market simulates a day and returns the closing price in the response.
Im wondering if its possible to reverse the pattern? In which case the broker agent in AMMPS will call SHARKFin with a closing price, triggering an iteration of the macro model and receiving a sell and buy volume in response. This pattern would be straight forward to implement with the agents. It sound easy when im writing it, but I realize that you probably have a similar issue. As i dont know SHARKFin im feeling that im only seeing half of the problem, so i think it would be a good idea to have a quick meeting and see if we can find a good solution.
One alternative I can envision, would be to have a dedicated "server" that both sides can call.
The problem of waiting for data from the other is an issue for both sides. The way to think about it is both sharkfin and ammps are clients to a message passing rabbitmq server. This means both of us will have to deal with waiting for messages from the rabbitmq server. One way to do this is with the callback-function structure i posted earlier in these comments, the other is to have a function with an infinite while loop that waits, and another function that’s called to send back data (which you can see in a different example above). You can choose either of those for AMMPs but in order to receive parameters from sharkfin one of those methods must be used.
On Feb 2, 2022, at 2:12 PM, Nicholas @.***> wrote:
@nicksawhney I have been looking implementing the other side to the ClientRPCMarket in AMMPS. As far as i understand, what you are suggesting is AMMPS implement a RPC "server" that waits for SHARKFin to call it. Once a call is received the market simulates a day and returns the closing price in the response.
Im wondering if its possible to reverse the pattern? In which case the broker agent in AMMPS will call SHARKFin with a closing price, triggering an iteration of the macro model and receiving a sell and buy volume in response. This pattern would be straight forward to implement with the agents. It sound easy when im writing it, but I realize that you probably have a similar issue. As i dont know SHARKFin im feeling that im only seeing half of the problem, so i think it would be a good idea to have a quick meeting and see if we can find a good solution.
One alternative I can envision, would be to have a dedicated "server" that both sides can call.
— Reply to this email directly, view it on GitHub, or unsubscribe. Triage notifications on the go with GitHub Mobile for iOS or Android. You are receiving this because you were mentioned.
@nicksawhney Thanks for the reply. I just wanted to check, but I was suspecting that it wouldn't be as easy as just reversing the pattern. Its going to add a bit more complexity, as I will have to make sure "communication" class running in a separate thread dosent try to return a value until it actually has a price to return.
@nicksawhney I got a prototype class in C# that works with your python code. Before we can run anything i need to implement it in AMMPS and look at some best practices for threading in C#, which im not particularly familiar with.
for reference im just going to paste my code here
using System;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Threading;
namespace Send
{
class RpcClient
{
private IConnection _connection;
private IModel _channel;
private string _replyQueueName;
private EventingBasicConsumer _consumer;
private IBasicProperties props;
//public Dictionary<int,Dictionary<string, dynamic>> Data = new Dictionary<int, Dictionary<string, dynamic>>();
public int RequestCounter = 0;
public RpcClient (Dictionary<int, Dictionary<string, dynamic>> data )
{
var factory = new ConnectionFactory() { HostName = "localhost" };
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.QueueDeclare(queue: "rpc_queue", durable: false,
exclusive: false, autoDelete: false, arguments: null);
_channel.BasicQos(0, 1, false);
_consumer = new EventingBasicConsumer(_channel);
_channel.BasicConsume(queue: "rpc_queue",
autoAck: false, consumer: _consumer);
Console.WriteLine(" [x] Awaiting RPC requests");
// Put this in a separate method for clarity
_consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var props = ea.BasicProperties;
var replyProps = _channel.CreateBasicProperties();
replyProps.CorrelationId = props.CorrelationId;
// Parse Call
var messageString = Encoding.UTF8.GetString(body);
var messageDict = JsonSerializer.Deserialize<Dictionary<string, Int32>>(messageString);
// Add new targets to data
data.Add(RequestCounter,
new Dictionary<string, dynamic>()
{{"BuyTarget", messageDict["bl"]}, {"SellTarget", messageDict["sl"]}, {"ClosingPrice", 0}});
var responseString = Reply(data);
var responseBytes = Encoding.UTF8.GetBytes(responseString);
Console.WriteLine("sending closing price");
_channel.BasicPublish( exchange:"", routingKey:props.ReplyTo, basicProperties: replyProps, body: responseBytes);
_channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
RequestCounter += 1;
};
}
public String Reply(Dictionary<int,Dictionary<string, dynamic>> data)
{
int i = 0;
while ( data[RequestCounter]["ClosingPrice"] == 0) // && i < 10)
{
Thread.Sleep(10000); //wait
i += 1;
}
var responseString = JsonSerializer.Serialize<Double>((double)data[RequestCounter]["ClosingPrice"]);
return responseString;
}
public void Close()
{
_connection.Close();
}
}
public class Rpc
{
public static void Main()
{
var Data = new Dictionary<int, Dictionary<string, dynamic>>();
RpcClient rpcClient;
var RPCThread = new Thread(() =>
{
rpcClient = new RpcClient(Data);
});
RPCThread.Start();
var random = new Random();
for (int dayNo = 0; dayNo <= 10; dayNo++)
{
Console.WriteLine("");
Console.WriteLine("Waiting for targets for day no {0}", dayNo);
while (!Data.TryGetValue(dayNo, out var dailyTargets))
{
Thread.Sleep(5000);
}
var dataForDay = Data[dayNo];
Console.WriteLine("Got data for day no {0}: buyTarget : {1} and sellTarget : {2}",dayNo, dataForDay["BuyTarget"],dataForDay["SellTarget"] );
var closingPrice = random.NextDouble() * 100.0;
Console.WriteLine("Closing price for day no {0} is {1}", dayNo, closingPrice);
Data[dayNo]["ClosingPrice"] = closingPrice;
}
}
}
}
Resolved with #53 . We can open new issues to discuss simulation-specific implementation details of the new market class when need arises.
It looks like we will be using RPC for the interaction between SHARKFin and AMMPS.
https://www.rabbitmq.com/tutorials/tutorial-six-python.html
So we will need a Market class that interacts with a remote application through RPC