TimelordUK / jspurefix

native typescript FIX engine
MIT License
58 stars 27 forks source link

Logging in without resetting sequence numbers triggers Resend Request loop #15

Closed NiklasZ closed 3 years ago

NiklasZ commented 3 years ago

[Problem A]

Hi again,

I was testing some message recovery using jspurefix and ran into an issue when I disable the message reset flag (141=N) on Logon. The library always requests a BeginSeqNo of 0 (7=0) in the Resend Request, which the counterparty rejects, as it uses 1-indexing for its messages (so no message 0 exists). Here the log:

2021-02-26T18:29:58.652Z [initiator] info: create session
2021-02-26T18:29:58.652Z [initiator] info: connecting ...
2021-02-26T18:29:58.653Z [redactedAKR UAT client:TcpInitiator] info: connecting with timeout 22
2021-02-26T18:29:58.653Z [redactedAKR UAT client:TcpInitiator] info: tryConnect re.da.ct.ed:1337
2021-02-26T18:29:58.755Z [redactedAKR UAT client:TcpInitiator] info: net.createConnection cb, resolving
2021-02-26T18:29:58.756Z [initiator] info: ... connected, run session
[1614364198758] INFO  (@niklas/fix-adapter): sent 8=FIX.4.2|9=000074|35=A|49=redactedB|56=redactedA|34=1|57=|52=20210226-18:29:58.757|98=0|108=30|141=N|10=146|
    service: "FixAdapter"
[1614364198853] INFO  (@niklas/fix-adapter): received 8=FIX.4.2|9=000074|35=A|34=000077|43=N|52=20210226-18:29:58.804|49=redactedA|56=redactedB|98=0|108=30|10=175|
    service: "FixAdapter"
2021-02-26T18:29:59.410Z [redactedAKR UAT client:FixSession] warn: sending resend last received 0 seqNo 77
[1614364199410] INFO  (@niklas/fix-adapter): sent 8=FIX.4.2|9=000066|35=2|49=redactedB|56=redactedA|34=2|57=|52=20210226-18:29:59.410|7=0|16=77|10=245|
    service: "FixAdapter"
2021-02-26T18:29:59.411Z [redactedAKR UAT client:FixSession] info: message 'A' failed checkSeqNo.
[1614364199506] INFO  (@niklas/fix-adapter): received 8=FIX.4.2|9=000097|35=3|34=000078|43=N|52=20210226-18:29:59.458|49=redactedA|56=redactedB|45=2|58=Invalid value in field # 7|10=209|
    service: "FixAdapter"
2021-02-26T18:30:00.056Z [redactedAKR UAT client:FixSession] warn: sending resend last received 0 seqNo 78
[1614364200056] INFO  (@niklas/fix-adapter): sent 8=FIX.4.2|9=000066|35=2|49=redactedB|56=redactedA|34=3|57=|52=20210226-18:30:00.056|7=0|16=78|10=231|
    service: "FixAdapter"
2021-02-26T18:30:00.057Z [redactedAKR UAT client:FixSession] info: message '3' failed checkSeqNo.
[1614364200151] INFO  (@niklas/fix-adapter): received 8=FIX.4.2|9=000097|35=3|34=000079|43=N|52=20210226-18:30:00.103|49=redactedA|56=redactedB|45=3|58=Invalid value in field # 7|10=176|
    service: "FixAdapter"
2021-02-26T18:30:00.673Z [redactedAKR UAT client:FixSession] warn: sending resend last received 0 seqNo 79
[1614364200673] INFO  (@niklas/fix-adapter): sent 8=FIX.4.2|9=000066|35=2|49=redactedB|56=redactedA|34=4|57=|52=20210226-18:30:00.673|7=0|16=79|10=238|
    service: "FixAdapter"

and this is my configuration:

export const myConfig = {
  application: {
    reconnectSeconds: 10,
    type: 'initiator',
    name: 'My client',
    tcp: {
      host: 're.da.ct.ed',
      port: 1337,
      tls: undefined as any, // non-production messages are not encrypted
    },
    protocol: 'ascii',
    dictionary: 'repo42',
  },
  Username: '',
  Password: '',
  EncryptMethod: 0,
  ResetSeqNumFlag: true,
  HeartBtInt: 30,
  SenderCompId: 'redactedB',
  SenderSubID: '',
  TargetCompID: 'redactedA',
  TargetSubID: '',
  BeginString: 'FIX.4.2',
  Name: 'Testing stuff',
  BodyLengthChars: 6,
} as const;

Not sure if there is an agreed upon convention whether message sequences start with 0 or 1, but I thought I should mention it.

TimelordUK commented 3 years ago

thanks for raising this issue - does seem like our side is wrong I will look at a fix

On Mon, 1 Mar 2021 at 13:25, NiklasZ notifications@github.com wrote:

Hi again,

I was testing some message recovery using jspurefix and ran into an issue when I disable the message reset flag (141=N) on Logon. The library always requests a BeginSeqNo of 0 (7=0) in the Resend Request, which the counterparty rejects, as it uses 1-indexing for its messages (so no message 0 exists). Here the log:

2021-02-26T18:29:58.652Z [initiator] info: create session 2021-02-26T18:29:58.652Z [initiator] info: connecting ... 2021-02-26T18:29:58.653Z [redactedAKR UAT client:TcpInitiator] info: connecting with timeout 22 2021-02-26T18:29:58.653Z [redactedAKR UAT client:TcpInitiator] info: tryConnect re.da.ct.ed:1337 2021-02-26T18:29:58.755Z [redactedAKR UAT client:TcpInitiator] info: net.createConnection cb, resolving 2021-02-26T18:29:58.756Z [initiator] info: ... connected, run session [1614364198758] INFO (@niklas/fix-adapter): sent 8=FIX.4.2|9=000074|35=A|49=redactedB|56=redactedA|34=1|57=|52=20210226-18:29:58.757|98=0|108=30|141=N|10=146| service: "FixAdapter" [1614364198853] INFO (@niklas/fix-adapter): received 8=FIX.4.2|9=000074|35=A|34=000077|43=N|52=20210226-18:29:58.804|49=redactedA|56=redactedB|98=0|108=30|10=175| service: "FixAdapter" 2021-02-26T18:29:59.410Z [redactedAKR UAT client:FixSession] warn: sending resend last received 0 seqNo 77 [1614364199410] INFO (@niklas/fix-adapter): sent 8=FIX.4.2|9=000066|35=2|49=redactedB|56=redactedA|34=2|57=|52=20210226-18:29:59.410|7=0|16=77|10=245| service: "FixAdapter" 2021-02-26T18:29:59.411Z [redactedAKR UAT client:FixSession] info: message 'A' failed checkSeqNo. [1614364199506] INFO (@niklas/fix-adapter): received 8=FIX.4.2|9=000097|35=3|34=000078|43=N|52=20210226-18:29:59.458|49=redactedA|56=redactedB|45=2|58=Invalid value in field # 7|10=209| service: "FixAdapter" 2021-02-26T18:30:00.056Z [redactedAKR UAT client:FixSession] warn: sending resend last received 0 seqNo 78 [1614364200056] INFO (@niklas/fix-adapter): sent 8=FIX.4.2|9=000066|35=2|49=redactedB|56=redactedA|34=3|57=|52=20210226-18:30:00.056|7=0|16=78|10=231| service: "FixAdapter" 2021-02-26T18:30:00.057Z [redactedAKR UAT client:FixSession] info: message '3' failed checkSeqNo. [1614364200151] INFO (@niklas/fix-adapter): received 8=FIX.4.2|9=000097|35=3|34=000079|43=N|52=20210226-18:30:00.103|49=redactedA|56=redactedB|45=3|58=Invalid value in field # 7|10=176| service: "FixAdapter" 2021-02-26T18:30:00.673Z [redactedAKR UAT client:FixSession] warn: sending resend last received 0 seqNo 79 [1614364200673] INFO (@niklas/fix-adapter): sent 8=FIX.4.2|9=000066|35=2|49=redactedB|56=redactedA|34=4|57=|52=20210226-18:30:00.673|7=0|16=79|10=238| service: "FixAdapter"

and this is my configuration:

export const myConfig = { application: { reconnectSeconds: 10, type: 'initiator', name: 'My client', tcp: { host: 're.da.ct.ed', port: 1337, tls: undefined as any, // non-production messages are not encrypted }, protocol: 'ascii', dictionary: 'repo42', }, Username: '', Password: '', EncryptMethod: 0, ResetSeqNumFlag: true, HeartBtInt: 30, SenderCompId: 'redactedB', SenderSubID: '', TargetCompID: 'redactedA', TargetSubID: '', BeginString: 'FIX.4.2', Name: 'Testing stuff', BodyLengthChars: 6,} as const;

Not sure if there is an agreed upon convention whether message sequences start with 0 or 1, but I thought I should mention it.

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/TimelordUK/jspurefix/issues/15, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABXWJG6QMWYSXVBKQICHVCTTBOISZANCNFSM4YMRDVJQ .

NiklasZ commented 3 years ago

No worries. Actually I just figured out I could bypass the problem by doing this in my constructor:

    this.sessionState.lastPeerMsgSeqNum = 1;

[Problem B]

But now I think I'm running into another problem, which is that the session layer seems to think that following a SequenceReset with a NewSeqNo of 4 (36=4), the next received message should have sequence 5 (34=000005), but the counterparty actually sends one with 34=000004. Upon that, the session eventually terminates, logging warn: terminate as seqDelta (0) < 0 lastSeq = 4 seqNo = 4.

Here's the full log:

2021-03-01T13:39:52.377Z [initiator] info: create session
2021-03-01T13:39:52.378Z [initiator] info: connecting ...
2021-03-01T13:39:52.378Z [redactedAKR UAT client:TcpInitiator] info: connecting with timeout 22
2021-03-01T13:39:52.378Z [redactedAKR UAT client:TcpInitiator] info: tryConnect re.da.ct.ed:1337
2021-03-01T13:39:52.478Z [redactedAKR UAT client:TcpInitiator] info: net.createConnection cb, resolving
2021-03-01T13:39:52.480Z [initiator] info: ... connected, run session
[1614605992482] INFO  (@niklas/fix-adapter): sent 8=FIX.4.2|9=000074|35=A|49=redactedB|56=redactedA|34=1|57=|52=20210301-13:39:52.481|98=0|108=30|141=N|10=124|
    service: "FixAdapter"
[1614605992582] INFO  (@niklas/fix-adapter): received 8=FIX.4.2|9=000074|35=A|34=000074|43=N|52=20210301-13:39:52.531|49=redactedA|56=redactedB|98=0|108=30|10=153|
    service: "FixAdapter"
2021-03-01T13:39:52.587Z [redactedAKR UAT client:FixSession] warn: sending resend last received 1 seqNo 74
[1614605992587] INFO  (@niklas/fix-adapter): sent 8=FIX.4.2|9=000066|35=2|49=redactedB|56=redactedA|34=2|57=|52=20210301-13:39:52.587|7=1|16=74|10=241|
    service: "FixAdapter"
2021-03-01T13:39:52.588Z [redactedAKR UAT client:FixSession] info: message 'A' failed checkSeqNo.
[1614605992685] INFO  (@niklas/fix-adapter): received 8=FIX.4.2|9=000073|35=4|34=000001|43=Y|52=20210301-13:39:52.635|49=redactedA|56=redactedB|123=Y|36=4|10=128|
    service: "FixAdapter"
2021-03-01T13:39:52.685Z [redactedAKR UAT client:FixSession] info: peer sends '4' sequence reset. newSeqNo = 4
[1614605992686] INFO  (@niklas/fix-adapter): received 8=FIX.4.2|9=000353|35=8|34=000004|43=Y|122=20210301-12:43:35.540|52=20210301-13:39:52.635|49=redactedA|56=redactedB|11=a7fb19fa-8c6d-413b-bda9-763e14408c6b-0|17=18065.1614602615.1|150=0|20=0|39=0|55=AAPL|38=100|44=124|32=0|31=0.00|14=0|151=100|6=0|54=1|37=000009a5.00004691.603c7978.0001|1=U901192|167=CS|60=20210301-12:43:35.540|40=2|109=redactedB|59=6|15=USD|126=20210301-22:59:59|10=110|
    service: "FixAdapter"
2021-03-01T13:39:52.687Z [redactedAKR UAT client:FixSession] warn: terminate as seqDelta (0) < 0 lastSeq = 4 seqNo = 4
2021-03-01T13:39:52.687Z [redactedAKR UAT client:FixSession] info: stop: kill transport
[1614605992688] INFO  (@niklas/fix-adapter): stopped adapter
    service: "FixAdapter"
2021-03-01T13:39:52.688Z [redactedAKR UAT client:FixSession] info: message '8' failed checkSeqNo.
[1614605992689] INFO  (@niklas/fix-adapter): received 8=FIX.4.2|9=000350|35=8|34=000005|43=Y|122=20210301-12:43:35.541|52=20210301-13:39:52.635|49=redactedA|56=redactedB|11=a7fb19fa-8c6d-413b-bda9-763e14408c6b-0|17=00002b45.603c759c.01.01|150=1|20=0|39=1|55=AAPL|38=100|44=124|32=1|30=XNGS|31=123.55|14=1|151=99|6=123.55|54=1|37=000009a5.00004691.603c7978.0001|1=U901192|167=CS|60=20210301-12:43:35.541|40=2|109=redactedB|59=6|15=USD|10=221|
    service: "FixAdapter"
...
(a few more messages arrive and then the layer stops).
TimelordUK commented 3 years ago

sorry im not in a position to look right now but looking at code can you actuallt debug as looking at the ascii transmitter I see

export class AsciiMsgTransmitter extends MsgTransmitter {
  public msgSeqNum: number = 1
  public time: Date

// and later

const hdr: ILooseObject = factory.header(msgType, this.msgSeqNum++,this.time || new Date())

clearly this code is not working?
NiklasZ commented 3 years ago

I think you're looking at the sending sequence number:

(this.transport.transmitter as AsciiMsgTransmitter).msgSeqNum

not the receiving one:

this.sessionState.lastPeerMsgSeqNum;

But sure, I'm happy to do some debugging :+1:

TimelordUK commented 3 years ago

ah yes you are correct indeed

maybe I need to add a debug event that fires or prints i see there is a tostring method on that state - if we initiate login as seq=1 and they respond as seq 1 are they then sending additional messages or are we requesting

I will have to look carefully at exactly from your trace where the break is happening from memory we compare the last received seq number in this state object to the one sent in their next message so it would appear we have either dropped messages or not updating the received seq number for some reason?

NiklasZ commented 3 years ago

(I've labelled the two logs [Problem A] and [Problem B] respectively as they are related problems and for a bit of clarity).

Regarding [Problem A] So the order of events from my understanding is:

  1. Client sends a Logon (client seq 1): 8=FIX.4.2|9=000074|35=A|49=redactedB|56=redactedA|34=1|57=|52=20210226-18:29:58.757|98=0|108=30|141=N|10=146|
  2. Server respond with a Logon (server seq 77): 8=FIX.4.2|9=000074|35=A|34=000077|43=N|52=20210226-18:29:58.804|49=redactedA|56=redactedB|98=0|108=30|10=175| service: "FixAdapter"
  3. Client session layer notices this, logging warn: sending resend last received 0 seqNo 77 and sends request for messages 0-77. 8=FIX.4.2|9=000066|35=2|49=redactedB|56=redactedA|34=2|57=|52=20210226-18:29:59.410|7=0|16=77|10=245|
  4. Server rejects the ResendRequest, citing Invalid value in field # 7: 8=FIX.4.2|9=000097|35=3|34=000078|43=N|52=20210226-18:29:59.458|49=redactedA|56=redactedB|45=2|58=Invalid value in field # 7|10=209|

Now as for why the jspurefix session layer requests messages 0-77 instead of 1-77 that's pretty straightforward. Looking at https://github.com/TimelordUK/jspurefix/blob/master/src/transport/fix-session-state.ts#L33, it's clear the peer sequence number initialises at 0. The resend request is then generated in checkSeqNo(), before lastPeerMsgSeqNum is set to anything new (https://github.com/TimelordUK/jspurefix/blob/master/src/transport/ascii/ascii-session.ts#L41-L49).

I think this behaviour is by and large correct. More of a question whether there should be a special case to not use BeginSeqNo=0 in resend requests or not.

NiklasZ commented 3 years ago

Regarding [Problem B] the order of messages is:

  1. Client sends a Logon (client seq 1): 8=FIX.4.2|9=000074|35=A|49=redactedB|56=redactedA|34=1|57=|52=20210301-13:39:52.481|98=0|108=30|141=N|10=124|
  2. Server responds with Logon (server seq 74) 8=FIX.4.2|9=000074|35=A|34=000074|43=N|52=20210301-13:39:52.531|49=redactedA|56=redactedB|98=0|108=30|10=153|
  3. Client session layer notices this, logging: warn: sending resend last received 1 seqNo 74 (it's 1, because I manually set the peer number to 1 in the constructor. See https://github.com/TimelordUK/jspurefix/issues/15#issuecomment-787967065) and requests 1-74. 8=FIX.4.2|9=000066|35=2|49=redactedB|56=redactedA|34=2|57=|52=20210301-13:39:52.587|7=1|16=74|10=241|
  4. The server responds with a sequence reset, telling the client to the new sequence number to expect is 4. 8=FIX.4.2|9=000073|35=4|34=000001|43=Y|52=20210301-13:39:52.635|49=redactedA|56=redactedB|123=Y|36=4|10=128|
  5. The server sends an Execution Report with sequence 4: 8=FIX.4.2|9=000353|35=8|34=000004|43=Y|122=20210301-12:43:35.540|52=20210301-13:39:52.635|49=redactedA|56=redactedB|11=a7fb19fa-8c6d-413b-bda9-763e14408c6b-0|17=18065.1614602615.1|150=0|20=0|39=0|55=AAPL|38=100|44=124|32=0|31=0.00|14=0|151=100|6=0|54=1|37=000009a5.00004691.603c7978.0001|1=U901192|167=CS|60=20210301-12:43:35.540|40=2|109=redactedB|59=6|15=USD|126=20210301-22:59:59|10=110|
  6. The client's session layer thinks something is wrong because the last sequence number is the same as the current one terminate as seqDelta (0) < 0 lastSeq = 4 seqNo = 4.

The problem here is in https://github.com/TimelordUK/jspurefix/blob/master/src/transport/ascii/ascii-session.ts#L162-L167, where the session layer seems to think that the seqNo should be the last peer sequence number, whereas it seems it should be the next sequence number. Effectively:

        this.sessionState.lastPeerMsgSeqNum = newSeqNo - 1

I'm basing this off the rather confusingly worded FIX 4.2 documentation and a much less confusing visual example from some company.

Anyways, I actually need to get this working for myself relatively soon, so I'll make a fork. If you think I'm talking sense I'll make a PR later.

TimelordUK commented 3 years ago

what happens out of interest if you simply take seq number they give on login and assume that is the starting point. i.e. in their login to they set the reset seq number i assume they do not. So we would need a little change to accomodate both cases

i.e. rather than send reset request on login, we accept their seq number i.e. special case for login and we then check next message is 1 more than this,

NiklasZ commented 3 years ago

Sadly I can't let the server dictate the received sequence number for me as it may have to resend Execution Reports that occurred while the connection was out. Also the server tracks my client's sent sequence number and will start sending rejections if the number is too low.

I've done some work to persist received and sent messages and restore the client with the correct sequence numbers, but that does not seem to be enough. The server will also send ResendRequests of its own, so I need to also support resending messages with gap-fills and everything. It's gonna take me a while to build and test both the changes to jspurefix (see https://github.com/TimelordUK/jspurefix/compare/master...NiklasZ:1.1.1-fixup) and my client to make sure this works.

TimelordUK commented 3 years ago

But can you not at least login your side with reset sequence number that is to say you logon and set resetsewnumflag

It has been quite a while since I dug deep into code I will have to refresh myself. But point being you should be able to at least tell the server you intend to start from sequence 1 by virtue of this flag in logon message. I believe you set this in description object

Sent from my iPhone

On 2 Mar 2021, at 16:29, NiklasZ notifications@github.com wrote:

 Sadly I can't let the server dictate the sequence numbers for me as it may have to send Execution Reports that occurred while the connection was out. Also the server tracks my client's sent sequence number and will start sending rejections of the number is too low.

I've done some work to persist received and sent messages and restore the client with the correct sequence numbers, but that does not seem to be enough. The server will also send ResendRequests of its own, so I need to also support resending messages with gap-fills and everything. It's gonna take me a while to build and test both the changes to jspurefix (see master...NiklasZ:1.1.1-fixup) and my client to make sure this works.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub, or unsubscribe.

TimelordUK commented 3 years ago

as an aside most commercial vendors will use application specific messages for a resend rather than relying on sequence number. Of course I do not know about whom you are connected to, but do they not have an API where you can for example request an execution report. Generally speaking on OMS i have worked on in the past the client connects and issues such API messages and is then sent or replayed execution messages and you may be able for example to specify a from time going back to past N hours.

I am assuming no such API messages exist in your case? i.e. the ability to send a request for a latest report on a given order etc.

On Tue, 2 Mar 2021 at 16:29, NiklasZ notifications@github.com wrote:

Sadly I can't let the server dictate the sequence numbers for me as it may have to send Execution Reports that occurred while the connection was out. Also the server tracks my client's sent sequence number and will start sending rejections of the number is too low.

I've done some work to persist received and sent messages and restore the client with the correct sequence numbers, but that does not seem to be enough. The server will also send ResendRequests of its own, so I need to also support resending messages with gap-fills and everything. It's gonna take me a while to build and test both the changes to jspurefix (see master...NiklasZ:1.1.1-fixup https://github.com/TimelordUK/jspurefix/compare/master...NiklasZ:1.1.1-fixup) and my client to make sure this works.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/TimelordUK/jspurefix/issues/15#issuecomment-789035433, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABXWJG4IMVLFWVGS3K22PKLTBUHAJANCNFSM4YMRDVJQ .

NiklasZ commented 3 years ago

So maybe for a bit more context, I need my client to pass the counterparty's certification test, which means covering about 90% of the order and execution flow and not much else. The one test that is giving me trouble is this one:

image

Where they kill the connection and expect my client to log in again explicitly setting the reconnection flag to 141=N so the two parties can synchronise and resend lost messages.

To do this, I have to store sent and received messages and resend sent ones on demand. Not much of a way around it (sadly no API).

TimelordUK commented 3 years ago

I see

thanks for clarification.

we can add for example a read from the fix log locally on client pretty easily if you wish - i.e. an option in session to say something along lines of we are re-connecting will not be resetting sequence numbers and will go through fix log to find last message sent and received and set sequence numbers so they resume from that point.

Of course that assumes the FIX log is actually correct and up to date

it would be a sort of restore from fix log if your side has been restarted

These are just ideas that spring to mind. I do appreciate you have hit a clear hole in the logic within the engine that needs to be addressed.

I can try and add something like above within next day or so if you think it will help - I do not have a clear way of testing of course I would have to build a simulation of this test

On Wed, 3 Mar 2021 at 12:50, NiklasZ notifications@github.com wrote:

So maybe for a bit more context, I need my client to pass the counterparty's certification test, which means covering about 90% of the order and execution flow and not much else. The one test that is giving me trouble is this one:

[image: image] https://user-images.githubusercontent.com/7795825/109808005-e5be7d80-7c26-11eb-8ed3-fa655cb07782.png

Where they kill the connection and expect my client to log in again explicitly setting the reconnection flag to 141=N so the two parties can synchronise and resend lost messages.

To do this, I have to store sent and received messages and resend sent ones on demand. Not much of a way around it (sadly no API).

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/TimelordUK/jspurefix/issues/15#issuecomment-789692174, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABXWJG3ZR4CTOCHIYPGT7IDTBYWDHANCNFSM4YMRDVJQ .

NiklasZ commented 3 years ago

Hmm that would be great actually! Right now I more or less override Session methods to log messages to a DB, which is pretty makeshift:

 protected send(msgType: string, obj: ILooseObject) {
    // NOTE, the send() operation increments the sequence number, so we need to get it first.
    const sequenceNumber = (this.transport.transmitter as AsciiMsgTransmitter).msgSeqNum;

    super.send(msgType, obj);

    // NOTE: cannot await this promise as making this overridden method async breaks jspurefix.
    this.messageStore
      .writeSentMessage({
        sequenceNumber,
        messageType: msgType as MsgType,
        timestamp: new Date(),
        message: obj,
      })
      .catch((error) => this.logger.error({ error, obj }, 'Could not save sent FIX message'));
  }
  protected onMsg(msgType: string, view: MsgView): void {
    const sequenceNumber: number = view.getTyped(MsgTag.MsgSeqNum);
    const obj = view.toObject();

    // NOTE: cannot await this promise as making this overridden method async breaks jspurefix.
    this.messageStore
      .writeReceivedMessage({
        sequenceNumber,
        messageType: msgType as MsgType,
        timestamp: new Date(),
        message: obj,
      })
      .catch((error) => this.logger.error({ error, obj }, 'Could not save received FIX message'));

    super.onMsg(msgType, view);
    // TODO #4486 remove once resending logic is tested.
    this.logger.debug(`Last received sequence ${this.getLastReceivedMsgSeqNumber()}`);
  }

If you choose to keep an internal log, it would be good to expose it in a way that allows easily persisting it to a DB. Because even if the test only simulates their server crashing, in principle the same is possible for my client.

NiklasZ commented 3 years ago

Oh and I guess restoring it from the DB (or at least the sequence numbers) when starting the client up.

TimelordUK commented 3 years ago

I am looking at adding recovery - am starting to realise this is not a simple task by any means. It is unlikely I will have anything of any substance for at least a week I am afraid. However I will endeavour to add something that supports replaying missing messages.

NiklasZ commented 3 years ago

No I don't think it's trivial either and please do not rush it. What I have right now in my fork should work for the test. Appreciate the help though 😃

NiklasZ commented 3 years ago

Hey there, Just wanted to let you know I managed to get through the certification test. Some stuff I needed to jspurefix do to get it working was:

  1. Allow resending of messages marked with PossDupFlag, using a previous MsgSeqNum from the message log (keeping it in a DB).
  2. Correct some logic in the sequence number checking to also increment the lastPeerMsgSeqNum when we receive TestRequest or ResendRequest.
  3. Correct the SequenceReset handler to expect the next message's sequence number as given in the reset message (otherwise it enters an eternal ResendRequest loop).
  4. Added a handler onSequenceReset(...) to resend messages when the counterparty asks for them (method name is a bit of a misnomer, it should probably be on onResendRequest).

Most of that's visible in https://github.com/TimelordUK/jspurefix/compare/master...NiklasZ:1.1.1-fixup. Regarding the message resending logic, I have permission to share what wrote I to do it outside of the library:

protected async onSequenceReset(view: MsgView) {
    const BeginSeqNo: number = view.getTyped(MsgTag.BeginSeqNo);
    let EndSeqNo: number = view.getTyped(MsgTag.EndSeqNo);

    // Guard against too high ending sequence number
    const lastSentMessageSequence =
      (await this.messageStore.getLastSentMessage())?.sequenceNumber || 0;
    if (EndSeqNo > lastSentMessageSequence || lastSentMessageSequence === 0) {
      const rejection = this.config.factory.reject(
        MsgType.Reject,
        view.getTyped(MsgTag.MsgSeqNum),
        `Requested EndSeqNo 16=${EndSeqNo} is too high. Highest known sequence is ${lastSentMessageSequence}`,
        SessionRejectReason.ValueIsIncorrect
      );
      this.send(MsgType.Reject, rejection);
      return;
    }

    // An ending sequence of 0 means until the latest message.
    if (EndSeqNo === 0) {
      EndSeqNo = lastSentMessageSequence;
    }
    const messages = await this.messageStore.getSentMessagesInSequenceRange(BeginSeqNo, EndSeqNo);
    this.logger.info(`Found ${messages.length} to resend`);

    const relevantMessages = calculateMessagesToResend(
      messages,
      this.config.factory,
      BeginSeqNo,
      EndSeqNo
    );
    this.logger.info(`Reduced to ${relevantMessages.length}, skipping no longer relevant messages`);

    // Mark every message as a possible duplicate, so the receiving system can handle them properly.
    const withSessionDetails = relevantMessages.map((m) => ({
      ...m,
      message: { ...m.message, StandardHeader: { PossDupFlag: true, MsgSeqNum: m.sequenceNumber } },
    }));

    withSessionDetails.forEach((message) => this.send(message.messageType, message.message));
    this.logger.info('Finished resending');
  }

Attached is also the implementation of calculateMessagesToResend(...) + tests.

helpers.zip

Hope this helps. I'm also happy to implement/test anything on the library as needed.

TimelordUK commented 3 years ago

actually this looks fantastic, I very much appreciate sticking with the library and getting this over the line.

I will take a look at merging this in, looks a pretty neat solution

I will look at other changes on the branch - have you got some sort of in memory store to keep the messages?

also how do cater for them dropping the connection and you having to re-connect

thanks so much and congrats on passing certification

TimelordUK commented 3 years ago

one more thing, can i merge these changes over? i.e. apart from that is your branch much different. The fact you got this to pass with the vendor means it already has field exposure so I am happy to pull into master.

NiklasZ commented 3 years ago

No problem and regarding your questions:

I will look at other changes on the branch - have you got some sort of in memory store to keep the messages?

Yes it's pretty much just a class that sits on top of MongoDB. I reckon though that you'll want to go for an in-memory equivalent (probably with some configurable size limit). In any case, this is it:

fix-message-store.ts ```typescript export interface IFixMessage { sequenceNumber: number; messageType: MsgType; timestamp: Date; message: ILooseObject; } // TODO #4163 add DB version + migration support export class FixMessageStore { private readonly receivedMessages: Collection; private readonly sentMessages: Collection; private indexed = false; constructor(database: Db) { this.receivedMessages = database.collection(FixMessageStoreDbCollection.RECEIVED_MESSAGES); this.sentMessages = database.collection(FixMessageStoreDbCollection.SENT_MESSAGES); } // TODO #4163 these read methods are probably unnecessary public async readReceivedMessage(sequenceNumber: number): Promise { return this.receivedMessages.findOne({ sequenceNumber }); } public async writeReceivedMessage(message: IFixMessage): Promise { await this.checkIndices(); await this.receivedMessages.insertOne(message); } public async readSentMessage(sequenceNumber: number): Promise { return this.sentMessages.findOne({ sequenceNumber }); } public async writeSentMessage(message: IFixMessage): Promise { await this.checkIndices(); await this.sentMessages.insertOne(message); } public async getLastReceivedMessage(): Promise { return this.receivedMessages.findOne({}, { sort: { timestamp: -1 } }); } public async getLastSentMessage(): Promise { return this.sentMessages.findOne({}, { sort: { timestamp: -1 } }); } public async getSentMessagesInSequenceRange( startSequence: number, endSequence: number ): Promise { const startMessage = await this.sentMessages.findOne( { sequenceNumber: startSequence }, { sort: { timestamp: -1 } } ); const endMessage = await this.sentMessages.findOne( { sequenceNumber: endSequence }, { sort: { timestamp: -1 } } ); return this.sentMessages .find({ timestamp: { $gte: startMessage?.timestamp, $lte: endMessage?.timestamp }, }) .sort({ timestamp: 1 }) .toArray(); } private async checkIndices(): Promise { if (!this.indexed) { await this.sentMessages.createIndex({ timestamp: -1, sequenceNumber: 1 }, { unique: false }); await this.receivedMessages.createIndex( { timestamp: -1, sequenceNumber: 1 }, { unique: false } ); this.indexed = true; } } } ```

also how do cater for them dropping the connection and you having to re-connect

Presently not very elegantly and it's something I'm looking to improve. I rely on jspurefix stopping the connection after a few heartbeats time out and then exit the process. As the process runs in the depths of a cloud infrastructure this is "good enough" as the docker container it's in will be restarted so it reconnects that way.

basic-shutdown-logic ```typescript const fixInitiator = initiator(fixConfig, () => adapter); const disconnectFix = async () => { adapter.done(); await fixInitiator; }; const shutdown = once(async () => { logger.info('Shutting down...'); await Promise.all([mongoClient.close(), mba.disconnect(), disconnectFix()]); process.exit(); }); // This ensures the service always shuts down when the adapter stops for whatever reason. await fixInitiator.then(shutdown); ```

one more thing, can i merge these changes over?

Please wait a bit. My code is currently in review at work and I still would like to make some refinements to it first (I've figured out how to independently trigger connection breakdowns vs. my counterparty so I can retest any changes I make reasonably well). Once it's ready I'll open a PR.

NiklasZ commented 3 years ago

And here it is: https://github.com/TimelordUK/jspurefix/pull/16.
Gonna leave the implementation of an in-memory message store for later, as I need to think more on what it should look like. Ideally, I'd still like a DB-based one for backups and maybe to allow async message handlers, because presently it's a bit risky.