fiorix / go-diameter

Diameter stack and Base Protocol (RFC 6733) for the Go programming language
Other
252 stars 143 forks source link

High response load #2 #195

Open egorkovalchuk opened 7 months ago

egorkovalchuk commented 7 months ago

Hello

I put a high load on the diameter server, the load goes away, but some of the responses disappear. Him sent, but not processed. Most likely, more than one response is placed in the network buffer, but a few.

Speed send 4000 op/s

what information will you need?

I use an application that sends generated requests to two diameter servers. At low request rates. everything works correctly. When the requests increase, I see that the application does not parse all the answers. At the same time, tkpdump shows that these responses were sent by the servers.

my project https://github.com/egorkovalchuk/go-cdrgenerator/ add you

` func StartDiameterClient() { var brtadress []datatype.Address for , ip := range global_cfg.Common.BRT { brt_adress = append(brt_adress, datatype.Address(net.ParseIP(ip))) }

//прописываем конфиг
ProcessDebug("Load Diameter config")

diam_cfg := &sm.Settings{
    OriginHost:       datatype.DiameterIdentity(global_cfg.Common.BRT_OriginHost),
    OriginRealm:      datatype.DiameterIdentity(global_cfg.Common.BRT_OriginRealm),
    VendorID:         data.PETER_SERVICE_VENDOR_ID,
    ProductName:      "CDR-generator",
    OriginStateID:    datatype.Unsigned32(time.Now().Unix()),
    FirmwareRevision: 1,
    HostIPAddresses:  brt_adress,
}

// Create the state machine (it's a diam.ServeMux) and client.
mux := sm.New(diam_cfg)
ProcessDebug(mux.Settings())

ProcessDebug("Load Diameter dictionary")
ProcessDebug("Load Diameter client")

// Инициализация конфига клиента
cli := data.Client(mux)

// Set message handlers.
// Можно использовать канал AnswerCCAEvent(BrtDiamChannelAnswer)
mux.Handle("CCA", AnswerCCAEvent())
mux.Handle("DWA", AnswerDWAEvent())

// Запуск потока записи ошибок в лог
go DiamPrintErrors(mux.ErrorReports())
//KeepAlive WTF??
cli.EnableWatchdog = false //true

chk := 0
ProcessDiam("Connecting clients...")
for _, init_connect := range global_cfg.Common.BRT {
    ProcessDebug(init_connect)

    var err error

    brt_connect, err := Dial(cli, init_connect+":"+strconv.Itoa(global_cfg.Common.BRT_port), "", "", false, "tcp")
    if err != nil {
        ProcessError("Connect error ")
        ProcessError(err)
    } else {
        ProcessDebug("Connect to " + init_connect + " done.")
        // Запуск потоков записи по БРТ
        // Отмеаем что клиент запущен
        chk++
        go SendCCREvent(brt_connect, diam_cfg, BrtDiamChannel)
    }
}
// Проверка что клиент запущен
if chk > 0 {
    ProcessDiam("Done. Sending messages...")
} else {
    ProcessDiam("Stopping the client's diameter. No connection is initialized")
    brt = false
}

} // Кусок для диаметра // Определение шифрование соединения func Dial(cli sm.Client, addr, cert, key string, ssl bool, networkType string) (diam.Conn, error) { if ssl { return cli.DialNetworkTLS(networkType, addr, cert, key, nil) } return cli.DialNetwork(networkType, addr) } // Обработчик-ответа Диаметра func AnswerCCAEvent() diam.HandlerFunc { //func AnswerCCAEvent(done chan struct{}) diam.HandlerFunc { return func(c diam.Conn, m diam.Message) { // обработчик ошибок, добавить поток(канал) для офлайна? // Конкуренция по ответам, запись в фаил? s, sid := data.ResponseDiamHandler(m, ProcessDiam, debugm) CDRDiamResponseCount.Inc(strconv.Itoa(s)) if s == 4011 || s == 4522 || s == 4012 { //logdiam.Println("DIAM: Answer CCA code: " + strconv.Itoa(s) + " Session: " + sid) //переход в оффлайн val := BrtOfflineCDR.Load(sid) //BrtOfflineCDR[sid]* rr, err := data.CreateCDRRecord(val.RecPool, val.CDRtime, val.Ratio, CDRPatternTask[val.TaskName]) if err != nil { LogChannel <- LogStruct{"ERROR", err} } else { CDRChanneltoFileUni[val.TaskName] <- rr } BrtOfflineCDR.Delete(sid) } else if s == 5030 { // 5030 пользователь не известен BrtOfflineCDR.Delete(sid) } else { //logdiam.Println("DIAM: Answer CCA code: " + strconv.Itoa(s)) BrtOfflineCDR.Delete(sid) }

}

} func AnswerDWAEvent() diam.HandlerFunc { return func(c diam.Conn, m diam.Message) { //обработчик ошибок, вотч дог пишем в обычный лог s, _ := data.ResponseDiamHandler(m, ProcessDiam, debugm) ProcessDiam("Answer DWA code: " + strconv.Itoa(s)) } } // Горутина приема и записи сообщения по диаметру в брт func SendCCREvent(c diam.Conn, cfg sm.Settings, in chan data.DiamCH) {

var err error
server, _, _ := strings.Cut(c.RemoteAddr().String(), ":")
// на подумать, использовать структуру, а потом ее определять или сазу передавать готовое сообщение
// заменить на просто вывод в лог
defer c.Close()

heartbeat := time.Tick(5 * time.Second)
meta, ok := smpeer.FromContext(c.Context())
if !ok {
    ProcessDiam("Client connection does not contain metadata")
    ProcessDiam("Close threads")
}

for {
    select {
    case <-heartbeat:
        // Сделать выход или переоткрытие?
        meta, ok = smpeer.FromContext(c.Context())
        if !ok {
            ProcessDiam("Client connection does not contain metadata")
            ProcessDiam("Close threads")
        }

        // Настройка Watch Dog
        m := diam.NewRequest(280, 4, dict.Default)
        m.NewAVP(avp.OriginHost, avp.Mbit, 0, cfg.OriginHost)
        m.NewAVP(avp.OriginRealm, avp.Mbit, 0, cfg.OriginRealm)
        m.NewAVP(avp.OriginStateID, avp.Mbit, 0, cfg.OriginStateID)
        log.Printf("DIAM: Sending DWR to %s", c.RemoteAddr())
        _, err = m.WriteTo(c)
        if err != nil {
            LogChannel <- LogStruct{"ERROR", err}
        }

    case tmp := <-in:

        diam_message := tmp.Message
        //diam_message := data.CreateCCREventMessage(dict.Default)
        diam_message.NewAVP(avp.OriginHost, avp.Mbit, 0, cfg.OriginHost)
        diam_message.NewAVP(avp.OriginRealm, avp.Mbit, 0, cfg.OriginRealm)
        diam_message.NewAVP(avp.DestinationRealm, avp.Mbit, 0, meta.OriginRealm)
        diam_message.NewAVP(avp.DestinationHost, avp.Mbit, 0, meta.OriginHost)

        /*logdiam.Printf("DIAM: Sending CCR to %s", c.RemoteAddr())
        logdiam.Println(diam_message)*/

        _, err = diam_message.WriteTo(c)
        if err != nil {
            LogChannel <- LogStruct{"ERROR", err}
        } else {
            CDRDiamCount.Inc(server)
        }

    default:

    }
}

}`

egorkovalchuk commented 6 months ago

Hi

added diagnostics. At some point, responses from one of the two servers stop being processed.

what is the best way to use the machine, for each connection its own or a common one as in your example