如何使用 ZooKeeper实现生产者-消费者队列

大家好,我是程序员xiao熊,今天分享的内容是使用 ZooKeeper来实现生产者-消费者队列,具体的实现类为 ZookeeperQueue,涉及的内容主要包含Zookeeper数据模型、监听相关的知识点,有兴趣的读者可以参考以下文章:

此外,我们需要准备一个可以运行的ZooKeeper 服务,单机版的即可;接下来进入正题,本篇文章主要分为以下主要的几个部分

  1. 生产者-消费者队列介绍
  2. 使用Zookeeper实现生产者-消费者队列的原理
  3. 生产者-消费者队列的流程以及具体的实现步骤
  4. 测试验证
  5. 总结

生产者-消费者队列介绍

生产者-消费者队列是生产者消费者模型的一种实现,进程组使用它来存储生产和消费的元素,利用队列先进先出的特性顺序处理数据。生产者进程创建新元素并将它们添加到队列中,消费者进程从列表中获取元素,并处理它们。流程如下图所示:

如何使用 ZooKeeper实现生产者-消费者队列

Zookeeper生产者-消费者模型流程图

使用Zookeeper实现生产者-消费者队列的原理

使用Zookeeper实现生产者-消费者队列的原理是在Zookeeper中创建一个节点表示队列,然后在这个节点下面创建子节点来存储生产者生产的元素,通过删除子节点来实现消费者消费元素;其中涉及到以下几个关键的问题:

  1. 如何使用Zookeeper来表示队列
  2. 如何使用Zookeeper存储元素(元素入列)
  3. 如何使用Zookeeper消费元素(元素出列)
  4. 如何通过Zookeeper实现先进先出的特性
  5. 当队列由空变为非空时,如何通知消费者消费元素

针对这些问题,在zookeeper中,我们可以使用如下的解决方案(主要涉及zookeeper的数据模型、监听的知识点), 解决方案的序号对应问题的序号:

  1. 在Zookeeper中创建一个节点“/queue”,用来表示队列
  2. 通过在“/queue”节点下创建子节点来存储元素
  3. 消费者在消费元素,可以通过删除“/queue”节点下的子节点实现消费元素
  4. 在创建子节点时,给每一个子节点添加一个序列号,消费时,从最小序列号的元素开始消费
  5. 当队列元素不为空,则通过Zookeeper的监听机制可以通知消费者进行消费

生产者-消费者队列的执行逻辑以及具体的实现步骤

在了解了生产者-消费者队列的原理以及相关的关键问题和解决方案之后,接下来我们来看具体的执行逻辑,如下图所示:

如何使用 ZooKeeper实现生产者-消费者队列

Zookeeper生产者-消费者队列执行逻辑

基于以上的解决方案以及执行逻辑,实现方案的具体详细设计如下:

如何使用 ZooKeeper实现生产者-消费者队列

Zookeeper生产者-消费者队列详细设计

接下来看代码实现:

一、初始化Zookeeper队列对象

(1)创建zookeeper客户端对象:

ZooKeeper zk = new ZooKeeper(address, timeouts, watcher);

参数说明:

  • address:表示zookeeper的连接地址
  • timeouts:表示超时时间
  • watcher:表示监听器,指定为this,同时队列类需要实现Watcher接口

(2)创建队列节点:

zk.create(root, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) ,其目的是作为单个进程节点的父节点。

参数说明:

  • root:表示队列节点,传递字符串,例如“/queue”
  • data:表示节点的数据,可以不传值,例如:new byte[0]
  • ZooDefs.Ids.OPEN_ACL_UNSAFE:表示ACL类型
  • CreateMode.PERSISTENT:表示是永久性的节点

(3)创建同步对象:

Integer mutex = new Integer(-1)

4)监听队列节点:参考(1)步骤的watcher参数,在本例中,使用this对象,表示使用当前对象作为监听器,因此需要初始化zookeeper客户端的类实现Watcher接口,并实现如下方法:

public void process(WatchedEvent event){}

完整代码如下:

public class ZookeeperQueue implements Watcher{
static ZooKeeper zk = null;
    static Integer mutex;
    String root;
   public ZookeeperQueue(String address, String root, int size) {
        try {
            // 1. 创建zookeeper客户端对象
            if(zk == null){

                System.out.println("Starting ZK:");
                zk = new ZooKeeper(address, 3000, this);
                System.out.println("Finished starting ZK: " + zk);
            }

            // 2. 创建队列节点
            this.root = root;
            if (zk != null) {
                Stat s = zk.exists(root, false);
                if (s == null) {
                    zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }
            }

            // 3. 设置同步对象
            mutex = new Integer(-1);

        } catch (IOException e) {
            System.out.println(e.toString());
            zk = null;
        } catch (KeeperException e) {
            System.out.println("Keeper exception when instantiating queue: " + e.toString());
        } catch (InterruptedException e) {
            System.out.println("Interrupted exception");
        }
    }

   // 4. 监听队列节点
synchronized public void process(WatchedEvent event) {
        synchronized (mutex) {
              mutex.notify();
          }
   }
}

二、调用produce()方法,生产元素

(1)在屏障节点”/zkQueue”下创建子节点

zk.create(root + "/ele", data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);

参数说明:

  • root + "/ele" :队列元素路径,其中root表示队列节点,”/ele”是元素前缀,再利用CreateMode.EPHEMERAL_SEQUENTIAL参数的序列号即可生成完整的元素;
  • content:表示节点name中的数据,即队列元素的具体值,类型为字节数组byte[],开发者可根据具体场景设置不同的值;
  • ZooDefs.Ids.OPEN_ACL_UNSAFE:表示ACL类型
  • CreateMode.EPHEMERAL_SEQUENTIAL:表示是临时的并且是有序列号的节点类型,因为队列元素因为是存储在内存中,生产元素的进程所在机器出现故障,则元素也应该进行重置,利用zk临时节点的特性,在该进程的服务器出现故障之后,zk能够自动删除元素,如果是生产者所在机器出现故障不影响元素,则使用CreateMode.PERSISTENT_SEQUENTIAL,这样开发者只需要关注业务上的功能即可;

完整代码如下:

boolean produce() throws KeeperException, InterruptedException{
				zk.create(root + "/ele", content, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}

三、处理屏障节点的监听事件

在监听事件处理方法中,直接唤醒各个进程,然后让处于等待状态的消费者继续进行消费,代码如下:

// 6. 监听屏障节点
synchronized public void process(WatchedEvent event) {
    // 接收到监听事件,则本次监听失效,需要通过后续的getChildren方法设置下一轮的监听
		synchronized (mutex) {
        mutex.notify();
    }
}

四、创建consume()方法,消费元素

(1) 在获取节点”/queue”下的子节点

List eles = zk.getChildren(“/queue”, true)

参数说明:

  • 第一个参数是队列节点路径
  • 第二个参数为true,表示继续监听屏障节点(由于本例中设置的zk监听是标准的监听,标准的监听是一次性的,所以需要每次收到监听器发出的通知之后,继续设置监听,以便后续的处理)

(2)判断是否可以消费元素

如果子节点数量为空,则等待生产者生产元素,否则遍历获取到的子节点,取序列号最小的节点,然后将节点删除;结束本次消费

完整代码如下:

public Boolean consume() throws InterruptedException, KeeperException {
Stat stat = new Stat();
    // 开启消费
    while (true) {
        synchronized (mutex) {
            List eles = zk.getChildren(root, true);
            if (CollectionUtils.isEmpty(eles)) {
                mutex.wait();
            } else {
                String number = eles.get(0).substring(3);
// 获取最小序列号的元素(即队列中的第一个元素)
                for (String ele : eles) {
                    String tmpNumber = ele.substring(3);
                    if (Integer.valueOf(tmpNumber) <= Integer.valueOf(number)) {
                        number = tmpNumber;
                    }
                }
                byte[] content = zk.getData(root + "/ele" + number, false, stat);
                System.out.println("消费元素:" + new String(content));
                // 删除元素
                zk.delete(root + "/ele" + number, stat.getVersion());
                return true;
            }
        }
    }
}

测试验证

队列的验证测试比较简单,可以使用maven打包只有运行,或者直接在IDE工具中运行代码即可;完整的测试验证队列的代码如下

public class App {
    public static void main( String[] args ) {
       ZookeeperQueue queue = new ZookeeperQueue("localhost:2181", "/queue");
try {
    // 生产者线程
    Thread producer = new Thread(new Runnable() {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    queue.produce("node" + i);
                    // 增加睡眠时间可以更好的观察执行过程
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                }
            }
        }
    });

    // 消费者线程
    Thread consumer = new Thread(new Runnable() {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    queue.consume();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                }
            }
        }
    });
    //先开始消费,可以体现出消费者等待生产者进行通知的过程
    consumer.start();
    // 生产数据
    producer.start();

}catch (Exception e) {

}
}

总结

本示例中的生产者-消费者队列注释一个简单的示例,还有很多优化的空间,开发小伙伴们可以尝试着去优化;

以上就是为大家分享的内容,欢迎大家在评论区里面留言,后续也会分享Zookeeper实践相关的文章以及其他技术的文章;

私信我可获取全部完整的代码以及文章的高清图片!!!

发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章