cheungseol / cheungseol.github.io

2 stars 0 forks source link

使用 dataloader 优化 GraphQL 性能 #24

Open cheungseol opened 4 years ago

cheungseol commented 4 years ago

在 GraphQL 应用中 N+1 是经常遇到的一个问题场景。

N + 1 问题: What is the N+1 Query Problem? This problem occurs when the code needs to load the children of a parent-child relationship (the “many” in the “one-to-many”). Most ORMs have lazy-loading enabled by default, so queries are issued for the parent record, and then one query for EACH child record

N+ 1 问题

以一条简单的 GraphQL 语句为例🌰描述什么是 N + 1 问题:

GraphQL Client

前端查询文章列表并获取每篇文章的标签信息

{            
  query {        
   articles {  // 文章列表             
     title            
     tags { // 文章标签信息                
       name             
     }        
   }     
  }
}

GraphQL Server

以下👇用  type-graphql 展示 GraphQL server 的实现:

@ObjectType()
class Article {  
 @Field(type => ID)  id: string;  
 @Field()  title: string;  
 @Field(type => [Tag])  tags: Tag[];
}

@ObjectType()
class Tag {  
 @Field(type => ID)  id: string;  
 @Field()  name: string;
}
@Resolver(Article)
class ArticleResolver {  
  constructor(private apiService: ApiService) {}  
  @Query(returns => [Article])  
  async articles(): Promise {    
    return this.apiService.getArticles();  
  }
}

@Resolver(Tag)
class TagResolver {  
  constructor(private apiService: ApiService) {}   
  @Query(returns => [Tag])  
  async tags(@Args() { id }: GetTagsArgs ): Promise {    
    return this.apiService.getTags(id);  
  }
}

按照上面的前端查询语句,GraphQL server 会先执行一次 apiService.getArticles 获取所有 articles,然后再执行 apiService.getTags 分别获取各篇文章的 tags 信息。即如果有 n 篇文章的话,GraphQL server 实际执行的请求次数是 1 + n(1次 apiService.getArticles 和 n 次 apiService.getTags )

问题原因

GraphQL server 中之所以存在 N + 1 问题,是因为在实现 resolver 时,每个 resolver 都只知道自己的请求信息,并不清楚当前存在其它 resolver 与自己请求相同的接口或数据(resolver 不具备 look after 或 behind 的能力

解决方案

Dataloader 是 facebook 提出的一种解决方案,同它时具备批量请求 batch 和数据缓存 cache 的能力。Dataloader 能够收集一段时间内来自后端的多个数据请求,然后打包成单个请求再发送到底层数据库或服务。

Dataloader 原理

简单来说 dataloader 在内部维护了一个队列,将 Event loop 中一个 tick 阶段的接口请求(可以是API服务调用、数据库访问等异步操作)都放入一个队列里,并在 process.nextTick 时批量执行。

可以理解为 dataloader 的实现是延迟了 resolver 的执行。每次请求初始化 new Dataloader() ,并传入 batchLoadFn,batchLoadFn 是一个 promise,当清空任务队列时会批量执行这个 promise。在实际应用中,batchLoadFn 通常是一个批量请求接口服务或者访问数据库的 promise。

每当调用 dataloader load 方法时,会将相应的 key (dataloader.load 的参数)和 callback(new Dataloader 的参数) 放入队列。当 Event Loop 中所有的 promise resolve 执行完后,再通过 process.nextTick 方法执行队列中的所有任务。之所以使用process.nextTick ,是因为 libuv 在 Nodejs Event loop 中的每个 tick 之间,都会检查和执行process.nextTick 微任务队列,并且 process.nextTick 在 Event loop 中拥有较高的执行优先级。

由于在 Event loop 中, process.nextTick 的优先级比同样是微任务的 Pomise resolve 要高, 所以为了避免 process.nextTick 提前执行 ,保证在当前所有进行中的 Promise 执行完之后再执行队列任务,dataloader 将process.nextTick 放在 Promise.resolve 中执行。

var enqueuePostPromiseJob =  
  typeof process === 'object' && typeof process.nextTick === 'function' 
   ? function (fn) {     
      // ...      
     // 在 Promise.resolve 中执行 process.nextTick        
     resolvedPromise.then(() => {        
       process.nextTick(fn);     
     });    
    } :  setImmediate || setTimeout;

Dataloader 使用

先看未使用 DataLoader 优化的例子:

const FAKE_DATABASE = ['JAVA', 'PHP', 'GOLANG', 'PYTHON', 'RUBY', 'PERL'];
const getTagById = async (id) => {  
  console.log('DATABASE accessed!')  
  return FAKE_DATABASE[id - 1];
}

for (let i = 1 ; i< 7; i++) { 
  getTagById(i);
}

运行结果可以看到,每次执行 ​getTagById​ 都会访问一次 FAKE_DATABASE ,当代码中有 7 处调用 ​getTagById​ 函数时,FAKE_DATABASE 一共被访问了 7 次。在数据量很大的情况下,这显然不是一个理想的方案:

不使用dataloader

批处理

DataLoader 有批量执行异步任务 (*batching**)* 的能力,下面展示使用 DataLoader 优化批处理操作:

DataLoader 有批量执行异步任务的能力,下面展示使用 DataLoader 优化批处理操作:

const FAKE_DATABASE = ['JAVA', 'PHP', 'GOLANG', 'PYTHON', 'RUBY', 'PERL'];
const batchGetTagsById = async (ids) => {  
  console.log('batching:', ids);  
  return ids.map(id => FAKE_DATABASE[id - 1]);
};
const userLoader = new DataLoader(batchGetTagsById);
for (let i = 1 ; i< 7; i++) {    
  userLoader.load(i);
}

运行结果中可以看到使用 userLoader.load 方法合并了所有的调用参数,统一调用

使用dataloader batching

dataloader 执行时机

前面提到 dataloader 会在 process.next 时批量执行队列中的任务。下面的代码中使用 setTimeout 制造前后两个 Event loop tick:

import DataLoader from 'dataloader';
const FAKE_DATABASE = ['JAVA', 'PHP', 'GOLANG', 'PYTHON', 'RUBY', 'PERL'];
const batchGetTagsById = async (ids) => { 
  console.log('batching:', ids); 
  return ids.map(id => FAKE_DATABASE[id - 1]);
};
const userLoader = new DataLoader(batchGetTagsById);
console.log('\nEvent Loop Tick 1');
userLoader.load(1);
userLoader.load(2);
userLoader.load(5);
userLoader.load(6);
setTimeout(() => { 
 console.log('\nEvent Loop Tick 2'); 
 userLoader.load(3); 
 userLoader.load(4);
}, 1000);

当使用 setTimeout 时会在 nodejs Event loop 中新建一个 timer 宏任务(即一个 Eventloop tick)。从输出结果可以看到,dataloader 在每个 EventLoop tick 之间的阶段执行 batching 队列中的任务,而 libuv 正是在每个 tick 之间的阶段检查和执行 process.nectTick 任务:

dataloader执行时机

缓存

除了批量执行任务之外,Dataloader 还具备缓存(caching)的能力。当多次执行 dataloader load 方法并且参数一致时,将直接从缓存中返回:

import DataLoader from 'dataloader';
const FAKE_DATABASE = ['JAVA', 'PHP', 'GOLANG', 'PYTHON', 'RUBY', 'PERL'];
const batchGetTagsById = async (ids) => { 
 console.log('batching:', ids);
 return ids.map(id => FAKE_DATABASE[id - 1]);
};
const userLoader = new DataLoader(batchGetTagsById);
console.log('\nEvent Loop Tick 1');
userLoader.load(1);
userLoader.load(2);
userLoader.load(3);
userLoader.load(4);
userLoader.load(5);
userLoader.load(6);
setTimeout(() => { 
 console.log('\nEvent Loop Tick 2'); 
 userLoader.load(3).then(d=>console.log(d)); 
 userLoader.load(4).then(d=>console.log(d));
}, 1000);

运行结果可以看到在 setTimeout 中重复执行参数为 3、4 的 userLoader.load 方法时,dataloader 利用 cache 直接返回了数据:

使用 dataloader cache

需要注意的是,dataloader 缓存的是 promise,而不是数据结果

结合 GraphQL 使用

Resolver 使用 dataloader 的 load 方法实现,可以解决 GraqhQL 的 N + 1 问题。当 GraqhQL Client 请求的 query 中涉及重复执行 resolver 时,dataloader 会把它们放入队列合并执行: 还是上面查询文章和标签信息的例子,这次 GraphQL Server 使用 dataloader 实现 resolver:

// 每个请求初始化 DataLoader 实例,参数是批处理服务的 batch Promiseconst tagLoader = new DataLoader(ids => this.apiService.getTagsByIds(ids))// 每次执行 dataloader.load() 方法,都会将参数 id 进入队列,队列中的所有参数将最终合并传入 batch Promiseconst batchingGetTagsById = id => tagLoader.load(id)@Resolver(Tag)class TagResolver {  constructor(private apiService: ApiService) {}  @Query(returns => [Tag])     async tags(@Args() { id }: GetTagsArgs): Promise {     // 每次执行 TagResolver 获取 tags 时,执行 dataloader.load() 方法     return batchingGetTagsById(id);  }}

可以看到当 Client Query 中需要同时查询多个 tags 信息时,GraphQL 通过 datalaoder 不会再重读 n 次请求接口服务,而是把 n 次请求合并为一次,并且在 id 相同时还可以减少重复的请求。

Dataloader 实现

dataloader 的实现使用了 flow 而非 typescript)每当 new 一个 Dataloader 实例时,传入的 promise 参数即作为 batchLoadFn:

class DataLoader {  
  constructor(   
    batchLoadFn: BatchLoadFn,    
    options?: Options
  ) {   
   // ...  
  }  
  // ... 
  load(key: K): Promise {    
   // ...  
  } 
}

新建的实例拥有 load 方法,负责把每个执行 load 的函数放入任务队列:

  load(key: K): Promise {    
   // ...    
   // batch 为当前要处理的请求任务队列,包含 keys 和 callbacks    
   var batch = getCurrentBatch(this);    
   // ...    
   // 如果匹配中了 cache,则直接返回 cache    
   if (cacheMap) {      
      var cachedPromise = cacheMap.get(cacheKey);      
      if (cachedPromise) {        
        return new Promise(resolve => {            
           // ...            
           resolve(cachedPromise);        
        });      
      }    
    }    
   // 如果没有匹配到 cache,新生成一个 promise 放入请求任务队列中    
   batch.keys.push(key); 
   // 把 load 方法的参数作为 promise 的 key    
   var promise = new Promise((resolve, reject) => {      
      // 把 promise resolve 进入 callbacks 队列      
      batch.callbacks.push({ resolve, reject });    
   });    
   // 放入缓存,注意这里缓存的并不是执行结果,而是 promise   
   if (cacheMap) {      
     cacheMap.set(cacheKey, promise);    
   }    
   return promise;  
}

获取(或创建)当前需要批处理的任务队列:

function getCurrentBatch(loader: DataLoader): Batch {  
   // ...  
   // 如果任务队列已经存在,则直接返回已有的队列,避免重复创建 
   if (    existingBatch !== null && ...  ) {    return existingBatch;  }  
   // 创建唯一的任务队列,后续每当执行 load(key)方法时,key 会存入 keys,并把 key 对应的 promise resolve 存入 callbacks  
   var newBatch = { hasDispatched: false, keys: [], callbacks: [] }; 
   // ...  
  // 在所有 promise 之后的 nextTick 阶段批量处理队列中的请求  
  loader._batchScheduleFn(() => {    
       dispatchBatch(loader, newBatch); 
  });  
  return newBatch;
}

上面的 _batchScheduleFn 方法实际上返回的是 enqueuePostPromiseJob 函数,这个函数指明了 batching 队列的执行时机。dataloader 需要在当前所有的 promise resolve 之后批处理队列中的任务:

var enqueuePostPromiseJob =  
     typeof process === 'object' && typeof process.nextTick === 'function' 
          ?   function (fn) {      
                  if (!resolvedPromise) {        
                      resolvedPromise = Promise.resolve();      
                  }      
                 resolvedPromise.then(() => {        
                      process.nextTick(fn);      
                 });    
               } 
          :    setImmediate || setTimeout;

dispatchBatch 函数即为批量处理当前任务队列的方法:

function dispatchBatch(  loader: DataLoader,  batch: Batch) {  
  // ... 
   // 执行 _batchLoadFn 方法(新建 dataloader 实例时传入的 promise 函数),参数为所有调用 load 方法时传入的参数集合  
  var batchPromise = loader._batchLoadFn(batch.keys);  
  // ...  
  // 执行 batchPromise,返回批处理的结果 
  batchPromise.then(values => {    
      for (var i = 0; i < batch.callbacks.length; i++) {      
          var value = values[i];     
          if (value instanceof Error) {        
             batch.callbacks[i].reject(value);     
          } else {        
            batch.callbacks[i].resolve(value);      
       }    
     } 
  }
)}