io多路复用

 

 

上篇回顾:静态服务器+压测linux

3.2.概念篇

1.同步与异步

同步是指一个任务的完成须要依赖另一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成。web

异步是指不须要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工做。而后继续执行下面代码逻辑,只要本身完成了整个任务就算完成了(异步通常使用状态、通知和回调)数据库

PS:项目里面通常是这样的:(我的经验)编程

  1. 同步架构:通常都是和钱相关的需求,须要实时返回的业务
  2. 异步架构:更可能是对写要求比较高时的场景(同步变异步)
    • 读通常都是实时返回,代码通常都是await xxx()
  3. 想象个情景就清楚了:
    • 异步:如今用户写了篇文章,能够异步操做,就算没真正写到数据库也能够返回:发表成功(大不了失败提示一下)
    • 同步:用户获取订单信息,你若是异步就会这样了:提示下获取成功,而后一片空白...用户不卸载就怪了...

2.阻塞与非阻塞

阻塞是指调用结果返回以前,当前线程会被挂起,一直处于等待消息通知,不可以执行其余业务(大部分代码都是这样的)缓存

非阻塞是指在不能马上获得结果以前,该函数不会阻塞当前线程,而会马上返回(继续执行下面代码,或者重试机制走起)服务器

PS:项目里面重试机制为啥通常都是3次?markdown

  1. 第一次重试,两台PC挂了也是有可能的
  2. 第二次重试,负载均衡分配的三台机器同时挂的可能性不是很大,这时候就有多是网络有点拥堵了
  3. 最后一次重试,再失败就没意义了,日记写起来,再重试网络负担就加大了,得不偿失了

3.五种IO模型

对于一次IO访问,数据会先被拷贝到内核的缓冲区中,而后才会从内核的缓冲区拷贝到应用程序的地址空间。须要经历两个阶段:网络

  1. 准备数据
  2. 将数据从内核缓冲区拷贝到进程地址空间

因为存在这两个阶段,Linux产生了下面五种IO模型(以socket为例

  1. 阻塞式IO:
    • 当用户进程调用了recvfrom等阻塞方法时,内核进入IO的第1个阶段:准备数据(内核须要等待足够的数据再拷贝)这个过程须要等待,用户进程会被阻塞,等内核将数据准备好,而后拷贝到用户地址空间,内核返回结果,用户进程才从阻塞态进入就绪态
    • Linux中默认状况下全部的socket都是阻塞的
  2. 非阻塞式IO:
    • 当用户进程发出read操做时,若是kernel中的数据尚未准备好,那么它并不会block用户进程,而是马上返回一个error
    • 用户进程判断结果是一个error时,它就知道数据尚未准备好,因而它能够再次发送read操做
    • 一旦kernel中的数据准备好了,而且又再次收到了用户进程的system call,那么它立刻就将数据拷贝到了用户内存,而后返回
    • 非阻塞IO模式下用户进程须要不断地询问内核的数据准备好了没有
  3. IO多路复用
    • 经过一种机制,一个进程能够监视多个文件描述符(套接字描述符)一旦某个文件描述符就绪(通常是读就绪或者写就绪),可以通知程序进行相应的读写操做(这样就不须要每一个用户进程不断的询问内核数据准备好了没)
    • 经常使用的IO多路复用方式有selectpollepoll
  4. 信号驱动IO:
    • 内核文件描述符就绪后,经过信号通知用户进程,用户进程再经过系统调用读取数据。
    • 此方式属于同步IO(实际读取数据到用户进程缓存的工做仍然是由用户进程本身负责的)
  5. 异步IOPOSIXaio_系列函数)
    • 用户进程发起read操做以后,马上就能够开始去作其它的事。内核收到一个异步IO read以后,会马上返回,不会阻塞用户进程。
    • 内核会等待数据准备完成,而后将数据拷贝到用户内存,当这一切都完成以后,内核会给用户进程发送一个signal告诉它read操做完成了

4.Unix图示

贴一下Unix编程里面的图:

**非阻塞IO**

2.非阻塞IO

**IO复用**

3.IO复用

**信号IO**

4.信号IO

**异步AIO**

5.异步AIO

3.3.IO多路复用

开始以前我们经过非阻塞IO引入一下:(来个简单例子socket.setblocking(False))

import time
import socket

def select(socket_addr_list):
    for client_socket, client_addr in socket_addr_list:
        try:
            data = client_socket.recv(2048)
            if data:
                print(f"[来自{client_addr}的消息:]\n")
                print(data.decode("utf-8"))
                client_socket.send(
                    b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>"
                )
            else:
                # 没有消息是触发异常,空消息是断开链接
                client_socket.close()  # 关闭客户端链接
                socket_addr_list.remove((client_socket, client_addr))
                print(f"[客户端{client_addr}已断开链接,当前链接数:{len(socket_addr_list)}]")
        except Exception:
            pass

def main():
    # 存放客户端集合
    socket_addr_list = list()

    with socket.socket() as tcp_server:
        # 防止端口绑定的设置
        tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        tcp_server.bind(('', 8080))
        tcp_server.listen()
        tcp_server.setblocking(False)  # 服务端非阻塞
        while True:
            try:
                client_socket, client_addr = tcp_server.accept()
                client_socket.setblocking(False)  # 客户端非阻塞
                socket_addr_list.append((client_socket, client_addr))
            except Exception:
                pass
            else:
                print(f"[来自{client_addr}的链接,当前链接数:{len(socket_addr_list)}]")
            # 防止客户端断开后出错
            if socket_addr_list:
                # 轮询查看客户端有没有消息
                select(socket_addr_list)  # 引用传参
                time.sleep(0.01)

if __name__ == "__main__":
    main()

输出:
3.nowait.gif

能够思考下:

  1. 为何Server也要设置为非阻塞?
    • PS:一个线程里面只能有一个死循环,如今程序须要两个死循环,so ==> 放一块儿咯
  2. 断开链接怎么判断?
    • PS:没有消息是触发异常,空消息是断开链接
  3. client_socket为何不用dict存放?
    • PS:dict在循环的过程当中,del会引起异常

1.Select

select和上面的有点相似,就是轮询的过程交给了操做系统:

kernel会“监视”全部select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操做,将数据从kernel拷贝到用户进程

来个和上面等同的案例:

import select
import socket

def main():
    with socket.socket() as tcp_server:
        tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        tcp_server.bind(('', 8080))
        tcp_server.listen()
        socket_info_dict = dict()
        socket_list = [tcp_server]  # 监测列表
        while True:
            # 劣势:select列表数量有限制
            read_list, write_list, error_list = select.select(
                socket_list, [], [])
            for item in read_list:
                # 服务端迎接新的链接
                if item == tcp_server:
                    client_socket, client_address = item.accept()
                    socket_list.append(client_socket)
                    socket_info_dict[client_socket] = client_address
                    print(f"[{client_address}已链接,当前链接数:{len(socket_list)-1}]")
                # 客户端发来
                else:
                    data = item.recv(2048)
                    if data:
                        print(data.decode("utf-8"))
                        item.send(
                            b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>"
                        )
                    else:
                        item.close()
                        socket_list.remove(item)
                        info = socket_info_dict[item]
                        print(f"[{info}已断开,当前链接数:{len(socket_list)-1}]")

if __name__ == "__main__":
    main()

输出和上面同样

扩展说明:

select 函数监视的文件描述符分3类,分别是writefdsreadfds、和exceptfds。调用后select函数会阻塞,直到有描述符就绪函数返回(有数据可读、可写、或者有except)或者超时(timeout指定等待时间,若是当即返回设为null便可)

select的一个缺点在于单个进程可以监视的文件描述符的数量存在最大限制,在Linux上通常为1024(64位=>2048)

而后Poll就出现了,就是把上限给去掉了,本质并没变,仍是使用的轮询

2.EPoll

epoll在内核2.6中提出(Linux独有),使用一个文件描述符管理多个描述符,将用户关心的文件描述符的事件存放到内核的一个事件表中,采用监听回调的机制,这样在用户空间和内核空间的copy只需一次,避免再次遍历就绪的文件描述符列表

先来看个案例吧:(输出和上面同样)

import socket
import select

def main():
    with socket.socket() as tcp_server:
        tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        tcp_server.bind(('', 8080))
        tcp_server.listen()

        # epoll是linux独有的
        epoll = select.epoll()
        # tcp_server注册到epoll中
        epoll.register(tcp_server.fileno(), select.EPOLLIN | select.EPOLLET)

        # key-value
        fd_socket_dict = dict()

        # 回调须要本身处理
        while True:
            # 返回可读写的socket fd 集合
            poll_list = epoll.poll()
            for fd, event in poll_list:
                # 服务器的socket
                if fd == tcp_server.fileno():
                    client_socket, client_addr = tcp_server.accept()
                    fd = client_socket.fileno()
                    fd_socket_dict[fd] = (client_socket, client_addr)
                    # 把客户端注册进epoll中
                    epoll.register(fd, select.EPOLLIN | select.EPOLLET)
                else:  # 客户端
                    client_socket, client_addr = fd_socket_dict[fd]
                    data = client_socket.recv(2048)
                    print(
                        f"[来自{client_addr}的消息,当前链接数:{len(fd_socket_dict)}]\n")
                    if data:
                        print(data.decode("utf-8"))
                        client_socket.send(
                            b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>"
                        )
                    else:
                        del fd_socket_dict[fd]
                        print(
                            f"[{client_addr}已离线,当前链接数:{len(fd_socket_dict)}]\n"
                        )
                        # 从epoll中注销
                        epoll.unregister(fd)
                        client_socket.close()

if __name__ == "__main__":
    main()

扩展:epoll的两种工做模式

LT(level trigger,水平触发)模式:当epoll_wait检测到描述符就绪,将此事件通知应用程序,应用程序能够不当即处理该事件。下次调用epoll_wait时,会再次响应应用程序并通知此事件。LT模式是默认的工做模式。
LT模式同时支持阻塞和非阻塞socket。

ET(edge trigger,边缘触发)模式:当epoll_wait检测到描述符就绪,将此事件通知应用程序,应用程序必须当即处理该事件。若是不处理,下次调用epoll_wait时,不会再次响应应用程序并通知此事件。
ET是高速工做方式,只支持非阻塞socket(ET模式减小了epoll事件被重复触发的次数,所以效率要比LT模式高)

Code提炼一下

  1. 实例化对象:epoll = select.epoll()
  2. 注册对象:epoll.register(tcp_server.fileno(), select.EPOLLIN | select.EPOLLET)
  3. 注销对象:epoll.unregister(fd)

PS:epoll不必定比Select性能高,通常都是分场景的:

  1. 高并发下,链接活跃度不高时:epoll比Select性能高(eg:web请求,页面随时关闭)
  2. 并发不高,链接活跃度比较高:Select更合适(eg:小游戏)
  3. Select是win和linux通用的,而epoll只有linux有

其实IO多路复用还有一个kqueue,和epoll相似,下面的通用写法中有包含


3.通用写法(Selector

通常来讲:Linux下使用epoll,Win下使用select(IO多路复用会这个通用的便可)

先看看Python源代码:

# 选择级别:epoll|kqueue|devpoll > poll > select
if 'KqueueSelector' in globals():
    DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():
    DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():
    DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():
    DefaultSelector = PollSelector
else:
    DefaultSelector = SelectSelector

实战案例:(可读和可写能够不分开)

import socket
import selectors

# Linux下使用epoll,Win下使用select
Selector = selectors.DefaultSelector()

class Task(object):
    def __init__(self):
        # 存放客户端fd和socket键值对
        self.fd_socket_dict = dict()

    def run(self):
        self.server = socket.socket()
        self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server.bind(('', 8080))
        self.server.listen()
        # 把Server注册到epoll
        Selector.register(self.server.fileno(), selectors.EVENT_READ,
                          self.connected)

    def connected(self, key):
        """客户端链接时处理"""
        client_socket, client_address = self.server.accept()
        fd = client_socket.fileno()
        self.fd_socket_dict[fd] = (client_socket, client_address)
        # 注册一个客户端读的事件(服务端去读消息)
        Selector.register(fd, selectors.EVENT_READ, self.call_back_reads)
        print(f"{client_address}已链接,当前链接数:{len(self.fd_socket_dict)}")

    def call_back_reads(self, key):
        """客户端可读时处理"""
        # 一个fd只能注册一次,监测可写的时候须要把可读给注销
        Selector.unregister(key.fd)
        client_socket, client_address = self.fd_socket_dict[key.fd]
        print(f"[来自{client_address}的消息:]\n")
        data = client_socket.recv(2048)
        if data:
            print(data.decode("utf-8"))
            # 注册一个客户端写的事件(服务端去发消息)
            Selector.register(key.fd, selectors.EVENT_WRITE,
                              self.call_back_writes)
        else:
            client_socket.close()
            del self.fd_socket_dict[key.fd]
            print(f"{client_address}已断开,当前链接数:{len(self.fd_socket_dict)}")

    def call_back_writes(self, key):
        """客户端可写时处理"""
        Selector.unregister(key.fd)
        client_socket, client_address = self.fd_socket_dict[key.fd]
        client_socket.send(b"ok")
        Selector.register(key.fd, selectors.EVENT_READ, self.call_back_reads)

def main():
    t = Task()
    t.run()
    while True:
        ready = Selector.select()
        for key, obj in ready:
            # 须要本身回调
            call_back = key.data
            call_back(key)

if __name__ == "__main__":
    main()

Code提炼一下

  1. 实例化对象:Selector = selectors.DefaultSelector()
  2. 注册对象:
    • Selector.register(server.fileno(), selectors.EVENT_READ, call_back)
    • Selector.register(server.fileno(), selectors.EVENT_WRITE, call_back)
  3. 注销对象:Selector.unregister(key.fd)
  4. 注意一下:一个fd只能注册一次,监测可写的时候须要把可读给注销(反之同样)

业余拓展:

select, iocp, epoll,kqueue及各类I/O复用机制
https://blog.csdn.net/shallwake/article/details/5265287

kqueue用法简介
http://www.cnblogs.com/luminocean/p/5631336.html
相关文章
相关标签/搜索