apache / cassandra-gocql-driver

GoCQL Driver for Apache Cassandra®
https://cassandra.apache.org/
Apache License 2.0
2.58k stars 622 forks source link

Not able to match the Node.js driver write performance using the Go driver #613

Closed markuscraig closed 7 years ago

markuscraig commented 8 years ago

Hey Crew,

I'd like to use Go with Cassandra, but I'm getting much better write performance with the Node.js driver (almost 5x write performance with no timeouts). I feel like I must be doing something really wrong.

My test code below is simply updating counters millions of times using batch queries (using the same number of entries per batch in both Go and Node; 1000 entries), but I'm seeing dramatically different performance...

Here is the simple Cassandra schema I'm using for the test...

Just for reference, here is the Node.js code that writes 5 million updates in 15 secs. I never get any Cassandra timeouts when using Node.js, even if I add way too many entries per batch...

var cassandra = require('cassandra-driver');
var process = require('process');

var client = new cassandra.Client({
   contactPoints: ['127.0.0.1'],
   keyspace: 'emanate'
});

var query = "UPDATE stats SET total = total + 3, num_samples = num_samples + 1 WHERE stat_name = 'TEMPERATURE' AND org_id = '111' AND deploy_id = '222' AND item_id = '333' AND bucket_name = '1HR' AND start_time = '2016-01-01'";
var queries = [];

for (var i=0; i < 5000000; i++) {
   queries.push({
      query: query,
      params: []
   });

   if (i % 1000 === 0) {
      client.batch(queries, { prepare: true, counter: true }, function(err) {
         if (err) {
            console.log("ERROR: could not execute Cassandra query: err = " + err);
         } else {
            console.log("Batch data update successful");
         }
      });

      // reset the queries array
      queries = [];
   }
}

client.batch(queries, { prepare: true, counter: true }, function(err) {
   if (err) {
      console.log("ERROR: could not execute final Cassandra query: err = " + err);
   } else {
      console.log("Final batch data update successful");
   }

   // exit the process now
   process.exit(0)
});

Here is the Go code that takes a little over 1 minute to complete. I am using goroutines and channels to hopefully send the batch data in parallel (8 workers). Also, I get timeouts if I set the number of workers > 8, so I feel like I have to manually tune the code when using the Go driver...

package main

import (
    "flag"
    "fmt"
    "log"
    "sync"

    "github.com/gocql/gocql"
)

type Options struct {
    host         string
    port         int
    keyspace     string
    username     string
    password     string
    numWorkers   int
    maxBatchSize int
}

type WorkerConfig struct {
    workerId     string
    maxBatchSize int
    session      *gocql.Session
    jobs         <-chan Job
    results      chan<- JobResult
    wg           *sync.WaitGroup
}

type Job struct {
    batch *gocql.Batch
    count int
}

type JobResult struct {
    job    *Job
    config *WorkerConfig
    count  int
}

func main() {
    // set the command-line options
    options := Options{}
    flag.StringVar(&options.host, "host", "127.0.0.1", "Cassandra database host")
    flag.StringVar(&options.keyspace, "keyspace", "emanate", "Cassandra keyspace name")
    flag.StringVar(&options.username, "username", "admin", "Cassandra username")
    flag.StringVar(&options.password, "password", "password", "Cassandra password")
    flag.IntVar(&options.numWorkers, "workers", 8, "Number of batch query workers")
    flag.IntVar(&options.maxBatchSize, "max-batch-size", 1000, "Max number of queries per batch")

    // parse the command-line flags
    flag.Parse()

    // create and configure the cassandra client
    cluster := gocql.NewCluster(options.host)

    // set the keyspace to use
    cluster.Keyspace = options.keyspace

    // configure the username and password
    cluster.Authenticator = gocql.PasswordAuthenticator{
        Username: options.username,
        Password: options.password,
    }

    // connect to cassandra
    session, _ := cluster.CreateSession()

    // close the connection when we are done
    defer session.Close()

    // create the worker channels
    jobs := make(chan Job, 1000)
    results := make(chan JobResult, 1000)

    // create the worker done waitgroup
    wg := &sync.WaitGroup{}
    fmt.Printf("WG ADD %d", options.numWorkers)
    wg.Add(options.numWorkers)

    // create the batch query workers
    for w := 1; w <= options.numWorkers; w++ {
        // create the worker id
        workerId := fmt.Sprintf("WORKER-%d", w)

        // define the data generator config options
        workerConfig := WorkerConfig{
            workerId:     workerId,
            maxBatchSize: options.maxBatchSize,
            session:      session,
            jobs:         jobs,
            results:      results,
            wg:           wg,
        }

        // start the next batch query worker
        go processBatchQuery(&workerConfig)
    }

    // generate the batch queries
    go generateBatchQueries(jobs, &options)

    // wait for all of the job results
    log.Println("Waiting for all job results")
    wg.Wait()

    // close the results channel
    log.Println("CLOSING RESULTS CHANNEL")
    close(results)
}

func generateBatchQueries(jobs chan<- Job, o *Options) {
    // define the update query statement
    statement := "UPDATE stats SET total = total + 3, num_samples = num_samples + 1 WHERE stat_name = 'TEMPERATURE' AND org_id = '111' AND deploy_id = '222' AND item_id = '333' AND bucket_name = '1HR' AND start_time = '2016-01-01'"

    // create the next batch query
    batch := gocql.NewBatch(gocql.CounterBatch)

    // iterate through the individual queries
    for i := 0; i < 5000000; i++ {
        // add the next statement to the batch query
        batch.Query(statement)

        // if the max number of statements have been added to the batch query
        if i%1000 == 0 {
            // add the next job to the jobs channel
            jobs <- Job{
                batch: batch,
                count: len(batch.Entries),
            }

            // create a new batch query
            batch = gocql.NewBatch(gocql.CounterBatch)
        }
    }

    // add the final job to the jobs channel
    jobs <- Job{
        batch: batch,
        count: len(batch.Entries),
    }

    // close job channel shared by all workers
    log.Println("CLOSING JOBS CHANNEL")
    close(jobs)
}

func processBatchQuery(c *WorkerConfig) {
    log.Printf("BATCH QUERY WORKER STARTED: %s\n", c.workerId)

    // iterate through each batch query job
    for job := range c.jobs {
        //fmt.Printf("EXECUTE BATCH QUERY: entries = %d\n", job.count)

        // execute the batch query
        err := c.session.ExecuteBatch(job.batch)
        if err != nil {
            log.Printf("Couldn't execute batch: %s", err)
        }
    }

    log.Println("BATCH QUERY WORKER DONE")

    c.wg.Done()
}

Anything jump out at you guys? Any back-pressure from Cassandra that I should be handling? Any thoughts or pointers appreciated...

Thanks! Mark

Zariel commented 8 years ago

Thanks, ill take a look. As a note the default timeout for the nodejs driver looks to be 15seconds, whereas gocql is 600ms

markuscraig commented 8 years ago

Changing the Gocql timeout helped, but still the same slower performance.

I'm wondering if the statements are not getting automatically prepared by Gocql for some reason?

When I disable prepared statements in the Node.js version, I see very similar slower write performance.

nemosupremo commented 8 years ago

Statements in gocql are automatically prepared, however prepared-ness shouldn't matter in your case seeing as you aren't using any placeholders.

Can you test the performance using an obscene amount of workers rather than your default 8? Ex. use:

for job := range c.jobs {
        //fmt.Printf("EXECUTE BATCH QUERY: entries = %d\n", job.count)

        // execute the batch query
       go func(batch *gocql.Batch) {
        err := c.session.ExecuteBatch(batch)
        if err != nil {
            log.Printf("Couldn't execute batch: %s", err)
        }
       }(job.batch)
    }

My hunch is the async nature of the node code may be executing more than 8 batches at once.

Zariel commented 8 years ago

@markuscraig I looked through the nodejs code it is is probably just getting advantages by doing all the queries async as @nemothekid says.

markuscraig commented 8 years ago

Ok, thanks guys, much appreciated!

muirdm commented 8 years ago

Are you sure your node script is running all the queries? You call "process.exit(0)" when the "final" batch completes, but nothing guarantees that the final batch isn't actually the first one to run. Do you see the message "Batch data update successful" printed out 5000 times? If not, you need to add code that waits for all the batches to invoke their callback before exiting the process.

I think you would want many more than 8 worker goroutines (e.g. 100 at least). There are other settings that could improve the go script's performance, like setting ProtoVersion to 3 if your c* version is new enough.