分布式学习之zookeeper3
分布式学习之zookeeper-3
分布式很多地方都会用到zk,虽然这个技术出了很久,但是作为应用开发工程师可能这方面接触的还是比较少。
我打算从浅入深的学习下zk的使用。 Java操作zk的几种方式
java操作zk可以使用3种方式 使用原生 zkClinet curator
开始写代码前请务必启动启动 zookeeper(虽然是废话,但是还要提醒一下)。
另外最好开一个客户端用来观察调用代码后的信息。 原生
maven引用 org.apache.zookeeper zookeeper 3.4.8
代码: /** * @Author: jimmy * @Date: 2022/1/2 21:32 * @Description: */ public class ZookeeperOri { public static void main(String[] args) throws IOException, KeeperException, InterruptedException { // 连接zk服务器 ZooKeeper zk = new ZooKeeper("127.0.0.1", 2000, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("--zookeeper连接---"); } }); // ---- 节点基本操作 ---- // 获取子节点 List children = zk.getChildren("/", null); children.forEach(e -> System.out.println(e)); // 判断是否存在节点、删除节点 注意删除节点的时候必须不存在子节点 if (null != zk.exists("/my1", null)) { zk.delete("/my1", -1); } // 原生客户端创建节点 // CreateMode PERSISTENT 持久 PERSISTENT_SEQUENTIAL 有序持久 // EPHEMERAL 临时 EPHEMERAL_SEQUENTIAL 有序临时 // 这里需要设置权限 不然会报错 -> 【KeeperErrorCode = MarshallingError】 String c_rs = zk.create("/my1","test-my".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("----" + c_rs + "-----"); byte[] rs = zk.getData("/my1", null, new Stat()); System.out.println("----" + new String(rs) + "-----"); // 非顺序节点不能重复创建 try { String c_rs_tmp = zk.create("/my1","test-my".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch (Exception e) { System.out.println("ex :" + e.getMessage()); } // 设置值 Stat stat = zk.setData("/my1", "my-test".getBytes(), -1); System.out.println(JSONObject.toJSONString(stat)); // 获取值 rs = zk.getData("/my1", null, new Stat()); System.out.println("----" + new String(rs) + "-----"); // 创建有序节点 UUID uuid = new UUID(12,1); c_rs = zk.create("/my/sequen",("test-mysequen" + uuid.toString()).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); System.out.println("----" + c_rs + "-----"); List children_s = zk.getChildren("/my", null); children_s.forEach(e -> System.out.println(e)); // 有序节点的获取,不能直接使用节点名称,这里会报错 try{ rs = zk.getData("/my/sequen", null, new Stat()); System.out.println("----" + new String(rs) + "-----"); } catch (Exception e) { System.out.println("ex :" + e.getMessage()); } System.out.println("有序节点的获取-----"); for (String nodeKey : children_s) { byte[] tmp = zk.getData("/my/" + nodeKey, null, new Stat()); System.out.println("nodeKey:" + nodeKey + ", value:" + new String(tmp)); } // ---- 监听 ---- String epNode = zk.create("/myEp","test-myEp".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); zk.getData("/myEp", new WatcherDefine(zk), new Stat()); // 触发watch zk.setData("/myEp", "uu".getBytes(), -1); zk.close(); } static class WatcherDefine implements Watcher{ private ZooKeeper zookeeper; public WatcherDefine(ZooKeeper zookeeper) { this.zookeeper = zookeeper; } @Override public void process(WatchedEvent watchedEvent) { Stat stat = new Stat(); //如果当前的连接状态是连接成功的,那么通过计数器去控制 if(watchedEvent.getState()== Watcher.Event.KeeperState.SyncConnected){ if(Watcher.Event.EventType.None==watchedEvent.getType()&&null==watchedEvent.getPath()){ System.out.println(watchedEvent.getState()+"-->"+watchedEvent.getType()); }else if(watchedEvent.getType()== Watcher.Event.EventType.NodeDataChanged){ try { System.out.println("数据变更触发路径:"+watchedEvent.getPath()+"->改变后的值:"+ new String(zookeeper.getData(watchedEvent.getPath(),true,stat))); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }else if(watchedEvent.getType()== Watcher.Event.EventType.NodeChildrenChanged){ //子节点的数据变化会触发 try { System.out.println("子节点数据变更路径:"+watchedEvent.getPath()+"->节点的值:"+ zookeeper.getData(watchedEvent.getPath(),true,stat)); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }else if(watchedEvent.getType()== Watcher.Event.EventType.NodeCreated){ //创建子节点的时候会触发 try { System.out.println("节点创建路径:"+watchedEvent.getPath()+"->节点的值:"+ zookeeper.getData(watchedEvent.getPath(),true, stat)); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }else if(watchedEvent.getType()== Watcher.Event.EventType.NodeDeleted){ //子节点删除会触发 System.out.println("节点删除路径:"+watchedEvent.getPath()); } } } } }zkClinet
maven依赖 com.101tec zkclient 0.10
我在下面测试的时候遇到了乱码问题分为2部分(我是windows测试环境) 无论是否是中文写入的时候都是乱码实现 ZkSerializer来解决。 上面问题解决后中文仍然是乱码windows的 cmd/powershell 的锅,需要设置为UTF-8临时设置:CHCP 65001这个仍然会存在乱码的问题,因此我在练习类种后面把数据打出来了。
练习类: /** * @Author: jimmy * @Date: 2022/1/2 12:00 * @Description: */ public class ZkClientPp { public static void main(String[] args) throws InterruptedException, UnsupportedEncodingException { ZkClient zkClient = new ZkClient("127.0.0.1:2181"); // 解决乱码问题 zkClient.setZkSerializer(new ZkCodeSerializer()); if(zkClient.exists("/prc")){ zkClient.delete("/prc"); } // 创建持久化节点 zkClient.createPersistent("/prc"); zkClient.writeData("/prc","handsome-boy"); // 读取值 String data = zkClient.readData("/prc"); System.out.println(data); // --------- 获取子节点 ---------- List children = zkClient.getChildren("/"); children.forEach(c -> System.out.println(c + "")); System.out.println("……………· ·…οΟ の Οο…· ·…………… "); // 监听 zkClient.subscribeDataChanges("/prc", new IZkDataListener() { @Override public void handleDataChange(String s, Object o) throws Exception { System.out.println("……………· ·…οΟ の Οο…· ·…………… "); System.out.println("节点名称:" + s + "->节点修改后的值"+o); } @Override public void handleDataDeleted(String s) throws Exception { System.out.println("……………· ·…οΟ の Οο…· ·…………… "); System.out.println("节点名称:" + s + "-> 删除了"); } }); Thread.sleep(2000); //new String("老子叫工藤新一".getBytes("utf-8") zkClient.writeData("/prc", "老子叫工藤新一!"); String s = zkClient.readData("/prc"); System.out.println("第一次变更……………· ·…οΟ の Οο…· ·…………… prc"); System.out.println(s); Thread.sleep(5000); zkClient.writeData("/prc", "变成了一个小屁孩"); data = zkClient.readData("/prc"); System.out.println("第二次变更……………· ·…οΟ の Οο…· ·…………… prc"); System.out.println(data); // 修改完后,若程序立即结束,则无法看到watch的信息。 Thread.sleep(Integer.MAX_VALUE); zkClient.close(); } // 乱码解决类 static class ZkCodeSerializer implements ZkSerializer { @Override public byte[] serialize(Object object) throws ZkMarshallingError { return ((String)object).getBytes(Charset.forName("UTF-8")); } @Override public Object deserialize(byte[] bytes) throws ZkMarshallingError { return new String(bytes, Charset.forName("UTF-8")); } } }curator
maven依赖 org.apache.curator curator-framework 2.11.0
代码: /** * @Author: jimmy * @Date: 2022/1/3 15:25 * @Description: */ public class CuratorClinet { public static void main(String[] args) throws InterruptedException { CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("127.0.0.1:2181",5000,2000, new ExponentialBackoffRetry(1000,3)); curatorFramework.start(); // 创建节点 try { String result=curatorFramework.create(). creatingParentsIfNeeded(). withMode(CreateMode.PERSISTENT). forPath("/curator/C1/C101","curator".getBytes()); System.out.println(result); } catch (Exception e) { e.printStackTrace(); } // 删除节点 try { // 默认情况下,version为-1 递归删除 curatorFramework.delete().deletingChildrenIfNeeded().forPath("/curator"); } catch (Exception e) { e.printStackTrace(); } // 查询 Stat stat=new Stat(); try { byte[] bytes=curatorFramework.getData().storingStatIn(stat).forPath("/my"); System.out.println(new String(bytes)); } catch (Exception e) { e.printStackTrace(); } // 更新 try { Stat s = curatorFramework.setData().forPath("/my","123".getBytes()); System.out.println(s); } catch (Exception e) { e.printStackTrace(); } // 异步操作 ExecutorService service= Executors.newFixedThreadPool(1); CountDownLatch countDownLatch=new CountDownLatch(1); try { curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL). inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { System.out.println(Thread.currentThread().getName() + "->resultCode:" +curatorEvent.getResultCode()+"->" +curatorEvent.getType()); countDownLatch.countDown(); } },service).forPath("/prc","be happy".getBytes()); } catch (Exception e) { e.printStackTrace(); } countDownLatch.await(); service.shutdown(); // 事务操作(curator独有的) try { Collection resultCollections=curatorFramework.inTransaction().create().forPath("/demo1","111".getBytes()).and(). setData().forPath("/abc","trunsaction:test".getBytes()).and().commit(); for (CuratorTransactionResult result:resultCollections){ System.out.println(result.getForPath()+"->"+result.getType()); } } catch (Exception e) { e.printStackTrace(); } } }
哪个App播放器音质最好,无损音乐最多?我习惯把音乐下载下来听,用Foorbar2000播放。既然听无损就不用考虑音效了。原汁原味播放就是最好,还原音乐本来面目。所以手机我推荐本地播放器。电脑我推荐foobar2000。
AppleWatchSeries7正式上市,依旧加量减价,它值得购买吗?苹果在9月份的发布会中,推出了AppleWatchSeries7,不过当时并没有正式上市,只是表示秋末发售。10月8日晚上8点,苹果正式上市了AppleWatchSeries7。这
净水器排名有哪些品牌推荐冰尊净水器是世界净水器十大排名领导品牌。从传统净水器,到现在的厨房净水全屋净水,净水产品在不断升级迭代,净水器技术一直在优化升级,给中国的万千家庭带去了健康的饮水生活。下面就为大家
资深Linux运维工程师必须掌握的核心命令查看计算机硬件系统信息服务器核心硬件就是CPU内存磁盘和网卡,它们配置的好坏会影响程序的运行效率,我们可以使用命令的方式查看服务器硬件的配置查看CPU信息查看CPU信息可以使用ls
未来三年,青浦区发展潜力如何,入手亏吗?前途一片光明,前景不可限量上海未来重点发展大虹桥和五大新城,青浦这两个都有,前景很好,值得入手。青浦未来可期,青浦赵巷漕河泾开发区引进高科技人才11万人口,朱家角华为产业园已经拿地
有多少人信了读书不如去打工?读书不如去打工,读书开始是挣不到钱的,还要花钱,最少书要花钱买的。打工是打一天工挣一天钱的,能不能拿到是另外一回事。读书是可以挣长远的,不要挺卖力气的钱,有可能岁数越大挣的越多噢偷
高密的父老乡亲对莫言是怎样的评价?好吧,既然头条推荐了你提出的问题,做为高密人,我就简单回答吧,我只回答我所了解的事实,对莫言不做任何评价。莫言,本名管谟业,出身中农。1955年2月17日(农历正月25),出生于山
花1999买了款红米K40,回来对比IQOOZ5后这下纠结了随着科技的不断发展,手机市场也迎来了大洗牌,以前的手机龙头华为如今已经暂时退圈,而二哥小米则靠着格局和战略做到了全球第二,继华为之后再次硬刚苹果。尤其是今年,小米和旗下的红米都推出
苹果下周二还有发布会?新机比iPhone13贵一倍国庆假期结束了,上班第一天急需来点爆炸消息让人清醒清醒!时隔大半个月,苹果第二场发布会就要来了!外媒曝光将在10月12日举行,依然是以线上的形式召开。主要的产品为AirPods3以
看家竟然如此简单小豚当家智能摄像头在1。0时代,就在视觉体验上极大满足了用户的需求。此次小豚摄像头双频版上线,华为和小豚当家在产品研发上做了充足的努力,性能做了大提升!采用了双频WiFi提升了设备
马斯克晋升全球首富!旗下SpaceX估值已超1000亿美元财联社(上海,编辑周玲)讯,据报道,在本周内部人士宣布二次售股后,特斯拉CEO埃隆马斯克(ElonMusk)旗下太空探索技术公司SpaceX估值已超过1000亿美元。这意味着马斯克