头条创作挑战赛 结构化并发在Java19中是孵化功能,比预览功能的成熟度更低。想要尝鲜的朋友可以试试。想要在LTS版本中使用该功能,估计得等到Java25了。 结构化并发并不是一个新鲜的名词,这个概念已经出现很久了。Kotlin的协程(Coroutines)就已经实现了结构化并发。Java19中引入了结构化并发,可以作为一个新的多线程编程模式。结构化并发和虚拟线程(说一说Java19中的虚拟线程)都来自OpenJDK的Loom项目。多线程编程的难点 多线程编程一直以来都是Java开发中比较复杂难懂的部分。这一方面是多线程编程自身的复杂性,另外一方面则是由于语言和标准库的限制。如果给你一个开发任务,让你在单线程中完成,那肯定会简单很多。比如下面的一段代码,在takeAction方法里面,分别调用callOp1和callOp2方法来得到两个值,最后再把用得到的结果值来调用callOp3。inttakeAction(Stringinput){intresult1callOp1(input);intresult2callOp2(input);returncallOp3(result1,result2);} 如果takeAction方法在同一个线程中执行,那么整个方法内部的调用流程是很清晰的。当takeAction方法返回时,callOp1、callOp2和callOp3方法必定已经完成,或者由于之前的错误还未被调用。也就是说,这些方法的状态是确定的,只能处于执行成功、执行失败和未执行这三种状态之一。 如果callOp1、callOp2和callOp3这3个方法都是在各自的线程中运行的,那么很多在单线程情况下理所应当的结论,就不会成立了。这些方法的状态会变得更复杂:由于执行方法的线程池的负载过大,方法处于等待执行的状态。执行方法的线程可能被中断,导致方法的执行被取消。callOp1和callOp2是并行执行的,如果callOp1在执行中产生了错误,而此时callOp2如果还在执行中,应该取消callOp2的执行,因为takeAction会以错误的结果返回,callOp2的结果不再需要了。当takeAction方法以错误的结果返回时,callOp2可能仍然还在执行中,直到得到最后的结果,但是这个结果会被丢弃。这是因为对callOp2的取消请求可能未得到及时的响应。任何一个任务的执行时间可能都过长,比如它可能需要等待别的任务完成。这就要求妥善处理超时的问题。 这是多线程开发难以掌握的原因之一,因为它要求你考虑各种复杂的情况,拉高了开发的门槛。在实际的开发中,我们会使用Java标准库和第三方库提供的API来进行多线程开发。这可以在一定程度上简化开发。任务执行 在多线程开发中,绝大部分的工作都可以抽象成任务执行。这些任务的执行有下面这些特点:执行流程从主任务开始。主任务会被划分成多个子任务。子任务可以被进一步划分。全部的任务会组成一个树形结构。每个任务在独立的线程中执行。任务的执行可能成功或失败,也可能被取消。父任务负责管理子任务。根据子任务的执行状态,已有的正在运行的子任务可能被取消,新的子任务可能被创建。任务之间通过线程安全的数据结构来共享数据,或者使用消息传递机制来进行通信。 在目前的Java开发中,对于这样多线程任务的执行,一般使用的是CompletableFuture或ListenableFuture的级联方式。每个任务返回一个CompletableFuture对象来表示执行的结果。如果执行成功,CompletableFuture表示得到的结果值;如果执行失败,CompletableFuture表示产生的异常。使用CompletableFuture时并不是直接获取结果,而是添加一个回调方法,表示当前任务执行之后的下一步操作。 同样的takeAction方法,如果用CompletableFuture来实现,会是下面这样的代码。与单线程的代码相比,CompletableFuture的代码并不直观。CompletableFutureIntegertakeAction(Stringinput){varresult1callOp1(input);varresult2callOp2(input);returnresult1。thenCombine(result2,(v1,v2)newint〔〕{v1,v2})。thenCompose(valuescallOp3(values〔0〕,values〔1〕));}结构化并发 对于结构化并发(StructuredConcurrency),并没有一个清晰的解释。结构化并发要解决的问题是,如何让开发人员更加容易地编写并发代码,尽可能地对开发人员屏蔽多线程相关的细节。具体来说,结构化并发让你以类似单线程的方式来编写多线程代码。 由于结构化并发在Java19中是孵化功能,相关的模块需要被显式地启用。这是通过添加javac和java的参数addmodulesjdk。incubator。concurrent来实现的。 结构化并发使用了前面提到的任务执行的概念。在使用结构化并发之前,首先需要创建一个结构化任务作用域(scope),然后在这个作用域中创建并执行任务。这些任务的生命周期由作用域负责管理。当作用域被关闭时,其中所包含的任务都会结束。这就意味着开发人员不用处理任务的失败、取消和超时等情况,完全由底层平台提供支持。 在一个作用域中,其中创建的任务中也可以创建自己的作用域,用来管理该任务的子任务。这就形成了一个任务组成的树形结构。 使用结构化并发的基础类是jdk。incubator。concurrent。StructuredTaskScope。StructuredTaskScope表示的是一个使用结构化并发的作用域。 下表列出了StructuredTaskScope中的方法。 方法 说明 fork(Callablelt;?extendsUtask) 启动一个线程来执行任务 join() 等待所有任务执行完成,或当前作用域被关闭 joinUntil(Instantdeadline) 与join()相同,只不过设置了终止时间 shutdown() 结束任务作用域 close() 关闭任务作用域 下面的代码给出了StructuredTaskScope的使用示例,用来实现takeAction方法。在takeAction方法中的作用域中创建了调用callOp1和callOp2的子任务。等这两个子任务完成之后,使用得到的结果调用combine方法。combine方法中也创建了作用域,调用了callOp3一个子任务。publicclassStructuredWay{inttakeAction(Stringinput)throwsInterruptedException,ExecutionException{try(varscopenewStructuredTaskScope。ShutdownOnFailure()){FutureIntegerv1scope。fork(()callOp1(input));FutureIntegerv2scope。fork(()callOp2(input));scope。join();scope。throwIfFailed();returncombine(v1。resultNow(),v2。resultNow());}}intcombine(intresult1,intresult2)throwsInterruptedException,ExecutionException{try(varscopenewStructuredTaskScope。ShutdownOnFailure()){FutureIntegerrscope。fork(()callOp3(result1,result2));scope。join();scope。throwIfFailed();returnr。resultNow();}}} 使用StructuredTaskScope的基本流程如下:主任务创建一个StructuredTaskScope对象。使用fork方法来创建子任务。使用join或joinUntil方法来等待子任务完成或取消。在join或joinUntil方法返回之后,处理子任务中可能出现的错误,并使用子任务产生的结果。关闭作用域对象,一般使用trywithresources可以自动进行关闭。 看到上面的代码,你的第一反应可能是这样的代码也很繁琐。不过这里的重点在于思维方式的变化。结构化并发的思维方式,更加类似传统的单线程应用,因此更容易理解。 在上述的代码中,用到的是作用域的实现ShutdownOnFailure。与它作用类似的是ShutdownOnSuccess。ShutdownOnFailure适用的是作用域中的所有任务都必须成功的场景。只要有一个任务失败,该作用域的shutdown方法会被调用,其他未完成的任务线程也会被中断。ShutdownOnSuccess适用的是作用域中的任意任务成功即可的场景。只要有一个任务成功,该作用域的shutdown方法会被调用,其他未完成的任务线程也会被中断。 对于作用域的任务,如果在执行中出现错误,可以调用StructuredTaskScope的shutdown方法来终止执行。调用shutdown方法会阻止新任务的执行,同时取消正在运行中的任务。 关于结构化并发的基本介绍就到这里。由于结构化并发的实现还处于非常早期的阶段,API可能发生在后续版本中产生变化。大家并不需要关注过多的细节,但是有必要关注这种即将到来的新的多线程编程模式,应该会让以后的多线程编程更简单。