Java实现一致性哈希算法,并搭建环境测试其负载均衡特性(二)
II. 创建服务器集群, 供应RPC远程调用服务
-
首要创建一个服务器项目(运用Maven), 增加zookeeper依赖
-
创建常量接口, 用于存储联接zookeeper的信息
- public interface Constant {
- //zookeeper集群的地址
- String ZK_HOST = "192.168.117.129:2181,192.168.117.129:2182,192.168.117.129:2183";
- //联接zookeeper的超时时刻
- int ZK_TIME_OUT = 5000;
- //服务器所发布的远程服务在zookeeper中的注册地址, 也便是说这个节点中保存了各个服务器供应的接口
- String ZK_REGISTRY = "/provider";
- //zookeeper集群中注册服务的url地址的瞬时节点
- String ZK_RMI = ZK_REGISTRY + "/rmi";
- }
3.封装操作zookeeper和发布远程服务的接口供自己调用, 本事例中发布远程服务运用Java本身供应的rmi包完结, 假设没有了解过可以参阅这篇
- public class ServiceProvider {
-
- private CountDownLatch latch = new CountDownLatch(1);
-
- /**
- * 联接zookeeper集群
- */
- public ZooKeeper connectToZK(){
- ZooKeeper zk = null;
- try {
- zk = new ZooKeeper(Constant.ZK_HOST, Constant.ZK_TIME_OUT, new Watcher() {
- @Override
- public void process(WatchedEvent watchedEvent) {
- //假设联接上了就唤醒其时线程.
- latch.countDown();
- }
- });
- latch.await();//还没联接上时其时线程等候
- } catch (Exception e) {
- e.printStackTrace();
- }
- return zk;
- }
-
- /**
- * 创建znode节点
- * @param zk
- * @param url 节点中写入的数据
- */
- public void createNode(ZooKeeper zk, String url){
- try{
- //要把写入的数据转化为字节数组
- byte[] data = url.getBytes();
- zk.create(Constant.ZK_RMI, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- /**
- * 发布rmi服务
- */
- public String publishService(Remote remote, String host, int port){
- String url = null;
- try{
- LocateRegistry.createRegistry(port);
- url = "rmi://" + host + ":" + port + "/rmiService";
- Naming.bind(url, remote);
- } catch (Exception e) {
- e.printStackTrace();
- }
- return url;
- }
-
- /**
- * 发布rmi服务, 而且将服务的url注册到zookeeper集群中
- */
- public void publish(Remote remote, String host, int port){
- //调用publishService, 得到服务的url地址
- String url = publishService(remote, host, port);
- if(null != url){
- ZooKeeper zk = connectToZK();//联接到zookeeper
- if(null != zk){
- createNode(zk, url);
- }
- }
- }
- }
- 自定义远程服务. 服务供应一个简略的方法: 客户端发来一个字符串, 服务器在字符串前面增加上Hello, 并回来字符串。
- //UserService
- public interface UserService extends Remote {
- public String helloRmi(String name) throws RemoteException;
- }
- //UserServiceImpl
- public class UserServiceImpl implements UserService {
-
- public UserServiceImpl() throws RemoteException{
- super();
- }
-
- @Override
- public String helloRmi(String name) throws RemoteException {
- return "Hello " + name + "!";
- }
- }
- 修改端口号, 发起多个java虚拟机, 仿照服务器集群. 为了便当演示, 自定义7777, 8888, 9999端口打开3个服务器进程, 到时会仿照7777端口的服务器宕机和批改重连。
- public static void main(String[] args) throws RemoteException {
- //创建东西类方针
- ServiceProvider sp = new ServiceProvider();
- //创建远程服务方针
- UserService userService = new UserServiceImpl();
- //完结发布
- sp.publish(userService, "localhost", 9999);
- }
III. 编写客户端程序(运用一致性哈希算法完结负载均衡
- 封装客户端接口:
- public class ServiceConsumer {
- /**
- * 供应远程服务的服务器列表, 只记录远程服务的url
- */
- private volatile Listurls = new LinkedList<>();
- /**
- * 远程服务对应的虚拟节点调集
- */
-
private static TreeMap
virtualNodes = new TreeMap<>(); -
- public ServiceConsumer(){
- ZooKeeper zk = connectToZK();//客户端联接到zookeeper
- if(null != zk){
- //联接上后重视zookeeper中的节点改动(服务器改动)
- watchNode(zk);
- }
- }
-
- private void watchNode(final ZooKeeper zk) {
- try{
- //调查/provider节点下的子节点是否有改动(是否有服务器登入或登出)
- ListnodeList = zk.getChildren(Constants.ZK_REGISTRY, new Watcher() {
- @Override
- public void process(WatchedEvent watchedEvent) {
- //假设服务器节点有改动就从头获取
- if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged){
- System.out.println("服务器端有改动, 或许有旧服务器宕机或者新服务器参与集群...");
- watchNode(zk);
- }
- }
- });
- //将获取到的服务器节点数据保存到调集中, 也便是获得了远程服务的访问url地址
- ListdataList = new LinkedList<>();
-
TreeMap
newVirtualNodesList = new TreeMap<>(); - for(String nodeStr : nodeList){
- byte[] data = zk.getData(Constants.ZK_REGISTRY + "/" + nodeStr, false, null);
- //放入服务器列表的url
- String url = new String(data);
- //为每个服务器分配虚拟节点, 为了便当仿照, 默许打开在9999端口的服务器功用较差, 只分配300个虚拟节点, 其他分配1000个.
- if(url.contains("9999")){
- for(int i = 1; i <= 300; i++){
- newVirtualNodesList.put(FVNHash(url + "@" + i), url + "@" + i);
- }
- }else{
- for(int i = 1; i <= 1000; i++){
- newVirtualNodesList.put(FVNHash(url + "@" + i), url + "@" + i);
- }
- }
- dataList.add(url);
- }
- urls = dataList;
- virtualNodes = newVirtualNodesList;
- dataList = null;//好让废物收回器赶快收集
- newVirtualNodesList = null;
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- /**
- * 依据url获得远程服务方针
- */
- publicT lookUpService(String url){
- T remote = null;
- try{
- remote = (T)Naming.lookup(url);
- } catch (Exception e) {
- //假设该url联接不上, 很有或许是该服务器挂了, 这时运用服务器列表中的第一个服务器url从头获取远程方针.
- if(e instanceof ConnectException){
- if (urls.size() != 0){
- url = urls.get(0);
- return lookUpService(url);
- }
- }
- }
- return remote;
- }
-
- /**
- * 通过一致性哈希算法, 选取一个url, 终究回来一个远程服务方针
- */
- publicT lookUp(){
- T service = null;
- //随机核算一个哈希值
- int hash = FVNHash(Math.random() * 10000 + "");
- //得到大于该哈希值的一切map调集
-
SortedMap
subMap = virtualNodes.tailMap(hash); - //找到比该值大的第一个虚拟节点, 假设没有比它大的虚拟节点, 依据哈希环, 则回来第一个节点.
- Integer targetKey = subMap.size() == 0 ? virtualNodes.firstKey() : subMap.firstKey();
- //通过该虚拟节点获得服务器url
- String virtualNodeName = virtualNodes.get(targetKey);
- String url = virtualNodeName.split("@")[0];
- //依据服务器url获取远程服务方针
- service = lookUpService(url);
- System.out.print("供应本次服务的地址为: " + url + ", 回来效果: ");
- return service;
- }
-
- private CountDownLatch latch = new CountDownLatch(1);
-
- public ZooKeeper connectToZK(){
- ZooKeeper zk = null;
- try {
- zk = new ZooKeeper(Constants.ZK_HOST, Constants.ZK_TIME_OUT, new Watcher() {
- @Override
- public void process(WatchedEvent watchedEvent) {
- //判断是否联接zk集群
- latch.countDown();//唤醒处于等候情况的其时线程
- }
- });
- latch.await();//没有联接上的时分其时线程处于等候情况.
- } catch (IOException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return zk;
- }
-
-
- public static int FVNHash(String data){
- final int p = 16777619;
- int hash = (int)2166136261L;
- for(int i = 0; i < data.length(); i++)
- hash = (hash ^ data.charAt(i)) * p;
- hash += hash << 13;
- hash ^= hash >> 7;
- hash += hash << 3;
- hash ^= hash >> 17;
- hash += hash << 5;
- return hash < 0 ? Math.abs(hash) : hash;
- }
- }
- 发起客户端进行测验:
- public static void main(String[] args){
- ServiceConsumer sc = new ServiceConsumer();//创建东西类方针
- while(true){
- //获得rmi远程服务方针
- UserService userService = sc.lookUp();
- try{
- //调用远程方法
- String result = userService.helloRmi("炭烧生蚝");
- System.out.println(result);
- Thread.sleep(100);
- }catch(Exception e){
- e.printStackTrace();
- }
- }
- }
- 客户端跑起来后, 在显示台不断进行打印...下面将对数据进行核算。
IV. 对服务器调用数据进行核算剖析
重温一遍仿照的进程: 首要分别在7777, 8888, 9999端口发起了3台服务器. 然后发起客户端进行访问. 7777, 8888端口的两台服务器设置功用指数为1000, 而9999端口的服务器功用指数设置为300。
在客户端运转期间, 我手动关闭了8888端口的服务器, 客户规矩常打印出服务器改动信息。此刻理论上不会有访问被路由到8888端口的服务器。当我从头发起8888端口服务器时, 客户端打印出服务器改动信息, 访问能正常到达8888端口服务器。
下面临各服务器的访问量进行核算, 看是否完结了负载均衡。
测验程序如下:
- public class DataStatistics {
- private static float ReqToPort7777 = 0;
- private static float ReqToPort8888 = 0;
- private static float ReqToPort9999 = 0;
-
- public static void main(String[] args) {
- BufferedReader br = null;
- try {
- br = new BufferedReader(new FileReader("C://test.txt"));
- String line = null;
- while(null != (line = br.readLine())){
- if(line.contains("7777")){
- ReqToPort7777++;
- }else if(line.contains("8888")){
- ReqToPort8888++;
- }else if(line.contains("9999")){
- ReqToPort9999++;
- }else{
- print(false);
- }
- }
- print(true);
- } catch (Exception e) {
- e.printStackTrace();
- }finally {
- if(null != br){
- try {
- br.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- br = null;
- }
- }
- }
-
- private static void print(boolean isEnd){
- if(!isEnd){
- System.out.println("------------- 服务器集群产生改动 -------------");
- }else{
- System.out.println("------------- 终究一次核算 -------------");
- }
- System.out.println("截取自前次服务器改动到现在: ");
- float total = ReqToPort7777 + ReqToPort8888 + ReqToPort9999;
- System.out.println("7777端口服务器访问量为: " + ReqToPort7777 + ", 占比" + (ReqToPort7777 / total));
- System.out.println("8888端口服务器访问量为: " + ReqToPort8888 + ", 占比" + (ReqToPort8888 / total));
- System.out.println("9999端口服务器访问量为: " + ReqToPort9999 + ", 占比" + (ReqToPort9999 / total));
- ReqToPort7777 = 0;
- ReqToPort8888 = 0;
- ReqToPort9999 = 0;
- }
- }
-
- /* 以下是输出效果
- ------------- 服务器集群产生改动 -------------
- 截取自前次服务器改动到现在:
- 7777端口服务器访问量为: 198.0, 占比0.4419643
- 8888端口服务器访问量为: 184.0, 占比0.4107143
- 9999端口服务器访问量为: 66.0, 占比0.14732143
- ------------- 服务器集群产生改动 -------------
- 截取自前次服务器改动到现在:
- 7777端口服务器访问量为: 510.0, 占比0.7589286
- 8888端口服务器访问量为: 1.0, 占比0.0014880953
- 9999端口服务器访问量为: 161.0, 占比0.23958333
- ------------- 终究一次核算 -------------
- 截取自前次服务器改动到现在:
- 7777端口服务器访问量为: 410.0, 占比0.43248945
- 8888端口服务器访问量为: 398.0, 占比0.41983122
- 9999端口服务器访问量为: 140.0, 占比0.14767933
- */
V. 效果
从测验数据可以看出, 不管是8888端口服务器宕机之前, 仍是宕机之后, 三台服务器接纳的访问量和功用指数成正比,成功地验证了一致性哈希算法的负载均衡作用。
四. 扩展考虑
初识一致性哈希算法的时分, 对这种独特的思路佩服得五体投地。可是一致性哈希算法除了可以让后端服务器完结负载均衡, 还有一个特点或许是其他负载均衡算法所不具备的。
这个特点是基于哈希函数的, 我们知道通过哈希函数, 固定的输入可以产生固定的输出. 换句话说, 相同的恳求会路由到相同的服务器. 这点就很牛逼了, 我们可以结合一致性哈希算法和缓存机制供应后端服务器的功用。
比如说在一个分布式体系中, 有一个服务器集群供应查询用户信息的方法, 每个恳求将会带着用户的uid到达, 我们可以通过哈希函数进行处理(从上面的演示代码可以看到, 这点是可以轻松完结的), 使相同的uid路由到某个独定的服务器. 这样我们就可以在服务器上对该的uid反面的用户信息进行缓存, 从而削减对数据库或其他中间件的操作, 从而进步体系功率。
当然假设运用该战略的话, 你或许还要考虑缓存更新等操作, 但作为一种优良的战略, 我们可以考虑在恰当的场合灵活运用。
以上考虑受启发于Dubbo结构中对其完结的四种负载均衡战略的描述。
我有话说: