去年微信公众号就陆陆续续发布了,我一直觉得博客也汇总同步了,这几天有朋友说一直没找到,遂发现,的确是漏了,因此补上一篇html
在线预览:https://github.lesschina.com/python/base/concurrency/4.并发编程-协程篇.htmlnode
示例代码:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/ZCoroutinepython
多进程和多线程切换之间也是有资源浪费的,相比而言协程更轻量级jquery
往期文章:http://www.noobyard.com/article/p-vjbmfnpu-g.htmllinux
基础拓展篇已经讲的很透彻了,就再也不雷同了,贴一个简单案例,而后扩展说说可迭代
、迭代器
和生成器
nginx
% time
from functools import wraps
def log(func):
@wraps(func)
def wrapper(*args,**kv):
print("%s log_info..." % func.__name__)
return func(*args,**kv)
return wrapper
@log
def login_out():
print("已经退出登陆")
def main():
# @wraps(func) 可使得装饰先后,方法签名一致
print(f"方法签名:{login_out.__name__}")
login_out()
# @wraps能让你经过属性 __wrapped__ 直接访问被包装函数
login_out.__wrapped__() # 执行原来的函数
if __name__ == '__main__':
main()
往期文章:http://www.noobyard.com/article/p-ptjccadz-d.htmlgit
过于基础的就不说了,简单说下,而后举一个OOP
的Demo
:程序员
from collections.abc import Iterable
isinstance(xxx, Iterable)
next(xxx)
遍历)
from collections.abc import Iterator
isinstance(xxx, Iterable)
list、dict、str
等Iterable
变成Iterator
可使用iter()
函数 eg:iter([])
(节省资源)Iterator
对象,但list、dict、str虽然是Iterable
,却不是Iterator
提醒一下:from collections import Iterable, Iterator # 如今已经不推荐使用了(3.8会弃用)
github
查看一下typing.py
的源码就知道了:web
# 模仿collections.abc中的那些(Python3.7目前只是过渡的兼容版,没有具体实现)
def _alias(origin, params, inst=True):
return _GenericAlias(origin, params, special=True, inst=inst)
T_co = TypeVar('T_co', covariant=True) # Any type covariant containers.
Iterable = _alias(collections.abc.Iterable, T_co)
Iterator = _alias(collections.abc.Iterator, T_co)
以前说了个 CSharp 的 OOP Demo,此次来个Python
的,咱们来一步步演变:
% time
# 导入相关模块
from collections.abc import Iterable, Iterator
# from collections import Iterable, Iterator # 如今已经不推荐使用了(3.8会弃用)
# 定义一个Class
class MyArray(object):
pass
# 是否可迭代 False
isinstance(MyArray(),Iterable)
# 是不是迭代器 False
isinstance(MyArray(),Iterator)
# 若是Class里面含有`__iter__`方法就是可迭代的
# 从新定义测试:
class MyArray(object):
def __iter__(self):
pass
# 是否可迭代 False
isinstance(MyArray(),Iterable)
# 是不是迭代器 False
isinstance(MyArray(),Iterator)
这时候依然不是迭代器
这个能够类比C#:
// 能不能foreach就看你遍历对象有没有实现IEnumerable,就说明你是否是一个可枚举类型
public interface IEnumerable
{
IEnumerator GetEnumerator();
}
// 是否是个枚举器(enumerator)就看你实现了IEnumerator接口没
public interface IEnumerator
{
object Current { get; }
bool MoveNext();
void Reset();
}
先看看Python对于的类吧:
# https://github.com/lotapp/cpython3/blob/master/Lib/_collections_abc.py
class Iterable(metaclass=ABCMeta):
__slots__ = ()
@abstractmethod
def __iter__(self):
while False:
yield None
@classmethod
def __subclasshook__(cls, C):
if cls is Iterable:
return _check_methods(C, "__iter__")
return NotImplemented
class Iterator(Iterable):
__slots__ = ()
@abstractmethod
def __next__(self):
'Return the next item from the iterator. When exhausted, raise StopIteration'
raise StopIteration
def __iter__(self):
return self
@classmethod
def __subclasshook__(cls, C):
if cls is Iterator:
return _check_methods(C, '__iter__', '__next__')
return NotImplemented
读源码的好处来了==>抽象方法:@abstractmethod(子类必须实现)
,上次漏讲了吧~
上面说迭代器确定能够迭代,说很抽象,代码太直观了 (继承):class Iterator(Iterable)
如今咱们来模仿并实现一个Python
版本的迭代器
:
% time
# 先搭个空架子
class MyIterator(Iterator):
def __next__(self):
pass
class MyArray(Iterable):
def __iter__(self):
return MyIterator() # 返回一个迭代器
def main():
# 可迭代 True
print(isinstance(MyArray(), Iterable))
# 迭代器也是可迭代的 True
print(isinstance(MyIterator(), Iterable))
# 是迭代器 True
print(isinstance(MyIterator(), Iterator))
if __name__ == '__main__':
main()
% time
# 把迭代器简化合并
class MyIterator(Iterator):
def __next__(self):
pass
def __iter__(self):
return self # 返回一个迭代器(如今就是它本身了)
def main():
print(isinstance(MyIterator(), Iterable))
print(isinstance(MyIterator(), Iterator))
if __name__ == '__main__':
main()
% time
# 立刻进入正题了,先回顾一下Fibona
def fibona(n):
a, b = 0, 1
for i in range(n):
a, b = b, a+b
print(a)
# 获取10个斐波拉契数列
fibona(10)
% time
# 改形成迭代器
from collections.abc import Iterable, Iterator
class FibonaIterator(Iterator):
def __init__(self, n):
self.__a = 0
self.__b = 1
self.__n = n # 获取多少个
self.__index = 0 # 当前索引
def __next__(self):
if self.__index < self.__n:
self.__index += 1
# 生成下一波
self.__a, self.__b = self.__b, self.__a + self.__b
return self.__a
else:
raise StopIteration # for循环结束条件
def main():
print(FibonaIterator(10))
for i in FibonaIterator(10):
print(i)
if __name__ == "__main__":
main()
往期文章:http://www.noobyard.com/article/p-ptjccadz-d.html
生成器是啥?看源码就秒懂了:(迭代器的基础上再封装)
class Generator(Iterator):
__slots__ = ()
def __next__(self):
"""从生成器返回下一个item,结束的时候抛出 StopIteration"""
return self.send(None)
@abstractmethod
def send(self, value):
"""将值发送到生成器。返回下一个产生的值或抛出StopIteration"""
raise StopIteration
@abstractmethod
def throw(self, typ, val=None, tb=None):
"""在生成器中引起异常。返回下一个产生的值或抛出StopIteration"""
if val is None:
if tb is None:
raise typ
val = typ()
if tb is not None:
val = val.with_traceback(tb)
raise val
# 如今知道以前close后为啥没异常了吧~
def close(self):
"""屏蔽异常"""
try:
self.throw(GeneratorExit)
except (GeneratorExit, StopIteration):
pass
else:
raise RuntimeError("generator ignored GeneratorExit")
@classmethod
def __subclasshook__(cls, C):
if cls is Generator:
return _check_methods(C, '__iter__', '__next__',
'send', 'throw', 'close')
return NotImplemented
迭代器的基础上再封装了两个抽象方法send
、throw
和屏蔽异常的方法close
如今用生成器的方式改写下斐波拉契数列:(列表推导式改为小括号是最简单的一种生成器)
% time
# 代码瞬间就简洁了
def fibona(n):
a = 0
b = 1
for _ in range(n):
a, b = b, a + b
yield a # 加个yiel就变成生成器了
def main():
print(fibona(10))
for i in fibona(10):
print(i)
if __name__ == "__main__":
main()
注意下这几点:
TypeError: can't send non-None value to a just-started generator
StopIteration
错误,返回值包含在StopIteration
的value
中def test_send(n):
for i in range(n):
if i==2:
return "i==2"
yield i
g = test_send(5)
while True:
try:
tmp = next(g)
print(tmp)
except StopIteration as ex:
print(ex.value)
break
输出:
0 1 i==2
其余的也没什么好说的了,读完源码再看看以前讲的内容别有一番滋味在心头
哦~
上集回顾:网络:静态服务器+压测
同步是指一个任务的完成须要依赖另一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成。
异步是指不须要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工做。而后继续执行下面代码逻辑,只要本身完成了整个任务就算完成了(异步通常使用状态、通知和回调)
PS:项目里面通常是这样的:(我的经验)
await xxx()
阻塞是指调用结果返回以前,当前线程会被挂起,一直处于等待消息通知,不可以执行其余业务(大部分代码都是这样的)
非阻塞是指在不能马上获得结果以前,该函数不会阻塞当前线程,而会马上返回(继续执行下面代码,或者重试机制走起)
PS:项目里面重试机制为啥通常都是3次?
对于一次IO访问,数据会先被拷贝到内核的缓冲区中,而后才会从内核的缓冲区拷贝到应用程序的地址空间。须要经历两个阶段:
因为存在这两个阶段,Linux产生了下面五种IO模型(以socket为例
)
recvfrom
等阻塞方法时,内核进入IO的第1个阶段:准备数据(内核须要等待足够的数据再拷贝)这个过程须要等待,用户进程会被阻塞,等内核将数据准备好,而后拷贝到用户地址空间,内核返回结果,用户进程才从阻塞态进入就绪态kernel
中的数据尚未准备好,那么它并不会block
用户进程,而是马上返回一个error
。error
时,它就知道数据尚未准备好,因而它能够再次发送read操做kernel
中的数据准备好了,而且又再次收到了用户进程的system call
,那么它立刻就将数据拷贝到了用户内存,而后返回select
、poll
和epoll
POSIX
的aio_
系列函数)
IO read
以后,会马上返回,不会阻塞用户进程。signal
告诉它read操做完成了贴一下Unix编程里面的图:
开始以前我们经过非阻塞IO引入一下:(来个简单例子socket.setblocking(False)
)
import time
import socket
def select(socket_addr_list):
for client_socket, client_addr in socket_addr_list:
try:
data = client_socket.recv(2048)
if data:
print(f"[来自{client_addr}的消息:]\n")
print(data.decode("utf-8"))
client_socket.send(
b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>"
)
else:
# 没有消息是触发异常,空消息是断开链接
client_socket.close() # 关闭客户端链接
socket_addr_list.remove((client_socket, client_addr))
print(f"[客户端{client_addr}已断开链接,当前链接数:{len(socket_addr_list)}]")
except Exception:
pass
def main():
# 存放客户端集合
socket_addr_list = list()
with socket.socket() as tcp_server:
# 防止端口绑定的设置
tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
tcp_server.bind(('', 8080))
tcp_server.listen()
tcp_server.setblocking(False) # 服务端非阻塞
while True:
try:
client_socket, client_addr = tcp_server.accept()
client_socket.setblocking(False) # 客户端非阻塞
socket_addr_list.append((client_socket, client_addr))
except Exception:
pass
else:
print(f"[来自{client_addr}的链接,当前链接数:{len(socket_addr_list)}]")
# 防止客户端断开后出错
if socket_addr_list:
# 轮询查看客户端有没有消息
select(socket_addr_list) # 引用传参
time.sleep(0.01)
if __name__ == "__main__":
main()
输出:
能够思考下:
select和上面的有点相似,就是轮询的过程交给了操做系统:
kernel会“监视”全部select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操做,将数据从kernel拷贝到用户进程
来个和上面等同的案例:
import select
import socket
def main():
with socket.socket() as tcp_server:
tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
tcp_server.bind(('', 8080))
tcp_server.listen()
socket_info_dict = dict()
socket_list = [tcp_server] # 监测列表
while True:
# 劣势:select列表数量有限制
read_list, write_list, error_list = select.select(
socket_list, [], [])
for item in read_list:
# 服务端迎接新的链接
if item == tcp_server:
client_socket, client_address = item.accept()
socket_list.append(client_socket)
socket_info_dict[client_socket] = client_address
print(f"[{client_address}已链接,当前链接数:{len(socket_list)-1}]")
# 客户端发来
else:
data = item.recv(2048)
if data:
print(data.decode("utf-8"))
item.send(
b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>"
)
else:
item.close()
socket_list.remove(item)
info = socket_info_dict[item]
print(f"[{info}已断开,当前链接数:{len(socket_list)-1}]")
if __name__ == "__main__":
main()
输出和上面同样
扩展说明:
select 函数监视的文件描述符分3类,分别是
writefds
、readfds
、和exceptfds
。调用后select函数会阻塞,直到有描述符就绪函数返回(有数据可读、可写、或者有except)或者超时(timeout指定等待时间,若是当即返回设为null便可)select的一个缺点在于单个进程可以监视的文件描述符的数量存在最大限制,在Linux上通常为1024(64位=>2048)
而后Poll就出现了,就是把上限给去掉了,本质并没变,仍是使用的轮询
epoll在内核2.6中提出(Linux独有),使用一个文件描述符管理多个描述符,将用户关心的文件描述符的事件存放到内核的一个事件表中,采用监听回调的机制,这样在用户空间和内核空间的copy只需一次,避免再次遍历就绪的文件描述符列表
先来看个案例吧:(输出和上面同样)
import socket
import select
def main():
with socket.socket() as tcp_server:
tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
tcp_server.bind(('', 8080))
tcp_server.listen()
# epoll是linux独有的
epoll = select.epoll()
# tcp_server注册到epoll中
epoll.register(tcp_server.fileno(), select.EPOLLIN | select.EPOLLET)
# key-value
fd_socket_dict = dict()
# 回调须要本身处理
while True:
# 返回可读写的socket fd 集合
poll_list = epoll.poll()
for fd, event in poll_list:
# 服务器的socket
if fd == tcp_server.fileno():
client_socket, client_addr = tcp_server.accept()
fd = client_socket.fileno()
fd_socket_dict[fd] = (client_socket, client_addr)
# 把客户端注册进epoll中
epoll.register(fd, select.EPOLLIN | select.EPOLLET)
else: # 客户端
client_socket, client_addr = fd_socket_dict[fd]
data = client_socket.recv(2048)
print(
f"[来自{client_addr}的消息,当前链接数:{len(fd_socket_dict)}]\n")
if data:
print(data.decode("utf-8"))
client_socket.send(
b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>"
)
else:
del fd_socket_dict[fd]
print(
f"[{client_addr}已离线,当前链接数:{len(fd_socket_dict)}]\n"
)
# 从epoll中注销
epoll.unregister(fd)
client_socket.close()
if __name__ == "__main__":
main()
扩展:epoll的两种工做模式
LT(level trigger,水平触发)模式:当epoll_wait检测到描述符就绪,将此事件通知应用程序,应用程序能够不当即处理该事件。下次调用epoll_wait时,会再次响应应用程序并通知此事件。LT模式是默认的工做模式。 LT模式同时支持阻塞和非阻塞socket。
ET(edge trigger,边缘触发)模式:当epoll_wait检测到描述符就绪,将此事件通知应用程序,应用程序必须当即处理该事件。若是不处理,下次调用epoll_wait时,不会再次响应应用程序并通知此事件。 ET是高速工做方式,只支持非阻塞socket(ET模式减小了epoll事件被重复触发的次数,所以效率要比LT模式高)
Code提炼一下:
epoll = select.epoll()
epoll.register(tcp_server.fileno(), select.EPOLLIN | select.EPOLLET)
epoll.unregister(fd)
PS:epoll
不必定比Select
性能高,通常都是分场景的:
其实IO多路复用还有一个kqueue
,和epoll
相似,下面的通用写法中有包含
Selector
)¶通常来讲:Linux下使用epoll,Win下使用select(IO多路复用会这个通用的便可)
先看看Python源代码:
# 选择级别:epoll|kqueue|devpoll > poll > select
if 'KqueueSelector' in globals():
DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():
DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():
DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():
DefaultSelector = PollSelector
else:
DefaultSelector = SelectSelector
实战案例:(可读和可写能够不分开)
import socket
import selectors
# Linux下使用epoll,Win下使用select
Selector = selectors.DefaultSelector()
class Task(object):
def __init__(self):
# 存放客户端fd和socket键值对
self.fd_socket_dict = dict()
def run(self):
self.server = socket.socket()
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.bind(('', 8080))
self.server.listen()
# 把Server注册到epoll
Selector.register(self.server.fileno(), selectors.EVENT_READ,
self.connected)
def connected(self, key):
"""客户端链接时处理"""
client_socket, client_address = self.server.accept()
fd = client_socket.fileno()
self.fd_socket_dict[fd] = (client_socket, client_address)
# 注册一个客户端读的事件(服务端去读消息)
Selector.register(fd, selectors.EVENT_READ, self.call_back_reads)
print(f"{client_address}已链接,当前链接数:{len(self.fd_socket_dict)}")
def call_back_reads(self, key):
"""客户端可读时处理"""
# 一个fd只能注册一次,监测可写的时候须要把可读给注销
Selector.unregister(key.fd)
client_socket, client_address = self.fd_socket_dict[key.fd]
print(f"[来自{client_address}的消息:]\n")
data = client_socket.recv(2048)
if data:
print(data.decode("utf-8"))
# 注册一个客户端写的事件(服务端去发消息)
Selector.register(key.fd, selectors.EVENT_WRITE,
self.call_back_writes)
else:
client_socket.close()
del self.fd_socket_dict[key.fd]
print(f"{client_address}已断开,当前链接数:{len(self.fd_socket_dict)}")
def call_back_writes(self, key):
"""客户端可写时处理"""
Selector.unregister(key.fd)
client_socket, client_address = self.fd_socket_dict[key.fd]
client_socket.send(b"ok")
Selector.register(key.fd, selectors.EVENT_READ, self.call_back_reads)
def main():
t = Task()
t.run()
while True:
ready = Selector.select()
for key, obj in ready:
# 须要本身回调
call_back = key.data
call_back(key)
if __name__ == "__main__":
main()
Code提炼一下:
Selector = selectors.DefaultSelector()
Selector.register(server.fileno(), selectors.EVENT_READ, call_back)
Selector.register(server.fileno(), selectors.EVENT_WRITE, call_back)
Selector.unregister(key.fd)
业余拓展:
select, iocp, epoll,kqueue及各类I/O复用机制 https://blog.csdn.net/shallwake/article/details/5265287 kqueue用法简介 http://www.cnblogs.com/luminocean/p/5631336.html
咱们常常有这样的需求:读取两个分表的数据列表,而后合并以后进行一些处理
平时能够借用itertools.chain
来遍历:
# https://docs.python.org/3/library/itertools.html#itertools.chain
import itertools
def main():
# 模拟分表后的两个查询结果
user1 = ["小张", "小明"]
user2 = ["小潘", "小周"]
# dict只能遍历key(这种状况须要本身封装合并方法并处理下)
user3 = {"name": "test1", "name1": "test2"}
# 需求:合并并遍历
for item in itertools.chain(user1, user2, user3):
print(item)
if __name__ == '__main__':
main()
输出:
小张 小明 小潘 小周 name name1
它的内部实现实际上是这样的:(至关于两层遍历,用yield返回
)
def my_chain(*args, **kwargs):
for items in args:
for item in items:
yield item
def main():
# 模拟分表后的两个查询结果
user1 = ["小张", "小明"]
user2 = ["小潘", "小周"]
# dict只能遍历key(这种状况须要本身封装合并方法并处理下)
user3 = {"name": "test1", "name1": "test2"}
# 需求:合并并遍历
for item in my_chain(user1, user2, user3):
print(item)
if __name__ == '__main__':
main()
而后Python3.3
以后语法再一步简化(yield from iterable对象
)
def my_chain(*args, **kwargs):
for items in args:
yield from items
def main():
# 模拟分表后的两个查询结果
user1 = ["小张", "小明"]
user2 = ["小潘", "小周"]
# 需求:合并并遍历
for item in my_chain(user1, user2):
print(item)
if __name__ == '__main__':
main()
输出:
小张 小明 小潘 小周 test1 test2
其实知道了内部实现,很容易就写上一段应对的处理:
def my_chain(*args, **kwargs):
for my_iterable in args:
# 若是是字典类型就返回value
if isinstance(my_iterable, dict):
my_iterable = my_iterable.values()
for item in my_iterable:
yield item
def main():
# 模拟分表后的两个查询结果
user1 = ["小张", "小明"]
user2 = ["小潘", "小周"]
# dict只能遍历key(这种状况须要本身封装合并方法并处理下)
user3 = {"name": "test1", "name1": "test2"}
# 需求:合并并遍历
for item in my_chain(user1, user2, user3):
print(item)
if __name__ == '__main__':
main()
输出:
小张 小明 小潘 小周 test1 test2
PS:通常不会这么干的,通常都是[{},{}]
遍历并处理:
import itertools
def main():
# 模拟分表后的两个查询结果
user1 = [{"name": "小张"}, {"name": "小明"}]
user2 = [{"name": "小潘"}, {"name": "小周"}]
user3 = [{"name": "test1"}, {"name": "test2"}]
# 需求:合并并遍历
for item in itertools.chain(user1, user2, user3):
# 通常都是直接在这里进行处理
for key, value in item.items():
print(value)
if __name__ == '__main__':
main()
协程的目的其实很简单:像写同步代码那样实现异步编程
先看个需求:生成绘图的数据(max,min,avg
)
好比说原来数据是这样的:
products = [{
"id": 2344,
"title": "御泥坊补水面膜",
"price": [89, 76, 120, 99]
}, {
"id": 2345,
"title": "御泥坊火山泥面膜",
"price": [30, 56, 70, 89]
}]
处理以后:
new_products = [{
"id": 2344,
"title": "御泥坊补水面膜",
"price": [89, 76, 120, 99],
"max": 120,
"min": 76,
"avg": 96.0
},
{
"id": 2345,
"title": "御泥坊火山泥面膜",
"price": [30, 56, 70, 89],
"max": 89,
"min": 30,
"avg": 61.25
}]
处理过的数据通常用来画图,实际效果相似于:
若是不借助协程,咱们通常这么处理:(数据库获取过程省略)
# 生成新的dict数据
def get_new_item(item):
prices = item["price"]
item["avg"] = sum(prices) / len(prices)
item["max"] = max(prices)
item["min"] = min(prices)
return item
def get_new_data(data):
newdata = []
for item in data:
new_item = get_new_item(item)
# print(new_item) # 处理后的新dict
newdata.append(new_item)
return newdata
def main():
# 需求:生成绘图的数据(max,min,avg)
products = [{
"id": 2344,
"title": "御泥坊补水面膜",
"price": [89, 76, 120, 99]
}, {
"id": 2345,
"title": "御泥坊火山泥面膜",
"price": [30, 56, 70, 89]
}]
new_products = get_new_data(products)
print(new_products)
if __name__ == "__main__":
main()
改为yield版的协程也很方便,基本上代码没有变,也不用像IO多路复用那样来回的回调
# 生成新的dict数据
def get_new_item(item):
prices = item["price"]
item["avg"] = sum(prices) / len(prices)
item["max"] = max(prices)
item["min"] = min(prices)
yield item
def get_new_data(data):
for item in data:
yield from get_new_item(item)
def main():
# 需求:生成绘图的数据(max,min,avg)
products = [{
"id": 2344,
"title": "御泥坊补水面膜",
"price": [89, 76, 120, 99]
}, {
"id": 2345,
"title": "御泥坊火山泥面膜",
"price": [30, 56, 70, 89]
}]
new_products = list()
# 若是须要返回值就捕获StopIteration异常
for item in get_new_data(products):
new_products.append(item)
print(new_products)
if __name__ == "__main__":
main()
简单解析一下:(用yield from
的目的就是为了引出等会说的async/await
)
yield from
(委托生成器get_new_data
)的好处就是让调用方(main
)和yield
子生成器(get_new_item
)直接创建一个双向通道
你也能够把yield from
看成一个中介(若是不理解就把yield from
想象成await
就容易理解了),本质就是下面代码:
# 生成新的数据
def get_new_data(data):
for item in data:
prices = item["price"]
item["avg"] = sum(prices) / len(prices)
item["max"] = max(prices)
item["min"] = min(prices)
yield item
def main():
# 需求:生成绘图的数据(max,min,avg)
products = [{
"id": 2344,
"title": "御泥坊补水面膜",
"price": [89, 76, 120, 99]
}, {
"id": 2345,
"title": "御泥坊火山泥面膜",
"price": [30, 56, 70, 89]
}]
new_products = list()
for item in get_new_data(products):
new_products.append(item)
print(new_products)
if __name__ == "__main__":
main()
yield from
内部其实在yield
基础上作了不少事情(好比一些异常的处理),具体能够看看 PEP 380
先提炼一个简版
的:
# 正常调用
RESULT = yield from EXPR
# _i:子生成器(也是个迭代器)
# _y:子生成器生产的值
# _r:yield from 表达式最终结果
# _s:调用方经过send发送的值
# _e:异常对象
# 内部原理
_i = iter(EXPR) # EXPR是一个可迭代对象,_i是子生成器
try:
# 第一次不能send值,只能next() or send(None),并把产生的值放到_y中
_y = next(_i)
except StopIteration as _e:
# 若是子生成器直接就return了,那就会抛出异常,经过value能够拿到子生成器的返回值
_r = _e.value
else:
# 尝试进行循环(调用方和子生成器交互过程),yield from这个生成器会阻塞(委托生成器)
while 1:
# 这时候子生成器已经和调用方创建了双向通道,在等待调用方send(value),把这个值保存在_s中
_s = yield _y # 这边还会进行一系列异常处理,我先删掉,等会看
try:
# 若是send(None),那么继续next遍历
if _s is None:
_y = next(_i) # 把子生成器结果放到 _y 中
else:
_y = _i.send(_s) # 若是调用方send一个值,就转发到子生成器
except StopIteration as _e:
_r = _e.value # 若是子生成器遍历完了,就把返回值给_r
break
RESULT = _r # 最终的返回值(yield from 最终的返回值)
如今再来看完整版
压力就没有那么大了:
# 正常调用
RESULT = yield from EXPR
# _i:子生成器(也是个迭代器)
# _y:子生成器生产的值
# _r:yield from 表达式最终结果
# _s:调用方经过send发送的值
# _e:异常对象
# 内部原理
_i = iter(EXPR) # EXPR是一个可迭代对象,_i是子生成器
try:
# 第一次不能send值,只能next() or send(None),并把产生的值放到_y中
_y = next(_i)
except StopIteration as _e:
# 若是子生成器直接就return了,那就会抛出异常,经过value能够拿到子生成器的返回值
_r = _e.value
else:
# 尝试进行循环(调用方和子生成器交互过程),yield from这个生成器会阻塞(委托生成器)
while 1:
try:
# 这时候子生成器已经和调用方创建了双向通道,在等待调用方send(value),把这个值保存在_s中
_s = yield _y
# 【如今补全】有这么几种状况须要处理
# 1.子生成器可能只是一个迭代器,并不能做为协程的生成器(不支持throw和close)
# 2.子生成器虽然支持了throw和close,但在子生成器内部两种方法都会抛出异常
# 3.调用法调用了gen.throw(),想让子生成器本身抛异常
# 这时候就要处理 gen.close() 和 gen.throw()的状况
# 生成器close()异常的处理
except GeneratorExit as _e:
try:
_m = _i.close
except AttributeError:
pass # 屏蔽close的异常
else:
_m()
raise _e # 上抛异常
# 生成器throw()异常的处理
except BaseException as _e:
_x = sys.exc_info()
try:
_m = _i.throw
except AttributeError:
raise _e
else:
try:
_y = _m(*_x)
except StopIteration as _e:
_r = _e.value
break
else:
try:
# 若是send(None),那么继续next遍历
if _s is None:
_y = next(_i) # 把子生成器结果放到 _y 中
else:
_y = _i.send(_s) # 若是调用方send一个值,就转发到子生成器
except StopIteration as _e:
_r = _e.value # 若是子生成器遍历完了,就把返回值给_r
break
RESULT = _r # 最终的返回值(yield from 最终的返回值)
把上面的原生代码用async和await
改装一下:(协程的目的就是像写同步代码同样写异步,这个才算是真作到了)
import asyncio
# 生成新的dict数据
async def get_new_item(item):
prices = item["price"]
item["avg"] = sum(prices) / len(prices)
item["max"] = max(prices)
item["min"] = min(prices)
return item
async def get_new_data(data):
newdata = []
for item in data:
new_item = await get_new_item(item)
# print(new_item) # 处理后的新dict
newdata.append(new_item)
return newdata
def main():
# 需求:生成绘图的数据(max,min,avg)
products = [{
"id": 2344,
"title": "御泥坊补水面膜",
"price": [89, 76, 120, 99]
}, {
"id": 2345,
"title": "御泥坊火山泥面膜",
"price": [30, 56, 70, 89]
}]
# python 3.7
new_products = asyncio.run(get_new_data(products))
print(new_products)
if __name__ == "__main__":
main()
输出:(是否是很原生代码没啥区别?)
[{'id': 2344, 'title': '御泥坊补水面膜', 'price': [89, 76, 120, 99], 'avg': 96.0, 'max': 120, 'min': 76}, {'id': 2345, 'title': '御泥坊火山泥面膜', 'price': [30, 56, 70, 89], 'avg': 61.25, 'max': 89, 'min': 30}]
下级预估:asyncio
官方文档:https://docs.python.org/3/library/asyncio.html
开发中常见错误:https://docs.python.org/3/library/asyncio-dev.html
代码示例:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/ZCoroutine
PS:asyncio
是Python
用于解决异步IO
编程的一整套
解决方案
上次说了下协程演变过程,此次继续,先接着上次的说:
像JS
是能够生成器和async
和await
混用的,那Python
呢?(NetCore不能够混用)
import types
# 和生成器彻底分开了,不过能够理解为yield from
@types.coroutine
def get_value(value):
yield value
async def get_name(name):
# 一系列逻辑处理
return await get_value(name)
if __name__ == '__main__':
gen = get_name("小明")
print(gen.send(None))
# 直接混用会报错:TypeError: object generator can't be used in 'await' expression
咱们的async
和await
虽然和yield from
不是一个概念,可是能够理解为yield from
上面这段代码你能够理解为:
import types
def get_value(value):
yield value
# 这个async和await替换成yield from
def get_name(name):
# 一系列逻辑处理
yield from get_value(name)
if __name__ == '__main__':
gen = get_name("小明")
print(gen.send(None))
PS:Python默认和NetCore同样,不能直接混用,若是你必定要混用,那么得处理下(使用@asyncio.coroutine
也行)
在今天以前,协程咱们是这么实现的:事件循环(loop)
+回调(驱动生成器)
+IO多路复用(epoll)
如今能够经过官方提供的asyncio
(能够理解为协程池)来实现了(第三方还有一个uvloop
【基于C写的libuv
库(nodejs
也是基于这个库)】)
PS:uvloop
的使用很是简单,只要在获取事件循环前将asyncio
的事件循环策略设置为uvloop
的:asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
先看个简单的协程案例:
import types
import asyncio
# 模拟一个耗时操做
async def test():
print("start...")
# 不能再使用之前阻塞的暂停了
await asyncio.sleep(2)
print("end...")
return "ok"
if __name__ == '__main__':
import time
start_time = time.time()
# # >=python3.4
# # 返回asyncio的事件循环
# loop = asyncio.get_event_loop()
# # 运行事件循环,直到指定的future运行完毕,返回结果
# result = loop.run_until_complete(test())
# print(result)
# python3.7
result = asyncio.run(test())
print(result)
print(time.time() - start_time)
输出:
start... end... ok 2.001772403717041
简单说下,asyncio.run
是python3.7才简化出来的语法(类比NetCore的Task.Run
)看看源码就知道了:
# https://github.com/lotapp/cpython3/blob/master/Lib/asyncio/runners.py
def run(main, *, debug=False):
# 之前是直接使用"asyncio.get_event_loop()"(开发人员通常都习惯这个了)
# 3.7开始推荐使用"asyncio.get_running_loop()"来获取正在运行的loop(获取不到就抛异常)
if events._get_running_loop() is not None:
raise RuntimeError("没法从正在运行的事件循环中调用asyncio.run()")
if not coroutines.iscoroutine(main):
raise ValueError("{!r}应该是一个协程".format(main))
loop = events.new_event_loop() # 建立一个新的事件循环
try:
events.set_event_loop(loop) # 设置事件循环
loop.set_debug(debug) # 是否调试运行(默认否)
return loop.run_until_complete(main) # 等待运行
finally:
try:
_cancel_all_tasks(loop) # 取消其余任务
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
events.set_event_loop(None)
loop.close()
新版本其实就是使用了一个新的loop
去启动run_until_complete
PS:uvloop
也能够这样去使用:获取looploop = uvloop.new_event_loop()
再替换原生的loopasyncio.set_event_loop(loop)
import asyncio
# 模拟一个耗时操做
async def test(i):
print("start...")
# 不能再使用之前阻塞的暂停了
await asyncio.sleep(2)
print("end...")
return i
if __name__ == '__main__':
import time
start_time = time.time()
# # >=python3.4
loop = asyncio.get_event_loop()
# tasks = [asyncio.ensure_future(test(i)) for i in range(10)]
# 注意:是loop的方法,而不是asyncio的,否则就会引起RuntimeError:no running event loop
tasks = [loop.create_task(test(i)) for i in range(10)]
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
print(task.result())
print(time.time() - start_time)
输出:(tasks替换成这个也同样:tasks = [asyncio.ensure_future(test(i)) for i in range(10)]
)
start... start... start... start... start... start... start... start... start... start... end... end... end... end... end... end... end... end... end... end... 0 1 2 3 4 5 6 7 8 9 2.028331995010376
而后咱们再看看这个asyncio.wait
是个啥:(回顾:http://www.noobyard.com/article/p-gqozplwe-d.html)
# return_when 这个参数和以前同样 FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION ALL_COMPLETED = concurrent.futures.ALL_COMPLETED # 官方准备在将来版本废弃它的loop参数 # 和concurrent.futures里面的wait不同,这边是个协程 async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
平时使用能够用高级APIasyncio.gather(*tasks)
来替换asyncio.wait(tasks)
PS:官方推荐使用create_task的方式
来建立一个任务
import asyncio
# 模拟一个耗时操做
async def test(i):
print("start...")
# 不能再使用之前阻塞的暂停了
await asyncio.sleep(2)
print("end...")
return i
async def main():
tasks = [test(i) for i in range(10)]
# await task 能够获得返回值(获得结果或者异常)
# for task in asyncio.as_completed(tasks):
# try:
# print(await task)
# except Exception as ex:
# print(ex)
return [await task for task in asyncio.as_completed(tasks)]
if __name__ == '__main__':
import time
start_time = time.time()
# old推荐使用
loop = asyncio.get_event_loop()
result_list = loop.run_until_complete(main())
print(result_list)
print(time.time() - start_time)
输出:(PS:用asyncio.gather(*tasks)
直接替换asyncio.wait(tasks)
也行)
start... start... start... start... start... start... start... start... start... start... end... end... end... end... end... end... end... end... end... end... [1, 6, 4, 5, 0, 7, 8, 3, 2, 9] 2.0242035388946533
其实理解起来很简单,并且和NetCore
以及NodeJS
它们统一了,只要是await xxx
就返回一个(结果
|异常
),不await
就是一个task对象
import asyncio
# 模拟一个耗时操做
async def test(i):
print("start...")
await asyncio.sleep(2)
print("end...")
return i
async def main():
tasks = [test(i) for i in range(10)]
# 给`协程/futures`返回一个future聚合结果
return await asyncio.gather(*tasks) # 记得加*来解包
if __name__ == '__main__':
import time
start_time = time.time()
# python3.7
result_list = asyncio.run(main())
print(result_list)
# 2.0259485244750977
print(time.time() - start_time)
输出:(语法简化太多了,用起来特别简单)
start... start... start... start... start... start... start... start... start... start... end... end... end... end... end... end... end... end... end... end... [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 2.00840163230896
关于参数须要加*
解包的说明 ==> 看看函数定义就秒懂了:
# 给 协程/futures 返回一个future聚合结果
def gather(*coros_or_futures, loop=None, return_exceptions=False):
pass
# 把协程或者awaitable对象包裹成task
def ensure_future(coro_or_future, *, loop=None):
pass
# 传入一个协程对象,返回一个task对象
class BaseEventLoop(events.AbstractEventLoop):
def create_task(self, coro):
pass
asyncio的高级(high-level
)API通常用于这几个方面:(开发基本够用了)
IO
)和进程间通讯(IPC
)subprocesses
)相关Queue
)分配任务(Tasks
)synchronize
)并发代码低级(low-level
)API通常这么用:(事件循环和回调会用下,其余基本不用)
Signal
)等提供异步(asynchronous
)APIasync/await
语法桥接基于回调的库和代码回调通常不利于代码维护,如今基本上是尽可能不用了(异步代码用起来都和同步没多大差异了,回调也就没那么大用处了)
上面说的获取返回值,其实也能够经过回调函数来获取:
# 低级API示例
import asyncio
async def get_html(url):
print(f"get {url} ing")
await asyncio.sleep(2)
return f"<h1>This is a test for {url}</h1>"
def call_back(task):
print(type(task))
print(task.result())
if __name__ == "__main__":
import time
start_time = time.time()
urls = [
"https://www.baidu.com", "https://www.sogou.com",
"https://www.python.org", "https://www.asp.net"
]
tasks = set() # 任务集合
loop = asyncio.get_event_loop()
for url in urls:
# task = asyncio.ensure_future(get_html(url))
task = loop.create_task(get_html(url))
# 设置回调函数
task.add_done_callback(call_back)
# 添加到任务集合中
tasks.add(task)
# 批量执行
loop.run_until_complete(asyncio.gather(*tasks))
print(time.time() - start_time)
输出:(task.add_done_callback(回调函数)
)
get https://www.baidu.com ing get https://www.sogou.com ing get https://www.python.org ing get https://www.asp.net ing <class '_asyncio.Task'> <h1>This is a test for https://www.baidu.com</h1> <class '_asyncio.Task'> <h1>This is a test for https://www.python.org</h1> <class '_asyncio.Task'> <h1>This is a test for https://www.sogou.com</h1> <class '_asyncio.Task'> <h1>This is a test for https://www.asp.net</h1> 2.0168468952178955
实例:
import asyncio
import functools
async def get_html(url):
await asyncio.sleep(2)
return "This is a test for"
# 注意一个东西:经过偏函数传过来的参数在最前面
def call_back(url, task):
# do something
print(type(task))
print(task.result(), url)
if __name__ == "__main__":
import time
start_time = time.time()
urls = [
"https://www.baidu.com", "https://www.sogou.com",
"https://www.python.org", "https://www.asp.net"
]
tasks = set() # 任务集合
loop = asyncio.get_event_loop()
for url in urls:
# task = asyncio.ensure_future(get_html(url))
task = loop.create_task(get_html(url))
# 设置回调函数 (不支持传参数,咱们就利用偏函数来传递)
task.add_done_callback(functools.partial(call_back, url))
# 添加到任务集合中
tasks.add(task)
# 批量执行
loop.run_until_complete(asyncio.gather(*tasks))
print(time.time() - start_time)
输出:(PS:经过偏函数传过来的参数在最前面)
<class '_asyncio.Task'> This is a test for https://www.baidu.com <class '_asyncio.Task'> This is a test for https://www.python.org <class '_asyncio.Task'> This is a test for https://www.sogou.com <class '_asyncio.Task'> This is a test for https://www.asp.net 2.0167236328125
以前说的await task
可能获得结果也可能获得异常有些人可能还不明白 ==> 其实你把他看出同步代码(PS:协程的目的就是像写同步代码同样进行异步编程)就好理解了,函数执行要么获得结果要么获得返回值
看个异常的案例:
import asyncio
async def get_html(url):
print(f"get {url} ing")
if url == "https://www.asp.net":
raise Exception("Exception is over")
await asyncio.sleep(2)
return f"<h1>This is a test for {url}</h1>"
async def main():
urls = [
"https://www.baidu.com", "https://www.asp.net",
"https://www.python.org", "https://www.sogou.com"
]
tasks = [get_html(url) for url in urls]
return await asyncio.gather(*tasks)
if __name__ == "__main__":
import time
start_time = time.time()
try:
asyncio.run(main())
except Exception as ex:
print(ex)
print(time.time() - start_time)
输出:(和同步代码没差异,可能出异常的部分加个异常捕获便可)
get https://www.baidu.com ing get https://www.asp.net ing get https://www.python.org ing get https://www.sogou.com ing Exception is over 0.008000373840332031
再一眼旧版怎么用:(PS:基本差很少,下次所有用新用法了)
import asyncio
async def get_html(url):
print(f"get {url} ing")
if url == "https://www.asp.net":
raise Exception("Exception is over")
await asyncio.sleep(2)
return f"<h1>This is a test for {url}</h1>"
async def main():
urls = [
"https://www.baidu.com", "https://www.asp.net",
"https://www.python.org", "https://www.sogou.com"
]
tasks = set() # 任务集合
tasks = [get_html(url) for url in urls]
return await asyncio.gather(*tasks)
if __name__ == "__main__":
import time
start_time = time.time()
loop = asyncio.get_event_loop()
try:
# 批量执行
loop.run_until_complete(main())
except Exception as ex:
print(ex)
print(time.time() - start_time)
Python3调试过程当中的常见异常:http://www.noobyard.com/article/p-qvsyyfom-t.html
官方文档:https://docs.python.org/3/library/asyncio-exceptions.html
asyncio.TimeoutError(Exception.Error)
:
asyncio.CancelledError(Exception.Error)
:
asyncio.InvalidStateError(Exception.Error)
:
Task/Future
内部状态无效引起asyncio.IncompleteReadError(Exception.Error)
:读取未完成引起的错误:
asyncio.LimitOverrunError(Exception)
:
asyncio.SendfileNotAvailableError(Exception.ReferenceError.RuntimeError)
:
有些异常官方没有写进去,我补了一些经常使用的异常:https://docs.python.org/3/library/exceptions.html
BaseException
SystemExit
:sys.exit()
引起的异常(目的:让Python解释器退出)KeyboardInterrupt
:用户Ctrl+C终止程序引起的异常GeneratorExit
:生成器或者协程关闭的时候产生的异常(特别注意)Exception
:全部内置异常(非系统退出)或者用户定义异常的基类
asyncio.Error
asyncio.CancelledError
asyncio.TimeoutError
:和Exception.OSError.TimeoutError
区分开asyncio.InvalidStateError
:Task/Future
内部状态无效引起asyncio.LimitOverrunError
:超出缓冲区引起的异常StopIteration
:next()、send()
引起的异常:
https://www.cnblogs.com/dotnetcrazy/p/9278573.html#6.Python迭代器
StopAsyncIteration
:__anext__()
引起的异常AssertionError
:当断言assert
语句失败时引起AttributeError
:当属性引用或赋值失败时引起EOFError
asyncio.IncompleteReadError
:读取操做未完成引起的错误OSError
:当系统函数返回与系统相关的错误时引起
TimeoutError
:系统函数执行超时时触发ReferenceError
:引用错误(对象被资源回收或者删除了)RuntimeError
:出错了,可是检测不到错误类别时触发
NotImplementedError
:为实现报错(好比调用了某个不存在的子类方法)RecursionError
:递归程度太深引起的异常asyncio.SendfileNotAvailableError
:系统调用不适用于给定的套接字或文件类型SyntaxError
:语法错误时引起(粘贴代码常常遇到)
IndentationError
:缩进有问题TabError
:当缩进包含不一致的制表符和空格使用时引起TypeError
:类型错误Net方向的同志记得对比当时写的 Python3 与 C# 并发编程之~Net篇:https://www.cnblogs.com/dunitian/p/9419325.html
先说说概念:
event_loop
事件循环:
coroutine
协程:
async
关键字定义的函数,它的调用不会当即执行函数,而是会返回一个协程对象future
对象:
task
任务:
Task
对象是Future
的子类,它将coroutine
和Future
联系在一块儿,将coroutine
封装成一个Future
对象async/await
关键字:
async
定义一个协程,await
用于挂起阻塞的异步调用接口yield from
(都是在调用方与子协程之间直接创建一个双向通道)为了不读者混乱于新旧代码的使用,从下面开始就直接使用最新的语法的
asyncio.run(main())
if __name__ == "__main__"
)asyncio.create_task(func())
asyncio.gather(*tasks)
asyncio.wait
asyncio.get_event_loop()
asyncio.get_running_loop()
(获取不到会抛异常)# 若是和旧版本混用,就应该这么写了(麻烦)
try:
loop = asyncio.get_running_loop()
except RuntimeError as ex:
print(ex) # no running event loop
loop = asyncio.get_event_loop()
...
loop.run_until_complete(xxx)
新语法:
async def main():
loop = asyncio.get_running_loop()
...
asyncio.run(main())
Task基本上就是这几个状态(生成器、Future也是):
Pending
:建立Task,还未执行Running
:事件循环正在调用执行任务Done
:Task执行完毕Cancelled
:Task被取消后的状态Python3.7以前官方贴了张时序图,咱们拿来理解上面的话:https://docs.python.org/3.6/library/asyncio-task.html
import asyncio
async def compute(x, y):
print(f"计算 {x}+{y}...")
await asyncio.sleep(1.0)
return x + y
async def main(x, y):
result = await compute(x, y)
print(f"{x}+{y}={result}")
loop = asyncio.get_event_loop()
loop.run_until_complete(main(1, 2))
loop.close()
和旧版本比起来其实就是建立一个task
,而后为task
添加一个回调函数add_done_callback
import asyncio
async def get_html(url):
print(f"get {url} ing")
await asyncio.sleep(2)
return f"<h1>This is a test for {url}</h1>"
def callback_func(task):
print(type(task))
if task.done():
print(f"done") # print(task.result())
async def main():
urls = [
"https://www.baidu.com", "https://www.asp.net",
"https://www.python.org", "https://www.sogou.com"
]
# asyncio.create_task来建立一个Task
tasks = [asyncio.create_task(get_html(url)) for url in urls]
# 给每一个任务都加一个回调函数
for task in tasks:
task.add_done_callback(callback_func)
# 批量执行任务
result = await asyncio.gather(*tasks)
print(result) # 返回 result list
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
输出:
get https://www.baidu.com ing get https://www.asp.net ing get https://www.python.org ing get https://www.sogou.com ing <class '_asyncio.Task'> done <class '_asyncio.Task'> done <class '_asyncio.Task'> done <class '_asyncio.Task'> done ['<h1>This is a test for https://www.baidu.com</h1>', '<h1>This is a test for https://www.asp.net</h1>', '<h1>This is a test for https://www.python.org</h1>', '<h1>This is a test for https://www.sogou.com</h1>'] 2.0189685821533203
注意:`add_signal_handler`是loop独有的方法,Task中没有,eg:loop.add_signal_handler(signal.SIGINT, callback_handle, *args)
关于批量任务的异常处理:
return_exceptions=True
:不影响其余任务,异常消息也放在结果列表中gather
被取消的时候,无论True or False,这批次任务所有取消import asyncio
async def get_html(url):
print(f"get {url} ing")
if url == "https://www.asp.net":
raise Exception("Exception is over")
await asyncio.sleep(2)
return f"<h1>This is a test for {url}</h1>"
def callback_func(task):
if task.done():
print(f"done") # print(task.result())
async def main():
urls = [
"https://www.baidu.com", "https://www.asp.net",
"https://www.python.org", "https://www.sogou.com"
]
# asyncio.create_task来建立一个Task
tasks = [asyncio.create_task(get_html(url)) for url in urls]
# 给每一个任务都加一个回调函数
for task in tasks:
task.add_done_callback(callback_func)
# 批量执行任务
result = await asyncio.gather(*tasks, return_exceptions=True)
print(result) # 返回 result list
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
输出:
get https://www.baidu.com ing get https://www.asp.net ing get https://www.python.org ing get https://www.sogou.com ing done done done done ['<h1>This is a test for https://www.baidu.com</h1>', Exception('Exception is over'), '<h1>This is a test for https://www.python.org</h1>', '<h1>This is a test for https://www.sogou.com</h1>'] 2.013272523880005
看个简单的任务分组案例:
import asyncio
async def get_html(url):
print(f"get url for{url}")
await asyncio.sleep(2)
return f"<h1>This is a test for {url}</h1>"
async def main():
urls1 = ["https://www.baidu.com", "https://www.asp.net"]
urls2 = ["https://www.python.org", "https://www.sogou.com"]
tasks1 = [asyncio.create_task(get_html(url)) for url in urls1]
tasks2 = [asyncio.create_task(get_html(url)) for url in urls2]
# 等待两组都完成,而后返回聚合结果
result = await asyncio.gather(*tasks1, *tasks2)
print(result)
if __name__ == "__main__":
import time
start_time = time.time()
try:
asyncio.run(main())
except Exception as ex:
print(ex)
print(time.time() - start_time)
输出:(两个分组结果被一块儿放到了list中)
get url forhttps://www.baidu.com get url forhttps://www.asp.net get url forhttps://www.python.org get url forhttps://www.sogou.com ['<h1>This is a test for https://www.baidu.com</h1>', '<h1>This is a test for https://www.asp.net</h1>', '<h1>This is a test for https://www.python.org</h1>', '<h1>This is a test for https://www.sogou.com</h1>'] 2.0099380016326904
若是想要对Group1
和Group2
进行更多的自定化,能够再包裹一层gather
方法:
import asyncio
async def get_html(url):
print(f"get url for{url}")
await asyncio.sleep(2)
return f"<h1>This is a test for {url}</h1>"
async def main():
urls1 = ["https://www.baidu.com", "https://www.asp.net"]
urls2 = ["https://www.python.org", "https://www.sogou.com"]
tasks1 = [asyncio.create_task(get_html(url)) for url in urls1]
tasks2 = [asyncio.create_task(get_html(url)) for url in urls2]
group1 = asyncio.gather(*tasks1)
group2 = asyncio.gather(*tasks2)
# 分组2由于某缘由被取消任务了(模拟)
group2.cancel()
# 等待两组都完成,而后返回聚合结果
result = await asyncio.gather(group1, group2, return_exceptions=True)
print(result)
if __name__ == "__main__":
import time
start_time = time.time()
try:
asyncio.run(main())
except Exception as ex:
print(ex)
print(time.time() - start_time)
输出:
get url forhttps://www.baidu.com get url forhttps://www.asp.net [['<h1>This is a test for https://www.baidu.com</h1>', '<h1>This is a test for https://www.asp.net</h1>'], CancelledError()] 2.0090348720550537
再看个单个任务的案例:
import asyncio
async def test():
print("start...")
await asyncio.sleep(10)
print("end...")
async def main():
task = asyncio.create_task(test())
await asyncio.sleep(1)
# 取消task任务
task.cancel()
try:
await task
except asyncio.CancelledError:
print(f"任务已经被取消:{task.cancelled()}")
print(f"任务是由于异常而完成:{task.done()}")
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
输出:
start... 任务已经被取消:True 任务是由于异常而完成:True 1.0133979320526123
简单说明下:
task.done()
:任务是否完成
task.done() ==> true
:
task.cancelled()
:用来判断是否成功取消为何这么说?看看源码:
# 完成包含了正常+异常
if outer.done():
# 把由于异常完成的任务打个标记
if not fut.cancelled():
fut.exception() # 标记检索的异常
PS:官方推荐asyncio.all_tasks
(loop中还没有完成的Task集合):
asyncio.Task.all_tasks
来获取(返回loop的全部Task集合)超时等待:asyncio.wait_for(task, timeout)
import asyncio
async def test(time):
print("start...")
await asyncio.sleep(time)
print("end...")
return time
async def main():
task = asyncio.create_task(test(3))
try:
result = await asyncio.wait_for(task, timeout=2)
print(result)
except asyncio.CancelledError:
print("Cancel")
except asyncio.TimeoutError:
print("超时取消")
except Exception as ex:
print(ex)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
输出:
start... 超时取消 2.007002115249634
wait
是比gather
更底层的api,好比如今这个多任务限时等待gather
并不能知足:
import asyncio
async def test(time):
print("start...")
await asyncio.sleep(time)
print("end...")
return time
async def main():
tasks = [asyncio.create_task(test(i)) for i in range(10)]
# 已完成的任务(包含异常),未完成的任务
done, pending = await asyncio.wait(tasks, timeout=2)
# 任务总数(我用了3种表示)PS:`all_tasks()`的时候记得去除main的那个
print(
f"任务总数:{len(tasks)}=={len(done)+len(pending)}=={len(asyncio.Task.all_tasks())-1}"
)
# 全部未完成的task:asyncio.all_tasks(),记得去掉run(main())
print(f"未完成Task:{len(pending)}=={len(asyncio.all_tasks()) - 1}")
print(await asyncio.gather(*done))
# for task in done:
# print(await task)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
输出:
start... start... start... start... start... start... start... start... start... start... end... end... end... 任务总数:10==10==10 未完成Task:7==7 [0, 1, 2] 2.0071778297424316
用法其实和Future同样(https://www.cnblogs.com/dotnetcrazy/p/9528315.html#Future对象),这边就当再普及下新语法了
项目里常常有这么一个场景:同时调用多个同效果的API,有一个返回后取消其余请求
,看个引入案例
import asyncio
async def test(i):
print(f"start...task{i}")
await asyncio.sleep(i)
print(f"end...task{i}")
return "ok"
# 第一个任务执行完成则结束此批次任务
async def main():
tasks = [asyncio.create_task(test(i)) for i in range(10)]
# 项目里常常有这么一个场景:同时调用多个同效果的API,有一个返回后取消其余请求
done, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED)
# print(await asyncio.gather(*done))
for task in done:
print(await task)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
输出:
start...task0 start...task1 start...task2 start...task3 start...task4 start...task5 start...task6 start...task7 start...task8 start...task9 end...task0 ok 0.017002105712890625
课后拓展:(asyncio.shield
保护等待对象不被取消) https://docs.python.org/3/library/asyncio-task.html#shielding-from-cancellation
下级预估:旧代码兼容、同步语、Socket新用
以前有人问我,这个asyncio.get_running_loop()
究竟是用仍是不用?为何一会asyncio.get_event_loop()
一会又是asyncio.get_running_loop()
,一会是loop.run_until_complete()
一会又是asyncio.run()
的,有点混乱了。
以前逆天简单的提了一下,可能说的仍是不太详细,这边再举几个例子说说:
首先:若是你用的是Python3.7
以前的版本,那么你用不到loop = asyncio.get_running_loop()
和asyncio.run()
的
若是是老版本你就使用asyncio.get_event_loop()
来获取loop
,用loop.run_until_complete()
来运行:
import asyncio
async def test():
print("start ...")
await asyncio.sleep(2)
print("end ...")
# 若是你用`get_running_loop`就不要和`loop.run_until_complete`混用
loop = asyncio.get_event_loop()
loop.run_until_complete(test())
输出:(混用须要捕获Runtime的异常)
start ... end ...
上节课说使用asyncio.get_running_loop()
麻烦的情景是这个:(这种状况倒不如直接asyncio.get_event_loop()
获取loop了)
# 若是和旧版本混用,就应该这么写了(麻烦)
try:
loop = asyncio.get_running_loop()
except RuntimeError as ex:
loop = asyncio.get_event_loop()
...
asyncio.run(test())
官方推荐的新语法是这样的:(>=Python3.7
)
async def main():
loop = asyncio.get_running_loop()
...
asyncio.run(main())
PS:记住一句就行:asyncio.get_running_loop()
和asyncio.run()
成对出现
能够这么理解:asyncio.run
里会建立对应的loop
,因此你才能获取正在运行的loop
:
# https://github.com/lotapp/cpython3/blob/master/Lib/asyncio/runners.py
def run(main, *, debug=False):
if events._get_running_loop() is not None:
raise RuntimeError("没法从正在运行的事件循环中调用asyncio.run()")
if not coroutines.iscoroutine(main):
raise ValueError("{!r}应该是一个协程".format(main))
# 建立一个新的事件循环
loop = events.new_event_loop()
try:
events.set_event_loop(loop) # 设置事件循环
loop.set_debug(debug) # 是否调试运行(默认否)
return loop.run_until_complete(main) # 等待运行
finally:
try:
_cancel_all_tasks(loop) # 取消其余任务
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
events.set_event_loop(None)
loop.close()
就是怕你们混乱,上节课开始就直接使用的最新语法,旧语法文章里尽可能不使用了,本节也是
部分能够参考官方文档:https://docs.python.org/3/library/asyncio-eventloop.html
学了协程GIL
的问题其实也不是多大的事情了,多进程+协程就能够了,asyncio
如今也提供了线程安全的run
方法:asyncio.run_coroutine_threadsafe(coro)
(也算是对GIL给出的官方解决方法了)
前面咱们说过了并发编程(线程+进程)的通用解决方案:并发编程:concurrent.futures专栏
asyncio
框架虽然几乎包含了全部经常使用功能,但毕竟是新事物,旧代码怎么办?协程只是单线程工做,理论上不能使用阻塞代码,那库或者api只能提供阻塞的调用方式怎么办? ~ 不用慌,可使用官方提供的兼容方法,先看个案例:
import asyncio
import concurrent.futures
# 模拟一个耗时操做
def test(n):
return sum(i * i for i in range(10**n))
# old main
def main():
with concurrent.futures.ThreadPoolExecutor() as pool:
# 注意:future和asyncio.future是不同的
future = pool.submit(test, 7)
result = future.result()
print(result)
if __name__ == "__main__":
import time
start_time = time.time()
main() # old
print(time.time() - start_time)
输出:(注意:future
和asyncio.future
不是一个东西,只是相似而已)
333333283333335000000 15.230607032775879
import asyncio
import concurrent.futures
# 模拟一个耗时操做
def test(n):
return sum(i * i for i in range(10**n))
async def main():
# 获取loop
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
# 新版兼任代码
result = await loop.run_in_executor(pool, test, 7)
print(result)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main()) # new
print(time.time() - start_time)
输出:(不谈其余的,至少运行速度快了)
333333283333335000000 15.283994913101196
咱们来看看run_in_executor
的内部逻辑是啥:
class BaseEventLoop(events.AbstractEventLoop):
def run_in_executor(self, executor, func, *args):
# 检查loop是否关闭,若是关闭就抛`RuntimeError`异常
self._check_closed()
if self._debug:
self._check_callback(func, 'run_in_executor')
# 若是不传一个executor,就会使用默认的executor
# 换句话说:你能够不传`线程池`
if executor is None:
executor = self._default_executor
if executor is None:
executor = concurrent.futures.ThreadPoolExecutor()
self._default_executor = executor
# 把`concurrent.futures.Future`对象封装成`asyncio.futures.Future`对象
return futures.wrap_future(executor.submit(func, *args), loop=self)
看完源码就发现,代码还能够进一步简化:
import asyncio
# 模拟一个耗时操做
def test(n):
return sum(i * i for i in range(10**n))
async def main():
# 获取loop
loop = asyncio.get_running_loop()
# 新版兼任代码
result = await loop.run_in_executor(None, test, 7)
print(result)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
输出:
333333283333335000000 15.367998838424683
PS:协程里面不该该出现传统的阻塞代码,若是只能用那些代码,那么这个就是一个兼任的措施了
这个没有以前讲的那些经常使用,就当了解下,框架里面碰到不至于懵逼:
add_done_callback(回调函数)
task.add_done_callback()
or loop.add_done_callback()
functools.partial(call_back, url)
call_back(url,task)
call_soon(callback,*args)
loop.call_soon()
、线程安全:loop.call_soon_threadsafe()
loop.call_later(0,callback,*args)
loop.call_later(delay,callback,*args)
loop.call_at(绝对时间,callback,*args)
注意点:首先要保证任务执行前loop不断开,好比你call_later(2,xxx)
,这时候loop退出了,那么任务确定完成不了
这个比较简单,看个案例:
import asyncio
def test(name):
print(f"start {name}...")
print(f"end {name}...")
async def main():
# 正在执行某个任务
loop = asyncio.get_running_loop()
# 插入一个更要紧的任务
# loop.call_later(0, callback, *args)
task1 = loop.call_soon(test, "task1")
# 多少秒后执行
task2 = loop.call_later(2, test, "task2")
# 内部时钟时间
task3 = loop.call_at(loop.time() + 3, test, "task3")
print(type(task1))
print(type(task2))
print(type(task3))
# 保证loop在执行完毕后才关闭
await asyncio.sleep(5)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
输出:(回调函数通常都是普通函数
)
<class 'asyncio.events.Handle'> <class 'asyncio.events.TimerHandle'> <class 'asyncio.events.TimerHandle'> start task1... end task1... start task2... end task2... start task3... end task3... 4.9966819286346436
PS:关于返回值的说明能够看官方文档:https://docs.python.org/3/library/asyncio-eventloop.html#callback-handles
而后说下call_later
(这个执行过程会按照时间排个前后顺序,而后再批次运行)
import asyncio
# 回调函数通常都是普通函数
def test(name):
print(name)
if __name__ == "__main__":
import time
start_time = time.time()
loop = asyncio.get_event_loop()
# 新版本限制了时间不能超过24h(防止有些人当定时任务来乱用)
# 这个执行过程会安装时间排个前后顺序,而后再批次运行
task4 = loop.call_later(4, test, "task2-4")
task2 = loop.call_later(2, test, "task2-2")
task3 = loop.call_later(3, test, "task2-3")
task1 = loop.call_later(1, test, "task2-1")
# 取消测试
task4.cancel()
# close是直接丢弃任务而后关闭loop
loop.call_later(4, loop.stop) # 等任务执行完成结束任务 loop.stop()
# run内部运行的是run_until_complete,而run_until_complete内部运行的是run_forever
loop.run_forever()
print(time.time() - start_time)
输出:(asyncio.get_running_loop()
不要和旧代码混用)
task2-1 task2-2 task2-3 4.009201526641846
PS:run
内部运行的是run_until_complete
,而run_until_complete
内部运行的是run_forever
从开始说新语法以后,咱们建立任务都直接用asyncio.create_task
来包裹一层,有人问我这个Task
除了是Future
的子类外,有啥用?为啥不直接使用Future
呢?貌似也没语法啊?
看一个案例:
import asyncio
# 不是协程就加个装饰器
@asyncio.coroutine
def test():
print("this is a test")
async def test_async():
print("this is a async test")
await asyncio.sleep(1)
async def main():
# 传入一个协程对象,返回一个task
task1 = asyncio.create_task(test())
task2 = asyncio.create_task(test_async())
await asyncio.gather(task1, task2)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
输出:
this is a test this is a async test 1.0070011615753174
咱们来看看asyncio.create_task
的源码:(关键在Task
类)
# 传入一个协程对象,返回一个Task对象
def create_task(self, coro):
self._check_closed()
if self._task_factory is None:
# look:核心点
task = tasks.Task(coro, loop=self)
if task._source_traceback:
del task._source_traceback[-1]
else:
task = self._task_factory(self, coro)
return task
看看核心类Task
:
class Task(futures._PyFuture):
def __init__(self, coro, *, loop=None):
super().__init__(loop=loop)
...
# 安排了一个尽快执行的回调方法:self.__step
self._loop.call_soon(self.__step, context=self._context)
def __step(self, exc=None):
try:
if exc is None:
# 协程初始化(生成器或者协程初始化 next(xxx))
result = coro.send(None)
else:
result = coro.throw(exc)
except StopIteration as exc:
if self._must_cancel:
# 在中止以前取消任务
self._must_cancel = False
super().set_exception(futures.CancelledError())
else:
# 拿到了协程/生成器的结果
super().set_result(exc.value)
except futures.CancelledError:
super().cancel() # I.e., Future.cancel(self).
except Exception as exc:
super().set_exception(exc)
except BaseException as exc:
super().set_exception(exc)
raise
...
PS:那么很明显了,Task
的做用就相似于future
和协程
的中间人了(屏蔽某些差别)
官方文档:https://docs.python.org/3/library/asyncio-stream.html
asyncio
实现了TCP、UDP、SSL
等协议,aiohttp
则是基于asyncio
实现的HTTP框架,咱们简单演示一下(PS:网络通讯基本上都是使用aiohttp
)
服务端:
import asyncio
async def handler(client_reader, client_writer):
# 没有数据就阻塞等(主线程作其余事情去了)
data = await client_reader.read(2048)
print(data.decode("utf-8"))
client_writer.write("骊山语罢清宵半,泪雨霖铃终不怨\n何如薄幸锦衣郎,比翼连枝当日愿".encode("utf-8"))
await client_writer.drain() # 等待缓冲区(缓冲区没占满就直接返回)
client_writer.close() # 关闭链接
async def main():
server = await asyncio.start_server(handler, "127.0.0.1", 8080)
print("Server已经启动,端口:8080")
# 实现了协程方法`__aenter__`和`__aexit__`的可使用`async with`
async with server:
# async def serve_forever(self):pass ==> use await
await server.serve_forever() # 异步方法
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
客户端:
import asyncio
async def main():
reader, writer = await asyncio.open_connection("127.0.0.1", 8080)
writer.write("人生若只如初见,何事秋风悲画扇\n等闲变却故人心,却道故人心易变".encode("utf-8"))
data = await reader.read(2048)
if data:
print(data.decode("utf-8"))
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
输出图示:
再举个HTTP的案例:
import asyncio
async def get_html(host):
print("get_html %s..." % host)
reader, writer = await asyncio.open_connection(host, 80)
writer.write(f"GET / HTTP/1.1\r\nHost: {host}\r\n\r\n".encode('utf-8'))
await writer.drain() # 等待缓冲区
html_list = []
async for line in reader:
html_list.append(line.decode("utf-8"))
writer.close() # 关闭链接
return "\n".join(html_list)
async def main():
tasks = [
asyncio.create_task(get_html(url))
for url in ['dotnetcrazy.cnblogs.com', 'dunitian.cnblogs.com']
]
html_list = await asyncio.gather(*tasks)
print(html_list)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
输出:
get_html dotnetcrazy.cnblogs.com... get_html dunitian.cnblogs.com... [html内容省略,html内容省略] 5.092018604278564
GIF过程图:
PS:(后面会继续说的)
__anext__
的可使用async for
__aenter__
和__aexit__
的可使用async with
还记得以前IO多路复用的时候本身写的非阻塞Server
不,简单梳理下流程,而后我们再一块儿看看asyncio
对应的源码:
Socket
为非阻塞(socket.setblocking(False)
)fd
(register
)socket
进行相应操做unregister
)看看await asyncio.open_connection(ip,port)
的源码:
# asyncio.streams.py
async def open_connection(host=None, port=None, *, loop=None, limit=_DEFAULT_LIMIT, **kwds):
if loop is None:
loop = events.get_event_loop()
reader = StreamReader(limit=limit, loop=loop)
protocol = StreamReaderProtocol(reader, loop=loop)
# 核心点
transport, _ = await loop.create_connection(lambda: protocol, host, port, **kwds)
writer = StreamWriter(transport, protocol, reader, loop)
return reader, writer
发现,其实内部核心在loop.create_connection
中
# asyncio.base_events.py
# 链接TCP服务器
class BaseEventLoop(events.AbstractEventLoop):
async def create_connection(self,
protocol_factory,
host=None,
port=None,
*,
ssl=None,
family=0,
proto=0,
flags=0,
sock=None,
local_addr=None,
server_hostname=None,
ssl_handshake_timeout=None):
...
# 主要逻辑
if host is not None or port is not None:
exceptions = []
# 主要逻辑
for family, type, proto, cname, address in infos:
try:
sock = socket.socket(family=family, type=type, proto=proto)
sock.setblocking(False) # 1.设置非阻塞 <<<< look
if local_addr is not None:
for _, _, _, _, laddr in laddr_infos:
try:
sock.bind(laddr) # 端口绑定
break
except OSError as exc:
msg = (f'error while attempting to bind on '
f'address {laddr!r}: '
f'{exc.strerror.lower()}')
exc = OSError(exc.errno, msg)
exceptions.append(exc)
else:
sock.close()
sock = None
continue
if self._debug:
logger.debug("connect %r to %r", sock, address)
# 在selector_events中
await self.sock_connect(sock, address) # <<< look
except OSError as exc:
if sock is not None:
sock.close()
exceptions.append(exc)
except:
if sock is not None:
sock.close()
raise
else:
break
发现源码中设置了socket为非阻塞,调用了sock_connect
async def sock_connect(self, sock, address):
"""链接远程socket地址(协程方法)"""
# 非阻塞检查
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
...
fut = self.create_future()
self._sock_connect(fut, sock, address)
return await fut
def _sock_connect(self, fut, sock, address):
fd = sock.fileno() # 获取socket的文件描述符 <<< look
try:
sock.connect(address)
except (BlockingIOError, InterruptedError):
# 设置future的回调函数_sock_connect_done(用来注销的)<<< look
fut.add_done_callback(functools.partial(self._sock_connect_done, fd))
# 注册selector.register
self.add_writer(fd, self._sock_connect_cb, fut, sock, address)
except Exception as exc:
fut.set_exception(exc)
else:
fut.set_result(None)
先看下sock_connect
中调用的add_writer
(注册)
def add_writer(self, fd, callback, *args):
"""添加一个写的回调"""
self._ensure_fd_no_transport(fd)
return self._add_writer(fd, callback, *args)
def _add_writer(self, fd, callback, *args):
self._check_closed()
handle = events.Handle(callback, args, self, None)
try:
key = self._selector.get_key(fd)
except KeyError:
self._selector.register(fd, selectors.EVENT_WRITE,
(None, handle)) # selector.register
else:
mask, (reader, writer) = key.events, key.data
self._selector.modify(fd, mask | selectors.EVENT_WRITE,
(reader, handle))
if writer is not None:
writer.cancel()
再看下sock_connect
中设置的回调函数_sock_connect_done
(注销)
def _sock_connect_done(self, fd, fut):
# 取消注册selector.unregister
self.remove_writer(fd)
def remove_writer(self, fd):
"""移除写的回调"""
self._ensure_fd_no_transport(fd)
return self._remove_writer(fd)
def _remove_writer(self, fd):
if self.is_closed():
return False
try:
key = self._selector.get_key(fd)
except KeyError:
return False
else:
mask, (reader, writer) = key.events, key.data
mask &= ~selectors.EVENT_WRITE
if not mask:
self._selector.unregister(fd) # 注销 <<< look
else:
self._selector.modify(fd, mask, (reader, None))
if writer is not None:
writer.cancel()
return True
else:
return False
PS:嵌套的很是深,并且底层代码一致在变(Python3.6到Python3.7这个新小更新就变化很大)
以前并发编程的基础知识已经讲的很清楚了,也分析了不少源码,你能够本身去拓展一下(Python3
的asyncio
模块的源码一直在优化改进的路上)我这边就不一一分析了(源码很乱,估计几个版本后会清晰,如今是多层混套用),你能够参考部分源码解析:https://github.com/lotapp/cpython3/tree/master/Lib/asyncio
课后拓展:
https://docs.python.org/3/library/asyncio-protocol.html#examples https://docs.python.org/3/library/asyncio-eventloop.html#creating-network-servers
下节预估:同步与通讯、aiohttp版爬虫
官方文档:
https://docs.python.org/3/library/asyncio-sync.html https://docs.python.org/3/library/asyncio-queue.html
写在前面:
asyncio
具备如下基本同步原语:Lock、Event、Condition、Semaphore、BoundedSemaphore
先看个原来的引导案例:估计的结果是0,而不借助lock得出的结果每每出乎意料
import concurrent.futures
num = 0
def test(i):
global num
for _ in range(10000000):
num += i
def main():
with concurrent.futures.ThreadPoolExecutor() as executor:
print("start submit...")
future1 = executor.submit(test, 1)
future2 = executor.submit(test, -1)
concurrent.futures.wait([future1, future2]) # wait some time
print("end submit...")
global num
print(num)
if __name__ == "__main__":
import time
start_time = time.time()
main()
print(f"time:{time.time()-start_time}")
输出:(可是代码并非线程安全的,因此结果每每不是咱们想要的)
start submit... end submit... 82705 time:5.032064199447632
再看看协程的案例:
import asyncio
num = 0
async def test(i):
global num
for _ in range(10000000):
num += i
async def main():
print("start tasks...")
task1 = asyncio.create_task(test(1))
task2 = asyncio.create_task(test(-1))
await asyncio.gather(task1, task2)
print("end tasks...")
global num
print(num)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(f"time:{time.time()-start_time}")
输出:(就一个线程,固然安全)
start tasks... end tasks... 0 time:4.860997438430786
PS:你使用协程的兼容代码,并不能解决线程不安全的问题
import asyncio import concurrent.futures num = 0 def test(i): global num for _ in range(10000000): num += i async def main(): # 获取当前loop loop = asyncio.get_running_loop() with concurrent.futures.ThreadPoolExecutor() as executor: print("start submit...") future1 = loop.run_in_executor(executor, test, 1) future2 = loop.run_in_executor(executor, test, -1) # await asyncio.wait([future1,future2]) await asyncio.gather(future1, future2) print("end submit...") global num print(num) if __name__ == "__main__": import time start_time = time.time() asyncio.run(main()) print(f"time:{time.time()-start_time}")
输出:
start submit... end submit... -1411610 time:5.0279998779296875
咋一看,单线程不用管线程安全啥的啊,要啥同步机制?其实在业务场景里面仍是会出现诸如重复请求的状况,这个时候就须要一个同步机制了:
import asyncio
# 用来存放页面缓存
cache_dict = {}
# 模拟一个获取html的过程
async def fetch(url):
# 每次网络访问,时间其实不肯定的
import random
time = random.randint(2, 5)
print(time)
await asyncio.sleep(time)
return f"<h2>{url}</h2>"
async def get_html(url):
# 若是缓存存在,则返回缓存的页面
for url in cache_dict:
return cache_dict[url]
# 不然获取页面源码并缓存
html = await fetch(url)
cache_dict[url] = html
return html
async def parse_js(url):
html = await get_html(url)
# do somthing
return len(html)
async def parse_html(url):
html = await get_html(url)
# do somthing
return html
async def main():
# 提交两个Task任务
task1 = asyncio.create_task(parse_js("www.baidu.com"))
task2 = asyncio.create_task(parse_html("www.baidu.com"))
# 等待任务结束
result_list = await asyncio.gather(task1, task2)
print(result_list)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
输出:(fetch
方法访问了两次 ==> 两次网络请求)
2 3 [22, '<h2>www.baidu.com</h2>'] 3.0100157260894775
简单说明:baidu.com
一开始没缓存,那当解析js和解析html的任务提交时,就会进行两次网络请求(网络IO比较耗时),这样更容易触发反爬虫机制
线程相关的Lock复习:http://www.noobyard.com/article/p-gqozplwe-d.html
协程是线程安全的,那么这个Lock
确定是和多线程/进程
里面的Lock
是不同的,咱们先看一下提炼版的源码:
class Lock(_ContextManagerMixin):
def __init__(self, *, loop=None):
self._waiters = collections.deque()
self._locked = False
if loop is not None:
self._loop = loop
else:
self._loop = events.get_event_loop()
async def acquire(self):
if not self._locked:
self._locked = True # 改变标识
...
return self._locked
def release(self):
if self._locked:
self._locked = False
...
PS:源码看完秒懂了,asyncio里面的lock其实就是一个标识而已
修改一下上面的例子:
import asyncio
# 用来存放页面缓存
cache_dict = {}
lock = None # 你能够试试在这边直接写`asyncio.Lock()`
# 模拟一个获取html的过程
async def fetch(url):
# 每次网络访问,时间其实不肯定的
import random
time = random.randint(2, 5)
print(time)
await asyncio.sleep(time)
return f"<h2>{url}</h2>"
async def get_html(url):
async with lock:
# 若是缓存存在,则返回缓存的页面
for url in cache_dict:
return cache_dict[url]
# 不然获取页面源码并缓存
html = await fetch(url)
cache_dict[url] = html
return html
async def parse_js(url):
html = await get_html(url)
# do somthing
return len(html)
async def parse_html(url):
html = await get_html(url)
# do somthing
return html
async def main():
global lock
lock = asyncio.Lock() # 若是在开头就定义,那么lock的loop和方法的loop就会不一致了
# 提交两个Task任务
task1 = asyncio.create_task(parse_js("www.baidu.com"))
task2 = asyncio.create_task(parse_html("www.baidu.com"))
# 等待任务结束
result_list = await asyncio.gather(task1, task2)
print(result_list)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
输出:(fetch
方法访问了1次 ==> 1次网络请求)
3 [22, '<h2>www.baidu.com</h2>'] 3.0020127296447754
线程篇Semaphore
:http://www.noobyard.com/article/p-gqozplwe-d.html
这个用的比较多,简单回顾下以前讲的概念案例:
通俗讲就是:在互斥锁的基础上封装了下,实现必定程度的并行
举个例子,之前使用互斥锁的时候:(厕所就一个坑位,必须等里面的人出来才能让另外一我的上厕所)
使用信号量Semaphore
以后:厕所坑位增长到5个(本身指定),这样能够5我的一块儿上厕所了==> 实现了必定程度的并发控制
先看下缩略的源码:(能够这么想:内部维护了一个引用计数,每次来个任务就-1,一个任务结束计数就+1)
class Semaphore(_ContextManagerMixin):
def __init__(self, value=1, *, loop=None):
if value < 0:
raise ValueError("Semaphore initial value must be >= 0")
self._value = value
self._waiters = collections.deque()
if loop is not None:
self._loop = loop
else:
self._loop = events.get_event_loop()
async def acquire(self):
while self._value <= 0:
fut = self._loop.create_future()
self._waiters.append(fut) # 把当前任务放入Queue中
try:
await fut # 等待一个任务的完成再继续
except:
fut.cancel() # 任务取消
if self._value > 0 and not fut.cancelled():
self._wake_up_next() # 唤醒下一个任务
raise
self._value -= 1 # 用掉一个并发量
return True
def release(self):
self._value += 1 # 恢复一个并发量
self._wake_up_next() # 唤醒下一个任务
如今举个常见的场景:好比调用某个免费的api,该api限制并发数为5
import asyncio
sem = None
# 模拟api请求
async def api_test(i):
async with sem:
await asyncio.sleep(1)
print(f"The Task {i} is done")
async def main():
global sem
sem = asyncio.Semaphore(5) # 设置并发数为5
tasks = [asyncio.create_task(api_test(i)) for i in range(20)]
await asyncio.gather(*tasks)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
动态输出:
PS:BoundedSemaphore
是Semaphore
的一个版本,在调用release()
时检查计数器的值是否超过了计数器的初始值,若是超过了就抛出一个异常
线程篇Event
:http://www.noobyard.com/article/p-gqozplwe-d.html
以前讲的很详细了,举个爬虫批量更新
的例子就一笔带过:
import asyncio
event = None
html_dict = {}
async def updates():
# event.wait()是协程方法,须要await
await event.wait()
# 入库操做省略 html_dict >> DB
return "html_dict >> DB done"
async def get_html(url):
# 摸拟网络请求
await asyncio.sleep(2)
html_dict[url] = f"<h1>{url}</h1>" # 能够暂时写入临时文件中
event.set() # 标记完成,普通方法
return f"{url} done"
async def main():
global event
event = asyncio.Event() # 初始化 event 对象
# 建立批量任务
tasks = [
asyncio.create_task(get_html(f"www.mmd.com/a/{i}"))
for i in range(1, 10)
]
# 批量更新操做
tasks.append(asyncio.create_task(updates()))
result = await asyncio.gather(*tasks)
print(result)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
输出:
['www.mmd.com/a/1 done', 'www.mmd.com/a/2 done', 'www.mmd.com/a/3 done', 'www.mmd.com/a/4 done', 'www.mmd.com/a/5 done', 'www.mmd.com/a/6 done', 'www.mmd.com/a/7 done', 'www.mmd.com/a/8 done', 'www.mmd.com/a/9 done', 'html_dict >> DB done'] 2.0012683868408203
跟以前基本上同样,就一个地方不太同样:async def wait(self)
,wait
方法如今是协程方法了,使用的时候须要await
coroutine wait()
True
True
。不然,一直阻塞,直到另外的任务调用set()
set()
True
clear()
False
)set()
方法被再次调用is_set()
True
,则返回True
线程篇Condition
:http://www.noobyard.com/article/p-gqozplwe-d.html
先简单看看方法列表:
coroutine acquire()
:
notify(n=1)
:
acquire()
获取锁,并在调用该方法以后释放锁。RuntimeError
异常。locked()
:
notify_all()
:
release()
:
coroutine wait()
:
coroutine wait_for(predicate)
predicate
变为True。predicate
必须可调用,它的执行结果会被解释为布尔值,并做为最终结果返回。PS:Condition
结合了Event
和Lock
的功能(也可使多个Condition对象共享一个Lock,容许不一样任务之间协调对共享资源的独占访问)
看个生产消费者的案例:
import asyncio
cond = None
p_list = []
# 生产者
async def producer(n):
for i in range(5):
async with cond:
p_list.append(f"{n}-{i}")
print(f"[生产者{n}]生产商品{n}-{i}")
# 通知任意一个消费者
cond.notify() # 通知所有消费者:cond.notify_all()
# 摸拟一个耗时操做
await asyncio.sleep(0.01)
# 消费者
async def consumer(i):
while True:
async with cond:
if p_list:
print(f"列表商品:{p_list}")
name = p_list.pop() # 消费商品
print(f"[消费者{i}]消费商品{name}")
print(f"列表剩余:{p_list}")
# 摸拟一个耗时操做
await asyncio.sleep(0.01)
else:
await cond.wait()
async def main():
global cond
cond = asyncio.Condition() # 初始化condition
p_tasks = [asyncio.create_task(producer(i)) for i in range(2)] # 两个生产者
c_tasks = [asyncio.create_task(consumer(i)) for i in range(5)] # 五个消费者
await asyncio.gather(*p_tasks, *c_tasks)
if __name__ == "__main__":
asyncio.run(main())
输出:
[生产者0]生产商品0-0 [生产者1]生产商品1-0 列表商品:['0-0', '1-0'] [消费者0]消费商品1-0 列表剩余:['0-0'] 列表商品:['0-0'] [消费者1]消费商品0-0 列表剩余:[] [生产者0]生产商品0-1 [生产者1]生产商品1-1 列表商品:['0-1', '1-1'] [消费者0]消费商品1-1 列表剩余:['0-1'] 列表商品:['0-1'] [消费者1]消费商品0-1 列表剩余:[] [生产者0]生产商品0-2 [生产者1]生产商品1-2 列表商品:['0-2', '1-2'] [消费者0]消费商品1-2 列表剩余:['0-2'] 列表商品:['0-2'] [消费者1]消费商品0-2 列表剩余:[] [生产者0]生产商品0-3 [生产者1]生产商品1-3 列表商品:['0-3', '1-3'] [消费者0]消费商品1-3 列表剩余:['0-3'] 列表商品:['0-3'] [消费者1]消费商品0-3 列表剩余:[] [生产者0]生产商品0-4 [生产者1]生产商品1-4 列表商品:['0-4', '1-4'] [消费者0]消费商品1-4 列表剩余:['0-4'] 列表商品:['0-4'] [消费者1]消费商品0-4 列表剩余:[]
PS:第七条的简单说明:(来看看wait_for
方法的源码)
# 一直等到函数返回true(从返回结果来讲:要么一直阻塞,要么返回true)
async def wait_for(self, predicate):
result = predicate()
# 若是不是返回true就继续等待
while not result:
await self.wait()
result = predicate()
return result
课后拓展:async_timeout
(兼容async的超时的上下文管理器) https://github.com/lotapp/BaseCode/blob/master/python/5.concurrent/ZCoroutine/async_timeout_timeout.py
官方文档:https://docs.python.org/3/library/asyncio-queue.html
线程篇Queue
:http://www.noobyard.com/article/p-gqozplwe-d.html
其实你不考虑限流的状况下,协程里面的queue和list基本上差很少(ps:asyncio.Queue(num)
能够指定数量)
举个经典的生产消费者案例:
import random
import asyncio
async def producer(q, i):
for i in range(5):
num = random.random()
await q.put(num)
print(f"[生产者{i}]商品{num}出厂了")
await asyncio.sleep(num)
async def consumer(q, i):
while True:
data = await q.get()
print(f"[消费者{i}]商品{data}抢光了")
async def main():
queue = asyncio.Queue(10) # 为了演示,我这边限制一下
p_tasks = [asyncio.create_task(producer(queue, i)) for i in range(2)] # 两个生产者
c_tasks = [asyncio.create_task(consumer(queue, i)) for i in range(5)] # 五个消费者
await asyncio.gather(*p_tasks, *c_tasks)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
输出:(注意一下get
和put
方法都是协程方法便可)
[生产者0]商品0.20252203397767787出厂了 [生产者0]商品0.9641503458079388出厂了 [消费者0]商品0.20252203397767787抢光了 [消费者0]商品0.9641503458079388抢光了 [生产者1]商品0.8049655468032324出厂了 [消费者0]商品0.8049655468032324抢光了 [生产者1]商品0.6032743557097342出厂了 [消费者1]商品0.6032743557097342抢光了 [生产者2]商品0.08818326334746773出厂了 [消费者2]商品0.08818326334746773抢光了 [生产者3]商品0.3747289313977561出厂了 [消费者3]商品0.3747289313977561抢光了 [生产者4]商品0.3948823110071299出厂了 [消费者4]商品0.3948823110071299抢光了 [生产者2]商品0.5775767044660681出厂了 [消费者0]商品0.5775767044660681抢光了 [生产者3]商品0.500537752889471出厂了 [消费者1]商品0.500537752889471抢光了 [生产者4]商品0.9921528527523727出厂了 [消费者2]商品0.9921528527523727抢光了
PS:协程也提供了Priority Queue
优先级队列 and LifoQueue
后进先出队列,这边就再也不啰嗦了(前面咱们画图演示并手动实现过)
课后拓展:https://docs.python.org/3/library/asyncio-queue.html#examples
官方文档:https://docs.python.org/3/library/asyncio-subprocess.html
这个以前进程篇的时候说过,不是咱们今天的重点,我贴一个官方demo:
import asyncio
async def run(cmd):
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
print(f'[{cmd!r} exited with {proc.returncode}]')
if stdout:
print(f'[stdout]\n{stdout.decode()}')
if stderr:
print(f'[stderr]\n{stderr.decode()}')
asyncio.run(run('ls /zzz'))
输出:
['ls /zzz' exited with 1] [stderr] ls: /zzz: No such file or directory
下节预告:asyncio
+aiohttp
版爬虫
代码:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/ZCoroutine/z_spider
asyncio
库只有TCP
和UDP
服务,并不支持HTTP
,aiohttp
就能够理解为是基于asyncio
的http
服务
先来个获取页面html的demo:
import asyncio
import aiohttp
error_urls = set()
# 获取页面html
async def fetch(session, url):
async with session.get(url) as response:
if response.status == 200:
return await response.text()
else:
error_urls.add(url) # 添加到待处理集合中
async def main():
async with aiohttp.ClientSession() as session:
html = await fetch(session, "http://www.biquge.cm/12/12097/")
if html: # 获取到html
print(len(html))
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
输出:
24287 0.5429983139038086
推荐一款轻量级网页解析库:pyquery
(一个相似jquery的python库)
在上面基础上简单提取:(pq.items("dd a")
==> 类比JQ选择器)
import asyncio
import aiohttp
from pyquery import PyQuery
error_urls = set()
# 获取页面html
async def fetch(session, url):
async with session.get(url) as response:
if response.status == 200:
return await response.text()
else:
error_urls.add(url) # 待处理的url集合
# 阻塞方法
def saves(results):
with open("www.biquge.cm.txt", "a+", encoding="utf-8") as fs:
fs.writelines(results)
print("ok")
async def main():
async with aiohttp.ClientSession() as session:
html = await fetch(session, "http://www.biquge.cm/12/12097/")
pq = PyQuery(html)
results = [
item.text() + ":" + item.attr("href") + "\n"
for item in pq.items("dd a")
]
# print(pq("dd a").text())
# 兼容阻塞旧代码
await asyncio.get_running_loop().run_in_executor(None, saves, results)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
输出:www.biquge.cm.txt
新书的一些话:/12/12097/7563947.html 第一章论坛里的鬼故事。:/12/12097/7563949.html 第二章临时讲课:/12/12097/7563950.html 第三章鬼域。:/12/12097/7563951.html 第四章恐怖敲门鬼:/12/12097/7565568.html 第五章迷路:/12/12097/7565569.html 第六章厕所中的手:/12/12097/7565570.html 第七章身后的脚步:/12/12097/7565571.html 第八章奇怪的树:/12/12097/7565572.html 第九章鬼婴:/12/12097/7565573.html 第十章恶鬼之力:/12/12097/7565574.html ... 第三百二十七章三口箱子:/12/12097/7950281.html 第三百二十八章鬼橱里的照片:/12/12097/7952145.html 第三百二十九章中山市事件:/12/12097/7955244.html 第三百三十章两条信息:/12/12097/7956401.html 第三百三十一章进入中山市:/12/12097/7959077.html 第三百三十二章出乎意料:/12/12097/7962119.html 第三百三十四章酒店的二楼:/12/12097/7964192.html 第三百三十五章黑色的烛火:/12/12097/7969058.html 第三百三十六章微笑的尸体:/12/12097/7973826.html
获取一个详情页看看:
import asyncio
import aiohttp
from pyquery import PyQuery
error_urls = set()
# 获取页面html
async def fetch(session, url):
async with session.get(url) as response:
if response.status == 200:
return await response.text()
else:
error_urls.add(url) # 待处理的url集合
# 详情页获取测试
async def main():
async with aiohttp.ClientSession() as session:
html = await fetch(session,
"http://www.biquge.cm//12/12097/7563949.html")
pq = PyQuery(html)
print(pq("#content").text())
# results = [item.text() for item in pq.items("#content")]
# print(results)
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
输出:
老夫掐指一算,你如今正在床上看小说,并且仍是侧身,搞很差手机还在充电。 正在读高三的杨间此刻正躺在被窝里无聊的翻看着手机,他随手点开了一个帖子,下面有很多网友在回帖。 “卧槽,楼主真乃神人也,这都被楼主猜中了。” “呵,你会告诉大家我如今正在厕所蹲坑么?不用问了,脚麻了。” ...... 0.6684205532073975
PS:Win下Py包安装出错就去这个网站下对应包 https://www.lfd.uci.edu/~gohlke/pythonlibs/
限流以及反爬虫和如何应对反爬虫机制,后面咱们会继续说,这边简单举个小说离线的例子:
import asyncio
import aiohttp
from pyquery import PyQuery
sem = None
error_urls = set()
# 获取html
async def fetch(session, url):
async with sem:
async with session.get(url) as response:
if response.status == 200:
# aiohttp遇到非法字符的处理
return await response.text("gbk", "ignore") # 忽略非法字符
else:
error_urls.add(url) # 待处理的url集合
# 获取文章正文
async def get_text(session, url):
# 把相对路径改为域名+路径
if not url.startswith("http://www.biquge.cm"):
url = "http://www.biquge.cm" + url
html = await fetch(session, url)
pq = PyQuery(html)
return pq("#content").text()
# 普通阻塞方法
def save(title, text):
with open("恐怖复苏.md", "a+", encoding="gbk") as fs:
fs.write(f"## {title}\n\n{text}\n\n")
print(f"{title} done...")
async def main():
global sem
sem = asyncio.Semaphore(3) # 控制并发数反而更快
loop = asyncio.get_running_loop()
async with aiohttp.ClientSession() as session:
html = await fetch(session, "http://www.biquge.cm/12/12097/")
pq = PyQuery(html)
for item in pq.items("dd a"):
title = item.text()
text = await get_text(session, item.attr("href"))
# 兼容阻塞旧代码
await loop.run_in_executor(None, save, title, text)
print("task over")
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
print(time.time() - start_time)
输出:(爬取整站就不用我说了吧:提取a标签中的src,url去重后爬取内容
)
新书的一些话 done... 第一章论坛里的鬼故事。 done... 第二章临时讲课 done... 第三章鬼域。 done... 第四章恐怖敲门鬼 done... 第五章迷路 done... 第六章厕所中的手 done... 第七章身后的脚步 done... 第八章奇怪的树 done... 第九章鬼婴 done... 第十章恶鬼之力 done... 第十一章逐渐复苏 done... 第十二章宛如智障 done... 第十三章羊皮纸 done... 第十四章诡异的纸 done... ...... 第三百二十八章鬼橱里的照片 done... 第三百二十九章中山市事件 done... 第三百三十章两条信息 done... 第三百三十一章进入中山市 done... 第三百三十二章出乎意料 done... 第三百三十四章酒店的二楼 done... 第三百三十五章黑色的烛火 done... 第三百三十六章微笑的尸体 done... task over
动态展现:
【推荐】Python高性能异步框架:https://github.com/LessChina/sanic
逆天点评:(只看主线,只说个人见识)
Django
(方便)Tornado
(IO多路复用)来代替Flask
(简单)Japronto
,瞬间惊艳和吊打的全部开发语言,可是只是冒了泡就不怎么维护了asyncio
andaiohttp
(Node兄弟这么优秀,凭啥咱们不行)asyncio
替代品uvloop
(C实现的程度比官方多(谁多谁高效),PS:官方用法太丑陋了3.7才给足了语法糖)sanic
(语法和Flask
很像,性能不亚于Japronto
)vibora
(都是C实现)有超过sanic
的趋势(PS:等过几个版本再试水,不过如今不少开发者都是Go + Python
了)最后BB一句:
gevent
用猴子补丁的确很方便,但不少内部异常就被屏蔽了,并且性能如今不是最高tornado
为了兼容py2
和py3
,内部仍是经过生成器来实现异步的,效率相对低点asyncio
是将来的主流方向,sanic
是目前最火的异步框架(vibora
还在观察中)PS:Django
、Flask
是阻塞式IO,web框架通常不会直接部署(它自带的解决方案只是方便调试),通常使用uwsgi
or gunicorn
+ nginx
来部署(tornado能够直接部署)
参考连接:
python异步编程之asyncio https://www.cnblogs.com/shenh/p/9090586.html uWSGI, Gunicorn, 啥玩意儿? https://www.cnblogs.com/gdkl/p/6807667.html asyncio异步IO中文翻译: http://www.cnblogs.com/mamingqian/p/10008279.html https://www.cnblogs.com/mamingqian/p/10075444.html https://www.cnblogs.com/mamingqian/p/10044730.html PyQuery基础: https://www.cnblogs.com/zhaof/p/6935473.html https://www.cnblogs.com/lei0213/p/7676254.html