type ShardAwareRoutingInfo struct {
// RoutingKey - is bytes of primary key
RoutingKey []byte
// Host - is node to connect (HostAware policy)
Host *HostInfo
// Shard - is shard ID of node to connect (ShardAware policy)
Shard int
}
2. Have some option to do this optimization automatically when it makes sense and/or possible.
### Implementation details
To make it properly work for tablets we will need to pull tablets info from `system.tablet` beforehand.
### Pseudo code example
Borrowed from the same PR:
func routeRangeSelectToProperShards() {
const shardsAbout = 100 // node (cpu-1)
// Split []T by chunks
var (
queryBatches = make(map[string][]T, shardsAbout) // []T grouped by chunks
routingKeys = make(map[string][]byte, shardsAbout) // routing key for query
)
for _, pk := range pks {
var (
shardID string
routingKey []byte
)
// We receive information about the routing of our keys.
// In this example, PRIMARY KEY consists of one column pk_column_name.
info, err := session.GetShardAwareRoutingInfo(keyspaceName, tableName, []string{"pk_column_name"}, pk)
if err != nil || info.Host == nil {
// We may not get routing information for various reasons (change shema topology, etc).
// It is important to understand the reason when testing (for example, you are not using tokenAwarePolicy)
log.Printf("can't get shard id of pk '%d': %v", pk, err)
} else {
// build key: host + "/" + vShard (127.0.0.1/1)
shardID = info.Host.Hostname() + "/" + strconv.Itoa(info.Shard)
routingKey = info.RoutingKey
}
// Put key to corresponding batch
batch := queryBatches[shardID]
if batch == nil {
batch = make([]int64, 0, len(pks)/shardsAbout)
}
batch = append(batch, pk)
queryBatches[shardID] = batch
routingKeys[shardID] = rk
}
const query = "SELECT FROM tablename WHERE pk IN (?)"
var wg sync.WaitGroup
// we go through all the batches to execute queries in parallel
for shard, batch := range batches {
// We divide large batches into smaller chunks, since large batches in SELECT queries have a bad effect on RT scylla
for , chunk := range slices.ChunkSlice(batch, 10) { // slices.ChunkSlice some function that splits slice by N slices of M or less lenght (in our example M=10)
wg.Add(1)
go func(shard string, chunk []int64) {
defer wg.Done()
rk := keys[shard] // get our routing key
scanner := r.session.Query(query, chunk).RoutingKey(rk).Iter().Scanner() // use RoutingKey
for scanner.Next() {
// ...
}
if err := scanner.Err(); err != nil {
// ...
}
}(shard, chunk)
}
}
// wait for all answers
wg.Wait()
// NOTE: this is not the most optimal strategy 'cause we're waiting for all queries done.
// If at least one query has long response time it will affects on the response time of our method. (RT our method = max RT of queries)
}
This is idea from PR.
Proposal
There are two levels on what we can do
type ShardAwareRoutingInfo struct { // RoutingKey - is bytes of primary key RoutingKey []byte // Host - is node to connect (HostAware policy) Host *HostInfo // Shard - is shard ID of node to connect (ShardAware policy) Shard int }
func routeRangeSelectToProperShards() { const shardsAbout = 100 // node (cpu-1) // Split []T by chunks var ( queryBatches = make(map[string][]T, shardsAbout) // []T grouped by chunks routingKeys = make(map[string][]byte, shardsAbout) // routing key for query ) for _, pk := range pks { var ( shardID string routingKey []byte ) // We receive information about the routing of our keys. // In this example, PRIMARY KEY consists of one column pk_column_name. info, err := session.GetShardAwareRoutingInfo(keyspaceName, tableName, []string{"pk_column_name"}, pk) if err != nil || info.Host == nil { // We may not get routing information for various reasons (change shema topology, etc). // It is important to understand the reason when testing (for example, you are not using tokenAwarePolicy) log.Printf("can't get shard id of pk '%d': %v", pk, err) } else { // build key: host + "/" + vShard (127.0.0.1/1) shardID = info.Host.Hostname() + "/" + strconv.Itoa(info.Shard) routingKey = info.RoutingKey } // Put key to corresponding batch batch := queryBatches[shardID] if batch == nil { batch = make([]int64, 0, len(pks)/shardsAbout) } batch = append(batch, pk) queryBatches[shardID] = batch routingKeys[shardID] = rk } const query = "SELECT FROM tablename WHERE pk IN (?)" var wg sync.WaitGroup // we go through all the batches to execute queries in parallel for shard, batch := range batches { // We divide large batches into smaller chunks, since large batches in SELECT queries have a bad effect on RT scylla for , chunk := range slices.ChunkSlice(batch, 10) { // slices.ChunkSlice some function that splits slice by N slices of M or less lenght (in our example M=10) wg.Add(1) go func(shard string, chunk []int64) { defer wg.Done() rk := keys[shard] // get our routing key scanner := r.session.Query(query, chunk).RoutingKey(rk).Iter().Scanner() // use RoutingKey for scanner.Next() { // ... } if err := scanner.Err(); err != nil { // ... } }(shard, chunk) } } // wait for all answers wg.Wait() // NOTE: this is not the most optimal strategy 'cause we're waiting for all queries done. // If at least one query has long response time it will affects on the response time of our method. (RT our method = max RT of queries) }