Tinkoff / investAPI

399 stars 136 forks source link

Пересоздание стримов маркет даты ведет к ResourceExhausted при реализации gracefull shutdown #235

Closed makhmutov closed 2 years ago

makhmutov commented 2 years ago
package tinkoff

import (
    "context"
    "crypto/tls"
    "fmt"
    "github.com/rs/zerolog/log"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials"
    "google.golang.org/grpc/metadata"
    "xxx/config"
    pb "xxx/investapi"
    "io"
    "sync"
    "testing"
    "time"
)

const ReconnectCount = 2
const IdleTIme = time.Second * 10
const SleepTime = time.Second * 5

func TestGracefullShutdownWithOneConnection(t *testing.T) {
    tinkoffToken := config.GetConfigFromEnv().TinkoffToken
    md := metadata.New(map[string]string{"Authorization": "Bearer " + tinkoffToken})
    for ci := 0; ci < ReconnectCount; ci++ {
        func() {
            fmt.Println("Create connection", ci)
            conn, err := grpc.Dial("invest-public-api.tinkoff.ru:443", grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})), grpc.WithBlock()) //FIXME
            if err != nil {
                t.Fatalf("err on grpc.Dial: %v", err)
            }
            defer func() {
                fmt.Println("Close connection...", ci)
                conn.Close()
            }()
            var wg sync.WaitGroup
            tariff, err := pb.NewUsersServiceClient(conn).GetUserTariff(metadata.NewOutgoingContext(context.TODO(), md), &pb.GetUserTariffRequest{})
            if err != nil {
                t.Fatalf("err on client.GetUserTariff: %v", err)
            }
            var streamLimit int32
            for _, limit := range tariff.StreamLimits {
                for _, stream := range limit.Streams {
                    if stream == "tinkoff.public.invest.api.contract.v1.MarketDataStreamService/MarketDataStream" {
                        streamLimit = limit.Limit
                    }
                }
            }
            if streamLimit == 0 {
                t.Fatalf("streamLimit == 0")
            }
            fmt.Printf("streamLimit = %d\n", streamLimit)
            for si := int32(0); si < streamLimit; si++ {
                wg.Add(1)
                streamIndex := si
                go func() {
                    defer wg.Done()
                    fmt.Printf("Create stream[%d] of connection[%d] \n", streamIndex, ci)
                    streamClient, err := pb.NewMarketDataStreamServiceClient(conn).MarketDataStream(metadata.NewOutgoingContext(context.TODO(), md))
                    if err != nil {
                        t.Fatalf("err on client.MarketDataStream: %v", err)
                    }
                    err = streamClient.Send(&pb.MarketDataRequest{
                        Payload: &pb.MarketDataRequest_SubscribeTradesRequest{
                            SubscribeTradesRequest: &pb.SubscribeTradesRequest{
                                SubscriptionAction: pb.SubscriptionAction_SUBSCRIPTION_ACTION_SUBSCRIBE,
                                Instruments: []*pb.TradeInstrument{
                                    {
                                        Figi: "BBG002B04MT8",
                                    },
                                },
                            },
                        },
                    })
                    go func() {
                        for {
                            in, err := streamClient.Recv()
                            if err == io.EOF {
                                return
                            }
                            if err != nil {
                                log.Err(err).Msgf("err on streamClient.Recv: %v", err)
                                return
                            }
                            fmt.Println(in)
                        }
                    }()
                    if err != nil {
                        t.Fatalf("err on streamClient.Send: %v", err)
                    }
                    time.Sleep(IdleTIme)
                    err = streamClient.CloseSend()
                    if err != nil {
                        t.Fatalf("err on streamClient.CloseSend: %v", err)
                    }
                }()
            }
            wg.Wait()
            time.Sleep(SleepTime)
        }()
    }
}

Вывод

=== RUN   TestGracefullShutdownWithOneConnection
Create connection 0
streamLimit = 4
Create stream[3] of connection[0] 
Create stream[1] of connection[0] 
Create stream[2] of connection[0] 
Create stream[0] of connection[0] 
subscribe_trades_response:{tracking_id:"62fa651678a02b112e546631d9b61488" trade_subscriptions:{figi:"BBG002B04MT8" subscription_status:SUBSCRIPTION_STATUS_SUCCESS}}
subscribe_trades_response:{tracking_id:"62fa6516508de1ec6aa1a28e9f768765" trade_subscriptions:{figi:"BBG002B04MT8" subscription_status:SUBSCRIPTION_STATUS_SUCCESS}}
subscribe_trades_response:{tracking_id:"62fa65165f0fa65f26dd2f747fe1287b" trade_subscriptions:{figi:"BBG002B04MT8" subscription_status:SUBSCRIPTION_STATUS_SUCCESS}}
subscribe_trades_response:{tracking_id:"62fa6516105f4186f3501b9b7727dda4" trade_subscriptions:{figi:"BBG002B04MT8" subscription_status:SUBSCRIPTION_STATUS_SUCCESS}}
Close connection... 0
Create connection 1
streamLimit = 4
Create stream[3] of connection[1] 
Create stream[1] of connection[1] 
Create stream[2] of connection[1] 
Create stream[0] of connection[1] 
subscribe_trades_response:{tracking_id:"62fa6525b178e8f586ef365f206d8199" trade_subscriptions:{figi:"BBG002B04MT8" subscription_status:SUBSCRIPTION_STATUS_SUCCESS}}
{"level":"error","error":"rpc error: code = ResourceExhausted desc = 80001","time":"2022-08-15T20:24:21+05:00","message":"err on streamClient.Recv: rpc error: code = ResourceExhausted desc = 80001"}
{"level":"error","error":"rpc error: code = ResourceExhausted desc = 80001","time":"2022-08-15T20:24:21+05:00","message":"err on streamClient.Recv: rpc error: code = ResourceExhausted desc = 80001"}
{"level":"error","error":"rpc error: code = ResourceExhausted desc = 80001","time":"2022-08-15T20:24:21+05:00","message":"err on streamClient.Recv: rpc error: code = ResourceExhausted desc = 80001"}
makhmutov commented 2 years ago

пробывал вариант с cancelFunc, так же получаю ResourceExhausted на второй итерации

package tinkoff

import (
    "context"
    "crypto/tls"
    "fmt"
    "github.com/rs/zerolog/log"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials"
    "google.golang.org/grpc/metadata"
    "xxx/config"
    pb "xxx/investapi"
    "io"
    "sync"
    "testing"
    "time"
)

const ReconnectCount = 2
const IdleTIme = time.Second * 10
const SleepTime = time.Second * 5

func TestGracefullShutdownWithOneConnection(t *testing.T) {
    tinkoffToken := config.GetConfigFromEnv().TinkoffToken
    md := metadata.New(map[string]string{"Authorization": "Bearer " + tinkoffToken})
    for ci := 0; ci < ReconnectCount; ci++ {
        func() {
            fmt.Println("Create connection", ci)
            conn, err := grpc.Dial("invest-public-api.tinkoff.ru:443", grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})), grpc.WithBlock()) //FIXME
            if err != nil {
                log.Fatal().Msgf("err on grpc.Dial: %v", err)
            }
            defer func() {
                fmt.Println("Close connection...", ci)
                conn.Close()
            }()
            var wg sync.WaitGroup
            tariff, err := pb.NewUsersServiceClient(conn).GetUserTariff(metadata.NewOutgoingContext(context.TODO(), md), &pb.GetUserTariffRequest{})
            if err != nil {
                log.Fatal().Msgf("err on client.GetUserTariff: %v", err)
            }
            var streamLimit int32
            for _, limit := range tariff.StreamLimits {
                for _, stream := range limit.Streams {
                    if stream == "tinkoff.public.invest.api.contract.v1.MarketDataStreamService/MarketDataStream" {
                        streamLimit = limit.Limit
                    }
                }
            }
            if streamLimit == 0 {
                log.Fatal().Msgf("streamLimit == 0")
            }
            fmt.Printf("streamLimit = %d\n", streamLimit)
            for si := int32(0); si < streamLimit; si++ {
                wg.Add(1)
                streamIndex := si
                go func() {
                    defer wg.Done()
                    fmt.Printf("Create stream[%d] of connection[%d] \n", streamIndex, ci)
                    ctx, cancelFunc := context.WithCancel(context.Background())
                    streamClient, err := pb.NewMarketDataStreamServiceClient(conn).MarketDataStream(metadata.NewOutgoingContext(ctx, md))
                    if err != nil {
                        log.Fatal().Msgf("err on client.MarketDataStream: %v", err)
                    }
                    err = streamClient.Send(&pb.MarketDataRequest{
                        Payload: &pb.MarketDataRequest_SubscribeTradesRequest{
                            SubscribeTradesRequest: &pb.SubscribeTradesRequest{
                                SubscriptionAction: pb.SubscriptionAction_SUBSCRIPTION_ACTION_SUBSCRIBE,
                                Instruments: []*pb.TradeInstrument{
                                    {
                                        Figi: "BBG002B04MT8",
                                    },
                                },
                            },
                        },
                    })
                    go func() {
                        for {
                            in, err := streamClient.Recv()
                            if err == io.EOF {
                                return
                            }
                            if err != nil {
                                log.Err(err).Msgf("err on streamClient.Recv: %v", err)
                                return
                            }
                            fmt.Println(in)
                        }
                    }()
                    if err != nil {
                        log.Fatal().Msgf("err on streamClient.Send: %v", err)
                    }
                    time.Sleep(IdleTIme)
                    cancelFunc()
                    err = streamClient.CloseSend()
                    if err != nil {
                        log.Fatal().Msgf("err on streamClient.CloseSend: %v", err)
                    }
                }()
            }
            wg.Wait()
            time.Sleep(SleepTime)
            fmt.Println()
        }()
    }
}
betslus1 commented 2 years ago

проверил на ноде, вроде подтвердилось

https://github.com/betslus1/unofficial-tinkoff-invest-api_v2-lazy-sdk-NODEJS/blob/39a1f3eb0b14e6926ea1c5fa910c903ec332f9e4/example/test_stream_cancelation.js

После 4 cancel() и ответа от сервера, при повторном подключении получаю 3 exhausted

malyginvv commented 2 years ago

https://gist.github.com/malyginvv/ef719c4f04726e9689f6ce848541108b Заменил cancel() на end(), добавил логирование количества открытых стримов. При повторном переподключении только одного стрима кажется проблем нет.

betslus1 commented 2 years ago

https://gist.github.com/malyginvv/ef719c4f04726e9689f6ce848541108b

Заменил cancel() на end(), добавил логирование количества открытых стримов.

При повторном переподключении только одного стрима кажется проблем нет.

Когда тестил, частенько один стрим из 4 подключался при повторном подключении, на токене где лимит 4.

Плюс по вашему коду второй цикл начнётся раньше, чем будет отписка от стримов 1 цикла. (Тайм-аут 100мс и там и там) либо возможен RC на переменной calls[i]

TradesStream можно только через cancel завершить корректно.

nonamegithub commented 2 years ago

Подтверждаю, вчера началось, а сегодня продолжается ошибка 80001 после cancel()

AlexanderVolkovTCS commented 2 years ago

Спасибо. В утреннем релизе зафиксили проблему. Если вдруг воспроизведется повторно - прошу переоткрыть тикет.