上一篇:Dubbo3.0 RPC调用集群容错策略广播机制源码分析使用场景 并行调用服务提供者,只要有一个节点返回正确则返回(只适用于查询数据场景)。 源码分析/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dubbo.rpc.cluster.support; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.Result; import org.apache.dubbo.rpc.RpcContext; import org.apache.dubbo.rpc.RpcException; import org.apache.dubbo.rpc.cluster.Directory; import org.apache.dubbo.rpc.cluster.LoadBalance; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT; import static org.apache.dubbo.common.constants.CommonConstants.FORKS_KEY; import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY; import static org.apache.dubbo.rpc.cluster.Constants.DEFAULT_FORKS; /** * NOTICE! This implementation does not work well with async call. ** Invoke a specific number of invokers concurrently, usually used for demanding real-time operations, but need to waste more service resources. * * Fork */ /** * 并行去调用几个服务实例,如果那个服务实例先返回结果了就用谁返回的结果 * 优点:可以快速返回结果,缺点:增加了CPU负载 * @param
*/ public class ForkingClusterInvoker extends AbstractClusterInvoker { /** * Use {@link NamedInternalThreadFactory} to produce {@link org.apache.dubbo.common.threadlocal.InternalThread} * which with the use of {@link org.apache.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}. */ private final ExecutorService executor; public ForkingClusterInvoker(Directory directory) { super(directory); executor = directory.getUrl().getOrDefaultApplicationModel().getApplicationExecutorRepository().getSharedExecutor(); } @Override @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke(final Invocation invocation, List > invokers, LoadBalance loadbalance) throws RpcException { try { /** 检查invokers是否为空 */ checkInvokers(invokers, invocation); final List > selected; /** 并行度,默认:2 */ final int forks = getUrl().getParameter(FORKS_KEY, DEFAULT_FORKS); /** 超时时间:1000 */ final int timeout = getUrl().getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); /** 判断并行度是否小于0或者大于invokers集合数量 */ if (forks <= 0 || forks >= invokers.size()) { /** * 如果满足条件,直接将invokers赋值给selected * 说明同时对所有目标服务实例发起调用 */ selected = invokers; } else { /** 根据并行度构建集合 */ selected = new ArrayList<>(forks); /** 循环直至selected集合达到并行度数量 */ while (selected.size() < forks) { /** 根据负载均衡策略选择一个Invoker */ Invoker invoker = select(loadbalance, invocation, invokers, selected); /** 判断是否已存在待调用的Invoker集合中,不存在则添加 */ if (!selected.contains(invoker)) { //Avoid add the same invoker several times. selected.add(invoker); } } } RpcContext.getServiceContext().setInvokers((List) selected); final AtomicInteger count = new AtomicInteger(); final BlockingQueue