quickfixgo / quickfix

The Go FIX Protocol Library :rocket:
https://www.quickfixgo.org/
Other
731 stars 287 forks source link

Support batching outgoing messages #576

Open Yu-Xie opened 1 year ago

Yu-Xie commented 1 year ago

Prototype for https://github.com/quickfixgo/quickfix/issues/555

Yu-Xie commented 1 year ago

Posting some load testing results using local Postgres:

Output:

Spent 78.082917ms seconds to send 100 messages with 10 batch size and 10 batches
Spent 14.086666ms seconds to send 100 messages with 50 batch size and 2 batches
Spent 61.315208ms seconds to send 500 messages with 50 batch size and 10 batches
Spent 35.222625ms seconds to send 500 messages with 100 batch size and 5 batches
Spent 64.8035ms seconds to send 1000 messages with 100 batch size and 10 batches

Note that this is much faster compare to existing implementation (https://github.com/quickfixgo/quickfix/issues/555) which takes 1.5 SECONDS to prepare 1000 messages for sending.

Source code:

package quickfix

import (
    "fmt"
    "strconv"
    "sync/atomic"
    "testing"
    "time"

    "github.com/quickfixgo/quickfix/config"
    "github.com/stretchr/testify/require"

    _ "github.com/lib/pq"
)

func TestBatchSendMessage(t *testing.T) {
    const driver = "postgres"
    const dsn = `host=127.0.0.1 port=5432 user=xxx password=password dbname=xxx sslmode=disable`
    settings := NewSettings()
    sessionSettings := NewSessionSettings()
    sessionSettings.Set(config.BeginString, BeginStringFIX42)
    sessionSettings.Set(config.SenderCompID, "mock_initiator")
    sessionSettings.Set(config.TargetCompID, "mock_acceptor")
    sessionSettings.Set(config.HeartBtInt, "10")
    sessionSettings.Set(config.SocketConnectPort, "12345")
    sessionSettings.Set(config.SocketConnectHost, "127.0.0.1")
    sessionSettings.Set(config.SQLStoreDriver, driver)
    sessionSettings.Set(config.SQLStoreDataSourceName, dsn)
    sid, err := settings.AddSession(sessionSettings)
    require.NoError(t, err)

    factory := NewSQLStoreFactory(settings)

    initiator, err := NewInitiator(&app{}, factory, settings, NewNullLogFactory())
    require.NoError(t, err)

    err = initiator.Start()
    require.NoError(t, err)

    time.Sleep(10 * time.Second)

    require.NoError(t, batchSend(10, 100, sid))
    require.NoError(t, batchSend(50, 100, sid))
    require.NoError(t, batchSend(50, 500, sid))
    require.NoError(t, batchSend(100, 500, sid))
    require.NoError(t, batchSend(100, 1000, sid))

    time.Sleep(20 * time.Second)
    t.Fail() // to see logs
}

func batchSend(buf, n int, sessionID SessionID) error {
    if buf > n {
        buf = n
    }
    ch := make(chan Messagable, n)
    for _, o := range newOrders(n) {
        select {
        case ch <- o:
        }
    }
    // batch
    b := make([]Messagable, 0, buf)
    done := 0
    numBatches := 0
    start := time.Now()
    for {
        select {
        case o := <-ch:
            b = append(b, o)
            if len(b) == buf {
                if err := SendAppToTarget(b, sessionID); err != nil {
                    return err
                }
                numBatches++
                done += buf
                b = make([]Messagable, 0, buf)
            }
        }
        if done == n {
            latency := time.Since(start)
            fmt.Printf("Spent %s seconds to send %d messages with %d batch size and %d batches\n", latency.String(), n, buf, numBatches)
            return nil
        }
    }
}

var cnt atomic.Int32

func newOrders(n int) []Messagable {
    res := make([]Messagable, 0, n)
    for i := 0; i < n; i++ {
        m := NewMessage()
        cnt.Add(1)
        m.Header.Set(w(35, "D"))
        m.Body.Set(w(38, "101.0"))
        m.Body.Set(w(11, strconv.Itoa(int(cnt.Load()))))
        m.Body.Set(w(55, "symbol"))
        m.Body.Set(w(54, "1"))
        m.Body.Set(w(59, "0"))
        m.Body.Set(w(40, "1"))
        res = append(res, m.ToMessage())
    }
    return res
}

func w(tag int, s string) FieldWriter {
    return &writer{tag: Tag(tag), s: s}
}

type writer struct {
    tag Tag
    s   string
}

func (w *writer) Tag() Tag {
    return w.tag
}

func (w *writer) Write() []byte {
    return []byte(w.s)
}

type app struct {
}

func (a app) OnCreate(sessionID SessionID) {
}

func (a app) OnLogon(sessionID SessionID) {
}

func (a app) OnLogout(sessionID SessionID) {
}

func (a app) ToAdmin(message *Message, sessionID SessionID) {
}

func (a app) ToApp(message *Message, sessionID SessionID) error {
    return nil
}

func (a app) FromAdmin(message *Message, sessionID SessionID) MessageRejectError {
    return nil
}

func (a app) FromApp(message *Message, sessionID SessionID) MessageRejectError {
    return nil
}
ackleymi commented 10 months ago

@Yu-Xie if you would change SendAppToTarget to SendBatchToTarget and SaveMessagesAndIncrNextSenderMsgSeqNum to SaveBatchAndIncrNextSenderMsgSeqNum I will merge. I'd like to keep func names immediately obvious to prevent accidental misuse. Also if the FileStore and MongoStore impls are easy enough to implement for you, please do so.

Yu-Xie commented 9 months ago

@Yu-Xie if you would change SendAppToTarget to SendBatchToTarget and SaveMessagesAndIncrNextSenderMsgSeqNum to SaveBatchAndIncrNextSenderMsgSeqNum I will merge. I'd like to keep func names immediately obvious to prevent accidental misuse. Also if the FileStore and MongoStore impls are easy enough to implement for you, please do so.

Makes sense -- I have addressed these feedbacks in https://github.com/quickfixgo/quickfix/pull/599