Advertisement

Zookeeper相关知识点梳理

阅读量:

目录

  • 一、Zookeeper入门

    • 1.ZK的数据模型
    • 2.ZK的watch机制
    • 3.ZK的特性
  • 二、Zookeeper应用场景

    • 1.ZK实现配置中心
    • 2.ZK实现命令服务
    • 3.ZK实现Master选举
    • 4.ZK实现分布式队列
    • 5.ZK实现分布式锁
  • 三、Zookeeper集群

    • 1.ZK集群基本概念
    • 2.ZK集群ZAB协议
    • 3.ZK集群Leader选举
  • 四、分布式一致性协议

    • 1.Paxos算法写场景
    • 2.Paxos算法读场景

一、Zookeeper入门

1.ZK的数据模型

Zookeeper(简称ZK)是一种用于分布式应用程序的高性能协调服务,提供一种集中式信息存储服务。ZK的数据存储在内存中,它的存储结构类似文件系统的树形结构;ZK具有高吞吐量、低延迟和高可靠的特点。
ZK的数据存储结构类似文件系统,以“/”为根节点;但是不同之处在于每个ZK节点可包含与之关联的数据以及子节点(即是文件也是文件夹);每个节点路径总是表示为规范的、绝对的、斜杠分隔的路径。ZK具有4中节点类型:
(1)持久节点: 会话结束后也会一直存在;
(2)临时节点: 会话结束后就会消失;
(3)顺序节点: 节点名称顺序递增,会话结束后也会一直存在;
(4)临时顺序节点: 节点名称顺序递增,会话结束后就会消失。
【注】 对于顺序节点和临时顺序节点来说;节点序号为10位十进制数字,每个父节点有一个计数器,计数器为带符号int(4字节)数,超过2147483647后溢出导致变成负号。
ZK中的节点成为ZNode,ZNode的数据构成包含2个:
(1)节点数据: 存储的协调数据,如状态信息、配置和位置信息等,数据量上限为1M(可设置);
(2)节点元数据: 包含此节点的信息,是stat结构。
可用get命令获取Znode数据信息,包含以下信息:
(1)my_data: 此ZNode存储的数据;
(2)czxid: 创建该节点的事件ID;
(3)ctime: 创建该节点的时间;
(4)mzxid: 最后修改该节点的事件ID;
(5)mtime: 最后修改该节点的时间;
(6)pzxid: ZNode最后更新的子节点zxid;
(7)cversion: 该节点子节点的变更次数;
(8)dataversion: 该节点被修改的次数;
(9)aclversion: 访问控制列表被变更的次数;
(10)ephemralowner: 临时节点的所有者的会话ID,如果不是临时节点则为0;
(11)datalength: 该节点的数据长度;
(12)numchildren: 该节点子节点个数。
对于ZK中的时间概念,有以下说明:
(1)Zxid: ZK中每次更改操作都对应一个唯一的事物ID,称为Zxid;它是一个全局有序的戳记,如Zxid1小于Zxid2,则Zxid1一定在Zxid2之前发生;
(2)Version Numbers: 版本号,对应节点的每次更改都会导致该节点的版本号加一;
(3)Ticks: 当使用多服务器ZK时,服务器使用“滴答”来定义事件的时间,如状态上传、会话超时、对等节点之间的连接超时等;滴答时间仅通过最小会话超时(滴答时间的2倍)间接公开,如果客户端请求的会话小于最小会话超时,服务器将告诉客户端会话超时实际上是最小会话超时;
(4)RealTime: 真实时间,ZK除了在ZNode创建和修改时将时间戳放入stat结构外,根本用不到RealTime。

2.ZK的watch机制

ZK提供了watch机制,在一个ZNode上设置一个watch,即可监听此ZNode的变化。ZK中包含2类watch:
(1)data watch: 监听此节点的数据变化;
(2)child watch: 监听此节点的子节点变化。
操作节点会触发的watch事件如下:
(1)创建节点事件: 触发exists;
(2)删除节点事件: 触发exists、getdata、getchildren;
(3)更新节点事件: 触发exists、getdata;
(4)child事件: 触发getchildren。
watch机制有两个重要特性:
(1)一次性触发: watch触发后便会被删除,要持续监听变化,则需要再次设置watch;
(2)有序性: 客户端总是先得到watch通知,然后才会看到变化的结果。
由于以上的两个特性,使用watch时需要注意:
(1)因为watcher为一次性触发,且在获取事件和发送获取watch的新请求间存在延迟,所以不能可靠的获取节点发生的每个改变
(2)一个watch对象只会被特定的通知触发一次,若一个watch对象同时注册了exists、getData,当节点被删除时,删除事件对exists、getData都有效,但是只会调用watch一次。
以下是Java代码操作ZK的示例:

复制代码
    import org.I0Itec.zkclient.IZkDataListener;
    import org.I0Itec.zkclient.ZkClient;
    
    public class ZkClientWatchDemo {
    	public static void main(String[] args) {
    		// 创建一个zk客户端
    		ZkClient client = new ZkClient("localhost:2181");
    		//为此客户端设置watch(ZkClient做了封装,实现了一直监听ZNode的效果)
    		client.subscribeDataChanges("/test/a", new IZkDataListener() {
    			//设置节点被删除后进行的业务
    			@Override
    			public void handleDataDeleted(String dataPath) throws Exception {
    				System.out.println("----收到节点被删除了-------------");
    			}
    			//设置节点数据变化后进行的业务
    			@Override
    			public void handleDataChange(String dataPath, Object data) throws Exception {
    				System.out.println("----收到节点数据变化:" + data + "-------------");
    			}
    		});
    		try {
    			Thread.sleep(1000 * 60 * 2);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    	}
    }

3.ZK的特性

ZK具有一些重要的特性:
(1)顺序一直性: 保证客户端操作是按顺序 生效的(利用此特性可以做分布式锁等);
(2)原子性: 更新成功或失败,没有部分结果(见第三章的ZAB协议,说明了是如何实现的);
(3)单个系统映像: 无论连那个服务器,客户端都将看到相同的结果(强一致性);
(4)可靠性: 数据的变更不会丢失,除非被客户端覆盖修改(有日志记录来保证);
(5)及时性: 保证客户端读取到的数据是最新的。

二、Zookeeper应用场景

1.ZK实现配置中心

利用ZK实现配置中心可以动态修改配置并且可以实现热部署;利用了ZK节点的树形结构和watch机制。具体实现可以看我之前的博客:基于Zookeeper实现分布式配置中心

2.ZK实现命令服务

假设服务A中需要调用服务B的功能,如果服务A的代码开发完成,但是服务B的代码还没有开发完成;那么是否能让服务A先部署呢?利用ZK就可以实现此功能;只需要把服务B的服务名先注册到ZK中,而服务A只要在调用到服务B的地方指定调用服务B的服务名的服务(利用ZK的watch机制);这样先把服务A部署好,并且一直监听服务B是否已经部署完成,一旦当服务B部署完成服务A就会监听到,然后服务A再去调用服务B的功能即可。示例图如下:
在这里插入图片描述

3.ZK实现Master选举

在一些分布式集群中,需要选举Master节点时也可以利用ZK实现(比如Kafka就是);示例图如下:
在这里插入图片描述
如上图所示,需要说明的是:
(1)所以节点都去争抢成为Master节点;对于ZK来说,就是多个客户端去创建同一个临时节点,创建成功的客户端成为Master节点;比如A成为了Master节点,那么其他节点再去创建此ZNode时就会失败,并且这些节点可以监听 此节点的变化,同时其他节点也会知道是谁成为了Master节点;当A节点挂了,其他节点将会继续争抢成为Master(类似实现分布式锁);
(2)除此之外,我们还可以定义一个server子节点,通过server子节点可以获知整个集群当前有哪些节点,那么Master节点就可以管理整个集群中的节点了。

4.ZK实现分布式队列

分布式队列其实本质上就是一个队列,只不过是在分布式的环境中,让各个服务共享。示例图如下:
在这里插入图片描述
如上图所示,使用分布式队列的步骤为:
(1)入队: 即创建顺序节点,节点里放生产者的数据,这个顺序即入队的顺序;
(2)出队: 因为队列是先进先出,所以先获取所有节点号,取出最小好的节点的数据,并移除最小号节点。
需要注意的是,以上创建的是无界队列,若要实现有界队列,则生产者必须得知已有顺序节点的个数,超过了则不能创建顺序节点;同时需要用到分布式锁 ,只有抢到锁的生产者才能创建顺序节点,创建完再释放锁;否则会超出队列的界限。

5.ZK实现分布式锁

(一)方式一:
示例图如下:
在这里插入图片描述
流程图如下:
在这里插入图片描述
首先所有实例争抢创建同一名字(如Lock)的临时节点,因为ZNode不能重名 ,所以只会有一个实例创建成功,即“抢锁”成功,其他示例抢锁失败,同时其他示例监听 此临时节点,并且阻塞自己的代码;抢到锁的实例执行完自己的代码后,释放掉锁,即删除临时节点;或者抢到锁的实例突然挂了,也会因为临时节点的属性而自动删除此节点;如此一来,其他实例在监听到临时节点被删除后便会继续抢锁。(原理即:ZNode不能重名+watch机制)
【注】此种方式有一个缺点会引发“惊群”效应,即抢锁时会“惊动”集群中的所有实例抢锁,从而造成巨大的网络消耗;所有此种方式只适合并发量小的场景。
代码示例如下:

复制代码
    public class ZKDistributeLock implements Lock {
    	private String lockPath;
    	private ZkClient client;
    	// 锁重入计数
    	private ThreadLocal<Integer> reentrantCount = new ThreadLocal<>();
    	public ZKDistributeLock(String lockPath) {
    		super();
    		this.lockPath = lockPath;
    
    		client = new ZkClient("localhost:2181");
    		client.setZkSerializer(new MyZkSerializer());
    	}
    	@Override
    	public boolean tryLock() { // 不会阻塞
    		if (this.reentrantCount.get() != null) {
    			int count = this.reentrantCount.get();
    			if (count > 0) {
    				this.reentrantCount.set(++count);
    				return true;
    			}
    		}
    		// 创建节点
    		try {
    			client.createEphemeral(lockPath);
    			this.reentrantCount.set(1);
    		} catch (ZkNodeExistsException e) {
    			return false;
    		}
    		return true;
    	}
    	@Override
    	public void unlock() {
    		// 重入的释放锁处理
    		if (this.reentrantCount.get() != null) {
    			int count = this.reentrantCount.get();
    			if (count > 1) {
    				this.reentrantCount.set(--count);
    				return;
    			} else {
    				this.reentrantCount.set(null);
    			}
    		}
    		client.delete(lockPath);
    	}
    	@Override
    	public void lock() { // 如果获取不到锁,阻塞等待
    		if (!tryLock()) {
    			// 没获得锁,阻塞自己
    			waitForLock();
    			// 再次尝试
    			lock();
    		}
    	}
    
    	private void waitForLock() {
    		CountDownLatch cdl = new CountDownLatch(1);
    		IZkDataListener listener = new IZkDataListener() {
    			@Override
    			public void handleDataDeleted(String dataPath) throws Exception {
    				System.out.println("----收到节点被删除了-------------");
    				cdl.countDown();
    			}
    
    			@Override
    			public void handleDataChange(String dataPath, Object data) throws Exception {
    			}
    		};
    		client.subscribeDataChanges(lockPath, listener);
    		// 阻塞自己
    		if (this.client.exists(lockPath)) {
    			try {
    				cdl.await();
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    		// 取消注册
    		client.unsubscribeDataChanges(lockPath, listener);
    	}
    	@Override
    	public void lockInterruptibly() throws InterruptedException {
    		// TODO Auto-generated method stub
    
    	}
    	@Override
    	public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    		// TODO Auto-generated method stub
    		return false;
    	}
    	@Override
    	public Condition newCondition() {
    		// TODO Auto-generated method stub
    		return null;
    	}
    
    	public static void main(String[] args) {
    		// 并发数
    		int currency = 50;
    		// 循环屏障
    		CyclicBarrier cb = new CyclicBarrier(currency);
    		// 多线程模拟高并发
    		for (int i = 0; i < currency; i++) {
    			new Thread(new Runnable() {
    				public void run() {
    					System.out.println(Thread.currentThread().getName() + "---------我准备好---------------");
    					// 等待一起出发
    					try {
    						cb.await();
    					} catch (InterruptedException | BrokenBarrierException e) {
    						e.printStackTrace();
    					}
    					ZKDistributeLock lock = new ZKDistributeLock("/distLock11");
    					try {
    						lock.lock();
    						System.out.println(Thread.currentThread().getName() + " 获得锁!");
    					} finally {
    						lock.unlock();
    					}
    				}
    			}).start();
    		}
    	}
    }

(二)方式二:
示例图如下:
在这里插入图片描述
流程图如下:
在这里插入图片描述
此种方式类似于我们去银行办理业务时的“排队取号”;首先所有实例尝试创建临时顺序节点 ,每个节点都会按顺序成功创建临时顺序节点,然后每个实例都监听自己节点的前一个节点 ;同时当前的实例判断自己是否是当前最小号的节点 ,如果是则“抢锁”成功并执行代码,执行完成或者实例突然挂了以后删除临时顺序节点,否则抢锁不成功并阻塞代码;在实例监听到自己前一个节点被删除后则抢锁成功,以此类推。(原理即:排队取号+最小号+watch机制)
【注】此种方式避免了惊群效应;但是需要注意的是实例不要重复“取号”。
代码示例如下:

复制代码
    public class ZKDistributeImproveLock implements Lock {
    	/* * 利用临时顺序节点来实现分布式锁
    	 * 获取锁:取排队号(创建自己的临时顺序节点),然后判断自己是否是最小号,如是,则获得锁;不是,则注册前一节点的watcher,阻塞等待
    	 * 释放锁:删除自己创建的临时顺序节点
    	 */
    	private String lockPath;
    	private ZkClient client;
    	private ThreadLocal<String> currentPath = new ThreadLocal<>();
    	private ThreadLocal<String> beforePath = new ThreadLocal<>();
    	// 锁重入计数
    	private ThreadLocal<Integer> reentrantCount = new ThreadLocal<>();
    	public ZKDistributeImproveLock(String lockPath) {
    		super();
    		this.lockPath = lockPath;
    		client = new ZkClient("localhost:2181");
    		client.setZkSerializer(new MyZkSerializer());
    		if (!this.client.exists(lockPath)) {
    			try {
    				this.client.createPersistent(lockPath);
    			} catch (ZkNodeExistsException e) {
    			}
    		}
    	}
    	@Override
    	public boolean tryLock() {
    		if (this.reentrantCount.get() != null) {
    			int count = this.reentrantCount.get();
    			if (count > 0) {
    				this.reentrantCount.set(++count);
    				return true;
    			}
    		}
    		if (this.currentPath.get() == null) {
    			currentPath.set(this.client.createEphemeralSequential(lockPath + "/", "aaa"));
    		}
    		// 获得所有的子
    		List<String> children = this.client.getChildren(lockPath);
    		// 排序list
    		Collections.sort(children);
    		// 判断当前节点是否是最小的
    		if (currentPath.get().equals(lockPath + "/" + children.get(0))) {
    			this.reentrantCount.set(1);
    			return true;
    		} else {
    			// 取到前一个
    			// 得到字节的索引号
    			int curIndex = children.indexOf(currentPath.get().substring(lockPath.length() + 1));
    			beforePath.set(lockPath + "/" + children.get(curIndex - 1));
    		}
    		return false;
    	}
    	@Override
    	public void lock() {
    		if (!tryLock()) {
    			// 阻塞等待
    			waitForLock();
    			// 再次尝试加锁
    			lock();
    		}
    	}
    	private void waitForLock() {
    		CountDownLatch cdl = new CountDownLatch(1);
    		// 注册watcher
    		IZkDataListener listener = new IZkDataListener() {
    			@Override
    			public void handleDataDeleted(String dataPath) throws Exception {
    				System.out.println("-----监听到节点被删除");
    				cdl.countDown();
    			}
    			@Override
    			public void handleDataChange(String dataPath, Object data) throws Exception {
    
    			}
    		};
    		client.subscribeDataChanges(this.beforePath.get(), listener);
    		// 怎么让自己阻塞
    		if (this.client.exists(this.beforePath.get())) {
    			try {
    				cdl.await();
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    		// 醒来后,取消watcher
    		client.unsubscribeDataChanges(this.beforePath.get(), listener);
    	}
    	@Override
    	public void unlock() {
    		// 重入的释放锁处理
    		if (this.reentrantCount.get() != null) {
    			int count = this.reentrantCount.get();
    			if (count > 1) {
    				this.reentrantCount.set(--count);
    				return;
    			} else {
    				this.reentrantCount.set(null);
    			}
    		}
    		// 删除节点
    		this.client.delete(this.currentPath.get());
    	}
    	@Override
    	public void lockInterruptibly() throws InterruptedException {
    		// TODO Auto-generated method stub
    	}
    	@Override
    	public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    		// TODO Auto-generated method stub
    		return false;
    	}
    	@Override
    	public Condition newCondition() {
    		// TODO Auto-generated method stub
    		return null;
    	}
    	public static void main(String[] args) {
    		// 并发数
    		int currency = 50;
    		// 循环屏障
    		CyclicBarrier cb = new CyclicBarrier(currency);
    		// 多线程模拟高并发
    		for (int i = 0; i < currency; i++) {
    			new Thread(new Runnable() {
    				public void run() {
    					System.out.println(Thread.currentThread().getName() + "---------我准备好---------------");
    					// 等待一起出发
    					try {
    						cb.await();
    					} catch (InterruptedException | BrokenBarrierException e) {
    						e.printStackTrace();
    					}
    					ZKDistributeImproveLock lock = new ZKDistributeImproveLock("/distLock");
    					try {
    						lock.lock();
    						System.out.println(Thread.currentThread().getName() + " 获得锁!");
    					} finally {
    						lock.unlock();
    					}
    				}
    			}).start();
    		}
    	}
    }

三、Zookeeper集群

1.ZK集群基本概念

ZK集群的示意图如下:
在这里插入图片描述
ZK集群有以下几点需要说明:
(1)ZK集群提供了可靠的ZK服务;
(2)ZK集群只需要大多数 服务器准备好即可提供服务(过半数即可);
(3)强烈建议ZK集群部署奇数个服务器;
(4)建议每个ZK服务部署在单独的服务器上。
搭建ZK集群配置如下:
在这里插入图片描述
参数解释:
(1)initLimit: 集群中从服务器(F)与主服务器(L)间完成初始化同步连接时能容忍的最大心跳数(tickTime数量);若ZK集群确实很大,可适当调大初始化时间;
(2)syncLimit: 从节点与主节点间请求与应答之间能容忍的最大心跳数;
(3)集群节点: server.id=host:port:port;其中,id为通过各自的dataDir目录下创建一个名为myid的文件来为每台机器赋予一个服务器id;第一个port为从节点连到主节点的端口,第二个port为选举领导者的端口;
(4)myid文件: 即一行只包含机器id的文本,id在集群中必须唯一,其值在1到255之间。
【注】集群的所有节点都可以提供服务,客户端连接时,连接串可以指定多个或全部集群节点的地址;当一个节点不通时,客户端将自动切换到另一个节点。

2.ZK集群ZAB协议

对于ZK集群来说,每一个节点上的数据都是全量的数据,也就是说每个节点上的数据都是相同的。那么ZK集群是如何保证这些节点上的数据相同的呢?这个也就是分布式集群中的一致性问题。为了保证整个ZK集群的一致性,写入时只能由Leader节点写入(当然读是所有节点都可以的),Leader节点写完后要进行原子广播给所有节点;当超过半数的Follower节点反馈支持更新时,更新生效。
ZAB协议就是专为ZK设计的数据一致性协议。ZAB协议的特点是有序性,它的过程如下:
(1)所有的写请求都转发给Leader节点;
(2)Leader节点分配全局单调递增的事物id(即zxid),并且广播事物提议;
(3)Follower节点在收到提议后,处理提议,并且返回同意的反馈;
(4)Leader节点在收到超过半数Follower节点的同意反馈后,便会广播提交事物的消息,让其他Follower节点更新数据;
(5)最后Leader节点再响应给最初发送写请求的Follower节点,告知写请求已经完成;同时因为zxid是顺序递增的,所以最后集群中的数据总会保持一致。
示意图如下:
在这里插入图片描述
如果Leader节点出现崩溃,或者因为网络原因导致Leader节点失去与过半Follower节点的联系,ZK集群就会进入崩溃恢复模式 ;在此期间ZK服务不能用,并且会选举新的Leader节点。崩溃恢复有以下原则:
(1)ZAB协议规定,如果一个事物Proposal在一台机器上被处理成功,那么所有的集群都会被处理成功,即便机器出现崩溃;
(2)ZAB协议确保那些已经在Leader节点上提交的输完最终被所有服务器都提交,即便已经在几个Follower上更新完成而在另外几个Follower上还没来得及更新,Leader就崩溃了;最终也会更新完成;
(3)ZAB协议确保丢弃那些只在Leader上被提出的事务;即Leader还没来得及提交提案就崩溃了,这些请求会被丢弃。
根据以上原则,说明在Leader崩溃时,Leader已经提交的提案会被更新成功;Leader还没提交的提案不会被更新成功。
那么如何保证以上的原则做到呢?需以下做法:
(1)让Leader选举算法能保证新选出的Leader拥有集群中所有机器的最高zxid的事务提案;那么就可以保证此选举出来的Leader一定有所有已提交的提案;
(2)让具有最高编号事务提案的机器成为Leader,则可以省去Leader检查提案的提交和丢弃工作这一步骤;选出Leader后再去同步给其他Follower事务提案,如果此时之前的Leader又恢复了发现已经有了新Leader,那么它就变为了Follower,并且抛弃原来还没来得及提交的事务提案。
在崩溃恢复模式结束后,会选举出新的Leader,接着就要进行Leader与Follower的数据同步,当半数的Follower完成数据同步,ZK集群便又可以对外提供ZK服务了同步过程如下:
(1)Leader会为每个Follow准备一个队列,此队列用来存放那些还未来得及跟当前Leader同步数据的Follower;并将那些没被各Follower服务器(也就是这个Leader的队列中的Follower)同步的事务以Proposal消息的形式逐个发送给各Follower,并在每个Proposal消息后紧跟一个Commit消息,表示该事务以及被提交;
(2)Follower将所有其尚未同步的事务Proposal都从Leader服务器上同步过来并且成功应用到本地数据库中后;Leader则会将该Follower加入到真正可用的Follower列表中,并且开始之后的其他流程,那么统计真正可用的Follower超过半数后,ZK集群便可对外提供服务。
在ZK集群中一个重要的特性就是事务ID(zxid)是全局有序的 ,那么zxid是如何生成的呢?zxid是一个64位的数字,以下是zxid的生成方式:
(1)低32位是一个简单的单调递增的计数器,针对客户端的每一个事务请求,Leader在产生一个新的事务Proposal时,都对此计数器加1;
(2)高32位是Leader周期纪元的编号,每选举产生一个新Leader,就从Leader上取出其本地日志中最大编号Proposal的zxid,并且从改zxid中解析出对应的周期纪元值,再对其加1;以此作为新的周期纪元值,同时将低32位置为0,来开始生成新的zxid。
根据以上生成zxid的策略,如果有一个上个leader周期中尚未提交的Proposal的服务器加入集群后,发现和这个新Leader周期的Proposal不一致,那么会让此Follower同步为现在的Proposal事务。

3.ZK集群Leader选举

ZK集群对Leader选举算法有以下的原则:
(1)选出的Leader上需要有最高的zxid;
(2)过半数节点同意。
内置的Leader选举算法如下:
(1)LeaderElection;
(2)FastLeaderElection(默认);
(3)AuthFastLeaderElection。
Leader选举中的概念如下:
(1)服务器ID:myid;
(2)事务ID:为服务器中存放的最大的zxid;
(3)逻辑时钟:发起的投票轮数;
(4)选举状态(分为4种):
< 1>Looking: 竞选状态;
< 2>Following: 随从状态,同步Leader状态,参与投票;
< 3>Observing: 观察状态,同步Leader状态,不参与投票;
< 4>Leading: 领导者状态。
ZK集群选举Leader的过程如下:
(1)每个服务实例均发起选举自己为Leader的投票,为自己拉票,并且自己投给自己;
(2)其他服务实例收到投票邀请时,比较发起者的事务ID是否比自己最大的事务ID还大;大则投票,小则不投票,相等则比较服务器ID,大则投票;
(3)发起者收到其他服务器投票反馈后,看自己的投票数(含自己的票)是否超过半数,超过则成为Leader,若所有发起者都未超过半数则重新投票。

四、分布式一致性协议

在分布式系统中,我们常常面临着数据一致性的问题;这里我们介绍Paxos算法,它是一直解决分布式系统中数据一致性问题的算法;Paxos算法中包含三种角色:
在这里插入图片描述
分别为:
(1)Proposer: 提议者,负责提议;提出想达成一直value的提案;
(2)Acceptor: 接收者,对提案投票;决定是否接受此value的提案;
(3)Learner: 学习者,不参与投票;只接受已经决定的提案。
一个提案包括两个部分:
(1)提案编号: 全局唯一、递增;保证提案的先后顺序;
(2)更新值: 提案要更新的value值。
【注】Paxos算法即两阶段提交和大多数机制。

1.Paxos算法写场景

步骤如下:
(1)准备阶段(投票阶段):
<1>提议者提出提案给接收者;
<2>若接收者同意提案,做出Promise响应,并且不再接收其他的提案;
<3>提议者收到超过半数 的响应,则进入下一提案;否则重新提案。
(2)投票超过半数后,进入接受变更阶段(提交阶段):
<1>提议者向接收者发生Accept信息;
<2>接收者比较Accept信息中的提案号;若比自己当前已Promise的提案号低,则回应Nack,否则Accept,并且广播Accepted。
示意图如下:
在这里插入图片描述
为了更好理解Paxos算法写场景,见如下两个例子:
(一)Paxos先后批准场景:
同一个提案,提议者2,提议者1先后被批准,正常场景,如下图所示:
在这里插入图片描述
步骤如下:
(1)首先提议者2进入投票阶段 ;发出提议(编号1),三个接收者都投票同意了;
(2)接着提议者2进入提交阶段 ;广播Accept,改为值1,三个接收者都接受了;
(3)然后提议者1进入投票阶段 ;又发出提议2(编号2),三个接收者也都投票同意了(因为编号2比编号1大),但是接收者返回“编号1,值1”,告诉提议者1已经接受过提案了;
(4)接着提议者1进入提交阶段 ;此时如果提议者1想把改为值2,是无法改变的,因为已经有值1了且提议编号为2,所以只能广播Accept“编号2,值1”,所有接收者都接受提案;
(5)最后所有节点的值都保持一致。
(二)Paxos先后提议场景:
接收者3与提议者2失联,接受者1与提议者1失联,异常场景,如下图所示:
在这里插入图片描述
步骤如下:
(1)首先提议者2进入投票阶段 ;提议者2发出提议给接收者1和接收者2(编号1),两个接收者都投票同意了(因为这两个接收者能连接上,且超过半数同意);
(2)此时提议者1也进入投票阶段 ;提议者1发出提议给接收者2和接收者3(编号2),两个接收者都投票同意了(因为编号2比编号1大);
(3)接着提议者2进入提交阶段 ;广播Accept,改为值1;对于接收者1,它接收的最大编号为1,可以接受修改;而对于接收者2,它接收的最大编号为2,无法修改成功,返回Nack(编号2);这样提议者2发现有Nack(编号2)的信息,则提议者2的修改不成功;
(4)此时提议者1进入提交阶段 ;广播Accept,改为值2;对于接收者2,它接收的最大编号为2,可以接受修改,变为“编号2,值2”;对于接收者3,它接收的最大编号也为2,也可以接受修改,变为“编号2,值2”,此时与提议者1关联的集群中的值已经确定,为值2
(5)因为提议者2失败了,所以提议者2重新进入投票阶段 ;提议者2发出提议给接收者1和接收者2(编号3);接收者1投票同意(因为编号3大),返回“编号1,值1”;接收者2也投票同意,返回“编号2,值2”;
(6)接着提议者2进入提交阶段 ;此时,提议者2会选择返回回来编号最大的值 ,所以确定为值2(值1被抛弃);广播Accept(“编号3,值2”),两个接收者都接受修改;
(7)最后所有节点的值都保持一致。

2.Paxos算法读场景

Paxos算法读场景示意图如下:
在这里插入图片描述
步骤如下:
(1)接收客户端请求的节点,向集群广播获取大家的当前值;
(2)接收到超过半数相同的值,则返回该值,若自己本地值不同则修改为此超过半数相同的值;
(3)如果得不到超过半数相同的值,则读取失败。

全部评论 (0)

还没有任何评论哟~