influxdata / influxdb

Scalable datastore for metrics, events, and real-time analytics
https://influxdata.com
Apache License 2.0
28.71k stars 3.54k forks source link

I write a influxdb tool to move data from one database to anthoer, it does not work well, why? #4078

Closed guaiguaihw closed 9 years ago

guaiguaihw commented 9 years ago

I just wirte a tool used golang to move data from one database to another ,but there is something wrong I can't solve.......here is my system: 3

database A which I want to get the data:


I insert a data, use the command: 1 and the select result is below: qq 20150911131504

database B which I want to input data which get from A (ps: B has alreadly have the measurement and tag of A, so it cause a mistake)


I supposed to insert "offset" as type "float64", but when I used the tool to write data, I just get the "offset" as type "string"....... (PS: I have written a tool to move datas from one database to another, and in the program, I add sentences to judge the type of data.The result is below) 2 Strangely, the result is neither string or float64 or else......

I just wonder why it cause a mistake? Here is my programm, and the use for it :


-h help

-s Specify the host address of the database to export data from,default is 127.0.0.1

-sport Specify the running port of the database to export data,default is 8086

-sdb Specify the database to export data from,default is mydb

-d Specifies the host address of the database to import data, default is 127.0.0.1

-dport Specify the running port of the database that needs to import the data.,default is 8086

-ddb Specify the database name to import the data.,default is yourdb

-stime Specify start time,default is "1970-01-01 00:00:00"

-etime Specify end time,default is "2100-01-01 00:00:00"

example: go run client.go -s 10.121.90.12 -d 10.121.117.70 -stime "2015-08-30 10:00:00" -etime "2015-08-31 18:00:00" -sport 8080 -dport 8080 -sdb collect.route-broker.traffic.dataplatform.didi.com -ddb collect.route-broker.traffic.dataplatform.didi.com


package main

import ( "flag" "fmt" "github.com/cheggaaa/pb" "github.com/influxdb/influxdb/client" "net/url" "runtime" "sync" "time" )

****part one : build a client func DBclient(host, port string) *client.Client {

    //connect to database
    u, err := url.Parse(fmt.Sprintf("http://%v:%v", host, port))
    if err != nil {
            fmt.Printf("Fail to parse host and port of database, error: %s\n", err.Error())
    }   

    info := client.Config{
            URL: *u, 
    }   

    var con *client.Client
    con, err = client.NewClient(info)
    if err != nil {
            fmt.Printf("Fail to build newclient to database, error: %s\n", err.Error())
    }   

    return con 

}

*****part two : use command: "show measurements" to get measurements func Getmeasurements(c *client.Client, sdb, cmd string) []string {

    //get measurements from database
    q := client.Query{
            Command:  cmd,
            Database: sdb,
    }   

    var measurements []string

    response, err := c.Query(q)
    if err != nil {
            fmt.Printf("Fail to get response from database, get measurements error: %s\n", err.Error())
    }

    res := response.Results

    if len(res[0].Series) == 0 {
            fmt.Printf("The response of database is null, get measurements error!\n")
    } else {

            values := res[0].Series[0].Values

            //show progress of getting measurements
            count := len(values)
            bar := pb.StartNew(count)

            for _, row := range values {
                    measurement := fmt.Sprintf("%v", row[0])
                    measurements = append(measurements, measurement)
                    bar.Increment()
                    time.Sleep(3 * time.Millisecond)
            }
            bar.FinishPrint("Get measurements has finished!\n")
    }
    return measurements

}

****part three : use command:"select * from where time < 'endtime' and time > 'starttime' " to get type client.BatchPoints prepared for write to another DB.

func ReadDB(c *client.Client, sdb, ddb, cmd string) client.BatchPoints {

    q := client.Query{
            Command:  cmd,
            Database: sdb,
    }

    //get type client.BatchPoints
    var batchpoints client.BatchPoints

    response, err := c.Query(q)
    if err != nil {
            fmt.Printf("Fail to get response from database, read database error: %s\n", err.Error())
    }

    res := response.Results
    if len(res) == 0 {
            fmt.Printf("The response of database is null, read database error!\n")
    } else {

            res_length := len(res)
            for k := 0; k < res_length; k++ {

                    //show progress of reading series
                    count := len(res[k].Series)
                    bar := pb.StartNew(count)
                    for _, ser := range res[k].Series {

                            //get type client.Point
                            var point client.Point

                            point.Measurement = ser.Name
                            fmt.Println(point.Measurement)

                            point.Tags = ser.Tags
                            fmt.Println(point.Tags)
                            for _, v := range ser.Values {
                                    point.Time, _ = time.Parse(time.RFC3339, v[0].(string))
                                    fmt.Println("output point.time(v[0]): ", point.Time)

                                    field := make(map[string]interface{})
                                    l := len(v)
                                    for i := 1; i < l; i++ {
                                            if v[i] != nil {
                                                    field[ser.Columns[i]] = v[i]
                                                    fmt.Printf("ser.Columns[%d] : v[%d]\n", i, i)
                                                    fmt.Println(ser.Columns[i], " : ", v[i])
                                                    _, ok := v[i].(string)
                                                    fmt.Println("is it a string: ", ok)
                                                    _, ok = v[i].(float32)
                                                    fmt.Println("is it a float32: ", ok)
                                                    _, ok = v[i].(float64)
                                                    fmt.Println("is it a float64: ", ok)
                                                    _, ok = v[i].(int32)
                                                    fmt.Println("is it a int32: ", ok)
                                                    _, ok = v[i].(int64)
                                                    fmt.Println("is it a int64: ", ok)
                                                    _, ok = v[i].(bool)
                                                    fmt.Println("is it a bool: ", ok)

                                            }
                                    }
                                    point.Fields = field
                                    point.Precision = "s"
                                    batchpoints.Points = append(batchpoints.Points, point)
                            }
                            bar.Increment()
                            time.Sleep(3 * time.Millisecond)
                    }
                    bar.FinishPrint("Read series has finished!\n")
            }
            batchpoints.Database = ddb
            batchpoints.RetentionPolicy = "default"
    }
    return batchpoints

}

**part four : write to another database

func WriteDB(c *client.Client, b client.BatchPoints) {

    _, err := c.Write(b)
    if err != nil {
            fmt.Printf("Fail to write to database, error: %s\n", err.Error())
    }

}

func Goroutine(m string, h_length int, s_epoch time.Duration, wg sync.WaitGroup, scon, dcon client.Client, sdb, ddb *string) {

    //read an write datas every half an hour
    count := h_length
    bar := pb.StartNew(count)

    for i := 0; i < 6*h_length; i++ {

            startsec := int(s_epoch.Seconds() + float64(i*600))
            endsec := int(s_epoch.Seconds() + float64((i+1)*600))

            getvalues := fmt.Sprintf("select * from  \"%v\" where time  > %vs and time < %vs group by *", m, startsec, endsec)
            batchpoints := ReadDB(scon, *sdb, *ddb, getvalues)
            WriteDB(dcon, batchpoints)

            bar.Increment()
    }
    bar.FinishPrint("Measurement:" + m + ",Write to Database has Finished")
    wg.Done()

}


func main() {

    //support to input source and destination hosts
    src := flag.String("s", "127.0.0.1", "input an ip of source DB, from which you want to output datas")
    dest := flag.String("d", "127.0.0.1", "input an ip of destination DB, from which you want to input datas")

    //support to input source and destination ports
    sport := flag.String("sport", "8086", "input a port of source DB,from which you want to output datas")
    dport := flag.String("dport", "8086", "input a port of destination DB,from which you want to input datas")

    //support to input source and destination database
    sdb := flag.String("sdb", "mydb", "input name of source DB, from which you want to output datas")
    ddb := flag.String("ddb", "yourdb", "input name of destination DB, from which you want to input datas")

    //support to input start time and end time during which you select series from database
    st := flag.String("stime", "1970-01-01 00:00:00", "input a start time ,from when you want to select datas")
    et := flag.String("etime", "2100-01-01 00:00:00", "input an end time, until when you want to select datas")

    flag.Parse()

    scon := DBclient(*src, *sport)
    dcon := DBclient(*dest, *dport)

    getmeasurements := "show measurements"
    measurements := Getmeasurements(scon, *sdb, getmeasurements)

    //show progress of writing to database
    count_outer := len(measurements)
    bar_outer := pb.StartNew(count_outer)

    wg := new(sync.WaitGroup)
    runtime.GOMAXPROCS(runtime.NumCPU())

    for _, m := range measurements {

            template := "2006-01-02 15:04:05"

            since_time, err_sin := time.Parse(template, "1970-01-01 00:00:00")
            if err_sin != nil {
                    fmt.Println("Fail to parse since_time")
            }

            stime, err_st := time.Parse(template, fmt.Sprintf("%v", *st))
            if err_st != nil {
                    fmt.Println("Fail to parse stime")
            }

            etime, err_et := time.Parse(template, fmt.Sprintf("%v", *et))
            if err_et != nil {
                    fmt.Println("Fail to parse etime")
            }

            s_epoch := stime.Sub(since_time)
            e_epoch := etime.Sub(since_time)

            h_length := int(e_epoch.Hours()-s_epoch.Hours()) + 1

            //The datas which can be inputed is less than 30 days
            if h_length > 720 {
                    h_length = 720
            }

            wg.Add(1)
            go Goroutine(m, h_length, s_epoch, wg, scon, dcon, sdb, ddb)
            bar_outer.Increment()
    }
    wg.Wait()

    bar_outer.FinishPrint("Write to Database has Finished")
    fmt.Printf("Move datas from %s to %s has done!\n", *sdb, *ddb)

}

beckettsean commented 9 years ago

(ps: B has alreadly have the measurement and tag of A, so it cause a mistake)

The field type conflict indicates that database B has the offset field already defined as a string, not a float64. Run DROP MEASUREMENT "shanghai-gov-req.statuszero.10s" to remove all field definitions on database B and re-run your code.

guaiguaihw commented 9 years ago

Besides: I really want to make some complaints for some bugs of influxdb:


when I want to create a database named: collect.cpu.idle, Here is the steps: 8

then I want to use the database: 9 why in this step, I don not need to use quotes? It really make me confused, like some sentences below:


6 If I do not add \" , I would not get the right data. Why can't I just use "shanghai-gov-req.statuszero.10s" to get the data? It's ridiculous if the format can't be unified. Here are more bugs:


11

but If I want to drop measurement , there is a mistake: 12 ..........................I can say nothing, I don't know in which situation I can use quotation marks!!!!

beckettsean commented 9 years ago

@guaiguaihw what you describe are not bugs, please review the docs to understand when quotes are necessary and when they are not.

why in this step, I don not need to use quotes? It really make me confused,

Because for the USE command there is absolutely no ambiguity about what comes next. It will ALWAYS be a database name, and if it isn't, fail. In any other context, a string with periods may represent database.retention_policy.measurement, and therefore any measurement identifier with periods MUST be double-quoted in any other context. Otherwise the parser has no way to determine whether you want the shanghai-gov-req.statuszero.10s measurement or the 10s measurement in the statuszero retention policy from the shanghai-gov-req database.

If I do not add \" , I would not get the right data. Why can't I just use "shanghai-gov-req.statuszero.10s" to get the data?

You are confusing a curl syntax requirement with InfluxDB. The escaping is necessary because otherwise the double-quotes that are required around your measurement name would terminate the --data-urlencode flag and you would be passing shanghai-gov-req.statuszero.10s" group by * " as another flag to curl

Here are more bugs:

You wrote points to two different measurements, one called shanghai-gov-req.statuszero.10 and another called "shanghai-gov-req.statuszero.10". That's why there are two series in the output.

In order to drop shanghai-gov-req.statuszero.10 you run DROP MEASUREMENT "shanghai-gov-req.statuszero.10". In order to drop "shanghai-gov-req.statuszero.10" you run DROP MEASUREMENT "\"shanghai-gov-req.statuszero.10\"", because the double-quotes that are part of the measurement name string must be escaped so the parser interprets them as actual double-quote characters, and since the name contains periods it must be surrounded by actual double-quotes.

Please read the documentation, particularly https://influxdb.com/docs/v0.9/write_protocols/write_syntax.html and https://influxdb.com/docs/v0.9/guides/troubleshooting.html. I think you'll find the only bugs are in your understanding of the proper syntax.

guaiguaihw commented 9 years ago

.................................the data of B is actually float, It can not be string. There must be something wrong .By the way, I really suppose you to make a standard format...

beckettsean commented 9 years ago

@guaiguaihw I'm not going to debug your code. If you want to recreate the problem with direct curl or CLI commands I'd be happy to help, but what you describe is almost certainly a bug with your code, not in InfluxDB. Your serious lack of understanding about the proper way to use quotes and double-quotes only reinforces my certainty on this issue.

beckettsean commented 9 years ago

the data of B is actually float, It can not be string

Completely false. A field written as offset=888.3 is a float64. A field written as offset="888.3" is a string.

guaiguaihw commented 9 years ago

I know that, I just wirte data to A like below: 1 as you say, it's float64. But in my program, I judge the type of this value: 2 And return the result all false, is there something wrong with my code? I really have no idea about that, I just get the data, and do nothing to it , no convertion...... Anyway, thanks for your answer. Maybe there are some points I miss. Someone else is writing data to database B,makes my action to move datas from A to B failed..... I can't find the problem and fix it , maybe because another one's program makes datas a little different......

jgschmitz commented 6 years ago

@guaiguaihw RTFM