Closed httran13 closed 3 years ago
Adding a sample log run here for unsub/sub scenarios. I've executed the below json array cmd, service, and symbols in sequential order and allowed some time for stream data incoming.
Find my comments in =======, the first 2 scenario, i wanted to know td response if i unsubscribe from a service that hasn't been subscribed, the rest are just sub then unsub.
Noticed that once I unsub from a service with the same symbol, I no longer receive data from that service for that symbol. If there's. Its not proven here but if you have a set of subscribe symbols and you unsub another set of symbols, its acts as Set difference.
the client > and client < are the streaming logs that show raw data from the stream, look at these lines for the series of events.
I left my application logs there which pipes all of the data to my sqs services.
[
{
"cmd": "subs",
"service": "chart_equity",
"symbols": ["SPY"]
},
{
"cmd": "unsubs",
"service": "chart_equity",
"symbols": ["SPY"]
},
{
"cmd": "subs",
"service": "chart_futures",
"symbols": ["/ES"]
},
{
"cmd": "unsubs",
"service": "chart_futures",
"symbols": ["/ES"]
},
{
"cmd": "subs",
"service": "level_one_equity",
"symbols": ["SPY"]
},
{
"cmd": "unsubs",
"service": "level_one_equity",
"symbols": ["SPY"]
},
{
"cmd": "subs",
"service": "level_one_option",
"symbols": ["NET_102221C55"]
},
{
"cmd": "unsubs",
"service": "level_one_option",
"symbols": ["NET_102221C55"]
},
{
"cmd": "subs",
"service": "level_one_futures",
"symbols": ["/ES"]
},
{
"cmd": "unsubs",
"service": "level_one_futures",
"symbols": ["/ES"]
}
]
And for the logs, starting with start of app ...
2021-10-08 11:48:01,632 tdstreamer.py:81 INFO Returning token from store
client - state = CONNECTING
client - event = connection_made(<asyncio.sslproto._SSLProtocolTransport object at 0x7fbea5dbd880>)
client - event = data_received(<212 bytes>)
client - state = OPEN
client > Frame(fin=True, opcode=<Opcode.TEXT: 1>, data=b'{"requests": [{"service": "ADMIN", "requestid": "0", "command": "LOGIN", "account": "<REDACTED>", "source": "<REDACTED>", "parameters": {"credential": "userid=<REDACTED>&token=<REDACTED>&company=AMER&segment=AMER&cddomain=<REDACTED>&usergroup=ACCT&accesslevel=ACCT&authorized=Y×tamp=1633711682000&appid=<REDACTED>&acl=<REDACTED>", "token": "<REDACTED>", "version": "1.0"}}]}', rsv1=False, rsv2=False, rsv3=False)
client - event = data_received(<115 bytes>)
client < Frame(fin=True, opcode=<Opcode.TEXT: 1>, data=b'{"response":[{"service":"ADMIN","requestid":"0","command":"LOGIN","timestamp":1633711682418,"content":{"code":0,"msg":"04-1"}}]}', rsv1=False, rsv2=False, rsv3=False)
client > Frame(fin=True, opcode=<Opcode.TEXT: 1>, data=b'{"requests": [{"service": "ADMIN", "requestid": "1", "command": "QOS", "account": "<REDACTED>", "source": "<REDACTED>", "parameters": {"qoslevel": "0"}}]}', rsv1=False, rsv2=False, rsv3=False)
client - event = data_received(<55 bytes>)
client < Frame(fin=True, opcode=<Opcode.TEXT: 1>, data=b'{"response":[{"service":"ADMIN","requestid":"1","command":"QOS","timestamp":1633711682446,"content":{"code":0,"msg":"QoS command succeeded. Set qoslevel=0"}}]}', rsv1=False, rsv2=False, rsv3=False)
client > Frame(fin=True, opcode=<Opcode.TEXT: 1>, data=b'{"requests": [{"service": "ACCT_ACTIVITY", "requestid": "2", "command": "SUBS", "account": "<REDACTED>", "source": "<REDACTED>", "parameters": {"keys": "<REDACTED>", "fields": "0,1,2,3"}}]}', rsv1=False, rsv2=False, rsv3=False)
2021-10-08 11:48:02,647 sqs_client.py:217 INFO Got queue 'streamer-stream-submitter' with URL=https://us-east-2.queue.amazonaws.com/<REDACTED>/streamer-stream-submitter
client - event = data_received(<34 bytes>)
client - event = data_received(<41 bytes>)
client - event = data_received(<139 bytes>)
client < Frame(fin=True, opcode=<Opcode.TEXT: 1>, data=b'{"notify":[{"heartbeat":"1633711682517"}]}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=<Opcode.TEXT: 1>, data=b'{"response":[{"service":"ACCT_ACTIVITY","requestid":"2","command":"SUBS","timestamp":1633711682517,"content":{"code":0,"msg":"SUBS command succeeded"}}]}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=<Opcode.TEXT: 1>, data=b'{"data":[{"service":"ACCT_ACTIVITY", "timestamp":1633711682530,"command":"SUBS","content":[{"seq":0,"key":"<REDACTED>","1":"","2":"SUBSCRIBED","3":""}]}]}', rsv1=False, rsv2=False, rsv3=False)
2021-10-08 11:48:08,315 sqs_client.py:167 INFO Deleted message: 46434fcb-7f0b-41d2-a849-e1eebe6fd653
2021-10-08 11:48:08,364 sqs_client.py:217 INFO Got queue 'streamer-td-acct-activity' with URL=https://us-east-2.queue.amazonaws.com/<REDACTED>/streamer-td-acct-activity
2021-10-08 11:48:08,416 sqs_client.py:72 INFO Message sent: f94774cf-23c1-4840-a0fb-abed3500495b: {"seq": 0, "key": "<REDACTED>", "ACCOUNT": "", "MESSAGE_TYPE": "SUBSCRIBED", "MESSAGE_DATA": ""}
client - event = data_received(<14 bytes>)
client < Frame(fin=True, opcode=<Opcode.TEXT: 1>, data=b'{"notify":[{"heartbeat":"1633711693479"}]}', rsv1=False, rsv2=False, rsv3=False)
So it looks like subs commands may required the fields parameters. I tried sending a sub chart futures cmd without fields and it failed. Not that we are doing this anywhere in the code, but just wanted to see the behavior
@alexgolec
2021-10-24 17:25:40,515 protocol.py:990 DEBUG client > Frame(fin=True, opcode=<Opcode.TEXT: 1>, data=b'{"requests": [{"service": "CHART_FUTURES", "requestid": "2", "command": "SUBS", "account": "<REDACTED>", "source": "<REDACTED>", "parameters": {"keys": "/ES"}}]}', rsv1=False, rsv2=False, rsv3=False)
2021-10-24 17:25:40,544 protocol.py:1344 DEBUG client - event = data_received(<55 bytes>)
2021-10-24 17:25:40,544 protocol.py:977 DEBUG client < Frame(fin=True, opcode=<Opcode.TEXT: 1>, data=b'{"response":[{"service":"CHART_FUTURES","requestid":"2","command":"SUBS","timestamp":1635114340610,"content":{"code":22,"msg":"SUBS command failed"}}]}', rsv1=False, rsv2=False, rsv3=False)
2021-10-24 17:25:40,545 tdstreamer.py:326 ERROR unexpected response code: 22, msg is 'SUBS command failed'
Traceback (most recent call last):
File "stonks/app/tdstreamer.py", line 323, in main
await asyncio.gather(consumer.stream(), consumer.read_message())
File "stonks/app/tdstreamer.py", line 254, in read_message
jsonmsg["symbols"],
File "stonks/app/tdstreamer.py", line 235, in _request_to_stream
await method_dict[cmd](symbols)
File "/Users/huytran/.pyenv/versions/3.7.9/envs/stonks/lib/python3.7/site-packages/tda/streaming.py", line 677, in chart_futures_subs
symbols, 'CHART_FUTURES', 'SUBS')
File "/Users/huytran/.pyenv/versions/3.7.9/envs/stonks/lib/python3.7/site-packages/tda/streaming.py", line 332, in _service_op
await self._await_response(request_id, service, command)
File "/Users/huytran/.pyenv/versions/3.7.9/envs/stonks/lib/python3.7/site-packages/tda/streaming.py", line 306, in _await_response
resp['response'][0]['content']['msg']))
tda.streaming.UnexpectedResponseCode: unexpected response code: 22, msg is 'SUBS command failed'
Also note the failing docs generation test.
This PR adds an async.lock to streaming.py.
This lock achieves 2 goals:
Prevent websocket.recv() from being called from another coroutine while awaiting, so the locks are placed when retreiving response from the stream: https://websockets.readthedocs.io/en/stable/topics/design.html#concurrency
Ensures when making requests that sending requests and awaiting for responses occur synchronously.
With the lock in place, we can run, from our main program, concurrent programs without running into RuntimeError thrown by websocket.
I've also updated the timesales.py streaming example for a concurrent program to randomly make requests while an infinite loop is running to handle messages.
I've added unit tests for each unsub cmds and did not need to change any existing tests.