,谁说程序员不能有文艺范?


Zookeeper中的角色主要有以下三类,如下表所示:

系统模型如图所示:

客户端通过2181端口与ZooKeeper Service进行连接,不同的ZooKeeper Server之间通过2888或3888端口进行通信。

org.apache.zookeeper.ZooKeeper是ZooKeeper客户端类库中主要的类,所有使用Zookeeper服务的应用程序都必须首先初始化ZooKeeper类的对象。ZooKeeper类提供了对ZooKeeper操作的所有方法,并且该类中的方法都是线程安全的。Client连接成功后,会被自动分配一个Session ID,通过发送周期性的心跳信息来保持会话的有效性。

ZooKeeper API有同步和异步两种方式。在ZooKeeper中连接到Server的客户端进程可以在ZooKeeper节点上设置watcher(实现Watcher接口),当节点发生变化时,客户端能够捕获到并触发相应的动作。但是,每个watcher只能被触发一次,为了保证每次变化都能被捕获到,需要多次进行watcher注册。

ZooKeeper

package org.apache.zookeeper;public class ZooKeeper {    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)    throws IOException {        this(connectString, sessionTimeout, watcher, false);    }        public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,            boolean canBeReadOnly) throws IOException {        watchManager.defaultWatcher = watcher;        ConnectStringParser connectStringParser = new ConnectStringParser(connectString);        HostProvider hostProvider = new StaticHostProvider(                connectStringParser.getServerAddresses());                        // 实现实际上是实例化了一个客户端对象        cnxn = new ClientCnxn(connectStringParser.getChrootPath(),                hostProvider, sessionTimeout, this, watchManager,                getClientCnxnSocket(), canBeReadOnly);        cnxn.start();       }        public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,            long sessionId, byte[] sessionPasswd) throws IOException {        this(connectString, sessionTimeout, watcher, sessionId, sessionPasswd, false);    }        public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,            long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)        throws IOException {         // 略       }}

Watcher

package org.apache.zookeeper;public interface Watcher {    // 定义Event的可能状态    public interface Event {            public enum KeeperState {            @Deprecated                     @Deprecated            Unknown (-1), Disconnected (0), NoSyncConnected (1),SyncConnected (3),            AuthFailed (4), ConnectedReadOnly (5), SaslAuthenticated(6), Expired (-112);        }        // 定义Event的类型        public enum EventType {            None (-1), NodeCreated (1), NodeDeleted (2), NodeDataChanged (3), NodeChildrenChanged (4);        }    }    // 抽象方法    abstract public void process(WatchedEvent event);}

java代码

package com.invic.zk;import java.util.List;import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.AsyncCallback;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.ZooDefs.Ids;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.data.Stat;/** *  * @author lucl * */public class CusZooKeeperClient {    //     private static String connectString = "nnode:2181,dnode1:2181,dnode2:2181";        private static int SESSION_TIMEOUT = 30 * 1000;        /**     * @param args     */    public static void main(String[] args) {        CusZooKeeperClient client = new CusZooKeeperClient();        // 最基本的会话实例        // client.zkSessionInstance();        // 复用sessionID和sessionPasswd来创建会话        // client.zkSessionInstanceWithSidAndPwd();        // 创建节点        // client.createZNode();        // 获取子节点        // client.getChildren();        // 获取数据        // client.getData();        // 更新数据        // client.setData();        // 是否存在        client.exists();    }        /**     * 最基本的会话实例     */    private void zkSessionInstance () {        CountDownLatch latch = new CountDownLatch(1);        ZooKeeper zk = null;        try {            zk = new ZooKeeper(connectString, SESSION_TIMEOUT, new CusZKWatcher(latch));                        latch.await();                        System.out.println("ZK sessionId " + zk.getSessionId() + "\t and state " + zk.getState());        } catch (Exception e) {            e.printStackTrace();        } finally {            if (null != zk) {                 try {                    zk.close();                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        }    }        /**     * 复用sessionID和sessionPasswd创建一个ZooKeeper对象实例(维持之前会话的有效性)     */    private void zkSessionInstanceWithSidAndPwd () {        CountDownLatch latch = new CountDownLatch(1);        ZooKeeper zk = null;        long sessionID = 0;        byte [] sessionPwd = "".getBytes();        try {            System.out.println("============================001==============================");            {                zk = new ZooKeeper(connectString, SESSION_TIMEOUT, new CusZKWatcher(latch));                                latch.await();                    sessionID = zk.getSessionId();                sessionPwd = zk.getSessionPasswd();                System.out.println("ZK sessionId is " + sessionID + "\t and sessionPwd " + new String(sessionPwd));            }            System.out.println("=============================002==============================");            {                zk = new ZooKeeper(connectString, SESSION_TIMEOUT, new CusZKWatcher(latch), sessionID, sessionPwd);            }                        System.out.println("=============================003==============================");            {                // with wrong sessionID and sessionPasswd                long wrongSessionID = 1L;                byte [] wrongSessionPwd = "wrong".getBytes();                zk = new ZooKeeper(connectString, SESSION_TIMEOUT, new CusZKWatcher(latch), wrongSessionID, wrongSessionPwd);            }        } catch (Exception e) {            e.printStackTrace();        } finally {            try {                if (null != zk) {                    zk.close();                }            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }        /**     * 创建节点     */    private void createZNode () {        CountDownLatch latch = new CountDownLatch(1);        ZooKeeper zk = null;        try {            zk = new ZooKeeper(connectString, SESSION_TIMEOUT, new CusZKWatcher(latch));                    latch.await();                        /**             *  同步API             *  create(final String path, byte data[], List
 acl, CreateMode createMode)             */            {                String path = "/zk-sync";                // 内容只支持字节数组,字符串可以直接getBytes(),其他内容需要Hessian或Kryo等序列化工具                byte [] datas = "luchunli".getBytes();                    // Ids.OPEN_ACL_UNSAFE表示任何操作都不受权限控制                String basePath = zk.create(path + "-001", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);                System.out.println("Success create znode : " + basePath);    // `/zk-test-001`                basePath = zk.create(path + "-002", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);                System.out.println("Success create znode : " + basePath);    // `/zk-test-0020000000008`会在path后自动加个数字            }                         /**             *  异步API             */            System.out.println("================================for async===================================");            String path = "/zk-async";            byte [] datas = "luchunli".getBytes();            // 异步中接口不会抛异常,异常都是在回调函数中体现            zk.create(path + "-001", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new AsyncCallback.StringCallback() {                @Override                public void proce***esult(int rc, String path, Object ctx, String name) {                    switch (rc) {                        case 0 :                             System.out.println("Create result code is " + rc + ", " + path + ", " + ctx + ", real name " + name);                            break;                        case -4 :                            System.out.println("Connection abort.");                            break;                        case -110 :                            System.out.println("Node is already exists.");                            break;                        case -112 :                             System.out.println("Session expired.");                            break;                        default:                            System.out.println("Unknown error.");                    }                }            }, "I am Context.");                        zk.create(path + "-002", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {                                @Override                public void proce***esult(int rc, String path, Object ctx, String name) {                    switch (rc) {                        case 0 :                             System.out.println("Create result code is " + rc + ", " + path + ", " + ctx + ", real name " + name);                            break;                        case -4 :                            System.out.println("Connection abort.");                            break;                        case -110 :                            System.out.println("Node is already exists.");                            break;                        case -112 :                             System.out.println("Session expired.");                            break;                        default:                            System.out.println("Unknown error.");                    }                }            }, "I am Context.");                                    Thread.sleep(30 * 1000);        // 为了看到创建的临时节点的效果,zk.close()之后节点就不存在了        } catch (Exception e) {            e.printStackTrace();        } finally  {            try{                if (null != zk) {                    zk.close();                }            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }        /**     * 获取子节点及节点数据     */    private void getChildren () {        CountDownLatch latch = new CountDownLatch(1);                ZooKeeper zk = null;        try {            CusZKWatcher watcher = new CusZKWatcher(latch);            zk = new ZooKeeper(connectString, SESSION_TIMEOUT, watcher);                        latch.await();                        watcher.setZk(zk);                        String path = "/zk-book";            byte [] datas = "luchunli".getBytes();            {                String basePath = zk.create(path, datas, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);                System.out.println("Success create znode : " + basePath);    //            }                        {                String basePath = zk.create(path + "/c1", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);                System.out.println("Success create znode : " + basePath);    //            }                        /**             * 同步API获取子节点             */            {                List
 childs = zk.getChildren(path, true);                System.out.println("In client get children is : " + childs);                        }                        /**             * 异步API获取子节点             */            {                System.out.println("================================for async===================================");                zk.getChildren(path, true, new AsyncCallback.Children2Callback() {                                        @Override                    public void proce***esult(int rc, String path, Object ctx, List
 children, Stat stat) {                        System.out.println("Get children znode result " + rc + ", " + path + ", " + children + ", " + stat);                    }                }, "I am Context.");            }                        {                String basePath = zk.create(path + "/c2", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);                System.out.println("Success create znode : " + basePath);    //            }                        Thread.sleep(30 * 1000);        } catch (Exception e) {            e.printStackTrace();        } finally  {            try{                if (null != zk) {                    zk.close();                }            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }        /**     * 获取值     */    private void getData () {        CountDownLatch latch = new CountDownLatch(1);                ZooKeeper zk = null;        try {            CusZKWatcher watcher = new CusZKWatcher(latch);            zk = new ZooKeeper(connectString, SESSION_TIMEOUT, watcher);                        latch.await();                        watcher.setZk(zk);                        String path = "/zk-test";            byte [] datas = "luchunli".getBytes();            {                String basePath = zk.create(path, datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);                System.out.println("ZK create node " + basePath + " success.");            }                        /**             * 同步API获取数据             */            {                Stat stat = new Stat();                byte [] bytess = zk.getData(path, true, stat);                System.out.println("Get data from zk : " + new String(bytess));                System.out.println("stat info is : " + stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());            }                        {                Stat sstat = zk.setData(path, datas, -1);        // znode节点数据或内容的变化都会引起NodeDataChanged                System.out.println("after setData stat info is : " + sstat.getCzxid() + ", " + sstat.getMzxid() + ", " + sstat.getVersion());            }                        Thread.sleep(30 * 1000);                        /**             * 异步API获取数据             */            System.out.println("================================for async===================================");            zk.getData(path, true, new AsyncCallback.DataCallback() {                                @Override                public void proce***esult(int rc, String path, Object ctx, byte[] data, Stat stat) {                    System.out.println("Get data result " + rc + ", " + path + ", " + new String(data) + ", " + stat);                                    }            }, "I am Context.");                        {                Stat sstat = zk.setData(path, datas, -1);        // znode节点数据或内容的变化都会引起NodeDataChanged                System.out.println("after setData stat info is : " + sstat.getCzxid() + ", " + sstat.getMzxid() + ", " + sstat.getVersion());            }                        Thread.sleep(30 * 1000);        } catch (Exception e) {            e.printStackTrace();        } finally  {            try{                if (null != zk) {                    zk.close();                }            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }        /**     * 更新值     */    private void setData () {        /**         * setData(final String path, byte data[], int version)         * 或         * setData(final String path, byte data[], int version, StatCallback cb, Object ctx)         *          * version说明:getData中无version,ZooKeeper的setData接口中的version参数是由CAS理论演化而来         *          */        CountDownLatch latch = new CountDownLatch(1);                ZooKeeper zk = null;        try {            CusZKWatcher watcher = new CusZKWatcher(latch);            zk = new ZooKeeper(connectString, SESSION_TIMEOUT, watcher);                        latch.await();                        watcher.setZk(zk);                        String path = "/zk-test";            byte [] datas = "luchunli".getBytes();            {                String basePath = zk.create(path, datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);                System.out.println("ZK create node " + basePath + " success.");            }                        /**             * 同步API更新数据             */            Stat stat = zk.setData(path, datas, -1);        // 传入参数-1,表示客户端基于数据的最新版本进行更新            System.out.println("stat info is : " + stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());            Stat stat2 = zk.setData(path, datas, stat.getVersion());    // 基于上一次的version更新数据,更新成功            System.out.println("stat2 info is : " + stat2.getCzxid() + ", " + stat2.getMzxid() + ", " + stat2.getVersion());            Stat stat3 = zk.setData(path, datas, stat.getVersion());    // 仍然基于之前的version,更新数据失败(BadVersion for /zk-test)            System.out.println("stat3 info is : " + stat3.getCzxid() + ", " + stat3.getMzxid() + ", " + stat3.getVersion());                        /**             * 异步API更新数据             */            System.out.println("================================for async===================================");            zk.setData(path, datas, -1, new AsyncCallback.StatCallback() {                                @Override                public void proce***esult(int rc, String path, Object ctx, Stat stat) {                    System.out.println("Get data result " + rc + ", " + path + ", " + ctx + ", " + stat);                }            }, "I am Context");                        Thread.sleep(30 * 1000);        } catch (Exception e) {            e.printStackTrace();        }    }        /**     * 判断指定的znode是否存在     * @param path     * @return     */    private void exists () {        CountDownLatch latch = new CountDownLatch(1);                ZooKeeper zk = null;        try {            CusZKWatcher watcher = new CusZKWatcher(latch);            zk = new ZooKeeper(connectString, SESSION_TIMEOUT, watcher);                        latch.await();                        watcher.setZk(zk);                        String path = "/zk-test";            {                Stat stat = zk.exists(path, true);                if (null == stat) {                    System.out.println("Node " + path + " not exists.");                } else {                    System.out.println("stat info is : " + stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());                }            }                        byte [] datas = "luchunli".getBytes();            {                zk.create(path, datas, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);        // 需要为PERSISTENT节点,否则无法创建子节点                zk.exists(path, true);                zk.setData(path, "test".getBytes(), -1);                zk.create(path + "/c1", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);    // 子节点的变化客户端未watch到                zk.delete(path + "/c1", -1);                zk.delete(path, -1);            }                        Thread.sleep(30 * 1000);        } catch (Exception e) {            e.printStackTrace();        } finally {            if (null != zk) {                try {                    zk.close();                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        }    }        /**     * 删除节点-无法删除非空节点     */    private void delZNode () {        /**         *  delete(final String path, int version)         *  或         *  delete(final String path, int version, VoidCallback cb, Object ctx)         *           *  若version传入-1则直接删除         */    }    }

    观察者

package com.invic.zk;import java.util.List;import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.Watcher.Event.EventType;import org.apache.zookeeper.Watcher.Event.KeeperState;import org.apache.zookeeper.data.Stat;/** *  * @author lucl * @description 自定义观察者 * */public class CusZKWatcher implements Watcher {    private CountDownLatch latch = null;    private ZooKeeper zk = null;        private CusZKWatcher() {        // TODO Auto-generated constructor stub    }        public CusZKWatcher (CountDownLatch latch) {        this.latch = latch;    }        public ZooKeeper getZk() {        return zk;    }    public void setZk(ZooKeeper zk) {        this.zk = zk;    }    @Override    public void process(WatchedEvent event) {        if (null == latch) {            return;        }        KeeperState state = event.getState();        int stateValue = state.getIntValue();        EventType eventType = event.getType();        int typeValue = eventType.getIntValue();                System.out.println("KeeperState is : " + stateValue + "\tEventType : " + typeValue);                try {            if (KeeperState.SyncConnected == state) {                System.out.println("zk session established in mode of SyncConnected.");                if (EventType.None == eventType && null == event.getPath()) {                    this.latch.countDown();                } else if (EventType.NodeCreated == eventType) {                    System.out.println("Node " + event.getPath() + " created.");                    Stat stat = zk.exists(event.getPath(), true);                    System.out.println("stat info is : " + stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());                } else if (EventType.NodeDeleted == eventType) {                    System.out.println("Node " + event.getPath() + " deleted.");                    Stat stat = zk.exists(event.getPath(), true);                    if (null == stat) {                        System.out.println("Znod " + event.getPath() + " not exists.");                    } else {                        System.out.println("stat info is : " + stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());                    }                } else if (EventType.NodeChildrenChanged == eventType) {                            if (null != this.getZk()) {                        List
 result = this.getZk().getChildren(event.getPath(), true);                        System.out.println("Reget children in watcher, result : " + result);                    }                } else if (EventType.NodeDataChanged == eventType) {                                if (null != this.getZk()) {                        Stat stat = new Stat();                        byte[] datas = this.getZk().getData(event.getPath(), true, stat);                        System.out.println("Reget data from zk : " + new String(datas));                        System.out.println("Restat info is : " + stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());                    }                } else {                    System.out.println("Unknown type : " + eventType);                }            } else {                System.out.println("Unknown error : " + state);            }        } catch (Exception e) {            e.printStackTrace();        }    }}

3、其他客户端API

    ZkClient

    Curator