ApacheCuratorFramework教程 ApacheCurator是ApacheZooKeeper(分布式协调服务)的JavaJVM客户端库。它包括一个高级API框架和实用程序,使使用ApacheZooKeeper变得更加容易和可靠。 依赖 curator有很多的依赖,比如如下是maven依赖官方说明 一般情况下只要引入curatorrecipes基本就够用。他包含了client和framework的依赖,会自动下载下来。 创建项目并引入依赖 pom文件lt;?xmlversion1。0encodingUTF8?projectxmlnshttp:maven。apache。orgPOM4。0。0xmlns:xsihttp:www。w3。org2001XMLSchemainstancexsi:schemaLocationhttp:maven。apache。orgPOM4。0。0http:maven。apache。orgxsdmaven4。0。0。xsdmodelVersion4。0。0modelVersiongroupIdcom。itlab1024groupIdcuratorframeworktutorialartifactIdversion1。0SNAPSHOTversionpropertiesmaven。compiler。source17maven。compiler。sourcemaven。compiler。target17maven。compiler。targetproject。build。sourceEncodingUTF8project。build。sourceEncodingpropertiesdependenciesdependencygroupIdorg。apache。curatorgroupIdcuratorrecipesartifactIdversion5。4。0versiondependencydependencygroupIdorg。projectlombokgroupIdlombokartifactIdversion1。18。24versionscopetestscopedependencydependencygroupIdorg。slf4jgroupIdslf4japiartifactIdversion2。0。6versiondependencydependencygroupIdorg。slf4jgroupIdslf4jsimpleartifactIdversion2。0。6versiondependencydependencygroupIdorg。junit。jupitergroupIdjunitjupiterartifactIdversion5。9。0versionscopetestscopedependencydependenciesproject创建连接 curator主要通过工厂类CuratorFrameworkFactory的newClient方法创建连接 有三种多态方法。publicstaticCuratorFrameworknewClient(StringconnectString,intsessionTimeoutMs,intconnectionTimeoutMs,RetryPolicyretryPolicy,ZKClientConfigzkClientConfig)publicstaticCuratorFrameworknewClient(StringconnectString,intsessionTimeoutMs,intconnectionTimeoutMs,RetryPolicyretryPolicy)publicstaticCuratorFrameworknewClient(StringconnectString,RetryPolicyretryPolicy) 参数说明:connectString:连接字符串,服务器访问地址例如localhost:2181(注意是IP(域名)端口),如果是集群地址,则用逗号(,)隔开即可。sessionTimeoutMs:会话超时时间,单位毫秒,如果不设置则先去属性中找curatordefaultsessiontimeout的值,如果没设置,则默认是601000毫秒。connectionTimeoutMs:连接超时时间,单位毫秒,如果不设置则先去属性中找curatordefaultconnectiontimeout的值,如果没设置,则默认是151000毫秒。RetryPolicy:重试策略,后面具体讲解。 使用Java代码创建连接并创建一个节点packagecn。programtalk。connection;importorg。apache。curator。framework。CuratorFramework;importorg。apache。curator。framework。CuratorFrameworkFactory;importorg。apache。curator。retry。ExponentialBackoffRetry;importorg。junit。jupiter。api。Test;publicclassConnectionTest{创建连接TestpublicvoidTestConnection1()throwsException{CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(172。30。140。89:2181,newExponentialBackoffRetry(1000,3));curatorFramework。start();curatorFramework。create()。forPath(test);}} 运行完毕后查看结果: 也可以使用其builder()建造者模式构建client。TestpublicvoidTestConnection2()throwsException{CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。builder()。connectString(172。20。98。4:2181)。retryPolicy(newExponentialBackoffRetry(1000,3))。sessionTimeoutMs(1000)。connectionTimeoutMs(10000)。build();curatorFramework。start();curatorFramework。create()。forPath(test);}重试策略 重试策略有几种实现,可以通过如下类图直观地展示出来。 RetryForever 该策略是永远尝试。 newRetryForever(2000)参数是毫秒,代表间隔多久进行重试!packagecn。programtalk;importorg。apache。curator。framework。CuratorFramework;importorg。apache。curator。framework。CuratorFrameworkFactory;importorg。apache。curator。retry。RetryForever;importorg。junit。jupiter。api。Test;publicclassRetryTest{RetryForeverTestpublicvoidtestRetryForever()throwsException{CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(unknownHost:2181,newRetryForever(2000));curatorFramework。start();}}SessionFailedRetryPolicy session超时重试策略,其构造方法是SessionFailedRetryPolicy(RetryPolicydelegatePolicy),参数就是也是一个重试策略,其含义就是说会话超时的时候使用哪种具体的重试策略。publicvoidtestSessionFailedRetryPolicy()throwsException{RetryPolicysessionFailedRetryPolicynewSessionFailedRetryPolicy(newRetryForever(1000));CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。builder()。connectString(localhost:2181)。retryPolicy(sessionFailedRetryPolicy)。build();curatorFramework。start();TimeUnit。DAYS。sleep(1);} session超时后,会尝试重新连接。 RetryNTimes 重试N次策略:publicRetryNTimes(intn,intsleepMsBetweenRetries),第一个是重试次数,第二个参数是每次重试间隔多少毫秒。TestpublicvoidtestRetryNTimes()throwsException{CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(unknownHost:2181,newRetryNTimes(5,1000));curatorFramework。start();TimeUnit。DAYS。sleep(1);} 上面的代码就是重试5次,重试间隔1000毫秒。RetryOneTime 重试一次,他是RetryNTime的特例,N1的情况。TestpublicvoidtestRetryOneTime()throwsException{CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(unknownHost:2181,newRetryOneTime(1000));curatorFramework。start();TimeUnit。DAYS。sleep(1);}RetryUntilElapsed publicRetryUntilElapsed(intmaxElapsedTimeMs,intsleepMsBetweenRetries) 一直重试直到达到规定的时间,maxElapsedTimeMs:最大重试时间,sleepMsBetweenRetries每次重试间隔时间。TestpublicvoidtestRetryUntilElapsed()throwsException{RetryUntilElapsedretryPolicynewRetryUntilElapsed(3000,1000);CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(unknownHost:2181,retryPolicy);curatorFramework。start();TimeUnit。DAYS。sleep(1);}ExponentialBackoffRetry 按照设定的次数重试,每次重试之间的睡眠时间都会增加。 构造方法如下:publicExponentialBackoffRetry(intbaseSleepTimeMs,intmaxRetries){this(baseSleepTimeMs,maxRetries,DEFAULTMAXSLEEPMS);} baseSleepTimeMs:重试间隔毫秒数 maxRetries:最大重试次数TestpublicvoidtestExponentialBackoffRetry()throwsException{RetryPolicyretryPolicynewExponentialBackoffRetry(3000,1000);CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(unknownHost:2181,retryPolicy);curatorFramework。start();TimeUnit。DAYS。sleep(1);}BoundedExponentialBackoffRetry 重试策略,该策略重试设定的次数,重试之间的休眠时间增加(最多达到最大限制) BoundedExponentialBackoffRetry继承ExponentialBackoffRetry,相比与ExponentialBackoffRetry,它增加了最大休眠时间的设置。 构造方法如下:publicBoundedExponentialBackoffRetry(intbaseSleepTimeMs,intmaxSleepTimeMs,intmaxRetries){super(baseSleepTimeMs,maxRetries);this。maxSleepTimeMsmaxSleepTimeMs;} 示例如下:TestpublicvoidtestBoundedExponentialBackoffRetry()throwsException{RetryPolicyretryPolicynewBoundedExponentialBackoffRetry(3000,6000,1000);CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(unknownHost:2181,retryPolicy);curatorFramework。start();TimeUnit。DAYS。sleep(1);}名称空间(Namespace) curator中名称空间的含义,就是设置一个公共的父级path,之后的操作全部都是基于该path。名称空间throwsExceptionTestpublicvoidtestCreate6()throwsException{CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(connectString,retryPolicy);curatorFramework。start();CuratorFrameworkc2curatorFramework。usingNamespace(namespace1);c2。create()。forPath(node1);c2。create()。forPath(node2);} 查看运行结果: CRUD基础创建节点 创建节点使用create方法,该方法返回一个CreateBuilder他是一个建造者模式的类。用于创建节点。packagecn。programtalk。connection;importorg。apache。curator。RetryPolicy;importorg。apache。curator。framework。CuratorFramework;importorg。apache。curator。framework。CuratorFrameworkFactory;importorg。apache。curator。retry。ExponentialBackoffRetry;importorg。junit。jupiter。api。Test;publicclassCreateNodeTest{StringconnectString172。30。140。89:2181;RetryPolicyretryPolicynewExponentialBackoffRetry(1000,3);创建节点TestpublicvoidtestCreate1()throwsException{CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(connectString,retryPolicy);curatorFramework。start();curatorFramework。create()。forPath(test);}} 创建完毕后,通过命令行查看节点: 看到值是10。112。33。229,可实际上我并未给节点设置值,这个值是框架默认设置的,客户端的IP。这个默认值可以修改,此时不能使用newClient方法,需要使用工厂的builder自己构建设置。示例代码如下:TestpublicvoidtestCreateDefaultData()throwsException{CuratorFrameworkFactory。BuilderbuilderCuratorFrameworkFactory。builder()。defaultData(默认值。getBytes(StandardCharsets。UTF8));CuratorFrameworkclientbuilder。connectString(connectString)。retryPolicy(retryPolicy)。build();client。start();client。create()。forPath(defaultDataTest);} 运行结果: 可以看到,默认值已经被修改为默认值。 创建节点时如果节点存在,则会抛出NodeExistsException异常 使用forPath设置节点的值 forPath还接收第二个参数(节点的值,字节数组类型)TestpublicvoidtestCreate2()throwsException{CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(connectString,retryPolicy);curatorFramework。start();curatorFramework。create()。forPath(test2,用户自己设置的值。getBytes(StandardCharsets。UTF8));} 运行结果: 可见正确设置了值。节点模式设置 可以通过withMode方法设置节点的类型,为显示指定的节点都是持久性节点。设置节点类型throwsExceptionTestpublicvoidtestCreate3()throwsException{CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(connectString,retryPolicy);curatorFramework。start();curatorFramework。create()。withMode(CreateMode。EPHEMERAL)。forPath(EPHEMERAL1);临时节点,会话结束就会删除,线程睡眠用于延长会话时间TimeUnit。SECONDS。sleep(30);} 查看结果: 可以看到临时节点,红色框内只有临时节点该属性才是非零。TTL时长设置 使用withTtl设置时长,单位毫秒。当模式为CreateMode。PERSISTENTWITHTTL或CreateMode。PERSISTENTSEQUENTIALWITHTTL时指定TTL。必须大于0且小于或等于EphemeralType。MAXTTL。测试ttlthrowsExceptionTestpublicvoidtestCreate5()throwsException{CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(connectString,retryPolicy);curatorFramework。start();curatorFramework。create()。withTtl(10000)。withMode(CreateMode。PERSISTENTWITHTTL)。forPath(ttl1);} 可能出现如下错误: 这是因为TTL默认是关闭的,需要打开(zoo。cfg中设置extendedTypesEnabledtrue)。再次运行:〔zk:localhost:2181(CONNECTED)8〕ls〔defaultDataTest,hiveserver2zk,test,test2,ttl1,zookeeper〕等待10秒后再次查看,ttl1节点自动被删除。〔zk:localhost:2181(CONNECTED)9〕ls〔defaultDataTest,hiveserver2zk,test,test2,zookeeper〕ACL权限 创建节点时设置ACL,主要通过withACL方法设置,接收一个List类型的参数。 ACL实例对象,通过该类的构造方法创建,类似ACLaclnewACL(ZooDefs。Perms。ALL,ZooDefs。Ids。ANYONEIDUNSAFE);测试aclthrowsExceptionTestpublicvoidtestCreate7()throwsException{CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(connectString,retryPolicy);curatorFramework。start();ListaclListnewArrayList();ACLaclnewACL(ZooDefs。Perms。ALL,ZooDefs。Ids。ANYONEIDUNSAFE);aclList。add(acl);curatorFramework。create()。withACL(aclList)。forPath(acl1);} 运行结果: 运行完毕后,通过命令行查看权限,可以看到已经设置成功。 如果不设置ACL,默认则是newACL(Perms。ALL,ANYONEIDUNSAFE)。查询值 查询数据使用getData方法。查询节点的值throwsExceptionTestpublicvoidtestGetData()throwsException{CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(connectString,retryPolicy);curatorFramework。start();byte〔〕bytescuratorFramework。getData()。forPath(test);System。out。println(test节点的值是:newString(bytes,StandardCharsets。UTF8));} 结果: 设置值 使用setData,配合forpath方法。设置节点的值throwsExceptionTestpublicvoidtestGetData2()throwsException{CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(connectString,retryPolicy);curatorFramework。start();byte〔〕bytescuratorFramework。getData()。forPath(test);System。out。println(test节点的原始值是:newString(bytes,StandardCharsets。UTF8));curatorFramework。setData()。forPath(test,updated。getBytes(StandardCharsets。UTF8));bytescuratorFramework。getData()。forPath(test);System。out。println(test节点的新值是:newString(bytes,StandardCharsets。UTF8));} 运行结果是: 获取孩子节点获取孩子节点throwsExceptionTestpublicvoidtestGetState()throwsException{CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(connectString,retryPolicy);curatorFramework。start();ListStringchildrencuratorFramework。getChildren()。forPath(namespace1);children。forEach(System。out::println);} 运行结果: 获取ACLpackagecn。programtalk。connection;importorg。apache。curator。RetryPolicy;importorg。apache。curator。framework。CuratorFramework;importorg。apache。curator。framework。CuratorFrameworkFactory;importorg。apache。curator。retry。ExponentialBackoffRetry;importorg。apache。zookeeper。CreateMode;importorg。apache。zookeeper。data。ACL;importorg。junit。jupiter。api。Test;importjava。nio。charset。StandardCharsets;importjava。util。List;importjava。util。concurrent。TimeUnit;publicclassACLTest{StringconnectString172。30。140。89:2181;RetryPolicyretryPolicynewExponentialBackoffRetry(1000,3);获取Acl列表TestpublicvoidtestAcl1()throwsException{CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(connectString,retryPolicy);curatorFramework。start();ListaclscuratorFramework。getACL()。forPath(test);acls。forEach(aclSystem。out。println(acl。getId()acl。getPerms()));}} 运行结果: 删除节点 使用delete,搭配forPath方法,删除指定的节点。packagecn。programtalk。connection;importorg。apache。curator。RetryPolicy;importorg。apache。curator。framework。CuratorFramework;importorg。apache。curator。framework。CuratorFrameworkFactory;importorg。apache。curator。retry。ExponentialBackoffRetry;importorg。junit。jupiter。api。Test;importjava。util。List;publicclassDeleteNodeTest{StringconnectString172。30。140。89:2181;RetryPolicyretryPolicynewExponentialBackoffRetry(1000,3);删除节点throwsExceptionTestpublicvoidtestDelete1()throwsException{CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(connectString,retryPolicy);curatorFramework。start();curatorFramework。delete()。forPath(test);}} 程序执行完毕后,通过命令行查询test可知已经被删除。 如果被删除的节点有孩子节点,则无法删除,抛出NotEmptyException。 那么如何删除包含子节点的节点呢?需要使用deletingChildrenIfNeeded方法删除节点(包含子节点)throwsExceptionTestpublicvoidtestDelete2()throwsException{CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(connectString,retryPolicy);curatorFramework。start();curatorFramework。delete()。deletingChildrenIfNeeded()。forPath(namespace1);} 运行后,查看该节点 节点已经被删除。并且级联删除了子节点。检查节点是否存在 使用checkExists()搭配forPath来实现,返回一个Stat对象信息。packagecn。programtalk。connection;importorg。apache。curator。RetryPolicy;importorg。apache。curator。framework。CuratorFramework;importorg。apache。curator。framework。CuratorFrameworkFactory;importorg。apache。curator。retry。ExponentialBackoffRetry;importorg。apache。zookeeper。data。Stat;importorg。junit。jupiter。api。Test;publicclassCheckExistsTest{StringconnectString172。30。140。89:2181;RetryPolicyretryPolicynewExponentialBackoffRetry(1000,3);检查是否存在throwsExceptionTestpublicvoidtestGetState()throwsException{CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(connectString,retryPolicy);curatorFramework。start();StatstatcuratorFramework。checkExists()。forPath(namespace1);System。out。println(stat);}} 运行结果: stat的具体信息如下: 查看会话状态 使用getState()。packagecn。programtalk。connection;importorg。apache。curator。RetryPolicy;importorg。apache。curator。framework。CuratorFramework;importorg。apache。curator。framework。CuratorFrameworkFactory;importorg。apache。curator。framework。imps。CuratorFrameworkState;importorg。apache。curator。retry。ExponentialBackoffRetry;importorg。junit。jupiter。api。Test;importjava。util。List;importjava。util。concurrent。TimeUnit;publicclassGetStateTest{StringconnectString172。30。140。89:2181;RetryPolicyretryPolicynewExponentialBackoffRetry(1000,3);查询客户端状态throwsExceptionTestpublicvoidtestGetState()throwsException{CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(connectString,retryPolicy);CuratorFrameworkStatestatecuratorFramework。getState();System。out。println(状态是state);状态是LATENTcuratorFramework。start();statecuratorFramework。getState();System。out。println(状态是state);状态是STARTEDcuratorFramework。close();statecuratorFramework。getState();System。out。println(状态是state);状态是STOPPED}}事务packagecn。programtalk。connection;importorg。apache。curator。RetryPolicy;importorg。apache。curator。framework。CuratorFramework;importorg。apache。curator。framework。CuratorFrameworkFactory;importorg。apache。curator。framework。api。GetConfigBuilder;importorg。apache。curator。framework。api。transaction。CuratorMultiTransaction;importorg。apache。curator。framework。api。transaction。CuratorOp;importorg。apache。curator。framework。api。transaction。CuratorTransaction;importorg。apache。curator。framework。api。transaction。CuratorTransactionResult;importorg。apache。curator。retry。ExponentialBackoffRetry;importorg。apache。zookeeper。data。Stat;importorg。apache。zookeeper。server。quorum。flexible。QuorumVerifier;importorg。junit。jupiter。api。Test;importjava。nio。charset。StandardCharsets;importjava。util。Collection;importjava。util。List;publicclassTransactionTest{StringconnectString172。30。140。89:2181;RetryPolicyretryPolicynewExponentialBackoffRetry(1000,3);查询客户端状态throwsExceptionTestpublicvoidtestTransaction()throwsException{CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(connectString,retryPolicy);curatorFramework。start();CuratorOpcreateOpcuratorFramework。transactionOp()。create()。forPath(transaction1);CuratorOpsetDataOpcuratorFramework。transactionOp()。setData()。forPath(transaction2,transaction2。getBytes(StandardCharsets。UTF8));CuratorOpdeleteOpcuratorFramework。transactionOp()。delete()。forPath(transaction3);ListCuratorTransactionResultresultcuratorFramework。transaction()。forOperations(createOp,setDataOp,deleteOp);result。forEach(rtSystem。out。println(rt。getForPath()rt。getType()));}} 运行程序前先看下zk节点情况 可以看到没有transaction1和transaction2和transaction3。 运行程序会出现如下异常。 出现异常则事务应该回滚,也就是说transaction1节点不应该创建成功。 通过上图可知确实没有创建成功。 接下来我通过命令长创建transaction2和transaction3这两个节点。 创建完毕,并且可以看到transaction2节点的值是null。 重新运行程序后,不会发生异常。 通过命令行看下事务是否完全执行成功。 可以看到transaction1节点创建成功,transaction2节点的值修改成功。transaction3节点被删除。说明事务是有效的! 为了演示清晰,我先清理掉所有节点。 监听节点 本版本中PathChildrenCache、NodeCache、TreeCache都已经是过期的了,官方推荐使用CuratorCache。 并且api风格也更改了,改为了流式风格。 CuratorCacheListener提供了多种监听器,比如forInitialized,forCreates等。packagecn。programtalk;importlombok。extern。slf4j。Slf4j;importorg。apache。curator。RetryPolicy;importorg。apache。curator。framework。CuratorFramework;importorg。apache。curator。framework。CuratorFrameworkFactory;importorg。apache。curator。framework。listen。Listenable;importorg。apache。curator。framework。recipes。cache。CuratorCache;importorg。apache。curator。framework。recipes。cache。CuratorCacheListener;importorg。apache。curator。retry。ExponentialBackoffRetry;importorg。junit。jupiter。api。Test;importjava。nio。charset。StandardCharsets;importjava。util。Objects;importjava。util。concurrent。TimeUnit;Slf4jpublicclassCacheTest{StringconnectStringlocalhost:2181;RetryPolicyretryPolicynewExponentialBackoffRetry(1000,3);throwsExceptionTestpublicvoidtestCache1()throwsException{CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(connectString,retryPolicy);curatorFramework。start();CuratorCachecuratorCacheCuratorCache。builder(curatorFramework,ns1)。build();CuratorCacheListenercuratorCacheListenerCuratorCacheListener。builder()。forInitialized((){log。info(forInitialized回调);log。info();})。forCreates(childData{log。info(forCreates回调执行,path〔{}〕,data〔{}〕,stat〔{}〕,childData。getPath(),Objects。isNull(childData。getData())?null:newString(childData。getData(),StandardCharsets。UTF8),childData。getStat());log。info();})。forNodeCache((){log。info(forNodeCache回调);log。info();})。forChanges((oldNode,node){log。info(forChanges回调,oldNode。path〔{}〕,oldNode。data〔{}〕,oldNode。stat〔{}〕,node。path〔{}〕,node。data〔{}〕,node。stat〔{}〕,oldNode。getPath(),Objects。isNull(oldNode。getData())?null:newString(oldNode。getData(),StandardCharsets。UTF8),oldNode。getStat(),node。getPath(),Objects。isNull(node。getData())?null:newString(node。getData(),StandardCharsets。UTF8),node。getStat());log。info();})。forDeletes(childData{log。info(forDeletes回调执行,path〔{}〕,data〔{}〕,stat〔{}〕,childData。getPath(),Objects。isNull(childData。getData())?null:newString(childData。getData(),StandardCharsets。UTF8),childData。getStat());log。info();})。forAll((type,oldNode,node){log。info(forAll回调);log。info(type〔{}〕,type);if(Objects。nonNull(oldNode)){log。info(oldNode。path〔{}〕,oldNode。data〔{}〕,oldNode。stat〔{}〕,oldNode。getPath(),Objects。isNull(oldNode。getData())?null:newString(oldNode。getData(),StandardCharsets。UTF8),oldNode。getStat());}if(Objects。nonNull(node)){log。info(node。path〔{}〕,node。data〔{}〕,node。stat〔{}〕,node。getPath(),Objects。isNull(node。getData())?null:newString(node。getData(),StandardCharsets。UTF8),node。getStat());}log。info();})。forCreatesAndChanges((oldNode,node){log。info(forCreatesAndChanges回调);if(Objects。nonNull(oldNode)){log。info(oldNode。path〔{}〕,oldNode。data〔{}〕,oldNode。stat〔{}〕,oldNode。getPath(),Objects。isNull(oldNode。getData())?null:newString(oldNode。getData(),StandardCharsets。UTF8),oldNode。getStat());}if(Objects。nonNull(node)){log。info(node。path〔{}〕,node。data〔{}〕,node。stat〔{}〕,node。getPath(),Objects。isNull(node。getData())?null:newString(node。getData(),StandardCharsets。UTF8),node。getStat());}log。info();})。build();获取监听器列表容器ListenableCuratorCacheListenerlistenablecuratorCache。listenable();将监听器放入容器中listenable。addListener(curatorCacheListener);curatorCache必须启动curatorCache。start();延时,以保证连接不关闭TimeUnit。DAYS。sleep(10);curatorCache。close();}} 上面的代码就是创建监听节点的核心代码。 以前的监听类型是不同的类(过期的类)实现的。现在是通过不同的forXXX方法指定的(例如:forInitialized)。 在测试前我将zk中的数据清理掉〔zk:localhost:2181(CONNECTED)5〕ls〔zookeeper〕 可以看到完全清理掉了。API说明 在测试之前要简单地说明下API的基本使用方式。curator监听主要有如下几个主要的类。CuratorFrameworkFactory这是简单的静态工厂类,用于创建连接zk的客户端(client),里面提供了newClient的多态方法,也可以使用builder建造者模式类创建客户端。StringconnectStringlocalhost:2181; RetryPolicyretryPolicynewExponentialBackoffRetry(1000,3); 使用newClient方法 CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(connectString,retryPolicy); 也可以使用静态builder()方法 CuratorFrameworkcuratorFramework2CuratorFrameworkFactory。builder()。connectString(connectString)。retryPolicy(retryPolicy)。build();CuratorCache类,该类也有提供builder方法CuratorCachecuratorCacheCuratorCache。builder(curatorFramework,ns1)。build();也提供了build方法,可以像下面这样使用。CuratorCachecuratorCacheCuratorCache。builder(curatorFramework,ns1)。build(); curatorCacheCuratorCache。build(curatorFramework,ns1,CuratorCache。Options。SINGLENODECACHE,CuratorCache。Options。COMPRESSEDDATA,CuratorCache。Options。DONOTCLEARONCLOSE);CuratorCacheListener监听器类,里面可以定义各种监听器。测试启动 运行上面的示例,会打印如下内容: 可见初始化回调被调用。创建节点 创建CuratorCache监听的节点ns1,需要注意的是此时节点并不存在。 命令行操作如下: 程序输出如下: 我们看到当创建节点的时候有四个回调函数被执行。 结论:当创建节点的时候forCreates、forAll、forCreatesAndChanges被回调。 那么如果再创建子节点情况会是什么样的呢?比如我创建ns1sub1。 命令行: 控制台: 节点创建监听器,监听类型是CuratorCacheListener。Type。NODECREATED,创建节点的时候会触发,当创建子节点的时候也会触发。 结论:创建子节点依然会回调上述所说的四个监听器。修改数据 修改监听的根节点ns1的值 命令行修改值: 控制台输出: 当修改监听根节点ns1的值的时候,forChanges、forAll、forCreatesAndChanges四个监听器被触发。 接下来再修改其子节点的值 控制台输出如下: 依然回调forChanges、forAll、forCreatesAndChanges四个监听器函数。 结论:修改监听节点以及其子节点都会触发forChanges、forAll、forCreatesAndChanges监听器。ACL设置 命令行: 控制台没有打印回调: 结论:设置ACL不会触发监听器。删除节点 首先我先删除监听节点ns1下的子节点 命令行: 控制台: 删除子节点的时候会触发forDeletes、forNodeCache、forAll执行。 接下来再删除监听根节点ns1。 命令行: 控制台输出: 跟上面子节点的删除触发的监听器回调一样! 总结:删除监听根节点以及其子节点会触发forDeletes、forAll监听器。 那么如果我删除的是一个父级节点呢?会出现什么情况? 因为我之前的实验,删除了ns1sub1所以重建,重建后使用deleteallns1 命令行: 控制台: 可以看到,级联删除,会多次触发forDeletes,根节点和其子节点的删除都会触发。同理forAll也会多次触发。 总结:对于节点的删除,无论是单个删除还是级联删除,每个节点的删除都会触发forDeletes、forAll监听器。 那么上面这些总结对吗?起码默认情况是对的!因为缓存我使用这样的方式创建的CuratorCachecuratorCacheCuratorCache。builder(curatorFramework,ns1)。build();CuratorCache配置 上面的代码中我使用的CuratorCachecuratorCacheCuratorCache。builder(curatorFramework,ns1)。build();会导致子节点的操作也会触发监听器,这是因为默认就是如此,当然如果想值监听一个节点,可以使用如下方法(源码如下):staticCuratorCachebuild(CuratorFrameworkclient,Stringpath,Options。。。options){returnbuilder(client,path)。withOptions(options)。build();} 第三个参数Options就可以配置。 比如我配置就监听一个节点,就可以按照如下方式创建CuratorCache:CuratorCachecuratorCacheCuratorCache。build(curatorFramework,ns1,CuratorCache。Options。SINGLENODECACHE); 这里我传递了第三个参数CuratorCache。Options。SINGLENODECACHE。也就实现了只监听ns1节点的功能。 CuratorCache。Options。SINGLENODECACHE:单节点缓存 CuratorCache。Options。COMPRESSEDDATA:通过以下方式解压缩数据org。apache。curator。framework。api。GetDataBuilder。decompressed() CuratorCache。Options。DONOTCLEARONCLOSE:通常,当缓存通过关闭close()时,存储将通过清除CuratorCacheStorage。clear()。此选项可防止清除存储。 使用CuratorCache。builder(curatorFramework,ns1)。build()构建的时候,CuratorCache。Options。SINGLENODECACHEFALSE、CuratorCache。Options。COMPRESSEDDATAFALAW、CuratorCache。Options。DONOTCLEARONCLOSEtrue。 通过Debug可以看到对应的配置如下: 说明 上面我使用了命令行搭配代码的方式大致测试了下监听器的类型。接下来详细说明下各种监听器的作用。forInitialized 初始化完毕触发,也就是说CuratorFramework的start方法执行完毕后就会被触发。forCreates 触发条件:CuratorCacheListener。Type。NODECREATED,也就是说被监听节点或者子节点创建就会被触发。forChanges 触发条件:CuratorCacheListener。Type。NODECHANGED,也就是说被监听节点或者子节点值修改就会被触发。forCreatesAndChanges 触发条件:CuratorCacheListener。Type。NODECREATED和CuratorCacheListener。Type。NODECHANGED,也就是说被监听节点或者子节点创建或者值修改就会被触发。forDeletes 触发条件:CuratorCacheListener。Type。NODEDELETED,也就是说被监听节点或者子节点删除就会被触发。forAll 触发条件:上面的forCreates、forChanges、forCreatesAndChanges、forDeletes触发的时候都会同时触发forAll。forNodeCache、forTreeCache、forPathChildrenCache 这三个是一种桥接监听器,它允许将旧式监听器PathChildrenCache、NodeCache、TreeCache与CuratorCache重用,不过我觉得上面的那些监听器已经能够满足需求,无需使用这三个了。 如果读者有不一样的间接,欢迎留言!!!TestpublicvoidtestCache2()throwsException{curatorFramework。start();CuratorCachecuratorCacheCuratorCache。bridgeBuilder(curatorFramework,ns1)。build();CuratorCacheListenercuratorCacheListenerCuratorCacheListener。builder()。forNodeCache((){log。info(forNodeCache回调);log。info();})。forTreeCache(curatorFramework,(client,event){log。info(forTreeCache回调);log。info(type〔{}〕,data〔{}〕,oldData〔{}〕,event。getType(),event。getData(),event。getOldData());log。info();})。forPathChildrenCache(test,curatorFramework,(client,event){log。info(forPathChildrenCache回调);log。info(type〔{}〕,data〔{}〕,InitialData〔{}〕,event。getType(),event。getData(),event。getInitialData());log。info();})。build();获取监听器列表容器ListenableCuratorCacheListenerlistenablecuratorCache。listenable();将监听器放入容器中listenable。addListener(curatorCacheListener);curatorCache必须启动curatorCache。start();TimeUnit。MILLISECONDS。sleep(500);byte〔〕oldDataA。getBytes(StandardCharsets。UTF8);byte〔〕newDataB。getBytes(StandardCharsets。UTF8);创建根节点curatorFramework。create()。forPath(ns1,oldData);log。info(创建ns1节点);curatorFramework。create()。forPath(ns1sub1,oldData);log。info(创建ns1sub1节点);修改根节点的值curatorFramework。setData()。forPath(ns1,newData);log。info(修改ns1节点的值);修改子节点的值curatorFramework。setData()。forPath(ns1sub1,newData);log。info(修改ns1sub1节点的值);删除子节点curatorFramework。delete()。forPath(ns1sub1);log。info(删除ns1sub1节点);删除根节点curatorFramework。delete()。forPath(ns1);log。info(删除ns1节点);curatorCache。close();} 运行日志如下:INFOforTreeCache回调INFOtype〔INITIALIZED〕,data〔null〕,oldData〔null〕INFOINFOforPathChildrenCache回调INFOtype〔INITIALIZED〕,data〔null〕,InitialData〔null〕INFOINFO创建ns1节点INFO创建ns1sub1节点INFOforNodeCache回调INFOINFOforTreeCache回调DEBUGReadingreplysessionid:0x10001d2b6b70006,packet::clientPath:ns1sub1serverPath:ns1sub1finished:falseheader::10,4replyHeader::10,226,0request::ns1sub1,Fresponse::41,s{226,226,1674566507592,1674566507592,0,0,0,0,1,0,226}INFOtype〔NODEADDED〕,data〔ChildData{pathns1,stat225,225,1674566507586,1674566507586,0,1,0,0,1,1,226,data〔65〕}〕,oldData〔null〕INFOINFOforPathChildrenCache回调INFOtype〔CHILDADDED〕,data〔ChildData{pathns1,stat225,225,1674566507586,1674566507586,0,1,0,0,1,1,226,data〔65〕}〕,InitialData〔null〕INFOINFOforNodeCache回调INFOINFOforTreeCache回调INFOtype〔NODEADDED〕,data〔ChildData{pathns1sub1,stat226,226,1674566507592,1674566507592,0,0,0,0,1,0,226,data〔65〕}〕,oldData〔null〕INFOINFOforPathChildrenCache回调INFOtype〔CHILDADDED〕,data〔ChildData{pathns1sub1,stat226,226,1674566507592,1674566507592,0,0,0,0,1,0,226,data〔65〕}〕,InitialData〔null〕INFOINFO修改ns1节点的值INFOforNodeCache回调INFOINFOforTreeCache回调INFOtype〔NODEUPDATED〕,data〔ChildData{pathns1,stat225,227,1674566507586,1674566507601,1,1,0,0,1,1,226,data〔66〕}〕,oldData〔ChildData{pathns1,stat225,225,1674566507586,1674566507586,0,1,0,0,1,1,226,data〔65〕}〕INFOINFOforPathChildrenCache回调INFOtype〔CHILDUPDATED〕,data〔ChildData{pathns1,stat225,227,1674566507586,1674566507601,1,1,0,0,1,1,226,data〔66〕}〕,InitialData〔null〕INFOINFO修改ns1sub1节点的值INFOforNodeCache回调INFOINFOforTreeCache回调INFOtype〔NODEUPDATED〕,data〔ChildData{pathns1sub1,stat226,228,1674566507592,1674566507605,1,0,0,0,1,0,226,data〔66〕}〕,oldData〔ChildData{pathns1sub1,stat226,226,1674566507592,1674566507592,0,0,0,0,1,0,226,data〔65〕}〕INFOINFOforPathChildrenCache回调INFOtype〔CHILDUPDATED〕,data〔ChildData{pathns1sub1,stat226,228,1674566507592,1674566507605,1,0,0,0,1,0,226,data〔66〕}〕,InitialData〔null〕INFOINFO删除ns1sub1节点INFOforNodeCache回调INFOINFOforTreeCache回调INFOtype〔NODEREMOVED〕,data〔ChildData{pathns1sub1,stat226,228,1674566507592,1674566507605,1,0,0,0,1,0,226,data〔66〕}〕,oldData〔null〕INFOINFOforPathChildrenCache回调INFOtype〔CHILDREMOVED〕,data〔ChildData{pathns1sub1,stat226,228,1674566507592,1674566507605,1,0,0,0,1,0,226,data〔66〕}〕,InitialData〔null〕INFOINFOforNodeCache回调INFOINFOforTreeCache回调INFOtype〔NODEREMOVED〕,data〔ChildData{pathns1,stat225,227,1674566507586,1674566507601,1,1,0,0,1,1,226,data〔66〕}〕,oldData〔null〕INFOINFOforPathChildrenCache回调INFO删除ns1节点INFOtype〔CHILDREMOVED〕,data〔ChildData{pathns1,stat225,227,1674566507586,1674566507601,1,1,0,0,1,1,226,data〔66〕}〕,InitialData〔null〕INFO 这里有个问题,CuratorCache。bridgeBuilder(curatorFramework,ns1)。build()设置监听的是ns1,后面又通过。forPathChildrenCache(test,curatorFramework,(client,event){设置了监听test,那么到底监听哪个?从日志上看是监听了ns,那为什么要设置test,是API设计问题?还是我用错了?欢迎交流!!!计数器SharedCounter ShareCount是curator的一个共享计数器,所有监视同一路径的客户端都将具有共享整数的最新值(考虑到ZK的一致性保证)。 主要涉及三个类ShareCount、SharedCountReader,SharedCountListener。以下是基本使用方法packagecn。programtalk;importlombok。extern。slf4j。Slf4j;importorg。apache。curator。RetryPolicy;importorg。apache。curator。framework。CuratorFramework;importorg。apache。curator。framework。CuratorFrameworkFactory;importorg。apache。curator。framework。recipes。shared。SharedCount;importorg。apache。curator。framework。recipes。shared。SharedCountListener;importorg。apache。curator。framework。recipes。shared。SharedCountReader;importorg。apache。curator。framework。state。ConnectionState;importorg。apache。curator。retry。ExponentialBackoffRetry;importorg。junit。jupiter。api。Test;importjava。util。concurrent。ExecutorService;importjava。util。concurrent。Executors;Slf4jpublicclassShareCountTest{连接地址publicstaticfinalStringCONNECTSTRING172。24。246。68:2181;publicstaticfinalRetryPolicyRETRYPOLICYnewExponentialBackoffRetry(1000,3);privatestaticfinalExecutorServiceEXECUTORSERVICEExecutors。newCachedThreadPool();TestpublicvoidtestShareCount()throwsException{CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(CONNECTSTRING,RETRYPOLICY);curatorFramework。start();SharedCountsharedCountnewSharedCount(curatorFramework,ShareCount,0);sharedCount。start();sharedCount。addListener(newSharedCountListener(){OverridepublicvoidcountHasChanged(SharedCountReadersharedCountReader,intnewCount)throwsException{log。info(countHasChangedcallback);log。info(newCount{},newCount);}OverridepublicvoidstateChanged(CuratorFrameworkclient,ConnectionStatenewState){}},EXECUTORSERVICE);sharedCount。setCount(1);TimeUnit。DAYS。sleep(1);sharedCount。close();}} 运行结果: 成功获取到了监听的值。DistributedAtomicLong 尝试原子增量的计数器。它首先尝试使用乐观锁定。如果失败,则采用可选的InterProcessMutex。对于乐观和互斥锁,使用重试策略重试增量。 有两个构造方法: publicDistributedAtomicLong(CuratorFrameworkclient,StringcounterPath,RetryPolicyretryPolicy)采用乐观模式。packagecn。programtalk;importlombok。SneakyThrows;importlombok。extern。slf4j。Slf4j;importorg。apache。curator。framework。CuratorFramework;importorg。apache。curator。framework。CuratorFrameworkFactory;importorg。apache。curator。framework。recipes。atomic。AtomicValue;importorg。apache。curator。framework。recipes。atomic。DistributedAtomicLong;importorg。apache。curator。retry。RetryForever;importorg。junit。jupiter。api。Test;Slf4jpublicclassDistributedAtomicLongTest{SneakyThrowsTestpublicvoidtestDistributedAtomicLong1(){CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(172。24。246。68:2181,newRetryForever(1000));curatorFramework。start();DistributedAtomicLongdistributedAtomicLongnewDistributedAtomicLong(curatorFramework,DistributedAtomicLong,newRetryForever(1000));AtomicValueLonglongAtomicValuedistributedAtomicLong。get();log。info(1。preValue{},postValue{},succeeded{},longAtomicValue。preValue(),longAtomicValue。postValue(),longAtomicValue。succeeded());设置初始值,如果节点已经存在,则会返回false。booleansucceeddistributedAtomicLong。initialize(0L);log。info(initializesucceed?{},succeed);longAtomicValuedistributedAtomicLong。get();log。info(2。preValue{},postValue{},succeeded{},longAtomicValue。preValue(),longAtomicValue。postValue(),longAtomicValue。succeeded());add将增量添加到当前值并返回新值信息。请记住始终检查AtomicValue。succeeded()。distributedAtomicLong。add(10L);longAtomicValuedistributedAtomicLong。get();log。info(3。preValue{},postValue{},succeeded{},longAtomicValue。preValue(),longAtomicValue。postValue(),longAtomicValue。succeeded());subtract从当前值中减去增量并返回新值信息。请记住始终检查AtomicValue。succeeded()。distributedAtomicLong。subtract(1L);longAtomicValuedistributedAtomicLong。get();log。info(4。preValue{},postValue{},succeeded{},longAtomicValue。preValue(),longAtomicValue。postValue(),longAtomicValue。succeeded());increment加一distributedAtomicLong。increment();longAtomicValuedistributedAtomicLong。get();log。info(5。preValue{},postValue{},succeeded{},longAtomicValue。preValue(),longAtomicValue。postValue(),longAtomicValue。succeeded());decrement减一distributedAtomicLong。decrement();longAtomicValuedistributedAtomicLong。get();log。info(6。preValue{},postValue{},succeeded{},longAtomicValue。preValue(),longAtomicValue。postValue(),longAtomicValue。succeeded());}} 运行结果:INFO1。preValue0,postValue0,succeededtrueINFOinitializesucceed?trueINFO2。preValue0,postValue0,succeededtrueINFO3。preValue10,postValue10,succeededtrueINFO4。preValue9,postValue9,succeededtrueINFO5。preValue10,postValue10,succeededtrueINFO6。preValue9,postValue9,succeededtrue 另外一个构造方法,提供类锁的模式,在互斥升级模式下创建。将首先使用给定的重试策略尝试乐观锁定。如果增量不成功,InterProcessMutex将使用自己的重试策略尝试。SneakyThrowsTestpublicvoidtestDistributedAtomicLong2(){CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(172。24。246。68:2181,newRetryForever(1000));curatorFramework。start();DistributedAtomicLongdistributedAtomicLong;distributedAtomicLongnewDistributedAtomicLong(curatorFramework,DistributedAtomicLong,newRetryForever(1000),PromotedToLock。builder()。lockPath(DistributedAtomicLongPromotedToLock)。timeout(3000,TimeUnit。MILLISECONDS)。retryPolicy(newRetryOneTime(1000))。build());AtomicValueLonglongAtomicValuedistributedAtomicLong。get();log。info(1。preValue{},postValue{},succeeded{},longAtomicValue。preValue(),longAtomicValue。postValue(),longAtomicValue。succeeded());设置初始值,如果节点已经存在,则会返回false。booleansucceeddistributedAtomicLong。initialize(0L);log。info(initializesucceed?{},succeed);longAtomicValuedistributedAtomicLong。get();log。info(2。preValue{},postValue{},succeeded{},longAtomicValue。preValue(),longAtomicValue。postValue(),longAtomicValue。succeeded());add将增量添加到当前值并返回新值信息。请记住始终检查AtomicValue。succeeded()。distributedAtomicLong。add(10L);longAtomicValuedistributedAtomicLong。get();log。info(3。preValue{},postValue{},succeeded{},longAtomicValue。preValue(),longAtomicValue。postValue(),longAtomicValue。succeeded());subtract从当前值中减去增量并返回新值信息。请记住始终检查AtomicValue。succeeded()。distributedAtomicLong。subtract(1L);longAtomicValuedistributedAtomicLong。get();log。info(4。preValue{},postValue{},succeeded{},longAtomicValue。preValue(),longAtomicValue。postValue(),longAtomicValue。succeeded());increment加一distributedAtomicLong。increment();longAtomicValuedistributedAtomicLong。get();log。info(5。preValue{},postValue{},succeeded{},longAtomicValue。preValue(),longAtomicValue。postValue(),longAtomicValue。succeeded());decrement减一distributedAtomicLong。decrement();longAtomicValuedistributedAtomicLong。get();log。info(6。preValue{},postValue{},succeeded{},longAtomicValue。preValue(),longAtomicValue。postValue(),longAtomicValue。succeeded());} 运行结果:INFO1。preValue9,postValue9,succeededtrueINFOinitializesucceed?falseINFO2。preValue9,postValue9,succeededtrueINFO3。preValue19,postValue19,succeededtrueINFO4。preValue18,postValue18,succeededtrueINFO5。preValue19,postValue19,succeededtrueINFO6。preValue18,postValue18,succeededtrue锁 使用ZK可以实现分布式锁功能。SharedReentrantLock(InterProcessMutex)基本说明 全局同步的完全分布式锁,这意味着没有两个客户端可以同时持有相同的锁。 其提供了如下构造方法publicInterProcessMutex(CuratorFrameworkclient,Stringpath){this(client,path,newStandardLockInternalsDriver());} 这里有两个参数client:CuratorFramework客户端,path:zookeeper的node,抢锁路径,同一个锁path需一致。publicvoidtestLock1()throwsException{curatorFramework。start();定义锁InterProcessMutexlocknewInterProcessMutex(curatorFramework,programtalklock);获取锁lock。acquire();log。info(此处是业务代码);模拟业务执行30秒TimeUnit。SECONDS。sleep(30);释放锁lock。release();} 某个时刻,查看Zk的节点,可以看到如下所示内容。 当执行完毕的时候,如果正常释放锁,则会清理到对应的信息。 JavaDoc文档中其实有说跨JVM的锁,那么同一个JVM中多线程使用这个锁可以吗,可以!packagecn。programtalk;importlombok。extern。slf4j。Slf4j;importorg。apache。curator。RetryPolicy;importorg。apache。curator。framework。CuratorFramework;importorg。apache。curator。framework。CuratorFrameworkFactory;importorg。apache。curator。framework。recipes。locks。InterProcessMutex;importorg。apache。curator。retry。ExponentialBackoffRetry;importjava。util。concurrent。TimeUnit;Slf4jpublicclassInterProcessMutexThreadTestimplementsRunnable{StringconnectStringlocalhost:2181;RetryPolicyretryPolicynewExponentialBackoffRetry(1000,3);Overridepublicvoidrun(){CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(connectString,retryPolicy);curatorFramework。start();定义锁InterProcessMutexlocknewInterProcessMutex(curatorFramework,InterProcessMutex);try{lock。acquire();StringthreadNameThread。currentThread()。getName();log。info({},执行业务代码开始,threadName);TimeUnit。SECONDS。sleep(10);log。info({},执行业务代码完毕,threadName);}catch(Exceptione){e。printStackTrace();}finally{try{lock。release();}catch(Exceptione){e。printStackTrace();}}}publicstaticvoidmain(String〔〕args){InterProcessMutexThreadTesttasknewInterProcessMutexThreadTest();Threadt1newThread(task,任务1);Threadt2newThread(task,任务2);t1。start();t2。start();}} 运行结果如下:INFO任务1,执行业务代码开始DEBUGReadingreplysessionid:0x100000022e20032,packet::clientPath:nullserverPath:nullfinished:falseheader::8,4replyHeader::8,458,0request::InterProcessMutexc4929b7d66c6b4a9aae485315dc67523elock0000000000,Tresponse::3139322e3136382e31302e31,s{457,457,1674654986411,1674654986411,0,0,0,72057594623164465,12,0,457}INFO任务1,执行业务代码完毕DEBUGGotnotificationsessionid:0x100000022e20032DEBUGReadingreplysessionid:0x100000022e20031,packet::clientPath:nullserverPath:nullfinished:falseheader::8,2replyHeader::8,459,0request::InterProcessMutexc4929b7d66c6b4a9aae485315dc67523elock0000000000,1response::nullDEBUGGotpingresponseforsessionid:0x100000022e20031after5ms。DEBUGGotWatchedEventstate:SyncConnectedtype:NodeDeletedpath:InterProcessMutexc4929b7d66c6b4a9aae485315dc67523elock0000000000forsessionid0x100000022e20032DEBUGGotpingresponseforsessionid:0x100000022e20032after2ms。DEBUGReadingreplysessionid:0x100000022e20032,packet::clientPath:nullserverPath:nullfinished:falseheader::9,12replyHeader::9,459,0request::InterProcessMutex,Fresponse::v{c284a2de137d142c4b8183d2206a50c34lock0000000001},s{455,455,1674654986408,1674654986408,0,3,0,0,0,1,459}INFO任务2,执行业务代码开始INFO任务2,执行业务代码完毕DEBUGReadingreplysessionid:0x100000022e20032,packet::clientPath:nullserverPath:nullfinished:falseheader::10,2replyHeader::10,460,0request::InterProcessMutexc284a2de137d142c4b8183d2206a50c34lock0000000001,1response::nullDEBUGGotpingresponseforsessionid:0x100000022e20032after7ms。DisconnectedfromthetargetVM,address:127。0。0。1:58751,transport:socketProcessfinishedwithexitcode0 可以看到两个任务是顺序执行的,不过单个JVM基本不使用分布式锁,JDK内置的锁即可!定义锁 正如上面所说的那样,通过构造方法去定义一个可重入排它锁,InterProcessMutexlocknewInterProcessMutex(curatorFramework,programtalklock);。获取锁 获取锁有两种方法,一种是使用上面所使用的lock。acquire();,这是个无参函数,他会一直尝试获取锁,如果获取不到则会一直阻塞。 另外一种是使用publicbooleanacquire(longtime,TimeUnitunit)throwsException,不同于上面那个,这个不会一直阻塞,time是时间参数,unit是时间的单位。超过这个时间则会放弃获取锁。 示例代码如下:lock。acquire(10,TimeUnit。SECONDS); 此代码的意思就是如果在10秒内能获取到锁则返回true,超过10秒获取不到则返回false。不会一直阻塞。释放锁 当程序执行完毕后必须释放锁,释放锁使用release()方法。可重入性packagecn。programtalk;importlombok。extern。slf4j。Slf4j;importorg。apache。curator。RetryPolicy;importorg。apache。curator。framework。CuratorFramework;importorg。apache。curator。framework。CuratorFrameworkFactory;importorg。apache。curator。framework。recipes。locks。InterProcessMutex;importorg。apache。curator。retry。ExponentialBackoffRetry;importorg。junit。jupiter。api。Test;importjava。util。concurrent。TimeUnit;InterProcessMutex锁的可重入性测试Slf4jpublicclassInterProcessMutexReentrantTest{StringconnectStringlocalhost:2181;RetryPolicyretryPolicynewExponentialBackoffRetry(1000,3);CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(connectString,retryPolicy);InterProcessMutexlocknewInterProcessMutex(curatorFramework,InterProcessMutexReentrantTest);voida()throwsException{lock。acquire();log。info(a方法执行);b();lock。release();}voidb()throwsException{lock。acquire();log。info(b方法执行);lock。release();}Testpublicvoidtest()throwsException{curatorFramework。start();a();}} 上面的代码中,a函数调用b函数,并且a和b都是用了同一个锁。执行结果如下: 程序正常执行,说明锁具备可重入性。SharedLock(InterProcessSemaphoreMutex)基本说明 InterProcessSemaphoreMutex也是一个排它锁,不同于InterProcessMutex的是,InterProcessSemaphoreMutex不是一个可重入锁。 使用方法(定义锁、获取锁、释放锁)跟InterProcessMutex没有太大区别。 代码示例:TestpublicvoidtestLock3()throwsException{curatorFramework。start();定义锁InterProcessLocklocknewInterProcessSemaphoreMutex(curatorFramework,InterProcessSemaphoreMutex);获取锁try{booleangotlock。acquire(30,TimeUnit。SECONDS);if(got){log。info(此处是业务代码);模拟业务执行30秒TimeUnit。SECONDS。sleep(30);}else{log。warn(未获取到锁);}}catch(Exceptione){e。printStackTrace();}finally{释放锁lock。release();}} 某个时刻,查看Zk的节点,可以看到如下所示内容。 可重入性packagecn。programtalk;importlombok。extern。slf4j。Slf4j;importorg。apache。curator。RetryPolicy;importorg。apache。curator。framework。CuratorFramework;importorg。apache。curator。framework。CuratorFrameworkFactory;importorg。apache。curator。framework。recipes。locks。InterProcessMutex;importorg。apache。curator。framework。recipes。locks。InterProcessSemaphoreMutex;importorg。apache。curator。retry。ExponentialBackoffRetry;importorg。junit。jupiter。api。Test;InterProcessSemaphoreMutex锁的可重入性测试Slf4jpublicclassInterProcessSemaphoreMutexReentrantTest{StringconnectStringlocalhost:2181;RetryPolicyretryPolicynewExponentialBackoffRetry(1000,3);CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(connectString,retryPolicy);InterProcessSemaphoreMutexlocknewInterProcessSemaphoreMutex(curatorFramework,InterProcessSemaphoreMutex);voida()throwsException{lock。acquire();log。info(a方法执行);b();lock。release();}voidb()throwsException{lock。acquire();log。info(b方法执行);lock。release();}Testpublicvoidtest()throwsException{curatorFramework。start();a();}} 运行效果如下图: 不会正常执行完毕,会一直锁住,说明此锁不具备可重入性。SharedReentrantReadWriteLock(InterProcessReadWriteLock)基本说明 InterProcessReadWriteLock是类似JDK的ReentrantReadWriteLock。一个读写锁管理一对相关的锁。一个负责读操作,另外一个负责写操作。读操作在写锁没被使用时可同时由多个进程使用,而写锁使用时不允许读(阻塞)。此锁是可重入的。一个拥有写锁的线程可重入读锁,但是读锁却不能进入写锁。这也意味着写锁可以降级成读锁,比如请求写锁读锁释放写锁。从读锁升级成写锁是不成的。 读锁和写锁有如下关系:读写互斥写写互斥读读不互斥 重入性 读写锁是可以重入的,意味着你获取了一次读锁写锁,那么你可以再次获取。但是要记得最后释放锁,获取了几次就得释放几次。定义锁定义读锁InterProcessReadWriteLocklocknewInterProcessReadWriteLock(curatorFramework,InterProcessReadWriteLock);获取锁InterProcessReadWriteLocklocknewInterProcessReadWriteLock(curatorFramework,InterProcessReadWriteLock);获取读锁InterProcessReadWriteLock。ReadLockreadLocklock。readLock();获取写锁InterProcessReadWriteLock。WriteLockwriteLocklock。writeLock();释放锁 同样使用release()释放锁writeLock。release();readLock。release();测试 读写互斥 读写互斥就是说,当写的时候(写锁没有释放的时候,无法读取)。packagecn。programtalk;importlombok。extern。slf4j。Slf4j;importorg。apache。curator。RetryPolicy;importorg。apache。curator。framework。CuratorFramework;importorg。apache。curator。framework。CuratorFrameworkFactory;importorg。apache。curator。framework。recipes。locks。InterProcessReadWriteLock;importorg。apache。curator。retry。ExponentialBackoffRetry;importorg。apache。curator。retry。RetryForever;importorg。junit。jupiter。api。Test;importjava。util。concurrent。TimeUnit;Slf4jpublicclassInterProcessReadWriteLockTest{StringconnectString172。24。246。68:2181;RetryPolicyretryPolicynewRetryForever(1000);TestpublicvoidtestRead()throwsException{CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(connectString,retryPolicy);curatorFramework。start();InterProcessReadWriteLocklocknewInterProcessReadWriteLock(curatorFramework,InterProcessReadWriteLock);InterProcessReadWriteLock。ReadLockreadLocklock。readLock();readLock。acquire();log。info(读成功);readLock。release();}TestpublicvoidtestWrite()throwsException{CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(connectString,retryPolicy);curatorFramework。start();InterProcessReadWriteLocklocknewInterProcessReadWriteLock(curatorFramework,InterProcessReadWriteLock);InterProcessReadWriteLock。WriteLockwriteLocklock。writeLock();writeLock。acquire();TimeUnit。SECONDS。sleep(30);log。info(写成功);writeLock。release();}} testRead方法是读,testWrite方法是写,testWrite我休眠了30秒,主要是为了锁释放慢一点,来测试是否可读。 首先运行testWrite,然后运行testRead(不到超过30后再运行,为了保证此时写锁并没有释放)。 在读锁没有释放之前,运行效果图如下: 可以看到读也阻塞着,等待一段时间后,写锁释放,读也就不会继续阻塞,运行完毕。 写写互斥 运行两次testWrite方法,要保证多实例运行。idea需要设置。按照下图设置。 接下来运行testWrite方法。 第一个没运行完,第二个也会阻塞。 读读不互斥 我就不具体测试了,道理一样。 可重入性publicvoidtestWrite()throwsException{CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(connectString,retryPolicy);curatorFramework。start();InterProcessReadWriteLocklocknewInterProcessReadWriteLock(curatorFramework,InterProcessReadWriteLock);InterProcessReadWriteLock。WriteLockwriteLocklock。writeLock();writeLock。acquire();writeLock。acquire();log。info(写成功);writeLock。release();writeLock。release();} 程序能够正常执行完毕,说明具备可重入性。使用场景 分布式读写锁适用于读多写少的情况。MultiSharedLock(InterProcessMultiLock)基本说明 InterProcessMultiLock是多锁的意思,相当于一个容器,包含了多个锁。统一管理,一起获取锁,一起释放锁。 定义锁 他有两个构造方法。 InterProcessMultiLock(CuratorFramework,List)创造的是一个InterProcessMutex的锁。packagecn。programtalk;importlombok。extern。slf4j。Slf4j;importorg。apache。curator。RetryPolicy;importorg。apache。curator。framework。CuratorFramework;importorg。apache。curator。framework。CuratorFrameworkFactory;importorg。apache。curator。framework。recipes。locks。InterProcessMultiLock;importorg。apache。curator。framework。recipes。locks。InterProcessReadWriteLock;importorg。apache。curator。retry。RetryForever;importorg。junit。jupiter。api。Test;importjava。util。List;Slf4jpublicclassInterProcessMultiLockTest{StringconnectString172。24。246。68:2181;RetryPolicyretryPolicynewRetryForever(1000);TestpublicvoidtestInterProcessMultiLock1()throwsException{CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(connectString,retryPolicy);curatorFramework。start();InterProcessMultiLocklocknewInterProcessMultiLock(curatorFramework,List。of(InterProcessMultiLock1,InterProcessMultiLock2));lock。acquire();log。info(读成功);lock。release();}} 运行后,从命令行看: 创建了两个节点。 两外一个构造方法是InterProcessMultiLock(Listlocks),它则允许任何实现InterProcessLock的锁。TestpublicvoidtestInterProcessMultiLock2()throwsException{CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(connectString,retryPolicy);curatorFramework。start();ListInterProcessLockmutexesLists。newArrayList();InterProcessMutexinterProcessMutexnewInterProcessMutex(curatorFramework,InterProcessMultiLock3);mutexes。add(interProcessMutex);InterProcessSemaphoreMutexinterProcessSemaphoreMutexnewInterProcessSemaphoreMutex(curatorFramework,InterProcessMultiLock4);mutexes。add(interProcessSemaphoreMutex);InterProcessMultiLocklocknewInterProcessMultiLock(mutexes);lock。acquire();log。info(读成功);lock。release();} 运行结果:命令行查看。 SharedSemaphore(InterProcessSemaphoreV2)基本说明 InterProcessSemaphoreV2是一个信号量,跨JVM工作,多个客户端使用通过一个path则会统一受到进程间租约限制。这个信号量是公平的,会按照顺序获得租约。 直白点说:InterProcessSemaphoreV2就类似JDK中的Semaphore,Semaphore用于控制进入临界区的线程数,而InterProcessSemaphoreV2是跨JVM的而已。 有两个构造方法:最大租约是由给定路径的用户维护的约定。publicInterProcessSemaphoreV2(CuratorFrameworkclient,Stringpath,intmaxLeases)SharedCountReader用作给定路径的信号量的方法,以确定最大租约。publicInterProcessSemaphoreV2(CuratorFrameworkclient,Stringpath,SharedCountReadercount) 第一个我们就叫做INT类型构造方法,第二个叫做SharedCountReader类型构造方法。他们是有区别的,接下来我通过图片加描述的方式来说明下: 这两个构造方法的主要区别在第三个参数上,前者是int类型,后者是SharedCountReader类型,也就是说前者不是分布式共享的数,后者是共享的。 INT类型构造方法 INT类型构造方法的maxLeases参数是用户传递进入构造方法中的,也就是说在JVM中直接设置,那么就有可能出现在JVM1中设置的是2,在JVM2中设置的是3,并且Curator明确说明不会检查,这就可能出现,两个JVM中最大规约不一致导致出现问题。所以使用者必须保证设置相同。 SharedCountReader类型构造方法 SharedCountReader类型构造方法不是直接设置,而是区Zookeeper中去获取(相当多个JVM有相同的租约存储地址),然后加载设置到JVM中。并且该种方式会有SharedCount的监听器。 两者实现的功能是一样的,也都跨JVM!!!代码示例packagecn。programtalk;importlombok。extern。slf4j。Slf4j;importorg。apache。curator。RetryPolicy;importorg。apache。curator。framework。CuratorFramework;importorg。apache。curator。framework。CuratorFrameworkFactory;importorg。apache。curator。framework。recipes。locks。InterProcessSemaphoreV2;importorg。apache。curator。framework。recipes。locks。Lease;importorg。apache。curator。framework。recipes。shared。SharedCount;importorg。apache。curator。retry。RetryForever;importorg。junit。jupiter。api。Test;Slf4jpublicclassInterProcessSemaphoreV2Test{staticStringconnectString172。24。246。68:2181;staticRetryPolicyretryPolicynewRetryForever(10000);TestpublicvoidtestInterProcessSemaphoreV21()throwsException{CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(connectString,retryPolicy);curatorFramework。start();InterProcessSemaphoreV2interProcessSemaphoreV2newInterProcessSemaphoreV2(curatorFramework,InterProcessSemaphoreV21,3);Leaseleasenull;try{leaseinterProcessSemaphoreV2。acquire();log。info({}获取到租约,Thread。currentThread()。getName());}catch(Exceptione){e。printStackTrace();}finally{为了测试租约等待情况,我不释放租约interProcessSemaphoreV2。returnLease(lease);log。info({}释放掉租约,Thread。currentThread()。getName());}while(true){}}TestpublicvoidtestInterProcessSemaphoreV22()throwsException{CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(connectString,retryPolicy);curatorFramework。start();SharedCountsharedCountnewSharedCount(curatorFramework,InterProcessSemaphoreV22SharedCount,3);sharedCount。start();InterProcessSemaphoreV2interProcessSemaphoreV2newInterProcessSemaphoreV2(curatorFramework,InterProcessSemaphoreV22,sharedCount);Leaseleasenull;try{leaseinterProcessSemaphoreV2。acquire();log。info({}获取到租约,Thread。currentThread()。getName());}catch(Exceptione){e。printStackTrace();}finally{为了测试租约等待情况,我不释放租约interProcessSemaphoreV2。returnLease(lease);log。info({}释放掉租约,Thread。currentThread()。getName());}while(true){}}} testInterProcessSemaphoreV21方法,使用的是INT类型构造方法,testInterProcessSemaphoreV22使用的是SharedCountReader类型构造方法。 因为两者功能一样,我就使用testInterProcessSemaphoreV22进行测试。 使用IDEA运行testInterProcessSemaphoreV22(多实例运行)四次。 截图如下: 第一次: 第二次: 第三次: 第四次: 前三次都能够正常执行(正常打印),第四次次一直在等待获取租约,没有问题,因为我信号量设置的最大租约就是3!。屏障(Barriers)Barrier DistributedBarrier分布式系统使用屏障来阻止一组节点的处理,直到满足允许所有节点继续的条件为止。 创建屏障 DistributedBarrier提供了一个构造方法。 使用者通过构造方法直接new即可。 设置屏障 解除屏障 代码示例packagecn。programtalk;importlombok。extern。slf4j。Slf4j;importorg。apache。curator。framework。CuratorFramework;importorg。apache。curator。framework。CuratorFrameworkFactory;importorg。apache。curator。framework。recipes。barriers。DistributedBarrier;importorg。apache。curator。retry。RetryForever;importorg。junit。jupiter。api。Test;importjava。util。concurrent。ExecutorService;importjava。util。concurrent。Executors;importjava。util。concurrent。TimeUnit;Slf4jpublicclassDistributedBarrierTest{TestpublicvoidtestDistributedBarrier()throwsException{CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(172。24。246。68:2181,newRetryForever(1000));curatorFramework。start();创建DistributedBarrierDistributedBarrierdistributedBarriernewDistributedBarrier(curatorFramework,DistributedBarrier);setBarrier的功能是创建pathdistributedBarrier。setBarrier();ExecutorServiceexecutorServiceExecutors。newCachedThreadPool();for(inti0;i10;i){executorService。submit((){try{StringthreadNameThread。currentThread()。getName();log。info({}线程设置屏障,threadName);distributedBarrier。waitOnBarrier();log。info(屏障被移除,{}线程继续执行,threadName);}catch(Exceptione){thrownewRuntimeException(e);}});}TimeUnit。SECONDS。sleep(5);log。info(移除屏障);distributedBarrier。removeBarrier();while(true){}}} 运行结果:2023013114:49:05〔pool4thread7〕INFOcn。programtalk。DistributedBarrierTestpool4thread7线程设置屏障2023013114:49:05〔pool4thread5〕INFOcn。programtalk。DistributedBarrierTestpool4thread5线程设置屏障2023013114:49:05〔pool4thread1〕INFOcn。programtalk。DistributedBarrierTestpool4thread1线程设置屏障2023013114:49:05〔pool4thread8〕INFOcn。programtalk。DistributedBarrierTestpool4thread8线程设置屏障2023013114:49:05〔pool4thread10〕INFOcn。programtalk。DistributedBarrierTestpool4thread10线程设置屏障2023013114:49:05〔pool4thread9〕INFOcn。programtalk。DistributedBarrierTestpool4thread9线程设置屏障2023013114:49:05〔pool4thread3〕INFOcn。programtalk。DistributedBarrierTestpool4thread3线程设置屏障2023013114:49:05〔pool4thread2〕INFOcn。programtalk。DistributedBarrierTestpool4thread2线程设置屏障2023013114:49:05〔pool4thread6〕INFOcn。programtalk。DistributedBarrierTestpool4thread6线程设置屏障2023013114:49:05〔pool4thread4〕INFOcn。programtalk。DistributedBarrierTestpool4thread4线程设置屏障2023013114:49:10〔main〕INFOcn。programtalk。DistributedBarrierTest移除屏障2023013114:49:19〔pool4thread7〕INFOcn。programtalk。DistributedBarrierTest屏障被移除,pool4thread7线程继续执行2023013114:49:20〔pool4thread5〕INFOcn。programtalk。DistributedBarrierTest屏障被移除,pool4thread5线程继续执行2023013114:49:20〔pool4thread1〕INFOcn。programtalk。DistributedBarrierTest屏障被移除,pool4thread1线程继续执行2023013114:49:21〔pool4thread8〕INFOcn。programtalk。DistributedBarrierTest屏障被移除,pool4thread8线程继续执行2023013114:49:21〔pool4thread10〕INFOcn。programtalk。DistributedBarrierTest屏障被移除,pool4thread10线程继续执行2023013114:49:21〔pool4thread9〕INFOcn。programtalk。DistributedBarrierTest屏障被移除,pool4thread9线程继续执行2023013114:49:21〔pool4thread3〕INFOcn。programtalk。DistributedBarrierTest屏障被移除,pool4thread3线程继续执行2023013114:49:21〔pool4thread2〕INFOcn。programtalk。DistributedBarrierTest屏障被移除,pool4thread2线程继续执行 线程任务中设置了屏障,主线程等了5秒,之后解除了屏障,屏障解除后,所有线程继续执行后面的代码。DistributedDoubleBarrier DistributedDoubleBarrier双重屏障能够让客户端在任务的开始和结束阶段更好的同步控制。当有足够的任务已经进入到屏障后,一起开始,一旦任务完成则离开屏障。 不同于DistributedBarrier,DistributedDoubleBarrier允许设置一个阈值数量(只是个阈值,不是个限制。),只有数目大于等于设置的这个阈值后才会继续执行,特别强调是大于等于!!!。创建屏障 进入屏障 离开屏障 代码示例packagecn。programtalk;importlombok。extern。slf4j。Slf4j;importorg。apache。curator。framework。CuratorFramework;importorg。apache。curator。framework。CuratorFrameworkFactory;importorg。apache。curator。framework。recipes。barriers。DistributedBarrier;importorg。apache。curator。framework。recipes。barriers。DistributedDoubleBarrier;importorg。apache。curator。retry。RetryForever;importorg。junit。jupiter。api。Test;importjava。util。concurrent。ExecutorService;importjava。util。concurrent。Executors;importjava。util。concurrent。TimeUnit;Slf4jpublicclassDistributedDoubleBarrierTest{TestpublicvoidtestDistributedDoubleBarrier(){CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(172。24。246。68:2181,newRetryForever(1000));curatorFramework。start();ExecutorServiceexecutorServiceExecutors。newCachedThreadPool();for(inti0;i10;i){executorService。submit((){try{创建distributedDoubleBarrierDistributedDoubleBarrierdistributedDoubleBarriernewDistributedDoubleBarrier(curatorFramework,DistributedDoubleBarrier,2);distributedDoubleBarrier。enter();StringthreadNameThread。currentThread()。getName();。。。