此次咱们看的是客户端部分。bootstrap
1:在客户端咱们使用的是注解 @GlobalTransactional。会建立代理 GlobalTransactionScanner。在代理的初始化代码中,会进行TM和RM的初始化,代码以下:缓存
private void initClient() { if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) { throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup)); } //init TM TMClient.init(applicationId, txServiceGroup, accessKey, secretKey); if (LOGGER.isInfoEnabled()) { LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup); } //init RM RMClient.init(applicationId, txServiceGroup); registerSpringShutdownHook(); }
2:在 TMClient 或者 RMClient 的init 方法里,会建立 NettyClientBootstrap 实例。在 NettyClientBootstrap 构造过程当中,会建立 Bootstrap 实例,也会建立 NioEventLoopGroup 的客户端事件选择器。代码以下:服务器
public class NettyClientBootstrap implements RemotingBootstrap { private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientBootstrap.class); private final NettyClientConfig nettyClientConfig; private final Bootstrap bootstrap = new Bootstrap(); private final EventLoopGroup eventLoopGroupWorker; private EventExecutorGroup defaultEventExecutorGroup; private final AtomicBoolean initialized = new AtomicBoolean(false); public NettyClientBootstrap(NettyClientConfig nettyClientConfig, final EventExecutorGroup eventExecutorGroup, NettyPoolKey.TransactionRole transactionRole) { if (nettyClientConfig == null) { nettyClientConfig = new NettyClientConfig(); } this.nettyClientConfig = nettyClientConfig; int selectorThreadSizeThreadSize = this.nettyClientConfig.getClientSelectorThreadSize(); this.transactionRole = transactionRole; this.eventLoopGroupWorker = new NioEventLoopGroup(selectorThreadSizeThreadSize, new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()), selectorThreadSizeThreadSize)); this.defaultEventExecutorGroup = eventExecutorGroup; }
3:建立以后,会调用 NettyClientBootstrap 的 start 方法,创建netty的客户端代码,以下:app
public void start() { this.bootstrap.group(this.eventLoopGroupWorker).channel( //绑定事件选择器 nettyClientConfig.getClientChannelClazz()).option( //设置通道类型,默认是NioSocketChannel ChannelOption.TCP_NODELAY, true) // TCP不缓存直接发送 .option(ChannelOption.SO_KEEPALIVE, true) // TCP进行心跳检测 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) // 设置链接超时时间 .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()) //设置发送缓存区大小 .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()); //设置接受缓冲区大小 bootstrap.handler(new ChannelInitializer<SocketChannel>() { //设置通道处理器 @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast( new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(), //添加通道空闲心跳处理器 nettyClientConfig.getChannelMaxWriteIdleSeconds(), nettyClientConfig.getChannelMaxAllIdleSeconds())) .addLast(new ProtocolV1Decoder()) //通道消息解码处理器 .addLast(new ProtocolV1Encoder()); //通道消息编码处理器 if (channelHandlers != null) { addChannelPipelineLast(ch, channelHandlers); //添加处理器 ClientHandler } } }); if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) { LOGGER.info("NettyClientBootstrap has started"); } }
4:在seata客户端,使用netty客户端的时候,使用了池化技术,其工厂类是 NettyPoolableFactory。在 makeObject 方法中去获取netty的链接通道。获取通道的代码以下:ide
public Channel getNewChannel(InetSocketAddress address) { Channel channel; ChannelFuture f = this.bootstrap.connect(address); // 链接netty服务器 try { f.await(this.nettyClientConfig.getConnectTimeoutMillis(), TimeUnit.MILLISECONDS); // 等待链接完成 if (f.isCancelled()) { throw new FrameworkException(f.cause(), "connect cancelled, can not connect to services-server."); } else if (!f.isSuccess()) { throw new FrameworkException(f.cause(), "connect failed, can not connect to services-server."); } else { channel = f.channel(); //获取通道 } } catch (Exception e) { throw new FrameworkException(e, "can not connect to services-server."); } return channel; }
5:发送消息的示例代码(这是须要获取返回值的状况,若是不须要获取返回值,直接调用 channel.writeAndFlush()便可):oop
protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException { MessageFuture messageFuture = new MessageFuture(); messageFuture.setRequestMessage(rpcMessage); messageFuture.setTimeout(timeoutMillis); futures.put(rpcMessage.getId(), messageFuture); channelWritableCheck(channel, rpcMessage.getBody()); String remoteAddr = ChannelUtil.getAddressFromChannel(channel); doBeforeRpcHooks(remoteAddr, rpcMessage); channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> { if (!future.isSuccess()) { MessageFuture messageFuture1 = futures.remove(rpcMessage.getId()); if (messageFuture1 != null) { messageFuture1.setResultMessage(future.cause()); } destroyChannel(future.channel()); } }); try { Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS); doAfterRpcHooks(remoteAddr, rpcMessage, result); return result; } catch (Exception exx) { if (exx instanceof TimeoutException) { throw (TimeoutException) exx; } else { throw new RuntimeException(exx); } } }