,谁说程序员不能有文艺范?
Zookeeper中的角色主要有以下三类,如下表所示:
系统模型如图所示:
客户端通过2181端口与ZooKeeper Service进行连接,不同的ZooKeeper Server之间通过2888或3888端口进行通信。
org.apache.zookeeper.ZooKeeper是ZooKeeper客户端类库中主要的类,所有使用Zookeeper服务的应用程序都必须首先初始化ZooKeeper类的对象。ZooKeeper类提供了对ZooKeeper操作的所有方法,并且该类中的方法都是线程安全的。Client连接成功后,会被自动分配一个Session ID,通过发送周期性的心跳信息来保持会话的有效性。
ZooKeeper API有同步和异步两种方式。在ZooKeeper中连接到Server的客户端进程可以在ZooKeeper节点上设置watcher(实现Watcher接口),当节点发生变化时,客户端能够捕获到并触发相应的动作。但是,每个watcher只能被触发一次,为了保证每次变化都能被捕获到,需要多次进行watcher注册。
ZooKeeper
package org.apache.zookeeper;public class ZooKeeper { public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException { this(connectString, sessionTimeout, watcher, false); } public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException { watchManager.defaultWatcher = watcher; ConnectStringParser connectStringParser = new ConnectStringParser(connectString); HostProvider hostProvider = new StaticHostProvider( connectStringParser.getServerAddresses()); // 实现实际上是实例化了一个客户端对象 cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); cnxn.start(); } public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd) throws IOException { this(connectString, sessionTimeout, watcher, sessionId, sessionPasswd, false); } public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) throws IOException { // 略 }}
Watcher
package org.apache.zookeeper;public interface Watcher { // 定义Event的可能状态 public interface Event { public enum KeeperState { @Deprecated @Deprecated Unknown (-1), Disconnected (0), NoSyncConnected (1),SyncConnected (3), AuthFailed (4), ConnectedReadOnly (5), SaslAuthenticated(6), Expired (-112); } // 定义Event的类型 public enum EventType { None (-1), NodeCreated (1), NodeDeleted (2), NodeDataChanged (3), NodeChildrenChanged (4); } } // 抽象方法 abstract public void process(WatchedEvent event);}
java代码
package com.invic.zk;import java.util.List;import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.AsyncCallback;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.ZooDefs.Ids;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.data.Stat;/** * * @author lucl * */public class CusZooKeeperClient { // private static String connectString = "nnode:2181,dnode1:2181,dnode2:2181"; private static int SESSION_TIMEOUT = 30 * 1000; /** * @param args */ public static void main(String[] args) { CusZooKeeperClient client = new CusZooKeeperClient(); // 最基本的会话实例 // client.zkSessionInstance(); // 复用sessionID和sessionPasswd来创建会话 // client.zkSessionInstanceWithSidAndPwd(); // 创建节点 // client.createZNode(); // 获取子节点 // client.getChildren(); // 获取数据 // client.getData(); // 更新数据 // client.setData(); // 是否存在 client.exists(); } /** * 最基本的会话实例 */ private void zkSessionInstance () { CountDownLatch latch = new CountDownLatch(1); ZooKeeper zk = null; try { zk = new ZooKeeper(connectString, SESSION_TIMEOUT, new CusZKWatcher(latch)); latch.await(); System.out.println("ZK sessionId " + zk.getSessionId() + "\t and state " + zk.getState()); } catch (Exception e) { e.printStackTrace(); } finally { if (null != zk) { try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } } } /** * 复用sessionID和sessionPasswd创建一个ZooKeeper对象实例(维持之前会话的有效性) */ private void zkSessionInstanceWithSidAndPwd () { CountDownLatch latch = new CountDownLatch(1); ZooKeeper zk = null; long sessionID = 0; byte [] sessionPwd = "".getBytes(); try { System.out.println("============================001=============================="); { zk = new ZooKeeper(connectString, SESSION_TIMEOUT, new CusZKWatcher(latch)); latch.await(); sessionID = zk.getSessionId(); sessionPwd = zk.getSessionPasswd(); System.out.println("ZK sessionId is " + sessionID + "\t and sessionPwd " + new String(sessionPwd)); } System.out.println("=============================002=============================="); { zk = new ZooKeeper(connectString, SESSION_TIMEOUT, new CusZKWatcher(latch), sessionID, sessionPwd); } System.out.println("=============================003=============================="); { // with wrong sessionID and sessionPasswd long wrongSessionID = 1L; byte [] wrongSessionPwd = "wrong".getBytes(); zk = new ZooKeeper(connectString, SESSION_TIMEOUT, new CusZKWatcher(latch), wrongSessionID, wrongSessionPwd); } } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != zk) { zk.close(); } } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 创建节点 */ private void createZNode () { CountDownLatch latch = new CountDownLatch(1); ZooKeeper zk = null; try { zk = new ZooKeeper(connectString, SESSION_TIMEOUT, new CusZKWatcher(latch)); latch.await(); /** * 同步API * create(final String path, byte data[], Listacl, CreateMode createMode) */ { String path = "/zk-sync"; // 内容只支持字节数组,字符串可以直接getBytes(),其他内容需要Hessian或Kryo等序列化工具 byte [] datas = "luchunli".getBytes(); // Ids.OPEN_ACL_UNSAFE表示任何操作都不受权限控制 String basePath = zk.create(path + "-001", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("Success create znode : " + basePath); // `/zk-test-001` basePath = zk.create(path + "-002", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("Success create znode : " + basePath); // `/zk-test-0020000000008`会在path后自动加个数字 } /** * 异步API */ System.out.println("================================for async==================================="); String path = "/zk-async"; byte [] datas = "luchunli".getBytes(); // 异步中接口不会抛异常,异常都是在回调函数中体现 zk.create(path + "-001", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new AsyncCallback.StringCallback() { @Override public void proce***esult(int rc, String path, Object ctx, String name) { switch (rc) { case 0 : System.out.println("Create result code is " + rc + ", " + path + ", " + ctx + ", real name " + name); break; case -4 : System.out.println("Connection abort."); break; case -110 : System.out.println("Node is already exists."); break; case -112 : System.out.println("Session expired."); break; default: System.out.println("Unknown error."); } } }, "I am Context."); zk.create(path + "-002", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() { @Override public void proce***esult(int rc, String path, Object ctx, String name) { switch (rc) { case 0 : System.out.println("Create result code is " + rc + ", " + path + ", " + ctx + ", real name " + name); break; case -4 : System.out.println("Connection abort."); break; case -110 : System.out.println("Node is already exists."); break; case -112 : System.out.println("Session expired."); break; default: System.out.println("Unknown error."); } } }, "I am Context."); Thread.sleep(30 * 1000); // 为了看到创建的临时节点的效果,zk.close()之后节点就不存在了 } catch (Exception e) { e.printStackTrace(); } finally { try{ if (null != zk) { zk.close(); } } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 获取子节点及节点数据 */ private void getChildren () { CountDownLatch latch = new CountDownLatch(1); ZooKeeper zk = null; try { CusZKWatcher watcher = new CusZKWatcher(latch); zk = new ZooKeeper(connectString, SESSION_TIMEOUT, watcher); latch.await(); watcher.setZk(zk); String path = "/zk-book"; byte [] datas = "luchunli".getBytes(); { String basePath = zk.create(path, datas, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("Success create znode : " + basePath); // } { String basePath = zk.create(path + "/c1", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("Success create znode : " + basePath); // } /** * 同步API获取子节点 */ { List childs = zk.getChildren(path, true); System.out.println("In client get children is : " + childs); } /** * 异步API获取子节点 */ { System.out.println("================================for async==================================="); zk.getChildren(path, true, new AsyncCallback.Children2Callback() { @Override public void proce***esult(int rc, String path, Object ctx, List children, Stat stat) { System.out.println("Get children znode result " + rc + ", " + path + ", " + children + ", " + stat); } }, "I am Context."); } { String basePath = zk.create(path + "/c2", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("Success create znode : " + basePath); // } Thread.sleep(30 * 1000); } catch (Exception e) { e.printStackTrace(); } finally { try{ if (null != zk) { zk.close(); } } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 获取值 */ private void getData () { CountDownLatch latch = new CountDownLatch(1); ZooKeeper zk = null; try { CusZKWatcher watcher = new CusZKWatcher(latch); zk = new ZooKeeper(connectString, SESSION_TIMEOUT, watcher); latch.await(); watcher.setZk(zk); String path = "/zk-test"; byte [] datas = "luchunli".getBytes(); { String basePath = zk.create(path, datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("ZK create node " + basePath + " success."); } /** * 同步API获取数据 */ { Stat stat = new Stat(); byte [] bytess = zk.getData(path, true, stat); System.out.println("Get data from zk : " + new String(bytess)); System.out.println("stat info is : " + stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion()); } { Stat sstat = zk.setData(path, datas, -1); // znode节点数据或内容的变化都会引起NodeDataChanged System.out.println("after setData stat info is : " + sstat.getCzxid() + ", " + sstat.getMzxid() + ", " + sstat.getVersion()); } Thread.sleep(30 * 1000); /** * 异步API获取数据 */ System.out.println("================================for async==================================="); zk.getData(path, true, new AsyncCallback.DataCallback() { @Override public void proce***esult(int rc, String path, Object ctx, byte[] data, Stat stat) { System.out.println("Get data result " + rc + ", " + path + ", " + new String(data) + ", " + stat); } }, "I am Context."); { Stat sstat = zk.setData(path, datas, -1); // znode节点数据或内容的变化都会引起NodeDataChanged System.out.println("after setData stat info is : " + sstat.getCzxid() + ", " + sstat.getMzxid() + ", " + sstat.getVersion()); } Thread.sleep(30 * 1000); } catch (Exception e) { e.printStackTrace(); } finally { try{ if (null != zk) { zk.close(); } } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 更新值 */ private void setData () { /** * setData(final String path, byte data[], int version) * 或 * setData(final String path, byte data[], int version, StatCallback cb, Object ctx) * * version说明:getData中无version,ZooKeeper的setData接口中的version参数是由CAS理论演化而来 * */ CountDownLatch latch = new CountDownLatch(1); ZooKeeper zk = null; try { CusZKWatcher watcher = new CusZKWatcher(latch); zk = new ZooKeeper(connectString, SESSION_TIMEOUT, watcher); latch.await(); watcher.setZk(zk); String path = "/zk-test"; byte [] datas = "luchunli".getBytes(); { String basePath = zk.create(path, datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("ZK create node " + basePath + " success."); } /** * 同步API更新数据 */ Stat stat = zk.setData(path, datas, -1); // 传入参数-1,表示客户端基于数据的最新版本进行更新 System.out.println("stat info is : " + stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion()); Stat stat2 = zk.setData(path, datas, stat.getVersion()); // 基于上一次的version更新数据,更新成功 System.out.println("stat2 info is : " + stat2.getCzxid() + ", " + stat2.getMzxid() + ", " + stat2.getVersion()); Stat stat3 = zk.setData(path, datas, stat.getVersion()); // 仍然基于之前的version,更新数据失败(BadVersion for /zk-test) System.out.println("stat3 info is : " + stat3.getCzxid() + ", " + stat3.getMzxid() + ", " + stat3.getVersion()); /** * 异步API更新数据 */ System.out.println("================================for async==================================="); zk.setData(path, datas, -1, new AsyncCallback.StatCallback() { @Override public void proce***esult(int rc, String path, Object ctx, Stat stat) { System.out.println("Get data result " + rc + ", " + path + ", " + ctx + ", " + stat); } }, "I am Context"); Thread.sleep(30 * 1000); } catch (Exception e) { e.printStackTrace(); } } /** * 判断指定的znode是否存在 * @param path * @return */ private void exists () { CountDownLatch latch = new CountDownLatch(1); ZooKeeper zk = null; try { CusZKWatcher watcher = new CusZKWatcher(latch); zk = new ZooKeeper(connectString, SESSION_TIMEOUT, watcher); latch.await(); watcher.setZk(zk); String path = "/zk-test"; { Stat stat = zk.exists(path, true); if (null == stat) { System.out.println("Node " + path + " not exists."); } else { System.out.println("stat info is : " + stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion()); } } byte [] datas = "luchunli".getBytes(); { zk.create(path, datas, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // 需要为PERSISTENT节点,否则无法创建子节点 zk.exists(path, true); zk.setData(path, "test".getBytes(), -1); zk.create(path + "/c1", datas, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); // 子节点的变化客户端未watch到 zk.delete(path + "/c1", -1); zk.delete(path, -1); } Thread.sleep(30 * 1000); } catch (Exception e) { e.printStackTrace(); } finally { if (null != zk) { try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } } } /** * 删除节点-无法删除非空节点 */ private void delZNode () { /** * delete(final String path, int version) * 或 * delete(final String path, int version, VoidCallback cb, Object ctx) * * 若version传入-1则直接删除 */ } }
观察者
package com.invic.zk;import java.util.List;import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.Watcher.Event.EventType;import org.apache.zookeeper.Watcher.Event.KeeperState;import org.apache.zookeeper.data.Stat;/** * * @author lucl * @description 自定义观察者 * */public class CusZKWatcher implements Watcher { private CountDownLatch latch = null; private ZooKeeper zk = null; private CusZKWatcher() { // TODO Auto-generated constructor stub } public CusZKWatcher (CountDownLatch latch) { this.latch = latch; } public ZooKeeper getZk() { return zk; } public void setZk(ZooKeeper zk) { this.zk = zk; } @Override public void process(WatchedEvent event) { if (null == latch) { return; } KeeperState state = event.getState(); int stateValue = state.getIntValue(); EventType eventType = event.getType(); int typeValue = eventType.getIntValue(); System.out.println("KeeperState is : " + stateValue + "\tEventType : " + typeValue); try { if (KeeperState.SyncConnected == state) { System.out.println("zk session established in mode of SyncConnected."); if (EventType.None == eventType && null == event.getPath()) { this.latch.countDown(); } else if (EventType.NodeCreated == eventType) { System.out.println("Node " + event.getPath() + " created."); Stat stat = zk.exists(event.getPath(), true); System.out.println("stat info is : " + stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion()); } else if (EventType.NodeDeleted == eventType) { System.out.println("Node " + event.getPath() + " deleted."); Stat stat = zk.exists(event.getPath(), true); if (null == stat) { System.out.println("Znod " + event.getPath() + " not exists."); } else { System.out.println("stat info is : " + stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion()); } } else if (EventType.NodeChildrenChanged == eventType) { if (null != this.getZk()) { Listresult = this.getZk().getChildren(event.getPath(), true); System.out.println("Reget children in watcher, result : " + result); } } else if (EventType.NodeDataChanged == eventType) { if (null != this.getZk()) { Stat stat = new Stat(); byte[] datas = this.getZk().getData(event.getPath(), true, stat); System.out.println("Reget data from zk : " + new String(datas)); System.out.println("Restat info is : " + stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion()); } } else { System.out.println("Unknown type : " + eventType); } } else { System.out.println("Unknown error : " + state); } } catch (Exception e) { e.printStackTrace(); } }}
3、其他客户端API
ZkClient
Curator