Open rooobot opened 4 years ago
首先,声明一个哈希函数的接口:
public interface HashFunction {
/**
* Calculate input key's hash value.
*
* @param key key for calculated
* @return hash value
*/
long hash(String key);
}
我们只关注给定一个指定的key
,返回哈希函数计算后的哈希值,所以只需要声明一个方法。
基于MurmurHash2
的实现:
public class MurmurHash2Function implements HashFunction {
/**
* MurmurHash2 implement
*
* @param key key for calculated
* @return hash value
*/
@Override
public long hash(String key) {
ByteBuffer buf = ByteBuffer.wrap(key.getBytes());
int seed = 0x1234ABCD;
ByteOrder byteOrder = buf.order();
buf.order(ByteOrder.LITTLE_ENDIAN);
long m = 0xc6a4a7935bd1e995L;
int r = 47;
long h = seed ^ (buf.remaining() * m);
long k;
while (buf.remaining() >= 8) {
k = buf.getLong();
k *= m;
k ^= k >>> r;
k *= m;
h ^= k;
h *= m;
}
if (buf.remaining() > 0) {
ByteBuffer finish = ByteBuffer.allocate(8).order(
ByteOrder.LITTLE_ENDIAN);
// for big-endian version, do this first:
// finish.position(8-buf.remaining());
finish.put(buf).rewind();
h ^= finish.getLong();
h *= m;
}
h ^= h >>> r;
h *= m;
h ^= h >>> r;
buf.order(byteOrder);
return h;
}
}
KV
数据抽象为下面的接口:
public interface KvData<K,V> {
/**
* KV data's key, it can be use to do mapping operation with hash ring.
*
* @return kv data's key
*/
K key();
/**
* KV data's value.
* because we don't concern about this value in the question,
* so implement by default with return null value.
*
* @return kv data's value
*/
default V value() {
return null;
}
}
问题只需要知道KV
数据的分布和分布数量的标准差,不需要关注KV
数据的value
,所以接口中直接以默认的value()
方法实现。
题目中指定了是10
台服务器,对于服务器节点,我们只需要关注它的唯一标识的key
即可,所以抽象为如下接口:
public interface ServerNode {
/**
* Server node identify key for mapping to hash ring
*
* @return node key value
*/
String nodeKey();
}
核心的接口已经齐了,需要编码实现一致性哈希的映射类了,代码如下:
public class ConsistentHashMapper {
private final HashFunction hashFunction;
private final int virtualNodeNum;
private TreeMap<Long, ServerNode> hashRing = new TreeMap<>();
public ConsistentHashMapper(HashFunction hashFunction) {
this(hashFunction, 0);
}
public ConsistentHashMapper(HashFunction hashFunction, int virtualNodeNum) {
if (virtualNodeNum < 0 || virtualNodeNum > 1000) {
throw new IllegalArgumentException("virtual node number must between 0 and 1000");
}
this.hashFunction = hashFunction;
this.virtualNodeNum = virtualNodeNum;
}
/**
* Mapping kvData's key to hash ring,
* and return the mapped ServerNode.
*
* @param nodeList ServerNode list
* @param kvData KvData for mapping
* @return ServerNode mapped by kvData
*/
public ServerNode mapping(List<ServerNode> nodeList, KvData<String, Object> kvData) {
if (nodeList == null || nodeList.isEmpty()) {
throw new IllegalArgumentException("node list can not be null or empty");
}
if (kvData == null || kvData.key() == null || "".equals(kvData.key())) {
throw new IllegalArgumentException("kvData's hashKey is null or empty");
}
long hashValue = hashFunction.hash(kvData.key());
if (hashRing == null || hashRing.isEmpty()) {
hashRing = buildHashRing(nodeList);
}
return locate(hashRing, hashValue);
}
/**
* Addressing for hashValue in hash ring, return the first mapped ServerNode.
* if not mapped util to the end, return the first ServerNode in the hash ring.
*
* @param hashRing hash ring with mapped ServerNode list
* @param hashValue hash value need to addressing
* @return ServerNode
*/
private ServerNode locate(TreeMap<Long, ServerNode> hashRing, long hashValue) {
Map.Entry<Long, ServerNode> entry = hashRing.ceilingEntry(hashValue);
if (entry == null) {
entry = hashRing.firstEntry();
}
return entry.getValue();
}
/**
* Build the hash ring with input ServerNode list if hash ring not be built.
* If #{code virtualNodeNum} larger than zero, the hash ring will build with virtual nodes.
* The result is a TreeMap<Long, ServerNode>,
* the map key is ServerNode's #{code HashFunction.hash(ServerNode.nodeKey())},
* and the map value is ServerNode.
*
* @param nodeList ServerNode list
* @return ServerNode mapping info map
*/
private TreeMap<Long, ServerNode> buildHashRing(List<ServerNode> nodeList) {
TreeMap<Long, ServerNode> virtualNodeMap = new TreeMap<>();
// no virtual node number set, just add real node
if (virtualNodeNum <= 0) {
addRealNodes(virtualNodeMap, nodeList);
} else {
addVirtualNodes(virtualNodeMap, nodeList);
}
return virtualNodeMap;
}
private void addRealNodes(TreeMap<Long, ServerNode> virtualNodeMap, List<ServerNode> nodeList) {
for (ServerNode node : nodeList) {
virtualNodeMap.put(hashFunction.hash(node.nodeKey()), node);
}
}
private void addVirtualNodes(TreeMap<Long, ServerNode> virtualNodeMap, List<ServerNode> nodeList) {
for (ServerNode node : nodeList) {
for (int i = 0; i < virtualNodeNum; i++) {
// all mapped node is virtual node
virtualNodeMap.put(hashFunction.hash(node.nodeKey() + String.format("%03d", i)), node);
}
}
}
}
我们会对每种哈希函数的实现进行多组测试,所以ConsistentHashMapper
有一个virtualNodeNum
属性,用于指定虚拟节点的数量,默认为0
表示全部映射真实的节点。(我这里限定了一个virtualNodeNum
值的范围为0~1000
,这个不是必须的,并且最大数量可以更大)
哈希函数和虚拟节点的数量是通过ConsistentHashMapper
类的构造函数来指定的。
ConsistentHashMapper#mapper()
方法接受两个参数:服务器节点的列表,以及要寻址的KV
数据,返回的结果就是映射到的服务节点。
另外,需要计算标准差,所以增加了StatisticUtils
类实现标准关的计算:
public class StatisticUtils {
/**
* Standard Deviation algorithm
*
* @param dataArr data array
* @return standard deviation result
*/
public static double stdev(Long[] dataArr){
double sum = 0.0;
double mean = 0.0;
double num = 0.0;
double numi = 0.0;
for (long i : dataArr) {
sum+=i;
}
mean = sum/dataArr.length;
for (long i : dataArr) {
numi = Math.pow(((double) i - mean), 2.0D);
num+=numi;
}
return Math.sqrt(num/dataArr.length);
}
}
10
节点100
万KV
数据测试不均衡性以下是测试用例:
public class TestConsistentHashMapper {
/**
* Mock ServerNode list
*/
private static final List<ServerNode> nodeList = Arrays.asList(
() -> newServerNodeKey("server01"),
() -> newServerNodeKey("server02"),
() -> newServerNodeKey("server03"),
() -> newServerNodeKey("server04"),
() -> newServerNodeKey("server05"),
() -> newServerNodeKey("server06"),
() -> newServerNodeKey("server07"),
() -> newServerNodeKey("server08"),
() -> newServerNodeKey("server09"),
() -> newServerNodeKey("server10")
);
private static String newServerNodeKey(String name) {
assertNotNull("server name can not be null", name);
return name;
}
private void initStatisticMap(List<ServerNode> nodeList, AtomicLongMap<String> statisticMap) {
for(ServerNode node : nodeList) {
statisticMap.put(node.nodeKey(), 0L);
}
}
//// test MurmurHash2 start ////
@Test
public void testMurmurHash2WithoutVirtualNode() {
System.out.println("MurmurHash2 with no virtual node: ");
ConsistentHashMapper hashMapper = new ConsistentHashMapper(new MurmurHash2Function());
process(hashMapper, AtomicLongMap.create());
}
@Test
public void testMurmurHash2With10VirtualNode() {
System.out.println("MurmurHash2 with 10 virtual node: ");
ConsistentHashMapper hashMapper = new ConsistentHashMapper(new MurmurHash2Function(), 10);
process(hashMapper, AtomicLongMap.create());
}
@Test
public void testMurmurHash2With30VirtualNode() {
System.out.println("MurmurHash2 with 30 virtual node: ");
ConsistentHashMapper hashMapper = new ConsistentHashMapper(new MurmurHash2Function(), 30);
process(hashMapper, AtomicLongMap.create());
}
@Test
public void testMurmurHash2With50VirtualNode() {
System.out.println("MurmurHash2 with 50 virtual node: ");
ConsistentHashMapper hashMapper = new ConsistentHashMapper(new MurmurHash2Function(), 50);
process(hashMapper, AtomicLongMap.create());
}
@Test
public void testMurmurHash2With80VirtualNode() {
System.out.println("MurmurHash2 with 80 virtual node: ");
ConsistentHashMapper hashMapper = new ConsistentHashMapper(new MurmurHash2Function(), 80);
process(hashMapper, AtomicLongMap.create());
}
@Test
public void testMurmurHash2With100VirtualNode() {
System.out.println("MurmurHash2 with 100 virtual node: ");
ConsistentHashMapper hashMapper = new ConsistentHashMapper(new MurmurHash2Function(), 100);
process(hashMapper, AtomicLongMap.create());
}
@Test
public void testMurmurHash2With200VirtualNode() {
System.out.println("MurmurHash2 with 200 virtual node: ");
ConsistentHashMapper hashMapper = new ConsistentHashMapper(new MurmurHash2Function(), 200);
process(hashMapper, AtomicLongMap.create());
}
@Test
public void testMurmurHash2With500VirtualNode() {
System.out.println("MurmurHash2 with 500 virtual node: ");
ConsistentHashMapper hashMapper = new ConsistentHashMapper(new MurmurHash2Function(), 500);
process(hashMapper, AtomicLongMap.create());
}
@Test
public void testMurmurHash2With800VirtualNode() {
System.out.println("MurmurHash2 with 800 virtual node: ");
ConsistentHashMapper hashMapper = new ConsistentHashMapper(new MurmurHash2Function(), 800);
process(hashMapper, AtomicLongMap.create());
}
@Test
public void testMurmurHash2With1000VirtualNode() {
System.out.println("MurmurHash2 with 1000 virtual node: ");
ConsistentHashMapper hashMapper = new ConsistentHashMapper(new MurmurHash2Function(), 1000);
process(hashMapper, AtomicLongMap.create());
}
//// test MurmurHash2 end ////
private void process(ConsistentHashMapper hashMapper, AtomicLongMap<String> statisticMap) {
initStatisticMap(nodeList, statisticMap);
processMappingWithLoop(hashMapper, statisticMap, 1000000);
prettyPrintStatisticMap(statisticMap);
}
private void processMappingWithLoop(ConsistentHashMapper hashMapper, AtomicLongMap<String> statisticMap, int timesOfLoop) {
for (int i = 0; i < timesOfLoop; i++) {
ServerNode node = hashMapper.mapping(nodeList, () -> UUID.randomUUID().toString().replaceAll("-", ""));
statisticMap.getAndIncrement(node.nodeKey());
}
}
private void prettyPrintStatisticMap(AtomicLongMap<String> statisticMap) {
statisticMap.asMap().entrySet().stream().sorted(Map.Entry.comparingByValue()).forEach(System.out::println);
System.out.print("Standard Deviation: ");
System.out.println(StatisticUtils.stdev(statisticMap.asMap().values().toArray(new Long[]{})));
System.out.println("\n");
}
}
test group | Standard Deviation |
---|---|
MurmurHash2 no virtual node | 76852.16470601202 |
MurmurHash2 10 virtual node | 40531.410579944044 |
MurmurHash2 30 virtual node | 19445.72150885639 |
MurmurHash2 50 virtual node | 14074.121905113654 |
MurmurHash2 80 virtual node | 16471.37915294284 |
MurmurHash2 100 virtual node | 11735.359227565214 |
MurmurHash2 200 virtual node | 5708.017168859954 |
MurmurHash2 500 virtual node | 3721.3552370070774 |
MurmurHash2 800 virtual node | 4179.462453474131 |
MurmurHash2 1000 virtual node | 3005.045091175838 |
从测试结果来看,虚拟节点能有效的均摊哈希key
的均衡性。要想获取更好的均衡性,需要考虑如何将节点均衡的映射到哈希环上,并且,还要结合所有可能的key
的哈希值来考虑最终key
对应的哈希值与节点的映射分布。
下图是标准差的拆线图,横轴为节点数量,纵轴为标准差的值:
问题:
100
万KV
数据,10
个服务器节点的情况下,计算这些KV
数据在服务器上分布数量的标准差,以评估算法的存储负载不均衡性。