Open afredlyj opened 8 years ago
使用Hystrix
监控依赖服务,并保证自己本身服务可用。
发现自己对Pipeline理解有误,代码在这里。
这段代码的本意是,根据不同的url,判断是否需要添加业务线程池。
问题在于,Channel和Pipeline是一一对应的,如果是HTTP1.1,不同的Http接口可能会共用同一个Pipeline,而此时Pipeline中的Handler在处理第一次请求之后就固定下来,所以没法达到想要的效果。
4.10之后NioEventLoop
的taskQueue使用了MpscLinkedQueue,替换了老版本的ConcurrentLinkedQueue队列。
NioEventLoop 是典型的多生产者单消费者模型,taskQueue调用的频率非常高,采用无锁的MpscLinkedQueue
对性能提升很大。
参考文档:http://blog.csdn.net/youaremoon/article/details/50351929
Netty 内存池的PoolChunk有两个核心概念:
一个chunk的可用内存大小为:
chunkSize = pageSize * (2 ^ maxOrder)
其中pageSize是一个page的内存大小,maxOrder是一个chunk的最大深度(从0开始计数)。
Netty 通过构建一个完整的平衡二叉树管理内存池。一个默认的chunk由2048个page组成,一个page的大小为8192(即8k),maxOrder为11,最后一层的节点数量即page的数量。刚开始分析时,我以为每个二叉树的节点即为一个page,是理解错误,引进二叉树是为了方便管理内存。二叉树中每个节点管理的内存包括该节点所有子节点管理的内存,文字描述可能难以理解,借用java doc的描述如下,括号中的数据表示该层每个节点管理的内存大小:
* depth=0 1 node (chunkSize)
* depth=1 2 nodes (chunkSize/2)
* ..
* ..
* depth=d 2^d nodes (chunkSize/2^d)
* ..
* depth=maxOrder 2^maxOrder nodes (chunkSize/2^{maxOrder} = pageSize)
*
* depth=maxOrder is the last level and the leafs consist of pages
如果一个节点上存在已经被分配的子节点,那么该节点不能再被分配。比如对于节点2(depth=1),如果其子节点4(depth=2)已经被分配,那么节点2不能再被分配,因为节点2的可分配内存已经小于chunkSize/2。
每次申请内存时,内存大小都是2的N次方,为了标记二叉树的状态,引入一个数组memoryMap
:
* Algorithm:
* ----------
* Encode the tree in memoryMap with the notation
* memoryMap[id] = x => in the subtree rooted at id, the first node that is free to be allocated
* is at depth x (counted from depth=0) i.e., at depths [depth_of_id, x), there is no node that is free
*
* As we allocate & free nodes, we update values stored in memoryMap so that the property is maintained
*
* Initialization -
* In the beginning we construct the memoryMap array by storing the depth of a node at each node
* i.e., memoryMap[id] = depth_of_id
*
* Observations:
* -------------
* 1) memoryMap[id] = depth_of_id => it is free / unallocated
* 2) memoryMap[id] > depth_of_id => at least one of its child nodes is allocated, so we cannot allocate it, but
* some of its children can still be allocated based on their availability
* 3) memoryMap[id] = maxOrder + 1 => the node is fully allocated & thus none of its children can be allocated, it
* is thus marked as unusable
接下来看看PoolChunk的初始化:
PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
unpooled = false;
this.arena = arena;
// // memory是一个容量为chunkSize的byte[](heap方式)或ByteBuffer(direct方式)
this.memory = memory;
// 每个page的大小,默认为8192
this.pageSize = pageSize;
// 13, 2 ^ 13 = 8192
this.pageShifts = pageShifts;
// 默认 8192 << 11 = 16MB
this.chunkSize = chunkSize;
// -8192
subpageOverflowMask = ~(pageSize - 1);
freeBytes = chunkSize;
// 2 ^ 11
int chunkSizeInPages = chunkSize >>> pageShifts;
// 2048, 最多能被分配的Subpage个数
maxSubpageAllocs = 1 << maxOrder;
// Generate the memory map.
// 4096 = 2 ^ 11 * 2 = 2 ^ 12
// 整个完整二叉树的节点数量 : 2 ^ 12 - 1
// 所以memoryMap数组有一个多余节点
memoryMap = new byte[maxSubpageAllocs << 1];
depthMap = new byte[memoryMap.length];
// 从第二个数组元素起初始化
int memoryMapIndex = 1;
for (int d = 0; d <= maxOrder; ++ d) { // move down the tree one level at a time
int depth = 1 << d;
for (int p = 0; p < depth; ++ p) {
// in each level traverse left to right and set value to the depth of subtree
memoryMap[memoryMapIndex] = (byte) d;
depthMap[memoryMapIndex] = (byte) d;
memoryMapIndex ++;
}
}
subpages = newSubpageArray(maxSubpageAllocs);
}
申请内存时,代码如下:
long allocate(int normCapacity) {
if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize
return allocateRun(normCapacity);
} else {
return allocateSubpage(normCapacity);
}
}
内存分配分两种情况:申请的内存大于pageSize
和小于pageSize
。
先看大于的情况:
private long allocateRun(int normCapacity) {
// 计算目标节点所在的深度
int d = maxOrder - (log2(normCapacity) - pageShifts);
// 根据深度从memeoryMap中获取node id
int id = allocateNode(d);
if (id < 0) {
return id;
}
freeBytes -= runLength(id);
return id;
}
最重要的逻辑在方法allocateNode
中:
private int allocateNode(int d) {
int id = 1;
// memoryMap数组从memoryMap[1]启用,即id=1
int initial = - (1 << d); // has last d bits = 0 and rest all = 1
byte val = value(id);
// 根据上文memoryMap[id] = x 的定义,此种情况表示从节点id所在层级到d层都没有可用节点
if (val > d) { // unusable
return -1;
}
// 该chunk还存在可用节点
while (val < d || (id & initial) == 0) { // id & initial == 1 << d for all ids at depth d, for < d it is 0
id <<= 1;
val = value(id);
if (val > d) {
id ^= 1;
val = value(id);
}
}
byte value = value(id);
assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",
value, id & initial, d);
setValue(id, unusable); // mark as unusable
// 更新父节点信息
updateParentsAlloc(id);
return id;
}
allocateNode
返回二叉树中可用节点的id,并更新该节点和父节点的信息。
未完待续。
耗时
转入业务线程池后,怎么合理计算接口耗时?
尝试用btrace监控接口耗时。
初步代码在这里。