Closed dfelton closed 4 years ago
Will continue to monitor further...
May still have an issue here... now the WebSocket has incurred the same exception. However evaluating the stack trace and error message it does not appear that it was specifically due to just the Rest FillMonitor beat it to the record...
object(stdClass)#1 (5) {
["time"]=> string(19) "2019-10-23 15:36:18"
["message"]=> string(446) "Trade record '619' is not healthy for 'Kobens\Gemini\TradeRepeater\DataResource\BuyPlaced'. Current State:
["id"]=> string(3) "619"
["is_enabled"]=> string(1) "1"
["status"]=> string(8) "BUY_SENT"
["symbol"]=> string(6) "btcusd"
["buy_client_order_id"]=> string(32) "repeater_619_buy_1571862978.5662"
["buy_order_id"]=> NULL
["buy_amount"]=> string(3) "0.2"
["buy_price"]=> string(7) "8047.50"
["sell_client_order_id"]=> NULL
["sell_order_id"]=> NULL
["sell_amount"]=> string(6) "0.1975"
["sell_price"]=> string(7) "8246.13"
["note"]=> NULL
["meta"]=> NULL
["updated_at"]=> string(19) "2019-10-23 15:36:18"
["code"]=> int(0)
["class"]=> string(61) "Kobens\Gemini\Exception\TradeRepeater\UnhealthyStateException"
["trace"]=> string(4906) "
#0 src/TradeRepeater/DataResource/BuyPlaced.php(21): Kobens\Gemini\TradeRepeater\DataResource\AbstractDataResource->getHealthyRecord(619)
#1 src/Command/Command/TradeRepeater/FillMonitor.php(133): Kobens\Gemini\TradeRepeater\DataResource\BuyPlaced->setNextState(619)
#2 src/Command/Command/TradeRepeater/FillMonitor.php(97): Kobens\Gemini\Command\Command\TradeRepeater\FillMonitor->processMessage(Array, Object(Symfony\Component\Console\Output\ConsoleOutput))
#3 [internal function]: Kobens\Gemini\Command\Command\TradeRepeater\FillMonitor->Kobens\Gemini\Command\Command\TradeRepeater\{closure}('a', NULL)
#4 vendor/amphp/amp/lib/Coroutine.php(105): Generator->send('[{"type":"fill"...')
#5 vendor/amphp/amp/lib/Internal/Placeholder.php(130): Amp\Coroutine->Amp\{closure}(NULL, '[{"type":"fill"...')
#6 vendor/amphp/amp/lib/Coroutine.php(110): Amp\Coroutine->resolve('[{"type":"fill"...')
#7 vendor/amphp/amp/lib/Internal/Placeholder.php(130): Amp\Coroutine->Amp\{closure}(NULL, NULL)
#8 vendor/amphp/amp/lib/Deferred.php(45): class@anonymous->resolve(NULL)
#9 vendor/amphp/byte-stream/lib/IteratorStream.php(57): Amp\Deferred->resolve()
#10 vendor/amphp/amp/lib/Success.php(36): Amp\ByteStream\IteratorStream->Amp\ByteStream\{closure}(NULL, false)
#11 vendor/amphp/amp/lib/Internal/Placeholder.php(125): Amp\Success->onResolve(Object(Closure))
#12 vendor/amphp/amp/lib/Deferred.php(45): class@anonymous->resolve(Object(Amp\Success))
#13 vendor/amphp/amp/lib/Internal/Producer.php(177): Amp\Deferred->resolve(Object(Amp\Success))
#14 vendor/amphp/amp/lib/Emitter.php(57): class@anonymous->complete()
#15 vendor/amphp/websocket/src/Rfc6455Client.php(360): Amp\Emitter->complete()
#16 vendor/amphp/websocket/src/Rfc6455Client.php(947): Amp\Websocket\Rfc6455Client->onData(1, '[{"type":"fill"...', true)
#17 [internal function]: Amp\Websocket\Rfc6455Client->parser()
#18 vendor/amphp/websocket/src/Rfc6455Client.php(281): Generator->send('\x81~\x02<[{"type":"f...')
#19 [internal function]: Amp\Websocket\Rfc6455Client->read()
#20 vendor/amphp/amp/lib/Coroutine.php(105): Generator->send('\x81~\x02<[{"type":"f...')
#21 vendor/amphp/amp/lib/Internal/Placeholder.php(130): Amp\Coroutine->Amp\{closure}(NULL, '\x81~\x02<[{"type":"f...')
#22 vendor/amphp/amp/lib/Deferred.php(45): class@anonymous->resolve('\x81~\x02<[{"type":"f...')
#23 vendor/amphp/byte-stream/lib/ResourceInputStream.php(99): Amp\Deferred->resolve('\x81~\x02<[{"type":"f...')
#24 vendor/amphp/amp/lib/Loop/NativeDriver.php(198): Amp\ByteStream\ResourceInputStream::Amp\ByteStream\{closure}('q', Resource id #341, NULL)
#25 vendor/amphp/amp/lib/Loop/NativeDriver.php(97): Amp\Loop\NativeDriver->selectStreams(Array, Array, 0.934)
#26 vendor/amphp/amp/lib/Loop/Driver.php(134): Amp\Loop\NativeDriver->dispatch(true)
#27 vendor/amphp/amp/lib/Loop/Driver.php(72): Amp\Loop\Driver->tick()
#28 vendor/amphp/amp/lib/Loop.php(84): Amp\Loop\Driver->run()
#29 src/Command/Command/TradeRepeater/FillMonitor.php(69): Amp\Loop::run(Object(Closure))
#30 vendor/symfony/console/Command/Command.php(255): Kobens\Gemini\Command\Command\TradeRepeater\FillMonitor->execute(Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#31 vendor/symfony/console/Application.php(932): Symfony\Component\Console\Command\Command->run(Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#32 vendor/symfony/console/Application.php(274): Symfony\Component\Console\Application->doRunCommand(Object(Kobens\Gemini\Command\Command\TradeRepeater\FillMonitor), Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#33 vendor/symfony/console/Application.php(150): Symfony\Component\Console\Application->doRun(Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#34 bin/gemini(100): Symfony\Component\Console\Application->run()
#35 {main}"
}
select * from trade_repeater where id = 619 \G
*************************** 1. row ***************************
id: 619
is_enabled: 1
status: BUY_PLACED
symbol: btcusd
buy_client_order_id: repeater_619_buy_1571862978.5662
buy_order_id: 307679057
buy_amount: 0.2
buy_price: 8047.50
sell_client_order_id: NULL
sell_order_id: NULL
sell_amount: 0.1975
sell_price: 8246.13
note: NULL
meta: {"buy_price":"8047.50"}
updated_at: 2019-10-23 15:36:18
This seems to indicate that
Some additional notes:
socket_sequence
this very well could be possible that it received a message on the SELL_FILLED event prior to the BUY_FILLED? (after the Rest FillMonitor by chance beat it to the record)
Follow up:
buy_client_order_id
and sell_client_order_id
values at appropriate locations given clear introduction of race conditions here.Additional Information: Had the WebSocket Command logged the (expected) buy_client_order_id
it would have been very helpful here... It would prove that it was trying to update an order older than the record currently in the database.
Another scenario could simply by that due to heavy trading activity occurring by the secondary sandbox account sloshing buy / sell orders back and forth, that the WebSocket FillMonitor, actually beat the Buyer Command in an attempt to mark the order as BUY_FILLED prior to the Buyer Command moving the (very same) order from BUY_SENT to BUY_FILLED.
Gut feeling at the moment is the WebSocket FillMonitor beat the Buyer command...
Additional Stack trace analyze:
Exception: Exception
Code: 0
Message: {"type":"fill","order_id":"307817647","client_order_id":"repeater_217_buy_1571883656.6989","api_session":"account-UHry8e80EgE7V0jyL4dW","symbol":"btcusd","side":"buy","order_type":"exchange limit","timestamp":"1571883674","timestampms":1571883674356,"is_live":false,"is_cancelled":false,"is_hidden":false,"avg_execution_price":"7042.50","executed_amount":"0.2","remaining_amount":"0","original_amount":"0.2","price":"7042.50","fill":{"trade_id":"307817958","liquidity":"Maker","price":"7042.50","amount":"0.2","fee":"1.4085","fee_currency":"USD"},"socket_sequence":2699}
Strace:
#0 src/Command/Command/TradeRepeater/FillMonitor.php(92): Kobens\Gemini\Command\Command\TradeRepeater\FillMonitor->processMessage(Array, Object(Symfony\Component\Console\Output\ConsoleOutput))
#1 [internal function]: Kobens\Gemini\Command\Command\TradeRepeater\FillMonitor->Kobens\Gemini\Command\Command\TradeRepeater\{closure}('a', NULL)
#2 vendor/amphp/amp/lib/Coroutine.php(105): Generator->send('[{"type":"fill"...')
#3 vendor/amphp/amp/lib/Internal/Placeholder.php(130): Amp\Coroutine->Amp\{closure}(NULL, '[{"type":"fill"...')
#4 vendor/amphp/amp/lib/Coroutine.php(110): Amp\Coroutine->resolve('[{"type":"fill"...')
#5 vendor/amphp/amp/lib/Internal/Placeholder.php(130): Amp\Coroutine->Amp\{closure}(NULL, NULL)
#6 vendor/amphp/amp/lib/Deferred.php(45): class@anonymous->resolve(NULL)
#7 vendor/amphp/byte-stream/lib/IteratorStream.php(57): Amp\Deferred->resolve()
#8 vendor/amphp/amp/lib/Success.php(36): Amp\ByteStream\IteratorStream->Amp\ByteStream\{closure}(NULL, false)
#9 vendor/amphp/amp/lib/Internal/Placeholder.php(125): Amp\Success->onResolve(Object(Closure))
#10 vendor/amphp/amp/lib/Deferred.php(45): class@anonymous->resolve(Object(Amp\Success))
#11 vendor/amphp/amp/lib/Internal/Producer.php(177): Amp\Deferred->resolve(Object(Amp\Success))
#12 vendor/amphp/amp/lib/Emitter.php(57): class@anonymous->complete()
#13 vendor/amphp/websocket/src/Rfc6455Client.php(360): Amp\Emitter->complete()
#14 vendor/amphp/websocket/src/Rfc6455Client.php(947): Amp\Websocket\Rfc6455Client->onData(1, '[{"type":"fill"...', true)
#15 [internal function]: Amp\Websocket\Rfc6455Client->parser()
#16 vendor/amphp/websocket/src/Rfc6455Client.php(281): Generator->send(' limit","timest...')
#17 [internal function]: Amp\Websocket\Rfc6455Client->read()
#18 vendor/amphp/amp/lib/Coroutine.php(105): Generator->send(' limit","timest...')
#19 vendor/amphp/amp/lib/Internal/Placeholder.php(130): Amp\Coroutine->Amp\{closure}(NULL, ' limit","timest...')
#20 vendor/amphp/amp/lib/Deferred.php(45): class@anonymous->resolve(' limit","timest...')
#21 vendor/amphp/byte-stream/lib/ResourceInputStream.php(107): Amp\Deferred->resolve(' limit","timest...')
#22 vendor/amphp/amp/lib/Loop/Driver.php(116): Amp\ByteStream\ResourceInputStream::Amp\ByteStream\{closure}('fz', ' limit","timest...')
#23 vendor/amphp/amp/lib/Loop/Driver.php(72): Amp\Loop\Driver->tick()
#24 vendor/amphp/amp/lib/Loop.php(84): Amp\Loop\Driver->run()
#25 src/Command/Command/TradeRepeater/FillMonitor.php(70): Amp\Loop::run(Object(Closure))
#26 vendor/symfony/console/Command/Command.php(255): Kobens\Gemini\Command\Command\TradeRepeater\FillMonitor->execute(Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#27 vendor/symfony/console/Application.php(932): Symfony\Component\Console\Command\Command->run(Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#28 vendor/symfony/console/Application.php(274): Symfony\Component\Console\Application->doRunCommand(Object(Kobens\Gemini\Command\Command\TradeRepeater\FillMonitor), Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#29 vendor/symfony/console/Application.php(150): Symfony\Component\Console\Application->doRun(Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#30 bin/gemini(100): Symfony\Component\Console\Application->run()
#31 {main}
Previous Exception:
Exception: Kobens\Gemini\Exception\TradeRepeater\UnhealthyStateException
Code: 0
Message: Trade record '217' is not healthy for 'Kobens\Gemini\TradeRepeater\DataResource\BuyPlaced'. Current State: {"id":"217","is_enabled":"1","status":"BUY_FILLED","symbol":"btcusd","buy_client_order_id":"repeater_217_buy_1571883656.6989","buy_order_id":"307817647","buy_amount":"0.2","buy_price":"7042.50","sell_client_order_id":null,"sell_order_id":null,"sell_amount":"0.1975","sell_price":"7216.00","note":null,"meta":"{\"buy_price\":\"7042.50\"}","updated_at":"2019-10-23 21:21:14"}
Strace:
#0 src/TradeRepeater/DataResource/BuyPlaced.php(21): Kobens\Gemini\TradeRepeater\DataResource\AbstractDataResource->getHealthyRecord(217)
#1 src/Command/Command/TradeRepeater/FillMonitor.php(137): Kobens\Gemini\TradeRepeater\DataResource\BuyPlaced->setNextState(217)
#2 src/Command/Command/TradeRepeater/FillMonitor.php(104): Kobens\Gemini\Command\Command\TradeRepeater\FillMonitor->_processMessage(Array, Object(Symfony\Component\Console\Output\ConsoleOutput))
#3 src/Command/Command/TradeRepeater/FillMonitor.php(92): Kobens\Gemini\Command\Command\TradeRepeater\FillMonitor->processMessage(Array, Object(Symfony\Component\Console\Output\ConsoleOutput))
#4 [internal function]: Kobens\Gemini\Command\Command\TradeRepeater\FillMonitor->Kobens\Gemini\Command\Command\TradeRepeater\{closure}('a', NULL)
#5 vendor/amphp/amp/lib/Coroutine.php(105): Generator->send('[{"type":"fill"...')
#6 vendor/amphp/amp/lib/Internal/Placeholder.php(130): Amp\Coroutine->Amp\{closure}(NULL, '[{"type":"fill"...')
#7 vendor/amphp/amp/lib/Coroutine.php(110): Amp\Coroutine->resolve('[{"type":"fill"...')
#8 vendor/amphp/amp/lib/Internal/Placeholder.php(130): Amp\Coroutine->Amp\{closure}(NULL, NULL)
#9 vendor/amphp/amp/lib/Deferred.php(45): class@anonymous->resolve(NULL)
#10 vendor/amphp/byte-stream/lib/IteratorStream.php(57): Amp\Deferred->resolve()
#11 vendor/amphp/amp/lib/Success.php(36): Amp\ByteStream\IteratorStream->Amp\ByteStream\{closure}(NULL, false)
#12 vendor/amphp/amp/lib/Internal/Placeholder.php(125): Amp\Success->onResolve(Object(Closure))
#13 vendor/amphp/amp/lib/Deferred.php(45): class@anonymous->resolve(Object(Amp\Success))
#14 vendor/amphp/amp/lib/Internal/Producer.php(177): Amp\Deferred->resolve(Object(Amp\Success))
#15 vendor/amphp/amp/lib/Emitter.php(57): class@anonymous->complete()
#16 vendor/amphp/websocket/src/Rfc6455Client.php(360): Amp\Emitter->complete()
#17 vendor/amphp/websocket/src/Rfc6455Client.php(947): Amp\Websocket\Rfc6455Client->onData(1, '[{"type":"fill"...', true)
#18 [internal function]: Amp\Websocket\Rfc6455Client->parser()
#19 vendor/amphp/websocket/src/Rfc6455Client.php(281): Generator->send(' limit","timest...')
#20 [internal function]: Amp\Websocket\Rfc6455Client->read()
#21 vendor/amphp/amp/lib/Coroutine.php(105): Generator->send(' limit","timest...')
#22 vendor/amphp/amp/lib/Internal/Placeholder.php(130): Amp\Coroutine->Amp\{closure}(NULL, ' limit","timest...')
#23 vendor/amphp/amp/lib/Deferred.php(45): class@anonymous->resolve(' limit","timest...')
#24 vendor/amphp/byte-stream/lib/ResourceInputStream.php(107): Amp\Deferred->resolve(' limit","timest...')
#25 vendor/amphp/amp/lib/Loop/Driver.php(116): Amp\ByteStream\ResourceInputStream::Amp\ByteStream\{closure}('fz', ' limit","timest...')
#26 vendor/amphp/amp/lib/Loop/Driver.php(72): Amp\Loop\Driver->tick()
#27 vendor/amphp/amp/lib/Loop.php(84): Amp\Loop\Driver->run()
#28 src/Command/Command/TradeRepeater/FillMonitor.php(70): Amp\Loop::run(Object(Closure))
#29 vendor/symfony/console/Command/Command.php(255): Kobens\Gemini\Command\Command\TradeRepeater\FillMonitor->execute(Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#30 vendor/symfony/console/Application.php(932): Symfony\Component\Console\Command\Command->run(Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#31 vendor/symfony/console/Application.php(274): Symfony\Component\Console\Application->doRunCommand(Object(Kobens\Gemini\Command\Command\TradeRepeater\FillMonitor), Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#32 vendor/symfony/console/Application.php(150): Symfony\Component\Console\Application->doRun(Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#33 bin/gemini(100): Symfony\Component\Console\Application->run()
#34 {main}
Exception: Kobens\Gemini\Exception\TradeRepeater\UnhealthyStateException
Code: 0
Message: Trade record '218' is not healthy for 'Kobens\Gemini\TradeRepeater\DataResource\BuyPlaced'. Current State: {"id":"218","is_enabled":"1","status":"BUY_FILLED","symbol":"btcusd","buy_client_order_id":"repeater_218_buy_1571883657.3085","buy_order_id":"307817668","buy_amount":"0.2","buy_price":"7045.00","sell_client_order_id":null,"sell_order_id":null,"sell_amount":"0.1975","sell_price":"7218.57","note":null,"meta":"{\"buy_price\":\"7045.00\"}","updated_at":"2019-10-23 21:21:14"}
Strace:
#0 src/TradeRepeater/DataResource/BuyPlaced.php(21): Kobens\Gemini\TradeRepeater\DataResource\AbstractDataResource->getHealthyRecord(218)
#1 src/Command/Command/TradeRepeater/FillMonitor/Rest.php(89): Kobens\Gemini\TradeRepeater\DataResource\BuyPlaced->setNextState(218)
#2 src/Command/Command/TradeRepeater/FillMonitor/Rest.php(62): Kobens\Gemini\Command\Command\TradeRepeater\FillMonitor\Rest->mainLoop(Object(Symfony\Component\Console\Output\ConsoleOutput))
#3 vendor/symfony/console/Command/Command.php(255): Kobens\Gemini\Command\Command\TradeRepeater\FillMonitor\Rest->execute(Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#4 vendor/symfony/console/Application.php(932): Symfony\Component\Console\Command\Command->run(Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#5 vendor/symfony/console/Application.php(274): Symfony\Component\Console\Application->doRunCommand(Object(Kobens\Gemini\Command\Command\TradeRepeater\FillMonitor\Rest), Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#6 vendor/symfony/console/Application.php(150): Symfony\Component\Console\Application->doRun(Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#7 bin/gemini(100): Symfony\Component\Console\Application->run()
#8 {main}
In this stack trace, we can see both the WebSocket FillMonitor as well as the Rest FillMonitor both incurred an UnhealthyStateException nearly at the same time. The Rest FillMonitor beat the WebSocket FillMonitor to record 218. Whereas the WebSocket FillMonitor beat the Rest FillMonitor to record 217. Both Monitors each then engaged in shutting down the application.
Since the WebSocket FillMonitor (currently) only checks for application shutdown during a heartbeat message, it makes sense that it was allowed to process another message prior to shutting itself down.
At the end of the day we can come to a few simple conclusions here:
The FillMonitor classes are both attempting to process the same type of records, and always could potentially be in a race condition with each other unless we revise the logic of the Rest based FillMonitor, further delaying the records it attempts to process in order to ensure the WebSocket FillMonitor always is allowed to process records first.
It doesn't really matter which FillMonitor class moves the record along to the next state. The Rest based class's main intention is to catch orders that the WebSocket based class may have missed during a timeframe of disconnect.
If a Record is in an UnhealthyState for either FillMonitor, it should leave it alone (as it already does, but also initiates a shutdown of the application). But there is no reason either FillMonitor could not continue to move churn along and further process the next "healthy" record it encounters.
\in_array()
, followed by fetching of the order from the exchange, followed by re-fetching of the record in the ->setNextState()
method. Meaning although it just verified the record was healthy, the WebSocket then snuck in and updated the record immediately after that and before it's update.SELECT FOR UDPATE
would address this. Something we should probably introduce across numerous areas of our TradeRepeater commands.TL;DR:
UnhealthyRecordException
, leaving the said record aloneSELECT
to SELECT FOR UPDATE
across all TradeRepeater's ->setNextState()
methods.
->getRecord()
method of AbstractDataResource
After commit 1e0f26edd0eea995efce4fa9a915b3dc57453593, running the application overnight with large sloshing around of orders yielded no application shutdown.
Closing issue as resolved.
The
...\TradeRepeater\FillMonitor\Rest
class is frequently running intoUnhealthyStateException
errors. Suspicion is that the Websocket based Fill Monitor and Rest based fill monitor are running into a race condition with each other due to the lag involved in the Rest based monitor.