上篇回顾:静态服务器+压测html
同步是指一个任务的完成须要依赖另一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成。linux
异步是指不须要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工做。而后继续执行下面代码逻辑,只要本身完成了整个任务就算完成了(异步通常使用状态、通知和回调)web
PS:项目里面通常是这样的:(我的经验)数据库
await xxx()
阻塞是指调用结果返回以前,当前线程会被挂起,一直处于等待消息通知,不可以执行其余业务(大部分代码都是这样的)编程
非阻塞是指在不能马上获得结果以前,该函数不会阻塞当前线程,而会马上返回(继续执行下面代码,或者重试机制走起)缓存
PS:项目里面重试机制为啥通常都是3次?服务器
对于一次IO访问,数据会先被拷贝到内核的缓冲区中,而后才会从内核的缓冲区拷贝到应用程序的地址空间。须要经历两个阶段:网络
因为存在这两个阶段,Linux产生了下面五种IO模型(以socket为例
)架构
recvfrom
等阻塞方法时,内核进入IO的第1个阶段:准备数据(内核须要等待足够的数据再拷贝)这个过程须要等待,用户进程会被阻塞,等内核将数据准备好,而后拷贝到用户地址空间,内核返回结果,用户进程才从阻塞态进入就绪态kernel
中的数据尚未准备好,那么它并不会block
用户进程,而是马上返回一个error
。error
时,它就知道数据尚未准备好,因而它能够再次发送read操做kernel
中的数据准备好了,而且又再次收到了用户进程的system call
,那么它立刻就将数据拷贝到了用户内存,而后返回select
、poll
和epoll
POSIX
的aio_
系列函数)
IO read
以后,会马上返回,不会阻塞用户进程。signal
告诉它read操做完成了贴一下Unix编程里面的图:并发
开始以前我们经过非阻塞IO引入一下:(来个简单例子socket.setblocking(False)
)
import time import socket def select(socket_addr_list): for client_socket, client_addr in socket_addr_list: try: data = client_socket.recv(2048) if data: print(f"[来自{client_addr}的消息:]\n") print(data.decode("utf-8")) client_socket.send( b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>" ) else: # 没有消息是触发异常,空消息是断开链接 client_socket.close() # 关闭客户端链接 socket_addr_list.remove((client_socket, client_addr)) print(f"[客户端{client_addr}已断开链接,当前链接数:{len(socket_addr_list)}]") except Exception: pass def main(): # 存放客户端集合 socket_addr_list = list() with socket.socket() as tcp_server: # 防止端口绑定的设置 tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) tcp_server.bind(('', 8080)) tcp_server.listen() tcp_server.setblocking(False) # 服务端非阻塞 while True: try: client_socket, client_addr = tcp_server.accept() client_socket.setblocking(False) # 客户端非阻塞 socket_addr_list.append((client_socket, client_addr)) except Exception: pass else: print(f"[来自{client_addr}的链接,当前链接数:{len(socket_addr_list)}]") # 防止客户端断开后出错 if socket_addr_list: # 轮询查看客户端有没有消息 select(socket_addr_list) # 引用传参 time.sleep(0.01) if __name__ == "__main__": main()
输出:
能够思考下:
select和上面的有点相似,就是轮询的过程交给了操做系统:
kernel会“监视”全部select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操做,将数据从kernel拷贝到用户进程
来个和上面等同的案例:
import select import socket def main(): with socket.socket() as tcp_server: tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) tcp_server.bind(('', 8080)) tcp_server.listen() socket_info_dict = dict() socket_list = [tcp_server] # 监测列表 while True: # 劣势:select列表数量有限制 read_list, write_list, error_list = select.select( socket_list, [], []) for item in read_list: # 服务端迎接新的链接 if item == tcp_server: client_socket, client_address = item.accept() socket_list.append(client_socket) socket_info_dict[client_socket] = client_address print(f"[{client_address}已链接,当前链接数:{len(socket_list)-1}]") # 客户端发来 else: data = item.recv(2048) if data: print(data.decode("utf-8")) item.send( b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>" ) else: item.close() socket_list.remove(item) info = socket_info_dict[item] print(f"[{info}已断开,当前链接数:{len(socket_list)-1}]") if __name__ == "__main__": main()
输出和上面同样
扩展说明:
select 函数监视的文件描述符分3类,分别是
writefds
、readfds
、和exceptfds
。调用后select函数会阻塞,直到有描述符就绪函数返回(有数据可读、可写、或者有except)或者超时(timeout指定等待时间,若是当即返回设为null便可)
select的一个缺点在于单个进程可以监视的文件描述符的数量存在最大限制,在Linux上通常为1024(64位=>2048)
而后Poll就出现了,就是把上限给去掉了,本质并没变,仍是使用的轮询
epoll在内核2.6中提出(Linux独有),使用一个文件描述符管理多个描述符,将用户关心的文件描述符的事件存放到内核的一个事件表中,采用监听回调的机制,这样在用户空间和内核空间的copy只需一次,避免再次遍历就绪的文件描述符列表
先来看个案例吧:(输出和上面同样)
import socket import select def main(): with socket.socket() as tcp_server: tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) tcp_server.bind(('', 8080)) tcp_server.listen() # epoll是linux独有的 epoll = select.epoll() # tcp_server注册到epoll中 epoll.register(tcp_server.fileno(), select.EPOLLIN | select.EPOLLET) # key-value fd_socket_dict = dict() # 回调须要本身处理 while True: # 返回可读写的socket fd 集合 poll_list = epoll.poll() for fd, event in poll_list: # 服务器的socket if fd == tcp_server.fileno(): client_socket, client_addr = tcp_server.accept() fd = client_socket.fileno() fd_socket_dict[fd] = (client_socket, client_addr) # 把客户端注册进epoll中 epoll.register(fd, select.EPOLLIN | select.EPOLLET) else: # 客户端 client_socket, client_addr = fd_socket_dict[fd] data = client_socket.recv(2048) print( f"[来自{client_addr}的消息,当前链接数:{len(fd_socket_dict)}]\n") if data: print(data.decode("utf-8")) client_socket.send( b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>" ) else: del fd_socket_dict[fd] print( f"[{client_addr}已离线,当前链接数:{len(fd_socket_dict)}]\n" ) # 从epoll中注销 epoll.unregister(fd) client_socket.close() if __name__ == "__main__": main()
扩展:epoll的两种工做模式
LT(level trigger,水平触发)模式:当epoll_wait检测到描述符就绪,将此事件通知应用程序,应用程序能够不当即处理该事件。下次调用epoll_wait时,会再次响应应用程序并通知此事件。LT模式是默认的工做模式。
LT模式同时支持阻塞和非阻塞socket。
ET(edge trigger,边缘触发)模式:当epoll_wait检测到描述符就绪,将此事件通知应用程序,应用程序必须当即处理该事件。若是不处理,下次调用epoll_wait时,不会再次响应应用程序并通知此事件。
ET是高速工做方式,只支持非阻塞socket(ET模式减小了epoll事件被重复触发的次数,所以效率要比LT模式高)
Code提炼一下:
epoll = select.epoll()
epoll.register(tcp_server.fileno(), select.EPOLLIN | select.EPOLLET)
epoll.unregister(fd)
PS:epoll
不必定比Select
性能高,通常都是分场景的:
其实IO多路复用还有一个kqueue
,和epoll
相似,下面的通用写法中有包含
Selector
)通常来讲:Linux下使用epoll,Win下使用select(IO多路复用会这个通用的便可)
先看看Python源代码:
# 选择级别:epoll|kqueue|devpoll > poll > select if 'KqueueSelector' in globals(): DefaultSelector = KqueueSelector elif 'EpollSelector' in globals(): DefaultSelector = EpollSelector elif 'DevpollSelector' in globals(): DefaultSelector = DevpollSelector elif 'PollSelector' in globals(): DefaultSelector = PollSelector else: DefaultSelector = SelectSelector
实战案例:(可读和可写能够不分开)
import socket import selectors # Linux下使用epoll,Win下使用select Selector = selectors.DefaultSelector() class Task(object): def __init__(self): # 存放客户端fd和socket键值对 self.fd_socket_dict = dict() def run(self): self.server = socket.socket() self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server.bind(('', 8080)) self.server.listen() # 把Server注册到epoll Selector.register(self.server.fileno(), selectors.EVENT_READ, self.connected) def connected(self, key): """客户端链接时处理""" client_socket, client_address = self.server.accept() fd = client_socket.fileno() self.fd_socket_dict[fd] = (client_socket, client_address) # 注册一个客户端读的事件(服务端去读消息) Selector.register(fd, selectors.EVENT_READ, self.call_back_reads) print(f"{client_address}已链接,当前链接数:{len(self.fd_socket_dict)}") def call_back_reads(self, key): """客户端可读时处理""" # 一个fd只能注册一次,监测可写的时候须要把可读给注销 Selector.unregister(key.fd) client_socket, client_address = self.fd_socket_dict[key.fd] print(f"[来自{client_address}的消息:]\n") data = client_socket.recv(2048) if data: print(data.decode("utf-8")) # 注册一个客户端写的事件(服务端去发消息) Selector.register(key.fd, selectors.EVENT_WRITE, self.call_back_writes) else: client_socket.close() del self.fd_socket_dict[key.fd] print(f"{client_address}已断开,当前链接数:{len(self.fd_socket_dict)}") def call_back_writes(self, key): """客户端可写时处理""" Selector.unregister(key.fd) client_socket, client_address = self.fd_socket_dict[key.fd] client_socket.send(b"ok") Selector.register(key.fd, selectors.EVENT_READ, self.call_back_reads) def main(): t = Task() t.run() while True: ready = Selector.select() for key, obj in ready: # 须要本身回调 call_back = key.data call_back(key) if __name__ == "__main__": main()
Code提炼一下:
Selector = selectors.DefaultSelector()
Selector.register(server.fileno(), selectors.EVENT_READ, call_back)
Selector.register(server.fileno(), selectors.EVENT_WRITE, call_back)
Selector.unregister(key.fd)
业余拓展:
select, iocp, epoll,kqueue及各类I/O复用机制 https://blog.csdn.net/shallwake/article/details/5265287 kqueue用法简介 http://www.cnblogs.com/luminocean/p/5631336.html
下级预估:协程篇 or 网络深刻篇