ZooKeeper与Curator注册和监控方法

这篇文章主要介绍“ZooKeeper与Curator注册和监控方法”,在日常操作中,相信很多人在ZooKeeper与Curator注册和监控方法问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”ZooKeeper与Curator注册和监控方法”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

这篇文章主要介绍“ZooKeeper与Curator注册和监控方法”,在日常操作中,相信很多人在ZooKeeper与Curator注册和监控方法问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”ZooKeeper与Curator注册和监控方法”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

创新互联公司坚持“要么做到,要么别承诺”的工作理念,服务领域包括:成都网站设计、成都网站建设、企业官网、英文网站、手机端网站、网站推广等服务,满足客户于互联网时代的贞丰网站设计、移动媒体设计的需求,帮助企业找到有效的互联网解决方案。努力成为您成熟可靠的网络建设合作伙伴!

Curator提供了对zookeeper客户端的封装,并监控连接状态和会话session,特别是会话session过期后,curator能够重新连接zookeeper,并且创建一个新的session。

对于zk的使用者来说,session的概念至关重要,如果想了解更多session的说明,请访问:

http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html

zk客户端和zk间主要可能存在下面几种异常情况:

短暂失去连接:此时客户端检测到与服务端的连接已经断开,但是服务端维护的客户端session尚未过期,之后客户端和服务端重新建立了连接;当客户端重新连接后,由于session没有过期,zookeeper能够保证连接恢复后保持正常服务。

失去连接时间很长:此时服务器相对于客户端的session已经过期了,与先前session相关的watcher和ephemeral的路径和数据都会消失;当Curator重新创建了与zk的连接后,会获取到session expired异常,Curator会销毁先前的session,并且会创建一个新的session,需要注意的是,与之前session相关的watcher和ephemeral类型的路径和数据在新的session中也不会存在,需要开发者在CuratorFramework.getConnectionStateListenable().addListener()中添加状态监听事件,对ConnectionState.LOST事件进行监听,当session过期后,使得之前的session状态得以恢复。对于ephemeral类型,在客户端应该保持数据的状态,以便及时恢复。

客户端重新启动:不论先前的zk session是否已经过期,都需要重新创建临时节点、添加数据和watch事件,先前的session也会在稍后的一段时间内过期。

Zk服务器重新启动:由于zk将session信息存放到了硬盘上,因此重启后,先前未过期的session仍然存在,在zk服务器启动后,客户端与zk服务器创建新的连接,并使用先前的session,与1相同。

需要注意的是,当session过期了,在session过期期间另外的客户端修改了zk的值,那么这个修改在客户端重新连接到zk上时,zk客户端不会接收到这个修改的watch事件(尽管添加了watch),如果需要严格的watch逻辑,就需要在curator的状态监控中添加逻辑。

特别提示:watcher仅仅是一次性的,zookeeper通知了watcher事件后,就会将这个watcher从session中删除,因此,如果想继续监控,就要添加新的watcher。

下面提供了对persistent和ephemeral两种类型节点的监控方法,其中get方法说明了persistent节点如何监控,而register方法说明了ephemeral类型的节点如何监控。

package demo;import java.net.InetAddress;import java.nio.charset.Charset;import java.util.concurrent.ConcurrentSkipListSet;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.api.CuratorWatcher;import org.apache.curator.framework.state.ConnectionState;import org.apache.curator.framework.state.ConnectionStateListener;import org.apache.curator.retry.RetryNTimes;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher.Event.EventType;import org.apache.zookeeper.ZooDefs;import org.apache.zookeeper.data.Stat;public class CuratorTest {    private CuratorFramework zkTools;    private ConcurrentSkipListSet<String> watchers = new ConcurrentSkipListSet<String>();    private static Charset charset = Charset.forName("utf-8");    public CuratorTest() {        zkTools = CuratorFrameworkFactory.builder()                .connectString("192.168.0.216:3306")                .namespace("zk/test")                .retryPolicy(new RetryNTimes(2000, 20000))                .build();        zkTools.start();    }    public void addReconnectionWatcher(final String path, final ZookeeperWatcherType watcherType,            final CuratorWatcher watcher) {        synchronized (this) {            if (!watchers.contains(watcher.toString()))// 不要添加重复的监听事件            {                watchers.add(watcher.toString());                System.out.println("add new watcher " + watcher);                zkTools.getConnectionStateListenable().addListener(new ConnectionStateListener() {                    @Override                    public void stateChanged(CuratorFramework client, ConnectionState newState) {                        System.out.println(newState);                        if (newState == ConnectionState.LOST) {// 处理session过期                            try {                                if (watcherType == ZookeeperWatcherType.EXITS) {                                    zkTools.checkExists().usingWatcher(watcher).forPath(path);                                } else if (watcherType == ZookeeperWatcherType.GET_CHILDREN) {                                    zkTools.getChildren().usingWatcher(watcher).forPath(path);                                } else if (watcherType == ZookeeperWatcherType.GET_DATA) {                                    zkTools.getData().usingWatcher(watcher).forPath(path);                                } else if (watcherType == ZookeeperWatcherType.CREATE_ON_NO_EXITS) {                                    // ephemeral类型的节点session过期了,需要重新创建节点,并且注册监听事件,之后监听事件中,                                    // 会处理create事件,将路径值恢复到先前状态                                    Stat stat = zkTools.checkExists().usingWatcher(watcher)                                            .forPath(path);                                    if (stat == null) {                                        System.err.println("to create");                                        zkTools.create().creatingParentsIfNeeded()                                                .withMode(CreateMode.EPHEMERAL)                                                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path);                                    }                                }                            } catch (Exception e) {                                e.printStackTrace();                            }                        }                    }                });            }        }    }    public void create() throws Exception {        zkTools.create()// 创建一个路径                .creatingParentsIfNeeded()// 如果指定的节点的父节点不存在,递归创建父节点                .withMode(CreateMode.PERSISTENT)// 存储类型(临时的还是持久的)                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)// 访问权限                .forPath("zk/test");// 创建的路径    }    public void put() throws Exception {        // 对路径节点赋值        zkTools.setData().forPath("zk/test", "hello world".getBytes(Charset.forName("utf-8")));    }    public void get() throws Exception {        String path = "zk/test";        ZKWatch watch = new ZKWatch(path);        byte[] buffer = zkTools.getData().usingWatcher(watch).forPath(path);        System.out.println(new String(buffer, charset));        // 添加session过期的监控        addReconnectionWatcher(path, ZookeeperWatcherType.GET_DATA, watch);    }    public void register() throws Exception {        String ip = InetAddress.getLocalHost().getHostAddress();        String registeNode = "zk/register/" + ip;// 节点路径        byte[] data = "disable".getBytes(charset);// 节点值        CuratorWatcher watcher = new ZKWatchRegister(registeNode, data); // 创建一个register watcher        Stat stat = zkTools.checkExists().forPath(registeNode);        if (stat != null) {            zkTools.delete().forPath(registeNode);        }        zkTools.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(registeNode, data);// 创建的路径和值        // 添加到session过期监控事件中        addReconnectionWatcher(registeNode, ZookeeperWatcherType.CREATE_ON_NO_EXITS, watcher);        data = zkTools.getData().usingWatcher(watcher).forPath(registeNode);        System.out.println("get path form zk : " + registeNode + ":" + new String(data, charset));    }    public static void main(String[] args) throws Exception {        CuratorTest test = new CuratorTest();        test.get();        test.register();        Thread.sleep(10000000000L);    }    public class ZKWatch implements CuratorWatcher {        private final String path;        public String getPath() {            return path;        }        public ZKWatch(String path) {            this.path = path;        }        @Override        public void process(WatchedEvent event) throws Exception {            System.out.println(event.getType());            if (event.getType() == EventType.NodeDataChanged) {                byte[] data = zkTools.getData().usingWatcher(this).forPath(path);                System.out.println(path + ":" + new String(data, Charset.forName("utf-8")));            }        }    }    public class ZKWatchRegister implements CuratorWatcher {        private final String path;        private byte[] value;        public String getPath() {            return path;        }        public ZKWatchRegister(String path, byte[] value) {            this.path = path;            this.value = value;        }        @Override        public void process(WatchedEvent event) throws Exception {            System.out.println(event.getType());            if (event.getType() == EventType.NodeDataChanged) {                // 节点数据改变了,需要记录下来,以便session过期后,能够恢复到先前的数据状态                byte[] data = zkTools.getData().usingWatcher(this).forPath(path);                value = data;                System.out.println(path + ":" + new String(data, charset));            } else if (event.getType() == EventType.NodeDeleted) {                // 节点被删除了,需要创建新的节点                System.out.println(path + ":" + path + " has been deleted.");                Stat stat = zkTools.checkExists().usingWatcher(this).forPath(path);                if (stat == null) {                    zkTools.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)                            .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path);                }            } else if (event.getType() == EventType.NodeCreated) {                // 节点被创建时,需要添加监听事件(创建可能是由于session过期后,curator的状态监听部分触发的)                System.out.println(path + ":" + " has been created!" + "the current data is "                        + new String(value));                zkTools.setData().forPath(path, value);                zkTools.getData().usingWatcher(this).forPath(path);            }        }    }    public enum ZookeeperWatcherType {        GET_DATA, GET_CHILDREN, EXITS, CREATE_ON_NO_EXITS    }}

本文名称:ZooKeeper与Curator注册和监控方法
本文链接:https://www.cdcxhl.com/article2/choic.html

成都网站建设公司_创新互联,为您提供静态网站搜索引擎优化网站营销商城网站定制开发响应式网站

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联

搜索引擎优化