Open afredlyj opened 8 years ago
DubboInvoker 负责consumer收发消息:
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout) ;
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
有三种发送模式:
接下来,主要看看双向发送的流程,跟踪Dubbo源码,请求数据被包装成Request
对象,该对象包含一个当前进程的唯一ID(使用AtomicLong实现),该ID代表一次request-response过程,当provider返回时,会根据该ID查找对应的Channel(因为Dubbo多路复用,所以provider端需要返回reqId,如果是独占Channel则不需要)。
// HeaderExchangeChannel.java
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try{
channel.send(req);
}catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
发送请求前,将请求的Request和Channel对象绑定到DefaultFuture
,然后将该Future返回,调用端通过该Future获取返回值。
上面说了唯一ID和Channel绑定,接下来看看处理流程,Channel取决于配置,默认是NettyClient
。
provider端数据返回之后,consumer端怎么将DefaultFuture和返回数据关联呢?接下来看看NettyClient
的处理流程 , NettyClient
的类图如下:
初始化流程会调用模版方法doOpen
和doConnect
,这两个方法和Netty本身的运行机制关系较大,这里只截取其中的一部分说明:
// NettyClient.java
@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
bootstrap = new ClientBootstrap(channelFactory);
// config
// @see org.jboss.netty.channel.socket.SocketChannelConfig
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("connectTimeoutMillis", getTimeout());
// NettyHandler处理数据的发送和接收,实际由NettyClient处理
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
}
这两个方法主要负责Netty框架的初始化工作,数据的发送和接收代码如下:
// NettyHandler.java
// 接收provider的数据
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
// 根据netty 中的channel,获取到绑定的dubbo channel
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
try {
// NettyClient对象
handler.received(channel, e.getMessage());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
// 请求provider
@Override
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
super.writeRequested(ctx, e);
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
try {
handler.sent(channel, e.getMessage());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
当接收到provider的返回时,经过层层调用,最后走到DefaultFuture#received
逻辑:
public static void received(Channel channel, Response response) {
try {
// 根据Response中的reqId查询到绑定的Future,然后设置Future的返回,并通过Condition通知等待Future返回的调用线程
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
至此,consumer端的处理返回的逻辑结束,移除缓存的Future和Channel。
由ExtensionLoader#createAdaptiveExtensionClassCode
方法生成。
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
public void destroy() {
throw new UnsupportedOperationException(
"method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException(
"method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public com.alibaba.dubbo.rpc.Exporter export(
com.alibaba.dubbo.rpc.Invoker arg0)
throws com.alibaba.dubbo.rpc.Invoker {
if (arg0 == null) {
throw new IllegalArgumentException(
"com.alibaba.dubbo.rpc.Invoker argument == null");
}
if (arg0.getUrl() == null) {
throw new IllegalArgumentException(
"com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
}
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = ((url.getProtocol() == null) ? "dubbo"
: url.getProtocol());
if (extName == null) {
throw new IllegalStateException(
"Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" +
url.toString() + ") use keys([protocol])");
}
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class)
.getExtension(extName);
return extension.export(arg0);
}
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0,
com.alibaba.dubbo.common.URL arg1) throws java.lang.Class {
if (arg1 == null) {
throw new IllegalArgumentException("url == null");
}
com.alibaba.dubbo.common.URL url = arg1;
String extName = ((url.getProtocol() == null) ? "dubbo"
: url.getProtocol());
if (extName == null) {
throw new IllegalStateException(
"Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" +
url.toString() + ") use keys([protocol])");
}
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class)
.getExtension(extName);
return extension.refer(arg0, arg1);
}
}
package com.alibaba.dubbo.rpc.cluster;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Cluster$Adpative implements com.alibaba.dubbo.rpc.cluster.Cluster {
public com.alibaba.dubbo.rpc.Invoker join(
com.alibaba.dubbo.rpc.cluster.Directory arg0)
throws com.alibaba.dubbo.rpc.cluster.Directory {
if (arg0 == null) {
throw new IllegalArgumentException(
"com.alibaba.dubbo.rpc.cluster.Directory argument == null");
}
if (arg0.getUrl() == null) {
throw new IllegalArgumentException(
"com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");
}
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("cluster", "failover");
if (extName == null) {
throw new IllegalStateException(
"Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" +
url.toString() + ") use keys([cluster])");
}
com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class)
.getExtension(extName);
return extension.join(arg0);
}
}
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory {
public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0)
throws com.alibaba.dubbo.rpc.Invoker {
if (arg0 == null) {
throw new IllegalArgumentException(
"com.alibaba.dubbo.rpc.Invoker argument == null");
}
if (arg0.getUrl() == null) {
throw new IllegalArgumentException(
"com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
}
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if (extName == null) {
throw new IllegalStateException(
"Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" +
url.toString() + ") use keys([proxy])");
}
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class)
.getExtension(extName);
return extension.getProxy(arg0);
}
public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0,
java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2)
throws java.lang.Object {
if (arg2 == null) {
throw new IllegalArgumentException("url == null");
}
com.alibaba.dubbo.common.URL url = arg2;
String extName = url.getParameter("proxy", "javassist");
if (extName == null) {
throw new IllegalStateException(
"Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" +
url.toString() + ") use keys([proxy])");
}
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class)
.getExtension(extName);
return extension.getInvoker(arg0, arg1, arg2);
}
}
Dubbo 负载均衡策略
Dubbo提供了四种负载均衡策略,分别是ConsistentHash, LeastActive, Random和RoundRobin。
ConsistentHashLoadBalance
一致性Hash,请求参数相同的请求将会路由到同一个提供者,有两个可配参数
hash.nodes
和hash.arguments
,分别表示单个Invoker的虚拟节点数量和请求参数的index,后者用来做hash算法的key值。一致性hash算法使用TreeMap
实现,select部分的关键代码如下:LeastActiveLoadBalance
官网是这么解释该算法的:
根据代码的处理逻辑,
活跃数
应该被描述成当前该Invoker正在处理的请求数,活跃计数参数active
的数值可以在ActiveLimitFilter
中查看:也就说,Invoker性能越好,处理时间越短,
active
数值越小。而LeastActiveLoadBalance
正是通过统计所有可用Invoker的active最小值,再结合权重weight
选取合适的Invoker。最终的效果就是,能力越大,责任越大,随着请求源源不断的到来,崩盘的可能性也就越大了。RandomLoadBalance
请求随机分配。如果权重相同,则均等随机,否则,按照总的权重随机。
RoundRobinLoadBalance
轮询调度。Dubbo采用加权的轮询调度算法,但是具体的算法暂时没看懂,是否能达到加权的效果依然存疑。