普通的hash算法有个很大的问题:当hash的"模数"发生变化时,整个hash数据结构就需要重新hash,重新hash之后的数据分布一定会和hash之前的不同;在很多场景下,"模数"的变化时必然的,但是这种"数据分布"的巨大变化却会带来一些麻烦.所以,就有了"一致性hash",当然学术界对"一致性hash"的阐述,还远远不止这些.
在编程应用方面,最直观的例子就是"分布式缓存",一个cache集群中,有N台物理server,为了提升单台server的支撑能力,可能会考虑将数据通过hash的方式相对均匀的分布在每个server上.
判定方式: location = hashcode(key) % N;事实上,由于需要,N可能会被增加或者削减,不管程序上是否能够妥善的支持N的变更,单从"数据迁移"的面积而言,也是非常大的.
一致性Hash很巧妙的简化了这个问题,同时再使用"虚拟节点"的方式来细分数据的分布.
F1
图示中表名,有2个物理server节点,每个物理server节点有多个Snode虚拟节点,server1和server2的虚拟节点互相"穿插"且依次排列,每个snode都有一个code,它表示接受数据的hashcode起始值(或终止值),比如上述图示中第一个snode.code为500,即当一个数据的hashcode值在[0,500]时则会被存储在它上.
引入虚拟节点之后,事情就会好很多;假如KEY1分布在Snode3上,snode3事实为物理server1,当server1故障后,snode2也将被移除,那么KEY1将会被分布在"临近的"虚拟节点上--snode2(或者snode4,由实现而定);无论是存取,下一次对KEY1的操作将会有snode2(或snode4)来服务.
1) 一个物理server在逻辑上分成N个虚拟节点(本例中为256个)
2) 多个物理server的虚拟节点需要散列分布,即互相"穿插".
3) 所有的虚拟节点,在逻辑上形成一个链表
4) 每个虚拟节点,负责一定区间的hashcode值.
import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketAddress; import java.nio.charset.Charset; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; public class ServersPool { private static final int VIRTUAL_NODES_NUMBER = 256;//物理节点对应的虚拟节点的个数 private static final String TAG = ".-vtag-."; private NavigableMap<Long, SNode> innerPool = new TreeMap<Long, SNode>(); private Hashing hashing = new Hashing(); /** * 新增物理server地址 * @param address * @param weight * 权重,权重越高,其虚拟节点的个数越高,事实上没命中的概率越高 * @throws Exception */ public synchronized void addServer(String address,int weight) throws Exception { SNode prev = null; SNode header = null; String[] tmp = address.split(":"); InnerServer server = new InnerServer(tmp[0], Integer.parseInt(tmp[1])); server.init(); //将一个address下的所有虚拟节点SNode形成链表,可以在removeServer,以及 //特殊场景下使用 int max = 1; if(weight > 0){ max = VIRTUAL_NODES_NUMBER * weight; } for (int i = 0; i < max; i++) { long code = hashing.hash(address + TAG + i); SNode current = new SNode(server, code); if (header == null) { header = current; } current.setPrev(prev); innerPool.put(code, current); prev = current; } } /** * 删除物理server地址,伴随着虚拟节点的删除 * @param address */ public synchronized void removeServer(String address) { long code = hashing.hash(address + TAG + (VIRTUAL_NODES_NUMBER - 1)); SNode current = innerPool.get(code); if(current == null){ return; } if(!current.getAddress().equalsIgnoreCase(address)){ return; } current.getServer().close();; while (current != null) { current = innerPool.remove(current.getCode()).getPrev(); } } /** * 根据指定的key,获取此key应该命中的物理server信息 * @param key * @return */ public InnerServer getServer(String key) { long code = hashing.hash(key); SNode snode = innerPool.lowerEntry(code).getValue(); if (snode == null) { snode = innerPool.firstEntry().getValue(); } return snode.getServer(); } /** * 虚拟节点描述 */ class SNode { Long code; InnerServer server; SNode prev; SNode(InnerServer server, Long code) { this.server = server; this.code = code; } SNode getPrev() { return prev; } void setPrev(SNode prev) { this.prev = prev; } Long getCode() { return this.code; } InnerServer getServer() { return server; } String getAddress(){ return server.ip + ":" + server.port; } } /** * hashcode生成 */ class Hashing { //少量优化性能 private ThreadLocal<MessageDigest> md5Holder = new ThreadLocal<MessageDigest>(); private Charset DEFAULT_CHARSET = Charset.forName("utf-8"); public long hash(String key) { return hash(key.getBytes(DEFAULT_CHARSET)); } public long hash(byte[] key) { try { if (md5Holder.get() == null) { md5Holder.set(MessageDigest.getInstance("MD5")); } } catch (NoSuchAlgorithmException e) { throw new IllegalStateException("no md5 algorythm found"); } MessageDigest md5 = md5Holder.get(); md5.reset(); md5.update(key); byte[] bKey = md5.digest(); long res = ((long) (bKey[3] & 0xFF) << 24) | ((long) (bKey[2] & 0xFF) << 16) | ((long) (bKey[1] & 0xFF) << 8) | (long) (bKey[0] & 0xFF); return res; } } /** * 与物理server的TCP链接,用于实际的IO操作 */ class InnerServer { String ip; int port; Socket socket; InnerServer(String ip, int port) { this.ip = ip; this.port = port; } synchronized void init() throws Exception { SocketAddress socketAddress = new InetSocketAddress(ip, port); socket = new Socket(); socket.connect(socketAddress, 30000); } public boolean write(byte[] sources) { //TODO return true; } public byte[] read() { //TODO return new byte[]{}; } public void close(){ if(socket == null || socket.isClosed()){ return; } try{ socket.close(); } catch (Exception e){ // } } } }
相关推荐
一致性 hash 算法介绍
Ketama算法是一致性hash算法的一个优秀实现。增删节点后数据命中率及均分率都很高。
一致性hash应用于负载均衡算法,本实现由C++语言开发。 一致性hash算法提出了在动态变化的Cache环境中,判定哈希算法好坏的四个定义: 1、平衡性(Balance)2、单调性(Monotonicity) 3、分散性(Spread)4、负载(Load)
IT面试常见的题目,对于分布式存储系统中常碰到的故障问题,如何解决,就是采用一致性hash算法
一致性hash算法
一致性Hash算法的原理及实现
一致性hash算法简介
一致性hash算法简介加C++实现
简单模拟实现一致性Hash,透过虚拟节点映射至实际结点,解决一致性Hash的单调性和平衡性问题。
摘要视图订阅登录 | 注册算法艺术(8)1004760次第1338名90篇16篇4篇595条一致性hash算法 - consistent hashing - s
一致性hash算法实现有两个关键问题需要解决,一个是用于结点存储和查找的数据结构的选择,另一个是结点hash算法的选择。 首先来谈一下一致性hash算法中用于存储结点的数据结构。通过了解一致性hash的原理,我们...
如果没有找到,则取整个环的第个节点。测试结果测试代码是整理的,主体法没有变分布平均性测试:测试随机成的众多key是否会平均分布到各个结点上测试结果如下:最上是参
主要介绍了PHP实现的一致性Hash算法,结合实例形式详细分析了php一致性Hash算法的概念、原理及相关实现与使用技巧,需要的朋友可以参考下
MurmurHash算法由Austin Appleby创建于2008年,现已应用到Hadoop、libstdc 、nginx、libmemcached,Redis,Memcached,Cassandra,HBase,Lucene等开源系统。2011年Appleby被Google雇佣,随后Google推出其变种的...
一致性哈希算法,广泛应用于分布式计算,C版实现,属于分布式服务器管理的核心算法。
一致性哈希算法的php版,希望能帮到phper了解一致性哈希
c++实现的一致性hash算法库,可以直接编译成静态库、动态库,包含demo程序。
随着虚拟节点的增加,数据量分配就比较平均了,但是并不是虚拟节点数量越多就越好,因为要考虑这些虚拟节点带来的性能开销以及算法的复杂性;
对已有的负载平衡的改进算法,通过一致性哈希算法,负载平衡更加的有效。