vitessio / vitess

Vitess is a database clustering system for horizontal scaling of MySQL.
http://vitess.io
Apache License 2.0
18.42k stars 2.08k forks source link

BUG: a vstream client can block a tablet ChangeType from replica => primary #11169

Closed maxenglander closed 1 year ago

maxenglander commented 2 years ago

Overview

A client that requests a VStream from a replica tablet can block that tablet from being transitioned from a replica to a primary. I think this is a bug.

Reproduction

S/o to @ryanpbrewster who shared this repro with me!

  1. Create cluster with an unsharded keyspace, with a primary and replica
  2. Create a data table used purely for inserts to generate VStream events:
    CREATE TABLE data (id INT NOT NULL AUTO_INCREMENT, text1 VARCHAR(36), PRIMARY KEY(id))
  3. Open a VStream on the replica
  4. Generate a backlog of VStream events by INSERTing records on the primary
  5. Begin sending ApplyVSchema events to the primary
  6. While that is happening, try to transition the replica to primary

Removing either steps 3 or 5 will allow the replica to transition to primary.

The script below carries out steps 3-6. It can be executed with ./main [topo-addr] [keyspace] [cell].

package main

import (
    "context"
    "log"
    "time"

    "google.golang.org/grpc"
    "vitess.io/vitess/go/netutil"
    "vitess.io/vitess/go/vt/grpcclient"
    "vitess.io/vitess/go/vt/proto/binlogdata"
    "vitess.io/vitess/go/vt/proto/query"
    "vitess.io/vitess/go/vt/proto/queryservice"
    "vitess.io/vitess/go/vt/proto/topodata"
    "vitess.io/vitess/go/vt/proto/vschema"
    "vitess.io/vitess/go/vt/proto/vtctldata"
    "vitess.io/vitess/go/vt/vtctl/grpcvtctldclient"
    "vitess.io/vitess/go/vt/vttablet/grpctmclient"
)

func main() {
    ctx := context.Background()

    // Connect to topo indirectly via vtctld.
    vc, err := grpcvtctldclient.NewWithDialOpts(
        "vitess-cluster-dev-vtctld-372e9986:15999", /* address */
        true, /* fail fast */
        grpc.WithInsecure(),
    )
    if err != nil {
        log.Fatalf("failed to get new grpc client: %s", err.Error())
    }

    // Get a replica tablet and primary tablet
    resp, err := vc.GetTablets(
        ctx,
        &vtctldata.GetTabletsRequest{
            Cells:    []string{"local"},
            Keyspace: "src",
        },
        grpc.FailFast(true),
    )
    if err != nil {
        log.Fatalf("failed to get replica tablets: %v", err)
    }
    tablets := resp.GetTablets()
    if len(tablets) == 0 {
        log.Fatalf("no tablets found")
    }
    var primary *topodata.Tablet
    var replica *topodata.Tablet
    for _, tablet := range tablets {
        if tablet.Type == topodata.TabletType_PRIMARY {
            primary = tablet
        }
        if tablet.Type == topodata.TabletType_REPLICA {
            replica = tablet
        }
    }
    if primary == nil {
        log.Fatalf("failed to get primary tablet")
    }
    if replica == nil {
        log.Fatalf("failed to get replica tablet")
    }

    // Create queryservice to replica.
    addr := ""
    if grpcPort, ok := replica.PortMap["grpc"]; ok {
        addr = netutil.JoinHostPort(replica.Hostname, grpcPort)
    } else {
        addr = replica.Hostname
    }
    rcc, err := grpcclient.Dial(
        addr,
        true,                /* fail fast */
        grpc.WithInsecure(), /* not recommended for production use! */
    )
    if err != nil {
        log.Fatalf("failed to dial tablet: %v", err)
    }
    rqc := queryservice.NewQueryClient(rcc)

    // Create queryservice to primary.
    addr = ""
    if grpcPort, ok := primary.PortMap["grpc"]; ok {
        addr = netutil.JoinHostPort(primary.Hostname, grpcPort)
    } else {
        addr = primary.Hostname
    }
    pcc, err := grpcclient.Dial(
        addr,
        true,                /* fail fast */
        grpc.WithInsecure(), /* not recommended for production use! */
    )
    if err != nil {
        log.Fatalf("failed to dial tablet: %v", err)
    }
    pqc := queryservice.NewQueryClient(pcc)

    // Create VStream from replica.
    stream, err := rqc.VStream(context.TODO(), &binlogdata.VStreamRequest{
        Target: &query.Target{
            Keyspace:   replica.Keyspace,
            Shard:      replica.Shard,
            TabletType: replica.Type,
        },
        Position: "current",
        Filter:   &binlogdata.Filter{},
    })
    if err != nil {
        log.Fatalf("client.VStream: %v", err)
    }
    log.Printf("set up vstream: %v", stream)

    log.Printf("wait 60 seconds")
    wait := time.Duration(60 * time.Second)
    var left = wait
    for start := time.Now(); time.Since(start) < wait; left = wait - time.Since(start) {
        log.Printf("T-%v", left)
        time.Sleep(1 * time.Second)
    }

    log.Printf("waiting for an initial vstream event")
    for {
        resp, err := stream.Recv()

        if err != nil {
            log.Printf("failed to recv vstream resp: %v", err)
        }

        log.Printf("got an initial vstream event: %v", resp)

        break
    }

    // Trigger a big backlog of vstream events on the primary.
    // This was done ahead of time: create table data(id int not null auto_increment, text1 varchar(36), primary key(id));
    log.Printf("inserting a bunch of data on primary to generate a backlog of vstream events")
    for i := 0; i < 5000; i++ {
        _, err := pqc.Execute(
            ctx,
            &query.ExecuteRequest{
                Target: &query.Target{
                    Keyspace:   primary.Keyspace,
                    Shard:      primary.Shard,
                    TabletType: primary.Type,
                },
                Query: &query.BoundQuery{
                    BindVariables: make(map[string]*query.BindVariable),
                    Sql:           "INSERT INTO data(text1) VALUES(uuid())",
                },
            },
            grpc.FailFast(true),
        )
        if err != nil {
            log.Fatalf("failed to insert data: %v", err)
        }
    }

    ch1 := make(chan struct{})
    ch2 := make(chan struct{})

    // SetVSchema. VStreamer will be blocked from sending events to us.
    // This should block VStreamer from sending events to stream, because the
    // event buffer is small.
    log.Printf("sending a bunch of ApplyVSchema requests to the tablet's cell/keyspace")
    go func(ch1 chan<- struct{}, ch2 <-chan struct{}) {
        count := 0

        for {
            resp, err := vc.ApplyVSchema(
                ctx,
                &vtctldata.ApplyVSchemaRequest{
                    Cells:    []string{primary.Alias.GetCell()},
                    Keyspace: primary.Keyspace,
                    VSchema:  &vschema.Keyspace{},
                },
                grpc.FailFast(true),
            )
            if err != nil {
                log.Fatalf("failed to ApplyVSchema: %v", err)
            }

            log.Printf("vschema after ApplyVSchema: %v", resp.VSchema)
            count++

            if count == 10 {
                close(ch1)
            }

            time.Sleep(1 * time.Second)
        }
    }(ch1, ch2)

    <-ch1

    // Change tablet type.
    tmc := grpctmclient.NewClient()
    log.Printf("changing tablet type to primary")
    if err := tmc.ChangeType(context.TODO(), replica, topodata.TabletType_PRIMARY, false /*semi-sync*/); err != nil {
        log.Fatalf("failed to change tablet to primary: %v", err)
    }
    log.Printf("changed tablet type to primary")

    close(ch2)
}

Environment

This repro was tested against Vitess main:

➜  vitess git:(main) ✗ git  --no-pager log -n1
commit 1632f96e7d18041cd07693b171aa9006fcb2edbb (HEAD -> main, vitessio/main)
Author: Manan Gupta <35839558+GuptaManan100@users.noreply.github.com>
Date:   Thu Sep 1 22:49:10 2022 +0530

    feat: augment local example with vtorc (#11155)

    Signed-off-by: Manan Gupta <manan@planetscale.com>

    Signed-off-by: Manan Gupta <manan@planetscale.com>

In addition to the HEAD commit, I have some local log statements in Vitess to get a better understanding of where things are deadlocking.

➜  vitess git:(main) ✗ git --no-pager diff
diff --git a/go/vt/vttablet/grpcqueryservice/server.go b/go/vt/vttablet/grpcqueryservice/server.go
index 237c16db06..620be51463 100644
--- a/go/vt/vttablet/grpcqueryservice/server.go
+++ b/go/vt/vttablet/grpcqueryservice/server.go
@@ -18,12 +18,14 @@ package grpcqueryservice

 import (
        "context"
+       "sync/atomic"

        "google.golang.org/grpc"

        "vitess.io/vitess/go/sqltypes"
        "vitess.io/vitess/go/vt/callerid"
        "vitess.io/vitess/go/vt/callinfo"
+       "vitess.io/vitess/go/vt/log"
        "vitess.io/vitess/go/vt/vterrors"
        "vitess.io/vitess/go/vt/vttablet/queryservice"

@@ -331,10 +333,15 @@ func (q *query) VStream(request *binlogdatapb.VStreamRequest, stream queryservic
                request.EffectiveCallerId,
                request.ImmediateCallerId,
        )
+       log.Infof("setting up vstream...")
+       var count uint64
        err = q.server.VStream(ctx, request, func(events []*binlogdatapb.VEvent) error {
-               return stream.Send(&binlogdatapb.VStreamResponse{
+               log.Infof("about to emit %d events to gRPC vstream", len(events))
+               err := stream.Send(&binlogdatapb.VStreamResponse{
                        Events: events,
                })
+               log.Infof("done emitting %d events (%d total) to gRPC vstream, err=%s", len(events), atomic.AddUint64(&count, uint64(len(events))), err)
+               return err
        })
        return vterrors.ToGRPC(err)
 }
diff --git a/go/vt/vttablet/tabletserver/binlog_watcher.go b/go/vt/vttablet/tabletserver/binlog_watcher.go
index 6c713791e6..d808944738 100644
--- a/go/vt/vttablet/tabletserver/binlog_watcher.go
+++ b/go/vt/vttablet/tabletserver/binlog_watcher.go
@@ -70,11 +70,15 @@ func (blw *BinlogWatcher) Open() {

 // Close stops the BinlogWatcher service.
 func (blw *BinlogWatcher) Close() {
+       log.Infof("closing self")
        if blw.cancel == nil {
+               log.Infof("not open, returning")
                return
        }
+       log.Infof("calling cancel")
        blw.cancel()
        blw.cancel = nil
+       log.Infof("waiting")
        blw.wg.Wait()
        log.Info("Binlog Watcher: closed")
 }
diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go
index 341bc12d1f..a1dda8c9a9 100644
--- a/go/vt/vttablet/tabletserver/state_manager.go
+++ b/go/vt/vttablet/tabletserver/state_manager.go
@@ -213,6 +213,7 @@ func (sm *stateManager) SetServingType(tabletType topodatapb.TabletType, terTime

        log.Infof("Starting transition to %v %v, timestamp: %v", tabletType, state, terTimestamp)
        if sm.mustTransition(tabletType, terTimestamp, state, reason) {
+               log.Infof("Calling execTransition")
                return sm.execTransition(tabletType, state)
        }
        return nil
@@ -245,6 +246,7 @@ func (sm *stateManager) execTransition(tabletType topodatapb.TabletType, state s
        switch state {
        case StateServing:
                if tabletType == topodatapb.TabletType_PRIMARY {
+                       log.Infof("Calling servePrimary")
                        err = sm.servePrimary()
                } else {
                        err = sm.serveNonPrimary(tabletType)
@@ -404,13 +406,17 @@ func (sm *stateManager) verifyTargetLocked(ctx context.Context, target *querypb.
 }

 func (sm *stateManager) servePrimary() error {
+       log.Infof("Closing binlog_watcher")
        sm.watcher.Close()

+       log.Infof("connecting to topo")
        if err := sm.connect(topodatapb.TabletType_PRIMARY); err != nil {
                return err
        }

+       log.Infof("making primary")
        sm.rt.MakePrimary()
+       log.Infof("opening tracker")
        sm.tracker.Open()
        // We instantly kill all stateful queries to allow for
        // te to quickly transition into RW, but olap and stateless
@@ -479,16 +485,19 @@ func (sm *stateManager) unserveNonPrimary(wantTabletType topodatapb.TabletType)
 }

 func (sm *stateManager) connect(tabletType topodatapb.TabletType) error {
+       log.Infof("ensuring connection and db")
        if err := sm.se.EnsureConnectionAndDB(tabletType); err != nil {
                return err
        }
        if err := sm.se.Open(); err != nil {
                return err
        }
+       log.Infof("opening vstreamer")
        sm.vstreamer.Open()
        if err := sm.qe.Open(); err != nil {
                return err
        }
+       log.Infof("opening tx throttler")
        return sm.txThrottler.Open()
 }

diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
index 131203cee8..f08c677c20 100644
--- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
+++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
@@ -138,12 +138,14 @@ func newVStreamer(ctx context.Context, cp dbconfigs.Connector, se *schema.Engine

 // SetVSchema updates the vstreamer against the new vschema.
 func (vs *vstreamer) SetVSchema(vschema *localVSchema) {
+       log.Infof("[RPB] trying to SetVSchema, len=%d, cap=%d", len(vs.vevents), cap(vs.vevents))
        // Since vs.Stream is a single-threaded loop. We just send an event to
        // that thread, which helps us avoid mutexes to update the plans.
        select {
        case vs.vevents <- vschema:
        case <-vs.ctx.Done():
        }
+       log.Infof("[RPB] SetVSchema complete")
 }

 // Cancel stops the streaming.

Logs

Here are logs from the repro program, as well as from the replica tablet.

Repro logs

root@my-shell:/# /tmp/repro vitess-cluster-dev-vtctld-372e9986 src local
ERROR: logging before flag.Parse: W0901 22:11:24.245771     375 component.go:41] [core] grpc: addrConn.createTransport failed to connect to {vitess-cluster-dev-vtctld-372e9986 vitess-cluster-dev-vtctld-372e9986 <nil> <nil> 0 <nil>}. Err: connection error: desc = "transport: Error while dialing dial tcp: address vitess-cluster-dev-vtctld-372e9986: missing port in address"
ERROR: logging before flag.Parse: I0901 22:11:24.246038     375 logutil.go:31] log: failed to get replica tablets: rpc error: code = Unavailable desc = connection error: desc = "transport: Error while dialing dial tcp: address vitess-cluster-dev-vtctld-372e9986: missing port in address"
root@my-shell:/# /tmp/repro vitess-cluster-dev-vtctld-372e9986:15999 src local
ERROR: logging before flag.Parse: I0901 22:11:38.852849     382 logutil.go:31] log: set up vstream: &{0x400025cf00}
ERROR: logging before flag.Parse: I0901 22:11:38.852881     382 logutil.go:31] log: wait 60 seconds
ERROR: logging before flag.Parse: I0901 22:11:38.852885     382 logutil.go:31] log: T-1m0s
ERROR: logging before flag.Parse: I0901 22:11:39.853914     382 logutil.go:31] log: T-58.998988416s
[...more lines like this removed for brevity...]
ERROR: logging before flag.Parse: I0901 22:12:35.888389     382 logutil.go:31] log: T-2.964339141s
ERROR: logging before flag.Parse: I0901 22:12:36.889034     382 logutil.go:31] log: T-1.963694057s
ERROR: logging before flag.Parse: I0901 22:12:37.889827     382 logutil.go:31] log: T-962.906223ms
ERROR: logging before flag.Parse: I0901 22:12:38.890700     382 logutil.go:31] log: waiting for an initial vstream event
ERROR: logging before flag.Parse: I0901 22:12:38.891196     382 logutil.go:31] log: got an initial vstream event: events:{type:GTID  gtid:"MySQL56/85fd3b7d-2a1c-11ed-862d-9e49ad135cac:1-10031,8635dd54-2a1c-11ed-862d-ee3fa2770711:1-15092"  keyspace:"src"  shard:"-"}  events:{type:OTHER  keyspace:"src"  shard:"-"}
ERROR: logging before flag.Parse: I0901 22:12:38.891211     382 logutil.go:31] log: inserting a bunch of data on primary to generate a backlog of vstream events
ERROR: logging before flag.Parse: I0901 22:13:09.995220     382 logutil.go:31] log: sending a bunch of ApplyVSchema requests to the tablet's cell/keyspace
ERROR: logging before flag.Parse: I0901 22:13:10.024310     382 logutil.go:31] log: vschema after ApplyVSchema:
ERROR: logging before flag.Parse: I0901 22:13:11.082430     382 logutil.go:31] log: vschema after ApplyVSchema:
ERROR: logging before flag.Parse: I0901 22:13:12.109163     382 logutil.go:31] log: vschema after ApplyVSchema:
ERROR: logging before flag.Parse: I0901 22:13:13.134091     382 logutil.go:31] log: vschema after ApplyVSchema:
ERROR: logging before flag.Parse: I0901 22:13:14.166263     382 logutil.go:31] log: vschema after ApplyVSchema:
ERROR: logging before flag.Parse: I0901 22:13:15.194598     382 logutil.go:31] log: vschema after ApplyVSchema:
ERROR: logging before flag.Parse: I0901 22:13:16.264974     382 logutil.go:31] log: vschema after ApplyVSchema:
ERROR: logging before flag.Parse: I0901 22:13:17.292940     382 logutil.go:31] log: vschema after ApplyVSchema:
ERROR: logging before flag.Parse: I0901 22:13:18.317042     382 logutil.go:31] log: vschema after ApplyVSchema:
ERROR: logging before flag.Parse: I0901 22:13:19.340086     382 logutil.go:31] log: vschema after ApplyVSchema:
ERROR: logging before flag.Parse: I0901 22:13:19.340113     382 logutil.go:31] log: changing tablet type to primary
ERROR: logging before flag.Parse: I0901 22:13:20.364044     382 logutil.go:31] log: vschema after ApplyVSchema:
ERROR: logging before flag.Parse: I0901 22:13:21.394695     382 logutil.go:31] log: vschema after ApplyVSchema:
ERROR: logging before flag.Parse: I0901 22:13:22.417508     382 logutil.go:31] log: vschema after ApplyVSchema:
ERROR: logging before flag.Parse: I0901 22:13:23.442249     382 logutil.go:31] log: vschema after ApplyVSchema:
[...more lines like this removed for brevity...]

Replica logs

0901 22:11:38.853023       1 server.go:336] setting up vstream...
I0901 22:11:38.862328       1 uvstreamer.go:368] Stream() called
I0901 22:11:38.865327       1 uvstreamer.go:303] sendEventsForCurrentPos
I0901 22:11:38.865364       1 server.go:339] about to emit 2 events to gRPC vstream
I0901 22:11:38.865419       1 server.go:343] done emitting 2 events (2 total) to gRPC vstream, err=%!s(<nil>)
I0901 22:11:38.865440       1 vstreamer.go:162] Starting Stream() with startPos MySQL56/85fd3b7d-2a1c-11ed-862d-9e49ad135cac:1-10031,8635dd54-2a1c-11ed-862d-ee3fa2770711:1-15092
I0901 22:11:38.867053       1 binlog_connection.go:80] new binlog connection: serverID=2104905959
I0901 22:11:38.867069       1 binlog_connection.go:126] sending binlog dump command: startPos=85fd3b7d-2a1c-11ed-862d-9e49ad135cac:1-10031,8635dd54-2a1c-11ed-862d-ee3fa2770711:1-15092, serverID=2104905959
I0901 22:11:39.882717       1 server.go:339] about to emit 1 events to gRPC vstream
I0901 22:11:39.882778       1 server.go:343] done emitting 1 events (3 total) to gRPC vstream, err=%!s(<nil>)
I0901 22:11:40.783469       1 server.go:339] about to emit 1 events to gRPC vstream
I0901 22:11:40.783510       1 server.go:343] done emitting 1 events (4 total) to gRPC vstream, err=%!s(<nil>)
[...more lines like this removed for brevity...]
I0901 22:12:45.090756       1 server.go:339] about to emit 3 events to gRPC vstream
I0901 22:12:45.090823       1 server.go:343] done emitting 3 events (2015 total) to gRPC vstream, err=%!s(<nil>)
I0901 22:12:45.095792       1 server.go:339] about to emit 3 events to gRPC vstream
I0901 22:12:45.095817       1 server.go:343] done emitting 3 events (2018 total) to gRPC vstream, err=%!s(<nil>)
I0901 22:12:45.102612       1 server.go:339] about to emit 3 events to gRPC vstream
I0901 22:13:10.025445       1 vstreamer.go:141] [RPB] trying to SetVSchema, len=0, cap=1
I0901 22:13:10.025472       1 vstreamer.go:148] [RPB] SetVSchema complete
I0901 22:13:11.080302       1 vstreamer.go:141] [RPB] trying to SetVSchema, len=1, cap=1
I0901 22:13:19.340986       1 tm_state.go:185] Changing Tablet Type: PRIMARY
I0901 22:13:19.355512       1 syslogger.go:129] src/-/local-0977615922 [tablet] updated
I0901 22:13:19.355659       1 engine.go:191] VReplication Engine: opening
I0901 22:13:19.360572       1 engine.go:207] VReplication engine opened successfully
I0901 22:13:19.360596       1 engine.go:98] VDiff Engine: opening...
I0901 22:13:19.366177       1 state_manager.go:214] Starting transition to PRIMARY Serving, timestamp: 2022-09-01 22:13:19.341037633 +0000 UTC
I0901 22:13:19.366245       1 state_manager.go:216] Calling execTransition
I0901 22:13:19.366255       1 state_manager.go:249] Calling servePrimary
I0901 22:13:19.366266       1 state_manager.go:409] Closing binlog_watcher
I0901 22:13:19.366270       1 binlog_watcher.go:73] closing self
I0901 22:13:19.366272       1 binlog_watcher.go:75] not open, returning
I0901 22:13:19.366273       1 state_manager.go:412] connecting to topo
I0901 22:13:19.366275       1 state_manager.go:488] ensuring connection and db
I0901 22:13:19.368618       1 state_manager.go:495] opening vstreamer
maxenglander commented 2 years ago

Hi @deepthi I tried using the repro with PlannedReparentShard and it doesn't lock up like ChangeTabletType.

Here's the modified repro using PlannedReparentShard: ``` package main import ( "context" "log" "os" "time" "google.golang.org/grpc" "vitess.io/vitess/go/netutil" "vitess.io/vitess/go/vt/grpcclient" "vitess.io/vitess/go/vt/proto/binlogdata" "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/proto/queryservice" "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/proto/vschema" "vitess.io/vitess/go/vt/proto/vtctldata" "vitess.io/vitess/go/vt/vtctl/grpcvtctldclient" ) // Repro shared by Ryan Brewster of Stripe. func main() { ctx := context.Background() topoAddr := os.Args[1] keyspace := os.Args[2] cell := os.Args[3] // Connect to topo indirectly via vtctld. vc, err := grpcvtctldclient.NewWithDialOpts( topoAddr, true, /* fail fast */ grpc.WithInsecure(), ) if err != nil { log.Fatalf("failed to get new grpc client: %s", err.Error()) } // Get a replica tablet and primary tablet resp, err := vc.GetTablets( ctx, &vtctldata.GetTabletsRequest{ Cells: []string{cell}, Keyspace: keyspace, }, grpc.FailFast(true), ) if err != nil { log.Fatalf("failed to get replica tablets: %v", err) } tablets := resp.GetTablets() if len(tablets) == 0 { log.Fatalf("no tablets found") } var primary *topodata.Tablet var replica *topodata.Tablet for _, tablet := range tablets { if tablet.Type == topodata.TabletType_PRIMARY { primary = tablet } if tablet.Type == topodata.TabletType_REPLICA { replica = tablet } } if primary == nil { log.Fatalf("failed to get primary tablet") } if replica == nil { log.Fatalf("failed to get replica tablet") } // Create queryservice to replica. addr := "" if grpcPort, ok := replica.PortMap["grpc"]; ok { addr = netutil.JoinHostPort(replica.Hostname, grpcPort) } else { addr = replica.Hostname } rcc, err := grpcclient.Dial( addr, true, /* fail fast */ grpc.WithInsecure(), /* not recommended for production use! */ ) if err != nil { log.Fatalf("failed to dial tablet: %v", err) } rqc := queryservice.NewQueryClient(rcc) // Create queryservice to primary. addr = "" if grpcPort, ok := primary.PortMap["grpc"]; ok { addr = netutil.JoinHostPort(primary.Hostname, grpcPort) } else { addr = primary.Hostname } pcc, err := grpcclient.Dial( addr, true, /* fail fast */ grpc.WithInsecure(), /* not recommended for production use! */ ) if err != nil { log.Fatalf("failed to dial tablet: %v", err) } pqc := queryservice.NewQueryClient(pcc) // Create VStream from replica. stream, err := rqc.VStream(context.TODO(), &binlogdata.VStreamRequest{ Target: &query.Target{ Keyspace: replica.Keyspace, Shard: replica.Shard, TabletType: replica.Type, }, Position: "current", Filter: &binlogdata.Filter{}, }) if err != nil { log.Fatalf("client.VStream: %v", err) } log.Printf("set up vstream: %v", stream) log.Printf("wait 60 seconds") wait := time.Duration(60 * time.Second) var left = wait for start := time.Now(); time.Since(start) < wait; left = wait - time.Since(start) { log.Printf("T-%v", left) time.Sleep(1 * time.Second) } log.Printf("waiting for an initial vstream event") for { resp, err := stream.Recv() if err != nil { log.Printf("failed to recv vstream resp: %v", err) } log.Printf("got an initial vstream event: %v", resp) break } // Trigger a big backlog of vstream events on the primary. // This was done ahead of time: create table data(id int not null auto_increment, text1 varchar(36), primary key(id)); log.Printf("inserting a bunch of data on primary to generate a backlog of vstream events") for i := 0; i < 5000; i++ { _, err := pqc.Execute( ctx, &query.ExecuteRequest{ Target: &query.Target{ Keyspace: primary.Keyspace, Shard: primary.Shard, TabletType: primary.Type, }, Query: &query.BoundQuery{ BindVariables: make(map[string]*query.BindVariable), Sql: "INSERT INTO data(text1) VALUES(uuid())", }, }, grpc.FailFast(true), ) if err != nil { log.Fatalf("failed to insert data: %v", err) } } ch1 := make(chan struct{}) ch2 := make(chan struct{}) // SetVSchema. VStreamer will be blocked from sending events to us. // This should block VStreamer from sending events to stream, because the // event buffer is small. log.Printf("sending a bunch of ApplyVSchema requests to the tablet's cell/keyspace") go func(ch1 chan<- struct{}, ch2 <-chan struct{}) { count := 0 for { resp, err := vc.ApplyVSchema( ctx, &vtctldata.ApplyVSchemaRequest{ Cells: []string{primary.Alias.GetCell()}, Keyspace: primary.Keyspace, VSchema: &vschema.Keyspace{}, }, grpc.FailFast(true), ) if err != nil { log.Fatalf("failed to ApplyVSchema: %v", err) } log.Printf("vschema after ApplyVSchema: %v", resp.VSchema) count++ if count == 10 { close(ch1) } time.Sleep(1 * time.Second) } }(ch1, ch2) <-ch1 // Change tablet type. //tmc := grpctmclient.NewClient() //log.Printf("changing tablet type to primary") //if err := tmc.ChangeType(context.TODO(), replica, topodata.TabletType_PRIMARY, false /*semi-sync*/); err != nil { // log.Fatalf("failed to change tablet to primary: %v", err) //} //log.Printf("changed tablet type to primary") // PlannedReparentShard log.Printf("doing planned reparent shard") if _, err := vc.PlannedReparentShard(context.TODO(), &vtctldata.PlannedReparentShardRequest{ Keyspace: primary.Keyspace, NewPrimary: primary.Alias, Shard: primary.Shard, }, grpc.FailFast(true)); err != nil { log.Fatalf("failed to do planned reparent shard") } log.Printf("did planned reparent shard") close(ch2) } ```