Netty实战:设计一个IM框架

快速开始算法

bitchat-example模块提供了一个服务端与客户端的实现示例,能够参照该示例进行本身的业务实现。数据库

启动服务端promise

要启动服务端,须要获取一个 Server 的实例,能够经过 ServerFactory 来获取。服务器

目前只实现了单机模式下的 Server ,经过 SimpleServerFactory 只须要定义一个端口便可获取一个单机的 Server 实例,以下所示:架构

public class StandaloneServerApplication {
    public static void main(String[] args) {
        Server server = SimpleServerFactory.getInstance()
            .newServer(8864);
        server.start();
    }
} 框架

服务端启动成功后,将显示以下信息:异步

启动客户端async

目前只实现了直连服务器的客户端,经过 SimpleClientFactory 只须要指定一个 ServerAttr 便可获取一个客户端,而后进行客户端与服务端的链接,以下所示:ide

public class DirectConnectServerClientApplication { 性能

    public static void main(String[] args) {
        Client client = SimpleClientFactory.getInstance()
            .newClient(ServerAttr.getLocalServer(8864));
        client.connect();

        doClientBiz(client);
    }
}

客户端链接上服务端后,将显示以下信息:

体验客户端的功能

目前客户端提供了三种 Func,分别是:登陆,查看在线用户列表,发送单聊消息,每种 Func 有不一样的命令格式。

登陆

经过在客户端中执行如下命令 -lo houyi 123456 便可实现登陆,目前用户中心还未实现,经过 Mock 的方式实现一个假的用户服务,因此输入任何的用户名密码都会登陆成功,而且会为用户建立一个用户id。

登陆成功后,显示以下:

查看在线用户

再启动一个客户端,而且也执行登陆,登陆成功后,能够执行 -lu 命令,获取在线用户列表,目前用户是保存在内存中,获取的结果以下所示:

发送单聊信息

用 gris 这个用户向 houyi 这个用户发送单聊信息,只要执行 -pc 1 hello,houyi 命令便可

其中第二个参数数要发送消息给那个用户的用户id,第三个参数是消息内容

消息发送方,发送完消息:

消息接收方,接收到消息:

客户端断线重连

客户端和服务端之间维持着心跳,双方都会检查链接是否可用,客户端每隔5s会向服务端发送一个 PingPacket,而服务端接收到这个 PingPacket 以后,会回复一个 PongPacket,这样表示双方都是健康的。

当由于某种缘由,服务端没有收到客户端发送的消息,服务端将会把该客户端的链接断开,一样的客户端也会作这样的检查。

当客户端与服务端之间的链接断开以后,将会触发客户端 HealthyChecker 的 channelInactive 方法,从而进行客户端的断线重连。

总体架构单机版

单机版的架构只涉及到服务端、客户端,另外有二者之间的协议层,以下图所示:

除了服务端和客户端以外,还有三大中心:消息中心,用户中心,连接中心。

  • 消息中心:主要负责消息的存储与历史、离线消息的查询
  • 用户中心:主要负责用户和群组相关的服务
  • 连接中心:主要负责保存客户端的连接,服务端从连接中心获取客户端的连接,向其推送消息

集群版

单机版没法作到高可用,性能与可服务的用户数也有必定的限制,因此须要有可扩展的集群版,集群版在单机版的基础上增长了一个路由层,客户端经过路由层来得到可用的服务端地址,而后与服务端进行通信,以下图所示:

客户端发送消息给另外一个用户,服务端接收到这个请求后,从 Connection中心中获取目标用户“挂”在哪一个服务端下,若是在本身名下,那最简单直接将消息推送给目标用户便可,若是在其余服务端,则须要将该请求转交给目标服务端,让目标服务端将消息推送给目标用户。

自定义协议

经过一个自定义协议来实现服务端与客户端之间的通信,协议中有以下几个字段:

*
* <p>
* The structure of a Packet is like blow:
* +----------+----------+----------------------------+
* |  size    |  value   |  intro                     |
* +----------+----------+----------------------------+
* | 1 bytes  | 0xBC     |  magic number              |
* | 1 bytes  |          |  serialize algorithm       |
* | 4 bytes  |          |  packet symbol             |
* | 4 bytes  |          |  content length            |
* | ? bytes  |          |  the content               |
* +----------+----------+----------------------------+
* </p>
*

每一个字段的含义

[td]

所占字节

用途

1

魔数,默认为 0xBC

1

序列化的算法

4

Packet 的类型

4

Packet 的内容长度

?

Packet 的内容

序列化算法将会决定该 Packet 在编解码时,使用何种序列化方式。

Packet 的类型将会决定到达服务端的字节流将被反序列化为什么种 Packet,也决定了该 Packet 将会被哪一个 PacketHandler 进行处理。

内容长度将会解决 Packet 的拆包与粘包问题,服务端在解析字节流时,将会等到字节的长度达到内容的长度时,才进行字节的读取。

除此以外,Packet 中还会存储一个 sync 字段,该字段将指定服务端在处理该 Packet 的数据时是否须要使用异步的业务线程池来处理。

健康检查

服务端与客户端各自维护了一个健康检查的服务,即 Netty 为咱们提供的 IdleStateHandler,经过继承该类,而且实现 channelIdle 方法便可实现链接 “空闲” 时的逻辑处理,当出现空闲时,目前咱们只关心读空闲,咱们既能够认为这条连接出现问题了。

那么只须要在连接出现问题时,将这条连接关闭便可,以下所示:

public class IdleStateChecker extends IdleStateHandler {

    private static final int DEFAULT_READER_IDLE_TIME = 15;

    private int readerTime;

    public IdleStateChecker(int readerIdleTime) {
        super(readerIdleTime == 0 ? DEFAULT_READER_IDLE_TIME : readerIdleTime, 0, 0, TimeUnit.SECONDS);
        readerTime = readerIdleTime == 0 ? DEFAULT_READER_IDLE_TIME : readerIdleTime;
    }

    @Override
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
        log.warn("[{}] Hasn't read data after {} seconds, will close the channel:{}",
        IdleStateChecker.class.getSimpleName(), readerTime, ctx.channel());
        ctx.channel().close();
    }

}

另外,客户端须要额外再维护一个健康检查器,正常状况下他负责定时向服务端发送心跳,当连接的状态变成 inActive 时,该检查器将负责进行重连,以下所示:

public class HealthyChecker extends ChannelInboundHandlerAdapter {

    private static final int DEFAULT_PING_INTERVAL = 5;

    private Client client;

    private int pingInterval;

    public HealthyChecker(Client client, int pingInterval) {
        Assert.notNull(client, "client can not be null");
        this.client = client;
        this.pingInterval = pingInterval <= 0 ? DEFAULT_PING_INTERVAL : pingInterval;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        schedulePing(ctx);
    }

    private void schedulePing(ChannelHandlerContext ctx) {
        ctx.executor().schedule(() -> {
            Channel channel = ctx.channel();
            if (channel.isActive()) {
                log.debug("[{}] Send a PingPacket", HealthyChecker.class.getSimpleName());
                channel.writeAndFlush(new PingPacket());
                schedulePing(ctx);
            }
        }, pingInterval, TimeUnit.SECONDS);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ctx.executor().schedule(() -> {
            log.info("[{}] Try to reconnecting...", HealthyChecker.class.getSimpleName());
            client.connect();
        }, 5, TimeUnit.SECONDS);
        ctx.fireChannelInactive();
    }

}
业务线程池

咱们知道,Netty 中维护着两个 IO 线程池,一个 boss 主要负责连接的创建,另一个 worker 主要负责连接上的数据读写,咱们不该该使用 IO 线程来处理咱们的业务,由于这样极可能会对 IO 线程形成阻塞,致使新连接没法及时创建或者数据没法及时读写。

为了解决这个问题,咱们须要在业务线程池中来处理咱们的业务逻辑,可是这并非绝对的,若是咱们要执行的逻辑很简单,不会形成太大的阻塞,则能够直接在 IO 线程中处理,好比客户端发送一个 Ping 服务端回复一个 Pong,这种状况是没有必要在业务线程池中进行处理的,由于处理完了最终仍是要交给 IO 线程去写数据。可是若是一个业务逻辑须要查询数据库或者读取文件,这种操做每每比较耗时间,因此就须要将这些操做封装起来交给业务线程池去处理。

服务端容许客户端在传输的 Packet 中指定采用何种方式进行业务的处理,服务端在将字节流解码成 Packet 以后,会根据 Packet 中的 sync 字段的值,肯定怎样对该 Packet 进行处理,以下所示:

public class ServerPacketDispatcher extends
    SimpleChannelInboundHandler<Packet> {
    @Override
    public void channelRead0(ChannelHandlerContext ctx, Packet request) {
        // if the packet should be handled async
        if (request.getAsync() == AsyncHandle.ASYNC) {
            EventExecutor channelExecutor = ctx.executor();
            // create a promise
            Promise<Packet> promise = new DefaultPromise<>(channelExecutor);
            // async execute and get a future
            Future<Packet> future = executor.asyncExecute(promise, ctx, request);
            future.addListener(new GenericFutureListener<Future<Packet>>() {
                @Override
                public void operationComplete(Future<Packet> f) throws Exception {
                    if (f.isSuccess()) {
                        Packet response = f.get();
                        writeResponse(ctx, response);
                    }
                }
            });
        } else {
            // sync execute and get the response packet
            Packet response = executor.execute(ctx, request);
            writeResponse(ctx, response);
        }
    }
}
不止是IM框架

bitchat除了能够做为 IM 框架以外,还能够做为一个通用的通信框架。

Packet 做为通信的载体,经过继承 AbstractPacket 便可快速实现本身的业务,搭配 PacketHandler 做为数据处理器便可实现客户端与服务端的通信。