场景
订阅发布
watcher机制
统一配置管理(disconf)
分布式锁
负载均衡
ID生成器
分布式队列
统一命名服务
master选举
详细介绍 数据发布订阅/配置中心
实现配置信息的集中式管理和数据的动态更新。
把配置信息全部放到Zookeeper上,保存在某个目录节点中,所有相关应用程序对这个目录节点进行监听,一旦配置信息发生变化,每个应用程序就会收到Zookeeper的通知,然后从Zookeeper获取新的配置信息应用到系统中
实现配置中心的两种模式
push
:服务端发生变化主动发送给客户端
pull
:客户端主动发起请求获取新的数据
Zookeeper采用推拉相结合的方式
客户端向服务器端注册需要关注的节点,一旦节点数据发生变化,那么服务器端就会向客户端发送watcher事件通知。
客户端收到通知后,主动到服务器端获取更新后的数据
优点
数据量比较小
数据内容在运行时会发生动态变更
集群中的各个机器共享配置
布式锁
锁服务的分类
保持独占
将Zookeeper上的一个zNode看作一把锁,通过createZnode方式来实现,所有客户端创建/distribute_lock节点,最终成功创建的那个客户端拥有了这把锁,用完删掉自己创建的节点就释放出锁
控制时序
/distribute_lock已经预先存在,所有客户端在它下面创建临时顺序编号目录节点,编号最小的获得锁,用完删除,依次进行
实现分布式锁的方式
redis
数据库
创建表
通过索引唯一的方式(create table(id,methodname…))methodname增加唯一索引。insert成功则获得锁
效率低
delete失败会导致后续进程无法获取锁
Zookeeper
负载均衡
通过集群的方式分摊请求/数据到多个计算机单元上
集群的关注点
是否有机器退出和加入
所有的机器约定在父目录GroupMembers下创建临时目录节点,然后监听父目录节点的子节点变化消息。一旦有机器挂掉,该机器与Zookeeper的连接断开,其所创建的临时目录节点被删除,其它机器都收到该节点被删除的通知。新机器加入也同理。
选举master
队列管理
同步队列
每一个队列的成功都聚齐时,这个队列才可用,否则一直等待所有成员到达
在约定目录下创建临时目录节点,监听节点数据是否时我们要求的树木
FIFO队列
和分布式锁服务中的控制时许场景基本原理一致,入列有编号,出列按编号
统一命名服务
分布式环境下,经常需要对应用/服务进行统一命名,便于识别不同服务。类似于域名与ip之间对应关系,域名容易记住。通过名称来获取资源或服务的地址、提供者等信息按照层次结构组织服务/应用名称可将名称以及地址信息写到Zookeeper上,客户端通过Zookeeper获取可用服务列表类。
master选举
要保证服务7*24小时可用,通过心跳包来维持正常通信状态 ,如果master挂掉,slave就需要重新选举master。可能会产生脑裂的问题。
代码示例 Java API实现共享锁 ZookeeperClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package com.bai.lock;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import java.io.IOException;import java.util.concurrent.CountDownLatch;public class ZookeeperClient { private final static String CONNECTSTRING = "" ; public static int getSessionTime () { return sessionTime; } private static int sessionTime=5000 ; public static ZooKeeper getInstance () throws IOException, InterruptedException { final CountDownLatch countDownLatch=new CountDownLatch(1 ); ZooKeeper zooKeeper=new ZooKeeper(CONNECTSTRING, sessionTime, new Watcher() { @Override public void process (WatchedEvent watchedEvent) { if (watchedEvent.getState()==Event.KeeperState.SyncConnected){ countDownLatch.countDown(); } } }); countDownLatch.await(); return zooKeeper; } }
LockWatcher
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 package com.bai.lock;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import java.util.concurrent.CountDownLatch;public class LockWatcher implements Watcher { private CountDownLatch latch; public LockWatcher (CountDownLatch countDownLatch) { this .latch=countDownLatch; } @Override public void process (WatchedEvent watchedEvent) { if (watchedEvent.getType()== Event.EventType.NodeDeleted){ latch.countDown(); } } }
DistributeLock
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 package com.bai.lock;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.ZooDefs;import org.apache.zookeeper.ZooKeeper;import java.io.IOException;import java.util.List;import java.util.Random;import java.util.SortedSet;import java.util.TreeSet;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;public class DistributeLock { private static final String ROOT_LOCKS = "/LOCKS" ; private ZooKeeper zooKeeper; private int sessionTimeout; private String lockID; private final static byte [] data = {1 , 2 }; private CountDownLatch countDownLatch = new CountDownLatch(1 ); public DistributeLock () throws IOException, InterruptedException { this .zooKeeper = ZookeeperClient.getInstance(); this .sessionTimeout = ZookeeperClient.getSessionTime(); } public boolean lock () { try { lockID = zooKeeper.create(ROOT_LOCKS + "/" , data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(Thread.currentThread().getName() + "---" + "成功创建了---" + lockID + "开始去竞争锁" ); List<String> childrenNodes = zooKeeper.getChildren(ROOT_LOCKS, true ); SortedSet<String> sortedSet = new TreeSet<String>(); for (String children : childrenNodes) { sortedSet.add(ROOT_LOCKS + "/" + children); } String first = sortedSet.first(); if (lockID.equals(first)) { System.out.println(Thread.currentThread().getName() + "成功获取锁---" + lockID); return true ; } SortedSet<String> lessThanLockId = sortedSet.headSet(lockID); if (!lessThanLockId.isEmpty()) { String prevLogID = lessThanLockId.last(); zooKeeper.exists(prevLogID, new LockWatcher(countDownLatch)); countDownLatch.await(sessionTimeout, TimeUnit.MILLISECONDS); System.out.println(Thread.currentThread().getName() + "成功获取锁---" + lockID); return true ; } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false ; } public boolean unlock () { System.out.println(Thread.currentThread().getName() + "---开始释放锁--- " + lockID); try { zooKeeper.delete(lockID, -1 ); System.out.println(lockID + "---被删除" ); return true ; } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } return false ; } public static void main (String[] args) { final CountDownLatch latch = new CountDownLatch(10 ); Random random = new Random(); for (int i = 0 ; i < 10 ; i++) { new Thread(() -> { DistributeLock lock = null ; try { lock = new DistributeLock(); latch.countDown(); latch.await(); lock.lock(); Thread.sleep(random.nextInt(500 )); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { if (lock != null ) { lock.unlock(); } } }).start(); } } }
需要在Zookeeper服务器上创建/LOCKS
根节点后再运行代码
master选举 UserCenter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package com.bai.master;import java.io.Serializable;public class UserCenter implements Serializable { private int mc_id; private String mc_name; public int getMc_id () { return mc_id; } public void setMc_id (int mc_id) { this .mc_id = mc_id; } public String getMc_name () { return mc_name; } public void setMc_name (String mc_name) { this .mc_name = mc_name; } }
MasterSelector
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 package com.bai.master;import org.I0Itec.zkclient.IZkDataListener;import org.I0Itec.zkclient.ZkClient;import org.I0Itec.zkclient.exception.ZkNodeExistsException;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;public class MasterSelector { private ZkClient zkClient; private final static String MASTER_PATH = "/master" ; private IZkDataListener dataListener; private UserCenter server; private UserCenter master; private static boolean isRunning = false ; ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1 ); public MasterSelector (UserCenter center,ZkClient client) { this .server=center; this .zkClient=client; this .dataListener = new IZkDataListener() { @Override public void handleDataChange (String s, Object o) throws Exception { } @Override public void handleDataDeleted (String s) throws Exception { System.out.println("触发节点删除事件" +s); chooseMaster(); } }; } public void start () { System.out.println("启动操作" ); if (!isRunning) { isRunning = true ; zkClient.subscribeDataChanges(MASTER_PATH, dataListener); chooseMaster(); } } public void stop () { if (isRunning) { System.out.println("停止操作" ); isRunning = false ; zkClient.unsubscribeDataChanges(MASTER_PATH,dataListener); releaseMaster(); scheduledExecutorService.shutdown(); } } private void chooseMaster () { if (!isRunning) { System.out.println("当前服务没有启动" ); return ; } try { zkClient.createEphemeral(MASTER_PATH, server); master = server; System.out.println(master.getMc_name() + "---是master" ); scheduledExecutorService.schedule(() -> { releaseMaster(); }, 5 , TimeUnit.SECONDS); } catch (ZkNodeExistsException e) { UserCenter userCenter = zkClient.readData(MASTER_PATH, true ); if (userCenter == null ) { chooseMaster(); } else { master = userCenter; } } } private void releaseMaster () { if (checkIsMaster()) { zkClient.delete(MASTER_PATH); } } private boolean checkIsMaster () { UserCenter userCenter = zkClient.readData(MASTER_PATH); if (userCenter.getMc_name().equals(server.getMc_name())) { master = userCenter; return true ; } return false ; } }
MasterChooseTest
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 package com.bai.master;import org.I0Itec.zkclient.ZkClient;import org.I0Itec.zkclient.serialize.SerializableSerializer;import java.util.ArrayList;import java.util.List;import java.util.concurrent.TimeUnit;public class MasterChooseTest { private final static String CONNECTSTRING = "" ; public static void main (String[] args) { List<ZkClient> clients = new ArrayList<>(); List<MasterSelector> selectorList=new ArrayList<>(); try { for (int i = 0 ; i < 10 ; i++) { ZkClient client = new ZkClient(CONNECTSTRING, 5000 , 5000 , new SerializableSerializer()); clients.add(client); UserCenter center = new UserCenter(); center.setMc_id(i); center.setMc_name("客户端" + i); MasterSelector selector = new MasterSelector(center,client); selectorList.add(selector); selector.start(); TimeUnit.SECONDS.sleep(4 ); } } catch (InterruptedException e) { e.printStackTrace(); }finally { for (MasterSelector selector :selectorList){ selector.stop(); } } } }