温故之.NET Socket通讯

上一篇文章介绍了内存映射文件,这篇文章咱们介绍一种用得更加普遍的方式——Socket 通讯git

Socket 介绍

Socket 称为”套接字”,它分为流式套接字和用户数据报套接字,分别对应网络中的 TCP 和 UDP 协议。这两种都可以实现进程间通讯(不管是不是同一机器)github

TCP 协议是面向链接的协议,提供稳定的双向通讯功能,TCP链接的创建是经过三次握手才能完成,稳定性高,建立链接的效率相对UDP较低
UDP协议是面向无链接的,效率高,但不保证数据必定可以正确传输(顺序、丢包等)网络

咱们应该选择 UDP 仍是 TCP?并发

  • 对数据的可靠性要求很高的场景,应该选择 TCP,好比涉及钱的地方。固然也能够选择 UDP,这时候须要咱们自行来保证数据的可靠性
  • 对速度要求高,但容许数据出现少许错误的适合,UDP最合适。好比记录日志的场景:一台机器专用于记录日志,其余的机器将日志发送给这台机器便可;还有就是视频会议的场景

但实际项目中,这样“纯粹”的场景并非那么多,所以,每每采用的方案都是 TCP、UDP 相结合的方式来实现。固然为了保证数据的可靠及业务的稳定性,不少框架都不只仅只有这么两种技术框架

框架的复杂、轻量与否,与其应对的业务场景是相关的。咱们须要根据不一样的场景,来选择适合本身项目的框架。在 C# 中,有 FastSocketSuperSocketSocket 框架供你们选择。其中 SuperSocket 支持 IOCP,它能够实现高性能、高并发。其余语言有 NettyHP-Socket 等,这些也有 .NET 的移植版本tcp

通常状况下,不建议各位朋友本身去写一个 Socket 框架来支持项目的业务场景,用现有的框架更加妥当。若是不知道选择什么框架,能够去 Github 上搜索相关的开源框架微服务

选择 Github 中的框架时,咱们应该注意高并发

  • 选择 Star 最多的
  • 看做者上一次维护时间是多久,这个框架的 issue 多很少。更新频繁的,每每能够选择,这样遇到问题也能够及时的处理
  • 文档:有一个详细的开发文档,能够提升咱们开发的速度

Socket 通讯,是市面上不少框架的基础,所以咱们有必要介绍下它的使用方式,及在开发过程当中须要注意的事项性能

使用示例

在 C# 中,不管是 TCP 协议,仍是 UDP 协议,都封装在了 Socket 这个类中。使用时,只须要咱们指定不一样的参数便可学习

TCP 与 UDP 区别

  • TCP 面向链接(如打电话要先拨号创建链接); UDP 是无链接的,即发送数据以前不须要创建链接(扔出去就不用管了)
  • TCP 提供可靠的服务。也就是说,经过 TCP 链接传送的数据,无差错,不丢失,不重复,且按序到达;UDP 尽最大努力交付,即不保证可靠交付
  • TCP 面向字节流,其实是 TCP 把数据当作一连串无结构的字节流;UDP 是面向报文的
  • UDP 没有堵塞控制,所以网络出现堵塞不会使源主机的发送速率下降(对实时应用颇有用,如IP电话,实时视频会议等)
  • 每一条 TCP 链接只能是点对点的;UDP 支持一对一,一对多,多对一和多对多的交互通讯(群视频等场景)
  • TCP 首部开销 20 字节;UDP 的首部开销小,只有8个字节
  • TCP 的逻辑通讯信道是全双工的可靠信道,UDP 则是不可靠信道

在大部分状况下(针对性能而言),咱们没法感受到这二者之间的差别;而在高并发的场景下,咱们就能很容易体会到(由于访问量大了以后,任何细小的变化都能累积起来从而形成巨大的影响)

使用 TCP 面临的一个主要问题就是粘包,业界主流的解决方案可概括以下

  • 消息定长:如每一个数据包的大小固定为 1024 字节,若是不足 1024 字节,使用空格填充剩下的部分
  • 在包尾增长回车换行符进行分隔,好比 FTP 协议
  • 将消息分为消息头、消息体。消息头包含了消息的总长度,及其余的一些元数据,消息体存储具体的数据包。通常地,消息头能够采用定长的方式,好比分配 40 个字节,其中16字节用于存放消息的长度信息,其他部分存放其余数据。
  • 自定义应用层协议:这种方式是为具体的业务场景而实现的,好比腾讯就有一套他们本身的通讯框架

另外,若是以为自定义协议太麻烦,咱们也能够根据 MQTT 协议来写一套符合它的解决方案

针对 TCP 的使用,咱们给出一个例子。其中咱们采用 Jil 来实现序列化

/// <summary>
/// 传输使用的包
/// </summary>
public class Packet {
    public const int TYPE_LOGIN = 10001;
    public const int TYPE_MSG = 10000;
    public const int TYPE_LOGOUT = 10002;
    public const int TYPE_INVALID = 40000;

    /// <summary>
    /// 这个包的类型。在实际业务场景中,通常会使用 int、short 等来表示,而不是 enum
    /// </summary>
    public int Type { get; set; }
    /// <summary>
    /// 具体的业务数据
    /// </summary>
    public string Data { get; set; }
}
复制代码

如下为服务端代码

using Jil;
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace App {
    class Program {
        static void Main(string[] args) {
            TcpListener tcpListener = new TcpListener(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9999));
            tcpListener.Start();
            /// 此处仅仅用于处理客户端的链接
            /// 而不涉及具体的业务逻辑
            while (true) {
                TcpClient remoteClient = tcpListener.AcceptTcpClient();
                ClientPacketHandlers packetHandlers = new ClientPacketHandlers(remoteClient);
            }
        }

    }

    /// <summary>
    /// 将业务逻辑处理分开
    /// </summary>
    public class ClientPacketHandlers {
        Dictionary<int, Action<NetworkStream, string>> clientHandlers = new Dictionary<int, Action<NetworkStream, string>>();
        TcpClient remoteClient;
        NetworkStream stream;
        Task processTask;
        CancellationTokenSource cancellationTokenSource;

        public ClientPacketHandlers(TcpClient client) {
            this.remoteClient = client;
            this.stream = remoteClient.GetStream();

            // 这个能够经过配置文件来添加处理器
            clientHandlers.Add(Packet.TYPE_LOGIN, HandleLogin);
            clientHandlers.Add(Packet.TYPE_MSG, HandleMsg);
            clientHandlers.Add(Packet.TYPE_LOGOUT, HandleLogout);

            cancellationTokenSource = new CancellationTokenSource();

            // 为该客户端开辟一个 Task,用于与该客户端通讯
            // 在高并发场景中,每每不会这样作。而是采用 IOCP 或者其余的高性能的方式
            // 为每一个客户端开辟一个 Task 不合理,也很浪费系统资源(由于不是每一个客户端都会频繁发送消息)
            processTask = Task.Run(() => {
                byte[] buffer = new byte[1024];
                while (true) {
                    int bytesRead = stream.Read(buffer, 0, 1024);
                    if (bytesRead > 0) {
                        byte[] realBytes = new byte[bytesRead];
                        Buffer.BlockCopy(buffer, 0, realBytes, 0, bytesRead);

                        Packet packet = JSON.Deserialize<Packet>(Encoding.UTF8.GetString(realBytes));
                        if (packet != null) {
                            if (clientHandlers.ContainsKey(packet.Type)) {
                                clientHandlers[packet.Type].Invoke(stream, packet.Data);
                            } else {
                                SendPacket(stream, new Packet() { Type = Packet.TYPE_INVALID, Data = "No handlers for your type" });
                            }
                        }
                    }

                    if (cancellationTokenSource == null || cancellationTokenSource.IsCancellationRequested) {
                        break;
                    }
                }
            }, cancellationTokenSource.Token);
        }

        public void HandleLogin(NetworkStream stream, string data) {
            if (stream == null || string.IsNullOrEmpty(data)) return;
            SendPacket(stream, new Packet() { Type = Packet.TYPE_LOGIN, Data = $"Hello, {data}" });
        }

        public void HandleMsg(NetworkStream stream, string data) {
            if (stream == null || string.IsNullOrEmpty(data)) return;
            SendPacket(stream, new Packet() { Type = Packet.TYPE_MSG, Data = $"Received Msg : {data}" });
        }

        public void HandleLogout(NetworkStream stream, string data) {
            if (stream == null || string.IsNullOrEmpty(data)) return;
            SendPacket(stream, new Packet() { Type = Packet.TYPE_LOGOUT, Data = $"Logout, {data}" });
            try {
                if (cancellationTokenSource != null) {
                    cancellationTokenSource.Cancel();
                    cancellationTokenSource.Dispose();
                }
            } catch (Exception e) {
            } finally {
                cancellationTokenSource = null;
            }
        }


        public void SendPacket(NetworkStream stream, Packet packet) {
            byte[] packetBytes = Encoding.UTF8.GetBytes(JSON.Serialize(packet));
            stream.Write(packetBytes, 0, packetBytes.Length);
        }
    }
}
复制代码

如下为客户端代码

using Jil;
using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

namespace App {
    class Program {
        static void Main(string[] args) {
            TcpClient tcpClient = new TcpClient();
            tcpClient.Connect(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9999));
            NetworkStream networkStream = tcpClient.GetStream();

            Task.Run(() => {
                byte[] buffer = new byte[1024];
                while (true) {
                    int bytesRead = networkStream.Read(buffer, 0, 1024);
                    if (bytesRead > 0) {
                        byte[] realBytes = new byte[bytesRead];
                        Buffer.BlockCopy(buffer, 0, realBytes, 0, bytesRead);

                        Packet packet = JSON.Deserialize<Packet>(Encoding.UTF8.GetString(realBytes));
                        if (packet != null) {
                            Console.WriteLine($"RECEIVED DATA: {packet.Data}");
                        }
                    }
                }
            });

            while (true) {
                string line = Console.ReadLine();
                string[] strs = line.Split(':');
                if(strs.Length >= 2) {
                    if(strs[0] == "login") {
                        SendPacket(networkStream, new Packet() { Type = Packet.TYPE_LOGIN, Data = strs[1] });
                    } else if (strs[0] == "msg") {
                        SendPacket(networkStream, new Packet() { Type = Packet.TYPE_MSG, Data = strs[1] });
                    } else if (strs[0] == "logout") {
                        SendPacket(networkStream, new Packet() { Type = Packet.TYPE_LOGOUT, Data = strs[1] });
                    }
                }
            }
        }

        private static void SendPacket(NetworkStream networkStream, Packet packet) {
            byte[] packetBytes = Encoding.UTF8.GetBytes(JSON.Serialize(packet));
            networkStream.Write(packetBytes, 0, packetBytes.Length);
        }
    }
}
复制代码

这即是 TCP 通讯的基础示例了,在更复杂的场景中,系统的设计将会更加复杂。但宗旨都只有一个,提供更加稳定可靠的服务

UDP 的使用与 TCP 相似,所以就不一一举例了

开发建议

  • 尽可能将对客户端的管理,与具体的业务逻辑分开,这样能够提升系统的可维护性
  • 若是使用 TCP,除了解决粘包以外,还须要使用心跳包来使链接处于活动状态
  • 在使用 UDP 的时候,若是须要保证数据的可靠性,此时须要经过其余的方式来辅助
  • 若是要采用 GitHub 上的一些框架,必定要参考前面给出的建议
  • 在不增长系统复杂度的状况下,可使用微服务来提高系统的扩展性。但切记不可滥用,过多的微服务会形成系统的可维护性降低,而且是指数级的降低
  • 在高并发、高性能的场景下,须要采用其余的方式。好比 IOCP 等框架。除了避免系统资源的浪费,更是为了提高系统的响应能力

至此,这篇文章的内容讲解完毕。欢迎关注公众号【嘿嘿的学习日记】,全部的文章,都会在公众号首发,Thank you~

公众号二维码