VoltDB / voltdb-client-go

VoltDB Golang Client Library
MIT License
32 stars 19 forks source link

Help with reconnect with permanent connection #56

Closed asyslinux closed 6 years ago

asyslinux commented 6 years ago

Hello, can anyone help with reconnection to VoltDB?

I use function with 1 established connection to VoltDB. I wrote a simple loop for reconnection to VoltDB.

But this loop is actual only when VoltDB is down and if I start my application, application try connect to VoltDB. When VoltDB is up, application successfully connected.

But I do not know, how to make reconnect to VoltDB in main function, when application is already running and VoltDB is restarted during application running.

I do not know about driver dunctionality, but parameter max_retries:

voltdb://localhost:21212?max_retries=1000&retry=true&retry_interval=5s

Doesn,t reconnect automatically if VoltDB restarted, when application running.

If I use ExecAsync method, i look on errors when VoltDB is restarted:

[INFO] 2018/08/02 21:40 200 71.695216ms 127.0.0.1 GET /api/v1/add main-stable.go:109: Error: timeout main-stable.go:109: Error: timeout main-stable.go:109: Error: timeout

If I use Exec method, i look on errors when VoltDB is restarted:

[INFO] 2018/08/02 21:09 200 24.653089ms 127.0.0.1 GET /api/v1/add ^[[15~main-stable.go:29: Error: node voltdb://localhost:21212?max_retries=1000&retry=true&retry_interval=5s: is down [INFO] 2018/08/02 21:10 200 1.0007059s 127.0.0.1 GET /api/v1/add main-stable.go:29: Error: voltdb://localhost:21212?max_retries=1000&retry=true&retry_interval=5s:3 writing on a closed node connection [INFO] 2018/08/02 21:10 200 118.898µs 127.0.0.1 GET /api/v1/add main-stable.go:29: Error: voltdb://localhost:21212?max_retries=1000&retry=true&retry_interval=5s:4 writing on a closed node connection

I made benchmarks with vegeta http load tool - https://github.com/tsenart/vegeta

Machine: 80vCPU / 256GB RAM / SSD HEAP memory for VoltDB = 8192MB

1 000 000 Simple inserts. Concurrency - 1024 rps

Results, if I use 1 db new connection per 1 http request, I have next results:

root@anl:/home# netstat -tapn | grep TIME_W | grep 21212 | wc -l 13998

CPU Load: ~500% for main.go / ~150-300% for java (VoltDB)

If I use this code with permanent connection:

Results, if I use 1 db permanent connection per 1024 http requests with ExecAsync and Exec, I have next results:

root@anl:/home# netstat -tapn | grep TIME_W | grep 21212 | wc -l 5

CPU Load: ~80% for main.go / ~50% for java (VoltDB)

This is very effectively results, but I do not know how make reconnection of permanent connection to VoltDB, if VoltDB is restarted.

Thanks.

Now I have this example code:

Http request for test:

https://localhost:8081/api/v1/add?uuid=123&rtime=123&name=XX

Database:

CREATE TABLE volt
(
  uuid VARCHAR(36) NOT NULL,
  rtime INT NOT NULL,
  name VARCHAR(36) NOT NULL,
  PRIMARY KEY (uuid)
);

CREATE PROCEDURE TestInsert AS UPSERT INTO volt (uuid,rtime,name) VALUES(?,?,?);

Go code:

package main

import (
        "log"
        "time"
        "database/sql/driver"
        "github.com/kataras/iris"
        "github.com/kataras/iris/middleware/logger"
        "github.com/kataras/iris/middleware/recover"
        "github.com/VoltDB/voltdb-client-go/voltdbclient"
)

// Statistic

func add_stat(volt_db *voltdbclient.Conn) iris.Handler {
 return func(ctx iris.Context) {

    // General Code

    uuid := ctx.URLParam("uuid")
    rtime := ctx.URLParam("rtime")
    name := ctx.URLParam("name")

    // Async Write to VoltDB

    anlCons := anlConsumer{}
    volt_db.ExecAsyncTimeout(anlCons, "TestInsert", []driver.Value{uuid, rtime, name}, 1 * time.Second)
//    volt_db.ExecAsync(anlCons, "TestInsert", []driver.Value{uuid, rtime, name})
    volt_db.Drain()

    // Sync Write to VoltDB

//    ins, err := volt_db.ExecTimeout("TestInsert", []driver.Value{uuid, rtime, name}, 1 * time.Second)
//    ins, err := volt_db.Exec("TestInsert", []driver.Value{uuid, rtime, name})
//    if err != nil  {
//          log.Printf("Error: %s", err)
//    }

//    if err == nil  {
//      log.Printf("Query: %v", ins)
//    }

    ctx.Writef("Arg User UUID : %s\n", uuid)
    ctx.Writef("Arg Response Time: %s\n", rtime)
    ctx.Writef("Arg Network Name: %s\n", name)
    ctx.Writef("\n")
    ctx.Text("Successfull!\n")

    ctx.StatusCode(iris.StatusOK)

 }

}

// Main Function

func main() {

    // Database Connection

    volt_db, err := GetVolt()
    defer volt_db.Close()
    if err != nil {
        log.Printf("Main Connect to VoltDB Failed: %s", err)
    }

    // Web Server

    app := iris.New()
    app.Logger().SetLevel("debug")
    app.Use(logger.New())
    app.Use(recover.New())

    // Web Routing

    v1 := app.Party("/api/v1").AllowMethods(iris.MethodOptions) // <- important for the preflight.
    {
        v1.Get("/add", add_stat(volt_db))
    }

    // Web Listen Settings

    app.Run(iris.Addr(":8081"))
}

// Database Connection

func GetVolt() (*voltdbclient.Conn, error) {

    for {

        db, err := voltdbclient.OpenConn("voltdb://localhost:21212?max_retries=1000&retry=true&retry_interval=5s")
        if err == nil  {
          log.Printf("Successfull Main Connect to VoltDB")
          return db, err
        }

        log.Printf("Try Connect to VoltDB: %s", err)
        time.Sleep(time.Second)

    }

}

// Consumer

type anlConsumer struct{}

func (rc anlConsumer) ConsumeError(err error) {
    log.Printf("Error: %s", err)
}

func (rc anlConsumer) ConsumeResult(res driver.Result) {
}

func (rc anlConsumer) ConsumeRows(rows driver.Rows) {
}
gernest commented 6 years ago

@asyslinux this is my personal opinion (so take it with a grain of salt)

I recommend you always use the database/sql driver unless you have very special needs. After saying that, I think there is some mis conception on your code. I dug around your github and found https://github.com/asyslinux/analytics , I will reference corrections from that repo since copy pasting might lose context and be harder to follow.

persistent connection

I advice you declare the connection in global scope so it can be accessible everywhere in your app. I will explain later why this might be handy.

So in your code we need to update this part

https://github.com/asyslinux/analytics/blob/b48226fda5e60ae5efd176cb1846c5d259931398/main.go#L28-L29

To

    wg    sync.WaitGroup
    db    *voltdbclient.Conn

So here you will have one connection. The connection is safe for concurrent use so don't worry about race conditions.

Next you just have to update the GetVolt function to do do one thing, that is replace the global instance with a working connection whenever it is called.

This is the updated function

func GetVolt() error {

    VoltLogger, voltlogfile := voltLogger()
    defer voltlogfile.Close()
    var err error
    for {

        db, err = voltdbclient.OpenConn("voltdb://localhost:21212?max_retries=1000&retry=true&retry_interval=5s")
        if err == nil {
            VoltLogger.Infof("Successfully Connected to VoltDB")
            return err
        }
        VoltLogger.Warnf("Try Connect to VoltDB: %s", err)
        time.Sleep(1 * time.Second)

    }

}

Note that we are only returning error now since we are updating the global client. I also made changes to the code to accomodate this, so now your file will look like this

package main

import (
    "os"
    //"os/signal"
    "context"
    "database/sql/driver"
    "fmt"
    "math/rand"
    "regexp"
    "strconv"
    "strings"
    "sync"
    "syscall"
    "time"

    //"github.com/orian/counters"
    "github.com/VoltDB/voltdb-client-go/voltdbclient"
    "github.com/kataras/golog"
    "github.com/kataras/iris"
    "github.com/kataras/iris/middleware/logger"
    "github.com/kataras/iris/middleware/recover"
    "github.com/orian/counters/global"
)

// Global Variables And Types

var (
    wg    sync.WaitGroup
    db    *voltdbclient.Conn
    rlock = 0
    mcalc sync.Mutex
    mrun  sync.Mutex

    rgxname = regexp.MustCompile("^([A-Z]{2,2}$)")
    rgxuuid = regexp.MustCompile("^([a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$)")
    //rgxrip = regexp.MustCompile("(^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$)")

)

type anlConsumer struct{}

// Handle Os Signals

// Realtime Statistic

func addStat(voltDb *voltdbclient.Conn, work chan bool) iris.Handler {
    return func(ctx iris.Context) {

        // Run Clock

        timeStart := time.Now()

        // Local Definitions

        //    var wg sync.WaitGroup

        // Loggers

        AppLogger, applogfile := appLogger()
        defer applogfile.Close()
        //    VoltLogger, voltlogfile := voltLogger()
        //    defer voltlogfile.Close()

        // General Code

        uuid := ctx.URLParam("uuid")
        rip := ctx.RemoteAddr()
        rtime, err := ctx.URLParamInt("rtime")

        if err != nil { // Check Arg Response Time !Integer/Empty
            ctx.Writef("Arg Response Time: %d is not integer\n", rtime)
            ctx.StatusCode(iris.StatusBadRequest)
            AppLogger.Warnf("Arg Response Time: %d is not integer", rtime)
            return
        }

        srtime := ctx.URLParam("rtime")

        name := ctx.URLParam("name")
        upname := strings.ToUpper(name)

        continent := ctx.GetHeader("GEOIP_CONTINENT")
        country := ctx.GetHeader("GEOIP_COUNTRY")
        country_name := ctx.GetHeader("GEOIP_COUNTRY_NAME")
        city := ctx.GetHeader("GEOIP_CITY")
        org_name := ctx.GetHeader("GEOIP_ORG")
        slatitude := ctx.GetHeader("GEOIP_LATITUDE")
        slongitude := ctx.GetHeader("GEOIP_LONGITUDE")

        if uuid == "" { // Check Arg User UUID Empty
            ctx.Writef("Arg User UUID: %s is empty\n", uuid)
            ctx.StatusCode(iris.StatusBadRequest)
            AppLogger.Warnf("Arg User UUID: %s is empty", uuid)
            return
        }

        uuidmatched := rgxuuid.MatchString(uuid)

        if !uuidmatched { // Check Arg User UUID RegExp
            ctx.Writef("Arg User UUID: %s is bad\n", uuid)
            ctx.StatusCode(iris.StatusBadRequest)
            AppLogger.Warnf("Arg User UUID: %s is bad", uuid)
            return
        }

        //    ripmatched := rgxrip.MatchString(rip)

        //    if !ripmatched { // Check Arg User IP Regexp
        //      ctx.Writef("Arg Remote IP Address: %s is bad\n", rip)
        //      ctx.StatusCode(iris.StatusBadRequest)
        //      AppLogger.Warnf("Arg Remote IP Address: %s is bad", rip)
        //      return
        //    }

        if name == "" { // Check Arg Network Name Empty
            ctx.Writef("Arg Network Name: %s is empty\n", upname)
            ctx.StatusCode(iris.StatusBadRequest)
            AppLogger.Warnf("Arg Network Name: %s is empty", upname)
            return
        }

        namematched := rgxname.MatchString(upname)

        if !namematched { // Check Arg Network Name Regexp
            ctx.Writef("Arg Network Name: %s is bad\n", upname)
            ctx.StatusCode(iris.StatusBadRequest)
            AppLogger.Warnf("Arg Network Name: %s is bad", upname)
            return
        }

        if continent == "" { // Check Arg Continent Code Empty
            ctx.Writef("Arg Continent Code: %s is empty\n", continent)
            ctx.StatusCode(iris.StatusBadRequest)
            AppLogger.Warnf("Arg Continent Code: %s is empty", continent)
            return
        }

        if country == "" { // Check Arg Country Code Empty
            ctx.Writef("Arg Country Code: %s is empty\n", country)
            ctx.StatusCode(iris.StatusBadRequest)
            AppLogger.Warnf("Arg Country Code: %s is empty", country)
            return
        }

        latitude, err := strconv.ParseFloat(slatitude, 64)

        if err != nil { // Check Arg Latitude !Float/Empty
            ctx.Writef("Arg Latitude : %s is not integer\n", slatitude)
            ctx.StatusCode(iris.StatusBadRequest)
            AppLogger.Warnf("Arg Latitude : %s is not integer", slatitude)
            return
        }

        longitude, err := strconv.ParseFloat(slongitude, 64)

        if err != nil { // Check Arg Longitude !Float/Empty
            ctx.Writef("Arg Longitude : %s is not integer\n", slongitude)
            ctx.StatusCode(iris.StatusBadRequest)
            AppLogger.Warnf("Arg Longitude : %s is not integer", slongitude)
            return
        }

        // Async Write to VoltDB

        anlCons := anlConsumer{}
        voltDb.ExecAsyncTimeout(anlCons, "RealInsert", []driver.Value{uuid, rip, srtime, name, continent, country, country_name, city, org_name, slatitude, slongitude}, 1*time.Second)
        voltDb.Drain()

        // Sync Write to VoltDB

        //    _, err = voltDb.ExecTimeout("RealInsert", []driver.Value{uuid, rip, srtime, name, continent, country, country_name, city, org_name, slatitude, slongitude}, 1 * time.Second)
        //    if err != nil {
        //      VoltLogger.Errorf("Error: %s", err)
        //    }

        // Async Calculat-e in VoltDB

        creal := global.GetCounter("creal")
        creal.Increment()
        rcreal := global.GetCounter("creal").Value()

        //      if rcreal >= 10000 && rlock == 0 {
        if rcreal >= 5 && rlock == 0 {

            rlock = 1

            rs := rand.Intn(10000)
            time.Sleep(time.Duration(rs) * time.Microsecond)

            mrun.Lock()

            work <- true

            mrun.Unlock()

        }

        // Debug Output

        timeElapsed := float64(time.Now().Sub(timeStart)) / float64(time.Millisecond)

        ctx.Writef("Arg User UUID : %s\n", uuid)
        ctx.Writef("Arg User IP : %s\n", rip)
        ctx.Writef("Arg Response Time: %d\n", rtime)
        ctx.Writef("Arg Network Name: %s\n", upname)
        ctx.Writef("Arg Continent Code : %s\n", continent)
        ctx.Writef("Arg Country Code : %s\n", country)
        ctx.Writef("Arg Country Name : %s\n", country_name)
        ctx.Writef("Arg City Name : %s\n", city)
        ctx.Writef("Arg Organization Name : %s\n", org_name)
        ctx.Writef("Arg Latitude : %f\n", latitude)
        ctx.Writef("Arg Longitude : %f\n\n", longitude)
        ctx.Writef("Time Elapsed : %f ms\n\n", timeElapsed)
        ctx.Writef("Debug :\n\n")
        ctx.Writef("Request Counter : %d requests\n", rcreal)
        ctx.Writef("Request Lock : %d (lock status)\n", rlock)
        ctx.Writef("\n")
        ctx.Text("Successfull!\n")

        ctx.StatusCode(iris.StatusOK)

    }

}

// Calculate Statistics

func calcStat(work chan bool, intc chan bool) {
    defer wg.Done()

    for {

        select {
        case <-intc:
            fmt.Println("Go Routine Safety Exited")
            return
        default:

            select {
            case <-work:

                mcalc.Lock()

                fmt.Println("Starting Calculation")

                time.Sleep(15 * time.Second)

                //    voltDb.QueryAsyncTimeout(anlCons, "TestSelect", []driver.Value{}, 1 * time.Second)
                //    voltDb.QueryAsyncTimeout(anlCons, "Calculate", []driver.Value{}, 1 * time.Second)
                //    voltDb.Drain()

                creal := global.GetCounter("creal")
                creal.Set(0)

                rlock = 0

                fmt.Println("Finishing Calculation")

                mcalc.Unlock()

            default:

                time.Sleep(10000 * time.Microsecond)

            }

        }

    }

}

// HTTP Error Handlers

func notFoundHandler(ctx iris.Context) {

    ctx.HTML("404 Not found")

}

func InternalServerErrorHandler(ctx iris.Context) {

    AppLogger, applogfile := appLogger()
    defer applogfile.Close()

    errMessage := ctx.Values().GetString("error")

    if errMessage != "" {
        ctx.Writef("500 Internal server error: %s", errMessage)
        AppLogger.Warnf("500 Internal server error: %s", errMessage)
        return
    }

    ctx.HTML("(Unexpected) Internal server error")
    AppLogger.Warnf("(Unexpected) Internal server error")

}

// Databases Connections

// VoltDB Connections

func GetVolt() error {

    VoltLogger, voltlogfile := voltLogger()
    defer voltlogfile.Close()
    var err error
    for {

        db, err = voltdbclient.OpenConn("voltdb://localhost:21212?max_retries=1000&retry=true&retry_interval=5s")
        if err == nil {
            VoltLogger.Infof("Successfully Connected to VoltDB")
            return err
        }
        VoltLogger.Warnf("Try Connect to VoltDB: %s", err)
        time.Sleep(1 * time.Second)

    }

}

// Consumers

func (rc anlConsumer) ConsumeError(err error) {

    VoltLogger, voltlogfile := voltLogger()
    defer voltlogfile.Close()

    VoltLogger.Errorf("Error: %s", err)

}

func (rc anlConsumer) ConsumeResult(res driver.Result) {

    VoltLogger, voltlogfile := voltLogger()
    defer voltlogfile.Close()

    ra, _ := res.RowsAffected()
    lid, _ := res.LastInsertId()
    VoltLogger.Infof("%d, %d\n", ra, lid)

}

func (rc anlConsumer) ConsumeRows(rows driver.Rows) {

    //    VoltLogger, voltlogfile := voltLogger()
    //    defer voltlogfile.Close()

}

// Loggers

func appLogger() (*golog.Logger, *os.File) {

    AppLogger := golog.New()

    applogfile := AppLogFile()

    AppLogger.SetLevel("warn")
    AppLogger.SetOutput(applogfile)
    //    AppLogger.SetLevel("debug")
    //    AppLogger.AddOutput(applogfile)

    return AppLogger, applogfile

}

func voltLogger() (*golog.Logger, *os.File) {

    VoltLogger := golog.New()

    voltlogfile := VoltLogFile()

    VoltLogger.SetLevel("warn")
    VoltLogger.SetOutput(voltlogfile)
    //    VoltLogger.SetLevel("debug")
    //    VoltLogger.AddOutput(voltlogfile)

    return VoltLogger, voltlogfile

}

// File Logs

func todayAppFilename() string {
    return "/var/log/anl-cdn/app.log"
}

func todayVoltFilename() string {
    return "/var/log/anl-cdn/volt.log"
}

func AppLogFile() *os.File {
    filename := todayAppFilename()
    // open an output file, this will append to the today's file if server restarted.
    applogfile, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
    if err != nil {
        panic(err)
    }

    return applogfile
}

func VoltLogFile() *os.File {
    filename := todayVoltFilename()
    // open an output file, this will append to the today's file if server restarted.
    voltlogfile, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
    if err != nil {
        panic(err)
    }

    return voltlogfile
}

// Main Function

func main() {

    // System Handling

    pid := syscall.Getpid()

    // Local Definitions

    intc := make(chan bool)
    work := make(chan bool)

    // Loggers

    AppLogger, applogfile := appLogger()
    defer applogfile.Close()
    VoltLogger, voltlogfile := voltLogger()
    defer voltlogfile.Close()

    AppLogger.Println("Analytics CDN Running with pid: ", pid, " to hot restart, issue the kill -1 ", pid, " command")

    // Go Job

    wg.Add(1)
    go calcStat(work, intc)

    // Database Connection

    err := GetVolt()

    if err != nil {
        VoltLogger.Errorf("Main Connect to VoltDB Failed: %s", err)
    }
    defer func() {
        if db != nil {
            // Make sure we release resources for the connection
            db.Close()
        }
    }()

    // Web Server

    app := iris.New()
    app.Logger().SetLevel("warn")
    app.Logger().SetOutput(applogfile)
    //    app.Logger().SetLevel("debug")
    //    app.Logger().AddOutput(applogfile)

    app.Use(logger.New())
    app.Use(recover.New())

    app.OnErrorCode(iris.StatusNotFound, notFoundHandler)
    app.OnErrorCode(iris.StatusInternalServerError, InternalServerErrorHandler)

    // Web Routing

    v1 := app.Party("/api/v1").AllowMethods(iris.MethodOptions) // <- important for the preflight.
    {
        v1.Get("/add", addStat(db, work))
    }

    // Interrupt Handler

    iris.RegisterOnInterrupt(func() {

        // Channel With Go Routines

        fmt.Println("Capture Interrupt")
        fmt.Println("Notify Go Routines About Interrupt")

        intc <- true

        // Wait Go Routines

        fmt.Println("Awaiting Go Routines Tasks")

        wg.Wait()

        fmt.Println("Go Routines Successfully Finished")

        timeout := 5 * time.Second

        ctx, cancel := context.WithTimeout(context.Background(), timeout)
        defer cancel()

        // Close App

        app.Shutdown(ctx)
    })

    // Web Listen Settings

    app.Run(iris.Addr(":8081"), iris.WithoutInterruptHandler, iris.WithCharset("UTF-8"), iris.WithRemoteAddrHeader("X-Real-IP"), iris.WithOptimizations,
        iris.WithConfiguration(iris.Configuration{
            //app.Run(iris.Addr(":8081"), iris.WithCharset("UTF-8"), iris.WithRemoteAddrHeader("X-Real-IP"), iris.WithOptimizations, iris.WithConfiguration(iris.Configuration{
            DisableStartupLog:                 false,
            DisableInterruptHandler:           false,
            DisablePathCorrection:             false,
            EnablePathEscape:                  false,
            FireMethodNotAllowed:              false,
            DisableBodyConsumptionOnUnmarshal: false,
            DisableAutoFireStatusCode:         false,
            TimeFormat:                        "Mon, 02 Jan 2006 15:04:05 GMT",
            Charset:                           "UTF-8",
        }))

}

NOTES:

Caveats

Hope this will help resolve your problem.

Happy coding.

abradley201 commented 6 years ago

Thanks for the answer, @gernest. Closing now.