大家好,我是程序员xiao熊,今天分享的内容是使用 ZooKeeper来实现生产者-消费者队列,具体的实现类为 ZookeeperQueue,涉及的内容主要包含Zookeeper数据模型、监听相关的知识点,有兴趣的读者可以参考以下文章:
此外,我们需要准备一个可以运行的ZooKeeper 服务,单机版的即可;接下来进入正题,本篇文章主要分为以下主要的几个部分
生产者-消费者队列是生产者消费者模型的一种实现,进程组使用它来存储生产和消费的元素,利用队列先进先出的特性顺序处理数据。生产者进程创建新元素并将它们添加到队列中,消费者进程从列表中获取元素,并处理它们。流程如下图所示:
Zookeeper生产者-消费者模型流程图
使用Zookeeper实现生产者-消费者队列的原理是在Zookeeper中创建一个节点表示队列,然后在这个节点下面创建子节点来存储生产者生产的元素,通过删除子节点来实现消费者消费元素;其中涉及到以下几个关键的问题:
针对这些问题,在zookeeper中,我们可以使用如下的解决方案(主要涉及zookeeper的数据模型、监听的知识点), 解决方案的序号对应问题的序号:
在了解了生产者-消费者队列的原理以及相关的关键问题和解决方案之后,接下来我们来看具体的执行逻辑,如下图所示:
Zookeeper生产者-消费者队列执行逻辑
基于以上的解决方案以及执行逻辑,实现方案的具体详细设计如下:
Zookeeper生产者-消费者队列详细设计
接下来看代码实现:
一、初始化Zookeeper队列对象
(1)创建zookeeper客户端对象:
ZooKeeper zk = new ZooKeeper(address, timeouts, watcher);
参数说明:
(2)创建队列节点:
zk.create(root, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) ,其目的是作为单个进程节点的父节点。
参数说明:
(3)创建同步对象:
Integer mutex = new Integer(-1)
(4)监听队列节点:参考(1)步骤的watcher参数,在本例中,使用this对象,表示使用当前对象作为监听器,因此需要初始化zookeeper客户端的类实现Watcher接口,并实现如下方法:
public void process(WatchedEvent event){}
完整代码如下:
public class ZookeeperQueue implements Watcher{
static ZooKeeper zk = null;
static Integer mutex;
String root;
public ZookeeperQueue(String address, String root, int size) {
try {
// 1. 创建zookeeper客户端对象
if(zk == null){
System.out.println("Starting ZK:");
zk = new ZooKeeper(address, 3000, this);
System.out.println("Finished starting ZK: " + zk);
}
// 2. 创建队列节点
this.root = root;
if (zk != null) {
Stat s = zk.exists(root, false);
if (s == null) {
zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
// 3. 设置同步对象
mutex = new Integer(-1);
} catch (IOException e) {
System.out.println(e.toString());
zk = null;
} catch (KeeperException e) {
System.out.println("Keeper exception when instantiating queue: " + e.toString());
} catch (InterruptedException e) {
System.out.println("Interrupted exception");
}
}
// 4. 监听队列节点
synchronized public void process(WatchedEvent event) {
synchronized (mutex) {
mutex.notify();
}
}
}
二、调用produce()方法,生产元素
(1)在屏障节点”/zkQueue”下创建子节点
zk.create(root + "/ele", data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
参数说明:
完整代码如下:
boolean produce() throws KeeperException, InterruptedException{
zk.create(root + "/ele", content, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
三、处理屏障节点的监听事件
在监听事件处理方法中,直接唤醒各个进程,然后让处于等待状态的消费者继续进行消费,代码如下:
// 6. 监听屏障节点
synchronized public void process(WatchedEvent event) {
// 接收到监听事件,则本次监听失效,需要通过后续的getChildren方法设置下一轮的监听
synchronized (mutex) {
mutex.notify();
}
}
四、创建consume()方法,消费元素
(1) 在获取节点”/queue”下的子节点
List
参数说明:
(2)判断是否可以消费元素
如果子节点数量为空,则等待生产者生产元素,否则遍历获取到的子节点,取序列号最小的节点,然后将节点删除;结束本次消费
完整代码如下:
public Boolean consume() throws InterruptedException, KeeperException {
Stat stat = new Stat();
// 开启消费
while (true) {
synchronized (mutex) {
List eles = zk.getChildren(root, true);
if (CollectionUtils.isEmpty(eles)) {
mutex.wait();
} else {
String number = eles.get(0).substring(3);
// 获取最小序列号的元素(即队列中的第一个元素)
for (String ele : eles) {
String tmpNumber = ele.substring(3);
if (Integer.valueOf(tmpNumber) <= Integer.valueOf(number)) {
number = tmpNumber;
}
}
byte[] content = zk.getData(root + "/ele" + number, false, stat);
System.out.println("消费元素:" + new String(content));
// 删除元素
zk.delete(root + "/ele" + number, stat.getVersion());
return true;
}
}
}
}
队列的验证测试比较简单,可以使用maven打包只有运行,或者直接在IDE工具中运行代码即可;完整的测试验证队列的代码如下
public class App {
public static void main( String[] args ) {
ZookeeperQueue queue = new ZookeeperQueue("localhost:2181", "/queue");
try {
// 生产者线程
Thread producer = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
queue.produce("node" + i);
// 增加睡眠时间可以更好的观察执行过程
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
});
// 消费者线程
Thread consumer = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
queue.consume();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
});
//先开始消费,可以体现出消费者等待生产者进行通知的过程
consumer.start();
// 生产数据
producer.start();
}catch (Exception e) {
}
}
本示例中的生产者-消费者队列注释一个简单的示例,还有很多优化的空间,开发小伙伴们可以尝试着去优化;
以上就是为大家分享的内容,欢迎大家在评论区里面留言,后续也会分享Zookeeper实践相关的文章以及其他技术的文章;
私信我可获取全部完整的代码以及文章的高清图片!!!
| 留言与评论(共有 0 条评论) “” |