场景

  • 订阅发布
    • watcher机制
    • 统一配置管理(disconf)
  • 分布式锁
    • redis
    • Zookeeper
    • 数据库
  • 负载均衡
  • ID生成器
  • 分布式队列
  • 统一命名服务
  • master选举

详细介绍

数据发布订阅/配置中心

实现配置信息的集中式管理和数据的动态更新。

把配置信息全部放到Zookeeper上,保存在某个目录节点中,所有相关应用程序对这个目录节点进行监听,一旦配置信息发生变化,每个应用程序就会收到Zookeeper的通知,然后从Zookeeper获取新的配置信息应用到系统中

  • 实现配置中心的两种模式
    • push:服务端发生变化主动发送给客户端
    • pull:客户端主动发起请求获取新的数据
      • 长轮训:客户端主动监控服务器端的变化
  • Zookeeper采用推拉相结合的方式
    • 客户端向服务器端注册需要关注的节点,一旦节点数据发生变化,那么服务器端就会向客户端发送watcher事件通知。
    • 客户端收到通知后,主动到服务器端获取更新后的数据
  • 优点
    • 数据量比较小
    • 数据内容在运行时会发生动态变更
    • 集群中的各个机器共享配置

布式锁

  • 锁服务的分类
    • 保持独占
      • 将Zookeeper上的一个zNode看作一把锁,通过createZnode方式来实现,所有客户端创建/distribute_lock节点,最终成功创建的那个客户端拥有了这把锁,用完删掉自己创建的节点就释放出锁
    • 控制时序
      • /distribute_lock已经预先存在,所有客户端在它下面创建临时顺序编号目录节点,编号最小的获得锁,用完删除,依次进行
  • 实现分布式锁的方式
    • redis
      • setNX
        • 存在则会返回0
        • 不存在则返回数据
    • 数据库
      • 创建表
        • 通过索引唯一的方式(create table(id,methodname…))methodname增加唯一索引。insert成功则获得锁
        • 效率低
        • delete失败会导致后续进程无法获取锁
    • Zookeeper
      • 排他锁(写锁)
      • 共享锁(读锁)

负载均衡

通过集群的方式分摊请求/数据到多个计算机单元上

  • 集群的关注点
    • 是否有机器退出和加入
      • 所有的机器约定在父目录GroupMembers下创建临时目录节点,然后监听父目录节点的子节点变化消息。一旦有机器挂掉,该机器与Zookeeper的连接断开,其所创建的临时目录节点被删除,其它机器都收到该节点被删除的通知。新机器加入也同理。
    • 选举master
      • Zookeeper有选举选法

队列管理

  • 同步队列
    • 每一个队列的成功都聚齐时,这个队列才可用,否则一直等待所有成员到达
      • 在约定目录下创建临时目录节点,监听节点数据是否时我们要求的树木
  • FIFO队列
    • 和分布式锁服务中的控制时许场景基本原理一致,入列有编号,出列按编号

统一命名服务

分布式环境下,经常需要对应用/服务进行统一命名,便于识别不同服务。类似于域名与ip之间对应关系,域名容易记住。通过名称来获取资源或服务的地址、提供者等信息按照层次结构组织服务/应用名称可将名称以及地址信息写到Zookeeper上,客户端通过Zookeeper获取可用服务列表类。

master选举

要保证服务7*24小时可用,通过心跳包来维持正常通信状态 ,如果master挂掉,slave就需要重新选举master。可能会产生脑裂的问题。

代码示例

Java API实现共享锁

ZookeeperClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.bai.lock;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
* Created by 2bai on 27/10/2017.
*/
public class ZookeeperClient {
private final static String CONNECTSTRING = "";

public static int getSessionTime() {
return sessionTime;
}

private static int sessionTime=5000;

/**
* 获取连接
* @return
* @throws IOException
* @throws InterruptedException
*/
public static ZooKeeper getInstance() throws IOException, InterruptedException {
final CountDownLatch countDownLatch=new CountDownLatch(1);
ZooKeeper zooKeeper=new ZooKeeper(CONNECTSTRING, sessionTime, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState()==Event.KeeperState.SyncConnected){
countDownLatch.countDown();
}
}
});
countDownLatch.await();
return zooKeeper;
}
}

LockWatcher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.bai.lock;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.util.concurrent.CountDownLatch;

/**
* Created by 2bai on 27/10/2017.
*/
public class LockWatcher implements Watcher{

private CountDownLatch latch;

public LockWatcher(CountDownLatch countDownLatch){
this.latch=countDownLatch;
}

@Override
public void process(WatchedEvent watchedEvent) {
if(watchedEvent.getType()== Event.EventType.NodeDeleted){
latch.countDown();
}
}
}

DistributeLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package com.bai.lock;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
* Created by 2bai on 27/10/2017.
*/
public class DistributeLock {
//根节点
private static final String ROOT_LOCKS = "/LOCKS";
private ZooKeeper zooKeeper;
//会话超时时间
private int sessionTimeout;
private String lockID;//记录锁节点ID

private final static byte[] data = {1, 2};//节点数据
private CountDownLatch countDownLatch = new CountDownLatch(1);

public DistributeLock() throws IOException, InterruptedException {
this.zooKeeper = ZookeeperClient.getInstance();
this.sessionTimeout = ZookeeperClient.getSessionTime();
}

/**
* 获取锁
*
* @return
*/
public boolean lock() {
try {
lockID = zooKeeper.create(ROOT_LOCKS + "/", data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName() + "---" + "成功创建了---" + lockID + "开始去竞争锁");

List<String> childrenNodes = zooKeeper.getChildren(ROOT_LOCKS, true);

//从小到大排序
SortedSet<String> sortedSet = new TreeSet<String>();
for (String children : childrenNodes) {
sortedSet.add(ROOT_LOCKS + "/" + children);
}

String first = sortedSet.first();//拿到最小节点
if (lockID.equals(first)) {
//表示当前节点为最小节点
System.out.println(Thread.currentThread().getName() + "成功获取锁---" + lockID);
return true;
}

SortedSet<String> lessThanLockId = sortedSet.headSet(lockID);
if (!lessThanLockId.isEmpty()) {
//拿到比当前lockID更小的上一个节点
String prevLogID = lessThanLockId.last();
zooKeeper.exists(prevLogID, new LockWatcher(countDownLatch));
countDownLatch.await(sessionTimeout, TimeUnit.MILLISECONDS);
//上面这段代码意味着会话超时或节点被删除就可以获取锁了
System.out.println(Thread.currentThread().getName() + "成功获取锁---" + lockID);
return true;
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}

/**
* 释放锁
*
* @return
*/
public boolean unlock() {
System.out.println(Thread.currentThread().getName() + "---开始释放锁--- " + lockID);
try {
zooKeeper.delete(lockID, -1);
System.out.println(lockID + "---被删除");
return true;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}

return false;
}

public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(10);
Random random = new Random();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
DistributeLock lock = null;

try {
lock = new DistributeLock();
latch.countDown();
latch.await();
lock.lock();
Thread.sleep(random.nextInt(500));
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (lock != null) {
lock.unlock();
}
}
}).start();
}
}
}

需要在Zookeeper服务器上创建/LOCKS根节点后再运行代码

master选举

UserCenter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.bai.master;

import java.io.Serializable;

/**
* Created by 2bai on 28/10/2017.
*/
public class UserCenter implements Serializable{
private int mc_id;//机器信息
private String mc_name;//机器名称

public int getMc_id() {
return mc_id;
}

public void setMc_id(int mc_id) {
this.mc_id = mc_id;
}

public String getMc_name() {

return mc_name;
}

public void setMc_name(String mc_name) {
this.mc_name = mc_name;
}
}

MasterSelector

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package com.bai.master;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Created by 2bai on 28/10/2017.
* 选主的服务
*/
public class MasterSelector {
private ZkClient zkClient;
//需要争抢的节点
private final static String MASTER_PATH = "/master";
//注册节点内容变化
private IZkDataListener dataListener;
//其它服务器
private UserCenter server;
//主服务器
private UserCenter master;
//
private static boolean isRunning = false;

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

public MasterSelector(UserCenter center,ZkClient client) {
this.server=center;
this.zkClient=client;
this.dataListener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {

}

@Override
public void handleDataDeleted(String s) throws Exception {
//如果节点被删除,发起选主操作
System.out.println("触发节点删除事件"+s);
chooseMaster();
}
};
}

/**
* 开始选举
*/
public void start() {
System.out.println("启动操作");
if (!isRunning) {
isRunning = true;
zkClient.subscribeDataChanges(MASTER_PATH, dataListener);//注册节点监听事件
chooseMaster();

}
}

/**
* 停止选举
*/
public void stop() {
if (isRunning) {
System.out.println("停止操作");
isRunning = false;
zkClient.unsubscribeDataChanges(MASTER_PATH,dataListener);
releaseMaster();
scheduledExecutorService.shutdown();
}
}

/**
* 选主
*/
private void chooseMaster() {
if (!isRunning) {
System.out.println("当前服务没有启动");
return;
}
try {
zkClient.createEphemeral(MASTER_PATH, server);
//创建成功则代表当前节点就是master节点
master = server;
System.out.println(master.getMc_name() + "---是master");

//定时器
//模仿master出现故障,每5秒钟释放一次锁
scheduledExecutorService.schedule(() -> {
//释放锁
releaseMaster();
}, 5, TimeUnit.SECONDS);
} catch (ZkNodeExistsException e) {
//master已经存在,获取master
UserCenter userCenter = zkClient.readData(MASTER_PATH, true);
if (userCenter == null) {
chooseMaster();
} else {
master = userCenter;
}
}
}

/**
* 释放锁
*/
private void releaseMaster() {
//释放锁(故障模拟过程)
//判断当前是不是master
//只有master才需要释放锁
if (checkIsMaster()) {
zkClient.delete(MASTER_PATH);
}
}

/**
* 判断当前的server是不是master
*
* @return
*/
private boolean checkIsMaster() {
UserCenter userCenter = zkClient.readData(MASTER_PATH);
if (userCenter.getMc_name().equals(server.getMc_name())) {
master = userCenter;
return true;
}
return false;
}
}

MasterChooseTest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.bai.master;

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* Created by 2bai on 29/10/2017.
*/
public class MasterChooseTest {
private final static String CONNECTSTRING = "";

public static void main(String[] args) {
//保存所有的zkClient列表
List<ZkClient> clients = new ArrayList<>();

List<MasterSelector> selectorList=new ArrayList<>();

try {
for (int i = 0; i < 10; i++) {
ZkClient client = new ZkClient(CONNECTSTRING, 5000, 5000, new SerializableSerializer());
clients.add(client);

UserCenter center = new UserCenter();
center.setMc_id(i);
center.setMc_name("客户端" + i);
//不是线程,是10个对象
//每个客户端去触发一次
//没办法让每个客户端都监听,只能监听其中一个

//10个客户端去抢master,只有一个会抢到
MasterSelector selector = new MasterSelector(center,client);
selectorList.add(selector);

selector.start();

TimeUnit.SECONDS.sleep(4);
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
for(MasterSelector selector :selectorList){
selector.stop();
}
}
}
}