Java实现一致性哈希算法,并搭建环境测试其负载均衡特性(二)

作者阿里云代理 文章分类 分类:linux图文教程 阅读次数 已被围观 986

II. 创建服务器集群, 供应RPC远程调用服务

  1. 首要创建一个服务器项目(运用Maven), 增加zookeeper依赖
  2. 创建常量接口, 用于存储联接zookeeper的信息
  3. public interface Constant {
  4.    //zookeeper集群的地址
  5.    String ZK_HOST = "192.168.117.129:2181,192.168.117.129:2182,192.168.117.129:2183";
  6.    //联接zookeeper的超时时刻
  7.    int ZK_TIME_OUT = 5000;
  8.    //服务器所发布的远程服务在zookeeper中的注册地址, 也便是说这个节点中保存了各个服务器供应的接口
  9.    String ZK_REGISTRY = "/provider";
  10.    //zookeeper集群中注册服务的url地址的瞬时节点
  11.    String ZK_RMI = ZK_REGISTRY + "/rmi";
  12. }

3.封装操作zookeeper和发布远程服务的接口供自己调用, 本事例中发布远程服务运用Java本身供应的rmi包完结, 假设没有了解过可以参阅这篇

  1. public class ServiceProvider {

  2.    private CountDownLatch latch = new CountDownLatch(1);

  3.    /**
  4.     * 联接zookeeper集群
  5.     */
  6.    public ZooKeeper connectToZK(){
  7.        ZooKeeper zk = null;
  8.        try {
  9.            zk = new ZooKeeper(Constant.ZK_HOST, Constant.ZK_TIME_OUT, new Watcher() {
  10.                @Override
  11.                public void process(WatchedEvent watchedEvent) {
  12.                    //假设联接上了就唤醒其时线程.
  13.                    latch.countDown();
  14.                }
  15.            });
  16.            latch.await();//还没联接上时其时线程等候
  17.        } catch (Exception e) {
  18.            e.printStackTrace();
  19.        }
  20.        return zk;
  21.    }

  22.    /**
  23.     * 创建znode节点
  24.     * @param zk
  25.     * @param url 节点中写入的数据
  26.     */
  27.    public void createNode(ZooKeeper zk, String url){
  28.        try{
  29.            //要把写入的数据转化为字节数组
  30.            byte[] data = url.getBytes();
  31.            zk.create(Constant.ZK_RMI, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
  32.        } catch (Exception e) {
  33.            e.printStackTrace();
  34.        }
  35.    }

  36.    /**
  37.     * 发布rmi服务
  38.     */
  39.    public String publishService(Remote remote, String host, int port){
  40.        String url = null;
  41.        try{
  42.            LocateRegistry.createRegistry(port);
  43.            url = "rmi://" + host + ":" + port + "/rmiService";
  44.            Naming.bind(url, remote);
  45.        } catch (Exception e) {
  46.            e.printStackTrace();
  47.        }
  48.        return url;
  49.    }

  50.    /**
  51.     * 发布rmi服务, 而且将服务的url注册到zookeeper集群中
  52.     */
  53.    public void publish(Remote remote, String host, int port){
  54.        //调用publishService, 得到服务的url地址
  55.        String url = publishService(remote, host, port);
  56.        if(null != url){
  57.            ZooKeeper zk = connectToZK();//联接到zookeeper
  58.            if(null != zk){
  59.                createNode(zk, url);
  60.            }
  61.        }
  62.    }
  63. }
  64. 自定义远程服务. 服务供应一个简略的方法: 客户端发来一个字符串, 服务器在字符串前面增加上Hello, 并回来字符串。
  65. //UserService
  66. public interface UserService extends Remote {
  67.    public String helloRmi(String name) throws RemoteException;
  68. }
  69. //UserServiceImpl
  70. public class UserServiceImpl implements UserService {

  71.    public UserServiceImpl() throws RemoteException{
  72.        super();
  73.    }

  74.    @Override
  75.    public String helloRmi(String name) throws RemoteException {
  76.        return "Hello " + name + "!";
  77.    }
  78. }
  79. 修改端口号, 发起多个java虚拟机, 仿照服务器集群. 为了便当演示, 自定义7777, 8888, 9999端口打开3个服务器进程, 到时会仿照7777端口的服务器宕机和批改重连。
  80. public static void main(String[] args) throws RemoteException {
  81.    //创建东西类方针
  82.    ServiceProvider sp = new ServiceProvider();
  83.    //创建远程服务方针
  84.    UserService userService = new UserServiceImpl();
  85.    //完结发布
  86.    sp.publish(userService, "localhost", 9999);
  87. }

8.jpg

III. 编写客户端程序(运用一致性哈希算法完结负载均衡

  1. 封装客户端接口:
  2. public class ServiceConsumer {
  3.    /**
  4.     * 供应远程服务的服务器列表, 只记录远程服务的url
  5.     */
  6.    private volatile Listurls = new LinkedList<>();
  7.    /**
  8.     * 远程服务对应的虚拟节点调集
  9.     */
  10.    private static TreeMap virtualNodes = new TreeMap<>();

  11.    public ServiceConsumer(){
  12.        ZooKeeper zk = connectToZK();//客户端联接到zookeeper
  13.        if(null != zk){
  14.            //联接上后重视zookeeper中的节点改动(服务器改动)
  15.            watchNode(zk);
  16.        }
  17.    }

  18.    private void watchNode(final ZooKeeper zk) {
  19.        try{
  20.            //调查/provider节点下的子节点是否有改动(是否有服务器登入或登出)
  21.            ListnodeList = zk.getChildren(Constants.ZK_REGISTRY, new Watcher() {
  22.                @Override
  23.                public void process(WatchedEvent watchedEvent) {
  24.                    //假设服务器节点有改动就从头获取
  25.                    if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged){
  26.                        System.out.println("服务器端有改动, 或许有旧服务器宕机或者新服务器参与集群...");
  27.                        watchNode(zk);
  28.                    }
  29.                }
  30.            });
  31.            //将获取到的服务器节点数据保存到调集中, 也便是获得了远程服务的访问url地址
  32.            ListdataList = new LinkedList<>();
  33.            TreeMap newVirtualNodesList = new TreeMap<>();
  34.            for(String nodeStr : nodeList){
  35.                byte[] data = zk.getData(Constants.ZK_REGISTRY + "/" + nodeStr, false, null);
  36.                //放入服务器列表的url
  37.                String url = new String(data);
  38.                //为每个服务器分配虚拟节点, 为了便当仿照, 默许打开在9999端口的服务器功用较差, 只分配300个虚拟节点, 其他分配1000个.
  39.                if(url.contains("9999")){
  40.                    for(int i = 1; i <= 300; i++){
  41.                        newVirtualNodesList.put(FVNHash(url + "@" + i), url + "@" + i);
  42.                    }
  43.                }else{
  44.                    for(int i = 1; i <= 1000; i++){
  45.                        newVirtualNodesList.put(FVNHash(url + "@" + i), url + "@" + i);
  46.                    }
  47.                }
  48.                dataList.add(url);
  49.            }
  50.            urls = dataList;
  51.            virtualNodes = newVirtualNodesList;
  52.            dataList = null;//好让废物收回器赶快收集
  53.            newVirtualNodesList = null;
  54.        } catch (Exception e) {
  55.            e.printStackTrace();
  56.        }
  57.    }

  58.    /**
  59.     * 依据url获得远程服务方针
  60.     */
  61.    publicT lookUpService(String url){
  62.        T remote = null;
  63.        try{
  64.            remote = (T)Naming.lookup(url);
  65.        } catch (Exception e) {
  66.            //假设该url联接不上, 很有或许是该服务器挂了, 这时运用服务器列表中的第一个服务器url从头获取远程方针.
  67.            if(e instanceof ConnectException){
  68.                if (urls.size() != 0){
  69.                    url = urls.get(0);
  70.                    return lookUpService(url);
  71.                }
  72.            }
  73.        }
  74.        return remote;
  75.    }

  76.    /**
  77.     * 通过一致性哈希算法, 选取一个url, 终究回来一个远程服务方针
  78.     */
  79.    publicT lookUp(){
  80.        T service = null;
  81.        //随机核算一个哈希值
  82.        int hash = FVNHash(Math.random() * 10000 + "");
  83.        //得到大于该哈希值的一切map调集
  84.        SortedMap subMap = virtualNodes.tailMap(hash);
  85.        //找到比该值大的第一个虚拟节点, 假设没有比它大的虚拟节点, 依据哈希环, 则回来第一个节点.
  86.        Integer targetKey = subMap.size() == 0 ? virtualNodes.firstKey() : subMap.firstKey();
  87.        //通过该虚拟节点获得服务器url
  88.        String virtualNodeName = virtualNodes.get(targetKey);
  89.        String url = virtualNodeName.split("@")[0];
  90.        //依据服务器url获取远程服务方针
  91.        service = lookUpService(url);
  92.        System.out.print("供应本次服务的地址为: " + url + ", 回来效果: ");
  93.        return service;
  94.    }

  95.    private CountDownLatch latch = new CountDownLatch(1);

  96.    public ZooKeeper connectToZK(){
  97.        ZooKeeper zk = null;
  98.        try {
  99.            zk = new ZooKeeper(Constants.ZK_HOST, Constants.ZK_TIME_OUT, new Watcher() {
  100.                @Override
  101.                public void process(WatchedEvent watchedEvent) {
  102.                    //判断是否联接zk集群
  103.                    latch.countDown();//唤醒处于等候情况的其时线程
  104.                }
  105.            });
  106.            latch.await();//没有联接上的时分其时线程处于等候情况.
  107.        } catch (IOException e) {
  108.            e.printStackTrace();
  109.        } catch (InterruptedException e) {
  110.            e.printStackTrace();
  111.        }
  112.        return zk;
  113.    }


  114.    public static int FVNHash(String data){
  115.        final int p = 16777619;
  116.        int hash = (int)2166136261L;
  117.        for(int i = 0; i < data.length(); i++)
  118.            hash = (hash ^ data.charAt(i)) * p;
  119.        hash += hash << 13;
  120.        hash ^= hash >> 7;
  121.        hash += hash << 3;
  122.        hash ^= hash >> 17;
  123.        hash += hash << 5;
  124.        return hash < 0 ? Math.abs(hash) : hash;
  125.    }
  126. }
  127. 发起客户端进行测验:
  128. public static void main(String[] args){
  129.    ServiceConsumer sc = new ServiceConsumer();//创建东西类方针
  130.    while(true){
  131.        //获得rmi远程服务方针
  132.        UserService userService = sc.lookUp();
  133.        try{
  134.            //调用远程方法
  135.            String result = userService.helloRmi("炭烧生蚝");
  136.            System.out.println(result);
  137.            Thread.sleep(100);
  138.        }catch(Exception e){
  139.            e.printStackTrace();
  140.        }
  141.    }
  142. }
  143. 客户端跑起来后, 在显示台不断进行打印...下面将对数据进行核算。

9.jpg

10.png

IV. 对服务器调用数据进行核算剖析

重温一遍仿照的进程: 首要分别在7777, 8888, 9999端口发起了3台服务器. 然后发起客户端进行访问. 7777, 8888端口的两台服务器设置功用指数为1000, 而9999端口的服务器功用指数设置为300。

在客户端运转期间, 我手动关闭了8888端口的服务器, 客户规矩常打印出服务器改动信息。此刻理论上不会有访问被路由到8888端口的服务器。当我从头发起8888端口服务器时, 客户端打印出服务器改动信息, 访问能正常到达8888端口服务器。

下面临各服务器的访问量进行核算, 看是否完结了负载均衡。

测验程序如下:

  1. public class DataStatistics {
  2.    private static float ReqToPort7777 = 0;
  3.    private static float ReqToPort8888 = 0;
  4.    private static float ReqToPort9999 = 0;

  5.    public static void main(String[] args) {
  6.        BufferedReader br = null;
  7.        try {
  8.            br = new BufferedReader(new FileReader("C://test.txt"));
  9.            String line = null;
  10.            while(null != (line = br.readLine())){
  11.                if(line.contains("7777")){
  12.                    ReqToPort7777++;
  13.                }else if(line.contains("8888")){
  14.                    ReqToPort8888++;
  15.                }else if(line.contains("9999")){
  16.                    ReqToPort9999++;
  17.                }else{
  18.                    print(false);
  19.                }
  20.            }
  21.            print(true);
  22.        } catch (Exception e) {
  23.            e.printStackTrace();
  24.        }finally {
  25.            if(null != br){
  26.                try {
  27.                    br.close();
  28.                } catch (IOException e) {
  29.                    e.printStackTrace();
  30.                }
  31.                br = null;
  32.            }
  33.        }
  34.    }

  35.    private static void print(boolean isEnd){
  36.        if(!isEnd){
  37.            System.out.println("------------- 服务器集群产生改动 -------------");
  38.        }else{
  39.            System.out.println("------------- 终究一次核算 -------------");
  40.        }
  41.        System.out.println("截取自前次服务器改动到现在: ");
  42.        float total = ReqToPort7777 + ReqToPort8888 + ReqToPort9999;
  43.        System.out.println("7777端口服务器访问量为: " + ReqToPort7777 + ", 占比" + (ReqToPort7777 / total));
  44.        System.out.println("8888端口服务器访问量为: " + ReqToPort8888 + ", 占比" + (ReqToPort8888 / total));
  45.        System.out.println("9999端口服务器访问量为: " + ReqToPort9999 + ", 占比" + (ReqToPort9999 / total));
  46.        ReqToPort7777 = 0;
  47.        ReqToPort8888 = 0;
  48.        ReqToPort9999 = 0;
  49.    }
  50. }

  51. /* 以下是输出效果
  52. ------------- 服务器集群产生改动 -------------
  53. 截取自前次服务器改动到现在:
  54. 7777端口服务器访问量为: 198.0, 占比0.4419643
  55. 8888端口服务器访问量为: 184.0, 占比0.4107143
  56. 9999端口服务器访问量为: 66.0, 占比0.14732143
  57. ------------- 服务器集群产生改动 -------------
  58. 截取自前次服务器改动到现在:
  59. 7777端口服务器访问量为: 510.0, 占比0.7589286
  60. 8888端口服务器访问量为: 1.0, 占比0.0014880953
  61. 9999端口服务器访问量为: 161.0, 占比0.23958333
  62. ------------- 终究一次核算 -------------
  63. 截取自前次服务器改动到现在:
  64. 7777端口服务器访问量为: 410.0, 占比0.43248945
  65. 8888端口服务器访问量为: 398.0, 占比0.41983122
  66. 9999端口服务器访问量为: 140.0, 占比0.14767933
  67. */

V. 效果

从测验数据可以看出, 不管是8888端口服务器宕机之前, 仍是宕机之后, 三台服务器接纳的访问量和功用指数成正比,成功地验证了一致性哈希算法的负载均衡作用。

四. 扩展考虑

初识一致性哈希算法的时分, 对这种独特的思路佩服得五体投地。可是一致性哈希算法除了可以让后端服务器完结负载均衡, 还有一个特点或许是其他负载均衡算法所不具备的。

这个特点是基于哈希函数的, 我们知道通过哈希函数, 固定的输入可以产生固定的输出. 换句话说, 相同的恳求会路由到相同的服务器. 这点就很牛逼了, 我们可以结合一致性哈希算法和缓存机制供应后端服务器的功用。

比如说在一个分布式体系中, 有一个服务器集群供应查询用户信息的方法, 每个恳求将会带着用户的uid到达, 我们可以通过哈希函数进行处理(从上面的演示代码可以看到, 这点是可以轻松完结的), 使相同的uid路由到某个独定的服务器. 这样我们就可以在服务器上对该的uid反面的用户信息进行缓存, 从而削减对数据库或其他中间件的操作, 从而进步体系功率。

当然假设运用该战略的话, 你或许还要考虑缓存更新等操作, 但作为一种优良的战略, 我们可以考虑在恰当的场合灵活运用。

以上考虑受启发于Dubbo结构中对其完结的四种负载均衡战略的描述。

本公司销售:阿里云新/老客户,只要购买阿里云,即可享受折上折优惠!>

我有话说: