多线程与多进程

 

前言:html

操做系统,位于底层硬件与应用软件之间的一层
工做方式:向下管理硬件,向上提供接口python

多道技术补充react

1.进程

考虑一个场景:浏览器,网易云音乐以及notepad++ 三个软件只能顺序执行是怎样一种场景呢?另外,假若有两个程序A和B,程序A在执行到一半的过程当中,须要读取大量的数据输入(I/O操做),而此时CPU只能静静地等待任务A读取完数据才能继续执行,这样就白白浪费了CPU资源。你是否是已经想到在程序A读取数据的过程当中,让程序B去执行,当程序A读取完数据以后,让程序B暂停。聪明,这固然没问题,但这里有一个关键词:切换。linux

既然是切换,那么这就涉及到了状态的保存,状态的恢复,加上程序A与程序B所须要的系统资源(内存,硬盘,键盘等等)是不同的。天然而然的就须要有一个东西去记录程序A和程序B分别须要什么资源,怎样去识别程序A和程序B等等(好比读书)。git

进程定义:程序员

进程就是一个程序在一个数据集上的一次动态执行过程。进程通常由程序、数据集、进程控制块三部分组成。咱们编写的程序用来描述进程要完成哪些功能以及如何完成;数据集则是程序在执行过程当中所须要使用的资源;进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统能够利用它来控制和管理进程,它是系统感知进程存在的惟一标志。github

举一例说明进程:
想象一位有一手好厨艺的计算机科学家正在为他的女儿烘制生日蛋糕。他有作生日蛋糕的食谱,厨房里有所需的原料:面粉、鸡蛋、糖、香草汁等。在这个比喻中,作蛋糕的食谱就是程序(即用适当形式描述的算法)计算机科学家就是处理器(cpu),而作蛋糕的各类原料就是输入数据。进程就是厨师阅读食谱、取来各类原料以及烘制蛋糕等一系列动做的总和。如今假设计算机科学家的儿子哭着跑了进来,说他的头被一只蜜蜂蛰了。计算机科学家就记录下他照着食谱作到哪儿了(保存进程的当前状态),而后拿出一本急救手册,按照其中的指示处理蛰伤。这里,咱们看处处理机从一个进程(作蛋糕)切换到另外一个高优先级的进程(实施医疗救治),每一个进程拥有各自的程序(食谱和急救手册)。当蜜蜂蛰伤处理完以后,这位计算机科学家又回来作蛋糕,从他
离开时的那一步继续作下去。web

注:redis

进程之间是相互独立得。算法

操做系统进程切换:一、出现IO操做。二、固定时间

2.线程

线程的出现是为了下降上下文切换的消耗,提升系统的并发性,并突破一个进程只能干同样事的缺陷,使到进程内并发成为可能。

假设,一个文本程序,须要接受键盘输入,将内容显示在屏幕上,还须要保存信息到硬盘中。若只有一个进程,势必形成同一时间只能干同样事的尴尬(当保存时,就不能经过键盘输入内容)。如有多个进程,每一个进程负责一个任务,进程A负责接收键盘输入的任务,进程B负责将内容显示在屏幕上的任务,进程C负责保存内容到硬盘中的任务。这里进程A,B,C间的协做涉及到了进程通讯问题,并且有共同都须要拥有的东西——-文本内容,不停的切换形成性能上的损失。如有一种机制,可使任务A,B,C共享资源,这样上下文切换所须要保存和恢复的内容就少了,同时又能够减小通讯所带来的性能损耗,那就行了。是的,这种机制就是线程。
线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程当中的最小单元,由线程ID、程序计数器、寄存器集合和堆栈共同组成。线程的引入减少了程序并发执行时的开销,提升了操做系统的并发性能。线程没有本身的系统资源。

注:一、进程是最小的资源管理单位(盛放线程的容器)。二、线程是最小执行单位。

3.进程与线程的关系

进程是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操做系统结构的基础。或者说进程是具备必定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位。
线程则是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。

              

 

4.进程线程归纳

(1)一个线程只能属于一个进程,而一个进程能够有多个线程,但至少有一个线程。
(2)资源分配给进程,同一进程的全部线程共享该进程的全部资源。
(3)CPU分给线程,即真正在CPU上运行的是线程。

注:

CPython的多线程:因为GIL,致使同一时刻,同一进程只能有一个线程执行。

进程占用的是独立的内存地址。

5.并行和并发

并行处理(Parallel Processing)是计算机系统中能同时执行两个或更多个处理的一种计算方法。并行处理可同时工做于同一程序的不一样方面。并行处理的主要目的是节省大型和复杂问题的解决时间。并发处理(concurrency Processing):指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机(CPU)上运行,但任一个时刻点上只有一个程序在处理机(CPU)上运行

并发的关键是你有处理多个任务的能力,不必定要同时。并行的关键是你有同时处理多个任务的能力。因此说,并行是并发的子集

             

注:

并行:在CPython里,由于有GIL锁,同一进程里,线程没有并行现象。可是不一样进程之间的线程能够实现并行。

6.同步与异步

在计算机领域,同步就是指一个进程在执行某个请求的时候,若该请求须要一段时间才能返回信息,那么这个进程将会一直等待下去,直到收到返回信息才继续执行下去;异步是指进程不须要一直等下去,而是继续执行下面的操做,无论其余进程的状态。当有消息返回时系统会通知进程进行处理,这样能够提升执行的效率。举个例子,打电话时就是同步通讯,发短息时就是异步通讯。

7.threading模块

 线程对象的建立:

Thread类直接建立:

import time

def tingge():
    print("听歌")
    time.sleep(3)
    print('听歌结束')

def xieboke():
    print("写博客")
    time.sleep(5)
    print("写博客结束")
    print(time.time()-s)
s=time.time()
tingge()
xieboke()
原始
import threading
import time

def tingge():
    print("听歌")
    time.sleep(3)
    print('听歌结束')

def xieboke():
    print("写博客")
    time.sleep(5)
    print("写博客结束")
    print(time.time()-s)
s=time.time()
t1=threading.Thread(target=tingge)
t2=threading.Thread(target=xieboke)

t1.start()
t2.start()
直接建立Thread类

                 

Thread类继承式建立:

import time
import threading

class MyThread(threading.Thread):
    def __init__(self,num):
        threading.Thread.__init__(self)
        self.num=num
    def run(self):
        print("running on number:%s" %self.num)
        time.sleep(3)

t1=MyThread(56)
t2=MyThread(78)

t1.start()
t2.start()
print("ending")
继承式建立Thread类

Thread类的实例方法:

join()和setDaemon():

# join():在子线程完成运行以前,这个子线程的父线程将一直被阻塞。

# setDaemon(True):
        '''
         将线程声明为守护线程,必须在start() 方法调用以前设置,若是不设置为守护线程程序会被无限挂起。

         当咱们在程序运行中,执行一个主线程,若是主线程又建立一个子线程,主线程和子线程 就分兵两路,分别运行,那么当主线程完成

         想退出时,会检验子线程是否完成。若是子线程未完成,则主线程会等待子线程完成后再退出。可是有时候咱们须要的是只要主线程

         完成了,无论子线程是否完成,都要和主线程一块儿退出,这时就能够 用setDaemon方法啦'''


import threading
from time import ctime,sleep
import time

def Music(name):

        print ("Begin listening to {name}. {time}".format(name=name,time=ctime()))
        sleep(3)
        print("end listening {time}".format(time=ctime()))

def Blog(title):

        print ("Begin recording the {title}. {time}".format(title=title,time=ctime()))
        sleep(5)
        print('end recording {time}'.format(time=ctime()))


threads = []


t1 = threading.Thread(target=Music,args=('FILL ME',))
t2 = threading.Thread(target=Blog,args=('',))

threads.append(t1)
threads.append(t2)

if __name__ == '__main__':

    #t2.setDaemon(True)

    for t in threads:

        #t.setDaemon(True) #注意:必定在start以前设置
        t.start()

        #t.join()

    #t1.join()
    #t2.join()    #  考虑这三种join位置下的结果?

    print ("all over %s" %ctime())

注意:关于setdaemon:程序直到不存在非守护线程时退出!

其余方法:

Thread实例对象的方法
  # isAlive(): 返回线程是否活动的。
  # getName(): 返回线程名。
  # setName(): 设置线程名。

threading模块提供的一些方法:
  # threading.currentThread(): 返回当前的线程变量。
  # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
  # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
import threading
from time import ctime,sleep
import time
def Music(name):
        print ("Begin listening to {name}. {time}".format(name=name,time=ctime()))
        sleep(3)
        print(threading.current_thread())
        print(threading.active_count())
        print(threading.enumerate())
        print("end listening {time}".format(time=ctime()))
def Blog(title):
        print ("Begin recording the {title}. {time}".format(title=title,time=ctime()))
        sleep(5)
        print('end recording {time}'.format(time=ctime()))
threads = []
t1 = threading.Thread(target=Music,args=('FILL ME',),name="sub_thread")
t2 = threading.Thread(target=Blog,args=('',))
threads.append(t1)
threads.append(t2)
if __name__ == '__main__':
    #t2.setDaemon(True)
    for t in threads:
        #t.setDaemon(True) #注意:必定在start以前设置
        t.start()
        #t.join()
    #t1.join()
    #t2.join()    #  考虑这三种join位置下的结果?
    print ("all over %s" %ctime())

#输出结果
# Begin listening to FILL ME. Tue May  9 14:51:48 2017
# Begin recording the . Tue May  9 14:51:48 2017
# all over Tue May  9 14:51:48 2017
# <Thread(sub_thread, started 224)>
# 3
# [<_MainThread(MainThread, stopped 5728)>, <Thread(sub_thread, started 224)>, <Thread(Thread-1, started 644)>]
# end listening Tue May  9 14:51:51 2017
# end recording Tue May  9 14:51:53 2017
练习

8.GIL(全局解释器锁)

'''

定义:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple 
native threads from executing Python bytecodes at once. This lock is necessary mainly 
because CPython’s memory management is not thread-safe. (However, since the GIL 
exists, other features have grown to depend on the guarantees that it enforces.)

'''

Python中的线程是操做系统的原生线程,Python虚拟机使用一个全局解释器锁(Global Interpreter Lock)来互斥线程对Python虚拟机的使用。为了支持多线程机制,一个基本的要求就是须要实现不一样线程对共享资源访问的互斥,因此引入了GIL。
GIL:在一个线程拥有了解释器的访问权以后,其余的全部线程都必须等待它释放解释器的访问权,即便这些线程的下一条指令并不会互相影响。
在调用任何Python C API以前,要先得到GIL
GIL缺点:多处理器退化为单处理器;优势:避免大量的加锁解锁操做

GIL(全局解释器锁):
加在cpython解释器上;

计算密集型: 一直在使用CPU
IO密集型:存在大量IO操做

 

总结:

对于计算密集型任务:Python的多线程并无用
对于IO密集型任务:Python的多线程是有意义的

python使用多核:开进程,弊端:开销大并且切换复杂
着重点:协程+多进程
方向:IO多路复用
终极思路:换C模块实现多线程

 

GIL的早期设计:

Python支持多线程,而解决多线程之间数据完整性和状态同步的最简单方法天然就是加锁。 因而有了GIL这把超级大锁,而当愈来愈多的代码库开发者接受了这种设定后,他们开始大量依赖这种特性(即默认python内部对象是thread-safe的,无需在实现时考虑额外的内存锁和同步操做)。慢慢的这种实现方式被发现是蛋疼且低效的。但当你们试图去拆分和去除GIL的时候,发现大量库代码开发者已经重度依赖GIL而很是难以去除了。有多难?作个类比,像MySQL这样的“小项目”为了把Buffer Pool Mutex这把大锁拆分红各个小锁也花了从5.5到5.6再到5.7多个大版为期近5年的时间,而且仍在继续。MySQL这个背后有公司支持且有固定开发团队的产品走的如此艰难,那又更况且Python这样核心开发和代码贡献者高度社区化的团队呢?

GIL的影响:

不管你启多少个线程,你有多少个cpu, Python在执行一个进程的时候会淡定的在同一时刻只容许一个线程运行。
因此,python是没法利用多核CPU实现多线程的。
这样,python对于计算密集型的任务开多线程的效率甚至不如串行(没有大量切换),可是,对于IO密集型的任务效率仍是有显著提高的。

               

Python的多线程: 因为GIL,致使同一时刻,同一进程只能有一个线程被运行。

计算密集型:

#coding:utf8
from threading import Thread
import time

def counter():
    i = 0
    for _ in range(50000000):
        i = i + 1

    return True


def main():

    l=[]
    start_time = time.time()

    for i in range(2):

        t = Thread(target=counter)
        t.start()
        l.append(t)
        t.join()

    # for t in l:
    #     t.join()

    end_time = time.time()
    print("Total time: {}".format(end_time - start_time))

if __name__ == '__main__':
    main()


'''
py2.7:
     串行:25.4523348808s
     并发:31.4084379673s
py3.5:
     串行:8.62115597724914s
     并发:8.99609899520874s

'''
View Code

 解决方案:

用multiprocessing替代Thread multiprocessing库的出现很大程度上是为了弥补thread库由于GIL而低效的缺陷。它完整的复制了一套thread所提供的接口方便迁移。惟一的不一样就是它使用了多进程而不是多线程。每一个进程有本身的独立的GIL,所以也不会出现进程之间的GIL争抢。

#coding:utf8
from multiprocessing import Process
import time

def counter():
    i = 0
    for _ in range(40000000):
        i = i + 1

    return True

def main():

    l=[]
    start_time = time.time()

    for _ in range(2):
        t=Process(target=counter)
        t.start()
        l.append(t)
        #t.join()

    for t in l:
       t.join()

    end_time = time.time()
    print("Total time: {}".format(end_time - start_time))

if __name__ == '__main__':
    main()


'''

py2.7:
     串行:6.1565990448 s
     并行:3.1639978885 s

py3.5:
     串行:6.556925058364868 s
     并发:3.5378448963165283 s

'''
View Code

固然multiprocessing也不是万能良药。它的引入会增长程序实现时线程间数据通信和同步的困难。就拿计数器来举例子,若是咱们要多个线程累加同一个变量,对于thread来讲,申明一个global变量,用thread.Lock的context包裹住三行就搞定了。而multiprocessing因为进程之间没法看到对方的数据,只能经过在主线程申明一个Queue,put再get或者用share memory的方法。这个额外的实现成本使得原本就很是痛苦的多线程程序编码,变得更加痛苦了。

总结:由于GIL的存在,只有IO Bound场景下得多线程会获得较好的性能 - 若是对并行计算性能较高的程序能够考虑把核心部分也成C模块,或者索性用其余语言实现 - GIL在较长一段时间内将会继续存在,可是会不断对其进行改进。

因此对于GIL,既然不能反抗,那就学会去享受它吧!

同步锁:

同步锁也叫互斥锁。

import time
import threading

def addNum():
    global num #在每一个线程中都获取这个全局变量
    #num-=1

    temp=num
    time.sleep(0.1)
    num =temp-1  # 对此公共变量进行-1操做

num = 100  #设定一个共享变量

thread_list = []

for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待全部线程执行完毕
    t.join()

print('Result: ', num)

锁一般被用来实现对共享资源的同步访问。为每个共享资源建立一个Lock对象,当你须要访问该资源时,调用acquire方法来获取锁对象(若是其它线程已经得到了该锁,则当前线程需等待其被释放),待资源访问完后,再调用release方法释放锁:

import threading

R=threading.Lock()

R.acquire()
'''
对公共数据的操做
'''
R.release()
import time
import threading

def addNum():
    global num #在每一个线程中都获取这个全局变量
    # num-=1
    print("ok")
    lock.acquire()
    temp=num
    time.sleep(0.1)
    num =temp-1  # 对此公共变量进行-1操做
    lock.release()
num = 100  #设定一个共享变量
thread_list = []
lock=threading.Lock()
for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)
for t in thread_list: #等待全部线程执行完毕
    t.join()

print('Result: ', num)
#串行
练习

一共有两把锁,一个是解释器级别的,一个是用户级别的。

扩展思考

'''
一、为何有了GIL,还须要线程同步?

多线程环境下必须存在资源的竞争,那么如何才能保证同一时刻只有一个线程对共享资源进行存取?

加锁, 对, 加锁能够保证存取操做的惟一性, 从而保证同一时刻只有一个线程对共享数据存取.

一般加锁也有2种不一样的粒度的锁:

    coarse-grained(粗粒度): python解释器层面维护着一个全局的锁机制,用来保证线程安全。
                            内核级经过GIL实现的互斥保护了内核的共享资源。

    fine-grained(细粒度):   那么程序员须要自行地加,解锁来保证线程安全,
                            用户级经过自行加锁保护的用户程序的共享资源。

 二、GIL为何限定在一个进程上?
 
 你写一个py程序,运行起来自己就是一个进程,这个进程是有解释器来翻译的,因此GIL限定在当前进程;
 若是又建立了一个子进程,那么两个进程是彻底独立的,这个字进程也是有python解释器来运行的,因此
 这个子进程上也是受GIL影响的                


'''

死锁与递归所:

所谓死锁: 是指两个或两个以上的进程或线程在执行过程当中,因争夺资源而形成的一种互相等待的现象,若无外力做用,它们都将没法推动下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。

抢锁,涉及到唤醒。

import threading
import time

mutexA = threading.Lock()
mutexB = threading.Lock()

class MyThread(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        self.fun1()
        self.fun2()

    def fun1(self):

        mutexA.acquire()  # 若是锁被占用,则阻塞在这里,等待锁的释放

        print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))

        mutexB.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
        mutexB.release()

        mutexA.release()


    def fun2(self):

        mutexB.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
        time.sleep(0.2)

        mutexA.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
        mutexA.release()

        mutexB.release()

if __name__ == "__main__":

    print("start---------------------------%s"%time.time())

    for i in range(0, 10):
        my_thread = MyThread()
        my_thread.start()

在Python中为了支持在同一线程中屡次请求同一资源,python提供了可重入锁RLock。这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源能够被屡次require。直到一个线程全部的acquire都被release,其余的线程才能得到资源。上面的例子若是使用RLock代替Lock,则不会发生死锁:

Rlock内部维护着一个计数器。

使用递归锁,使用串行方式。

Rlock=threading.RLock()
import threading
import time

# mutexA = threading.Lock()
# mutexB = threading.Lock()

Rlock=threading.RLock()

class MyThread(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):

        self.fun1()
        self.fun2()

    def fun1(self):

        Rlock.acquire()  # 若是锁被占用,则阻塞在这里,等待锁的释放

        print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))

        Rlock.acquire()  # count=2
        print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
        Rlock.release()   #count-1

        Rlock.release()   #count-1 =0


    def fun2(self):
        Rlock.acquire()  # count=1
        print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
        time.sleep(0.2)

        Rlock.acquire()  # count=2
        print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
        Rlock.release()

        Rlock.release()   # count=0


if __name__ == "__main__":

    print("start---------------------------%s"%time.time())

    for i in range(0, 10):

        my_thread = MyThread()
        my_thread.start()
递归锁RLock

应用场景:抢票软件中。

Event对象

线程的一个关键特性是每一个线程都是独立运行且状态不可预测。若是程序中的其 他线程须要经过判断某个线程的状态来肯定本身下一步的操做,这时线程同步问题就 会变得很是棘手。为了解决这些问题,咱们须要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它容许线程等待某些事件的发生。在 初始状况下,Event对象中的信号标志被设置为假。若是有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程若是将一个Event对象的信号标志设置为真,它将唤醒全部等待这个Event对象的线程。若是一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

event.isSet():返回event的状态值;

event.wait():若是 event.isSet()==False将阻塞线程;

event.set(): 设置event的状态值为True,全部阻塞池的线程激活进入就绪状态, 等待操做系统调度;

event.clear():恢复event的状态值为False。

          

 

 能够考虑一种应用场景(仅仅做为说明),例如,咱们有多个线程从Redis队列中读取数据来处理,这些线程都要尝试去链接Redis的服务,通常状况下,若是Redis链接不成功,在各个线程的代码中,都会去尝试从新链接。若是咱们想要在启动时确保Redis服务正常,才让那些工做线程去链接Redis服务器,那么咱们就能够采用threading.Event机制来协调各个工做线程的链接操做:主线程中会去尝试链接Redis服务,若是正常的话,触发事件,各工做线程会尝试链接Redis服务。

import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)

def worker(event):
    logging.debug('Waiting for redis ready...')
    event.wait()
    logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime())
    time.sleep(1)

def main():
    readis_ready = threading.Event()
    t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1')
    t1.start()

    t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2')
    t2.start()

    logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event')
    time.sleep(3) # simulate the check progress
    readis_ready.set()

if __name__=="__main__":
    main()
View Code

threading.Event的wait方法还接受一个超时参数,默认状况下若是事件一致没有发生,wait方法会一直阻塞下去,而加入这个超时参数以后,若是阻塞时间超过这个参数设定的值以后,wait方法会返回。对应于上面的应用场景,若是Redis服务器一致没有启动,咱们但愿子线程可以打印一些日志来不断地提醒咱们当前没有一个能够链接的Redis服务,咱们就能够经过设置这个超时参数来达成这样的目的:

def worker(event):
    while not event.is_set():
        logging.debug('Waiting for redis ready...')
        event.wait(2)
    logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime())
    time.sleep(1)
View Code
import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)


def worker(event):
    logging.debug('Waiting for redis ready...')

    while not event.isSet():
        logging.debug("wait.......")
        event.wait(3)   # if flag=False阻塞,等待flag=true继续执行


    logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime())
    time.sleep(1)

def main():

    readis_ready = threading.Event()  #  flag=False
    t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1')
    t1.start()

    t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2')
    t2.start()

    logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event')

    time.sleep(6) # simulate the check progress
    readis_ready.set()  # flag=Ture


if __name__=="__main__":
    main()
练习

这样,咱们就能够在等待Redis服务启动的同时,看到工做线程里正在等待的状况。

注意:event不是锁,只是种状态。

 Semaphore(信号量):

Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其余线程调用release()。

 

实例:(同时只有5个线程能够得到semaphore,便可以限制最大链接数为5):

import threading
import time

semaphore = threading.Semaphore(5)

def func():
    if semaphore.acquire():
        print (threading.currentThread().getName() + ' get semaphore')
        time.sleep(2)
        semaphore.release()

for i in range(20):
  t1 = threading.Thread(target=func)
  t1.start()
View Code

应用:链接池

思考:与Rlock的区别?

9.队列(queue)

queue方法:

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

 当必须在多个线程之间安全地交换信息时,队列在线程编程中尤为有用。

get与put方法

'''

建立一个“队列”对象

import Queue
q = Queue.Queue(maxsize = 10)
Queue.Queue类便是一个队列的同步实现。队列长度可为无限或者有限。可经过Queue的构造函数的可选参数
maxsize来设定队列长度。若是maxsize小于1就表示队列长度无限。

将一个值放入队列中
q.put(10)
调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;
第二个block为可选参数,默认为
1。若是队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。若是block为0,
put方法将引起Full异常。

将一个值从队列中取出
q.get()
调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。若是队列为空且
block为True,get()就使调用线程暂停,直至有项目可用。若是队列为空且block为False,队列将引起Empty异常。

'''

练习:

import queue

q = queue.Queue(3)
q.put(111)
q.put("hello")
q.put(222)
# q.put(223,False)


print(q.get())
print(q.get())
print(q.get())
# print(q.get(False))

join与task_done方法:

'''
join() 阻塞进程,直到全部任务完成,须要配合另外一个方法task_done。

    def join(self):
     with self.all_tasks_done:
      while self.unfinished_tasks:
       self.all_tasks_done.wait()

task_done() 表示某个任务完成。每一条get语句后须要一条task_done。


import queue
q = queue.Queue(5)
q.put(10)
q.put(20)
print(q.get())
q.task_done()
print(q.get())
q.task_done()

q.join()

print("ending!")
'''

其余经常使用方法:

'''

此包中的经常使用方法(q = Queue.Queue()):

q.qsize() 返回队列的大小
q.empty() 若是队列为空,返回True,反之False
q.full() 若是队列满了,返回True,反之False
q.full 与 maxsize 大小对应
q.get([block[, timeout]]) 获取队列,timeout等待时间
q.get_nowait() 至关q.get(False)非阻塞 
q.put(item) 写入队列,timeout等待时间
q.put_nowait(item) 至关q.put(item, False)
q.task_done() 在完成一项工做以后,q.task_done() 函数向任务已经完成的队列发送一个信号
q.join() 实际上意味着等到队列为空,再执行别的操做

'''

其余模式:

'''

Python Queue模块有三种队列及构造函数: 

一、Python Queue模块的FIFO队列先进先出。  class queue.Queue(maxsize) 
二、LIFO相似于堆,即先进后出。           class queue.LifoQueue(maxsize) 
三、还有一种是优先级队列级别越低越先出来。 class queue.PriorityQueue(maxsize) 


import queue

#先进后出

q=queue.LifoQueue()

q.put(34)
q.put(56)
q.put(12)

#优先级
q=queue.PriorityQueue()
q.put([5,100])
q.put([7,200])
q.put([3,"hello"])
q.put([4,{"name":"alex"}])

while 1:
  data=q.get()
  print(data)

'''

注意:

  队列只在多线程、多进程中才有。

  队列是个数据类型或者数据结构。

10.应用 生产者消费者模型

为何要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,若是生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。一样的道理,若是消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题因而引入了生产者和消费者模式。

什么是生产者消费者模式

生产者消费者模式是经过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而经过阻塞队列来进行通信,因此生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力。

这就像,在餐厅,厨师作好菜,不须要直接和客户交流,而是交给前台,而客户去饭菜也不须要不找厨师,直接去前台领取便可,这也是一个结耦的过程。

import time,random
import queue,threading

q = queue.Queue()

def Producer(name):
  count = 0
  while count <10:
    print("making........")
    time.sleep(random.randrange(3))
    q.put(count)
    print('Producer %s has produced %s baozi..' %(name, count))
    count +=1
    #q.task_done()
    #q.join()
    print("ok......")
def Consumer(name):
  count = 0
  while count <10:
    time.sleep(random.randrange(4))
    if not q.empty():
        data = q.get()
        #q.task_done()
        #q.join()
        print(data)
        print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
    else:
        print("-----no baozi anymore----")
    count +=1

p1 = threading.Thread(target=Producer, args=('A',))
c1 = threading.Thread(target=Consumer, args=('B',))
# c2 = threading.Thread(target=Consumer, args=('C',))
# c3 = threading.Thread(target=Consumer, args=('D',))
p1.start()
c1.start()
# c2.start()
# c3.start()
View Code

11.multiprocessing模块

Multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency,effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.

因为GIL的存在,python中的多线程其实并非真正的多线程,若是想要充分地使用多核CPU的资源,在python中大部分状况须要使用多进程。

multiprocessing包是Python中的多进程管理包。与threading.Thread相似,它能够利用multiprocessing.Process对象来建立一个进程。该进程能够运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象能够像多线程那样,经过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。因此,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。

python的进程调用:

# Process类调用

from multiprocessing import Process
import time
def f(name):

    print('hello', name,time.ctime())
    time.sleep(1)

if __name__ == '__main__':
    p_list=[]
    for i in range(3):
        p = Process(target=f, args=('alvin:%s'%i,))
        p_list.append(p)
        p.start()
    for i in p_list:
        p.join()
    print('end')

# 继承Process类调用
from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self):
        super(MyProcess, self).__init__()
        # self.name = name

    def run(self):

        print ('hello', self.name,time.ctime())
        time.sleep(1)


if __name__ == '__main__':
    p_list=[]
    for i in range(3):
        p = MyProcess()
        p.start()
        p_list.append(p)

    for p in p_list:
        p.join()

    print('end')
View Code
#coding:utf8
from multiprocessing import Process
import time

def counter():
    i = 0
    for _ in range(40000000):
        i = i + 1
    return True
def main():
    l=[]
    start_time = time.time()

    for _ in range(2):
        t=Process(target=counter)
        t.start()
        l.append(t)
        #t.join()

    for t in l:
       t.join()

    # counter()
    # counter()
    end_time = time.time()
    print("Total time: {}".format(end_time - start_time))
if __name__ == '__main__':
    main()

"""
测得时候,注意关闭其余无用的软件。防止出如今多进程环境中串行比并行还快。
这是由于其余进程在干扰。
"""
测试

process类:

构造方法:

Process([group [, target [, name [, args [, kwargs]]]]])

  group: 线程组,目前尚未实现,库引用中提示必须是None;
  target: 要执行的方法;
  name: 进程名;
  args/kwargs: 要传入方法的参数。

实例方法:

  is_alive():返回进程是否在运行。

  join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。

  start():进程准备就绪,等待CPU调度

  run():strat()调用run方法,若是实例进程时未制定传入target,这star执行t默认run()方法。

  terminate():无论任务是否完成,当即中止工做进程

属性:

  daemon:和线程的setDeamon功能同样

  name:进程名字。

  pid:进程号。

from multiprocessing import Process
import os
import time
def info(name):


    print("name:",name)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())
    print("------------------")
    time.sleep(1)

def foo(name):

    info(name)

if __name__ == '__main__':

    info('main process line')


    p1 = Process(target=info, args=('alvin',))
    p2 = Process(target=foo, args=('egon',))
    p1.start()
    p2.start()

    p1.join()
    p2.join()

    print("ending")

#输出结果
# name: main process line
# parent process: 5164 #pycharm进程号
# process id: 2584 
# ------------------
# name: alvin
# parent process: 2584
# process id: 8100
# ------------------
# name: egon
# parent process: 2584
# process id: 7752
# ------------------
# ending
View Code

12.协程

协程是单线程实现并发,再也不有任何锁的概念。

协程的好处:
一、因为单线程,不能再切换。
二、再也不有任何锁的概念。

yield与协程:

import time

"""
传统的生产者-消费者模型是一个线程写消息,一个线程取消息,经过锁机制控制队列和等待,但一不当心就可能死锁。
若是改用协程,生产者生产消息后,直接经过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高。
"""
# 注意到consumer函数是一个generator(生成器):
# 任何包含yield关键字的函数都会自动成为生成器(generator)对象

def consumer():
    r = ''
    while True:
        # 三、consumer经过yield拿到消息,处理,又经过yield把结果传回;
        #    yield指令具备return关键字的做用。而后函数的堆栈会自动冻结(freeze)在这一行。
        #    当函数调用者的下一次利用next()或generator.send()或for-in来再次调用该函数时,
        #    就会从yield代码的下一行开始,继续执行,再返回下一次迭代结果。经过这种方式,迭代器能够实现无限序列和惰性求值。
        n = yield r
        if not n:
            return
        print('[CONSUMER] ←← Consuming %s...' % n)
        time.sleep(1)
        r = '200 OK'
def produce(c):
    # 一、首先调用c.next()启动生成器
    next(c)
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] →→ Producing %s...' % n)
        # 二、而后,一旦生产了东西,经过c.send(n)切换到consumer执行;
        cr = c.send(n)
        # 四、produce拿到consumer处理的结果,继续生产下一条消息;
        print('[PRODUCER] Consumer return: %s' % cr)
    # 五、produce决定不生产了,经过c.close()关闭consumer,整个过程结束。
    c.close()
if __name__=='__main__':
    # 六、整个流程无锁,由一个线程执行,produce和consumer协做完成任务,因此称为“协程”,而非线程的抢占式多任务。
    c = consumer()
    produce(c)
    
    
'''
result:

[PRODUCER] →→ Producing 1...
[CONSUMER] ←← Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 2...
[CONSUMER] ←← Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 3...
[CONSUMER] ←← Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 4...
[CONSUMER] ←← Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 5...
[CONSUMER] ←← Consuming 5...
[PRODUCER] Consumer return: 200 OK
'''
View Code

greenlet:

greenlet 是最底层的库。gevent库和eventlet库,都是在greenlet库得基础上继续封装。

greenlet机制的主要思想是:生成器函数或者协程函数中的yield语句挂起函数的执行,直到稍后使用next()或send()操做进行恢复为止。可使用一个调度器循环在一组生成器函数之间协做多个任务。greentlet是python中实现咱们所谓的"Coroutine(协程)"的一个基础库.

from greenlet import greenlet
 
def test1():
    print (12)
    gr2.switch()
    print (34)
    gr2.switch()
 
def test2():
    print (56)
    gr1.switch()
    print (78)
 
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()
View Code

13.基于greenlet的框架

gevent模块实现协程

Python经过yield提供了对协程的基本支持,可是不彻底。而第三方的gevent为Python提供了比较完善的协程支持。

gevent是第三方库,经过greenlet实现协程,其基本思想是:

当一个greenlet遇到IO操做时,好比访问网络,就自动切换到其余的greenlet,等到IO操做完成,再在适当的时候切换回来继续执行。因为IO操做很是耗时,常常使程序处于等待状态,有了gevent为咱们自动切换协程,就保证总有greenlet在运行,而不是等待IO。

因为切换是在IO操做时自动完成,因此gevent须要修改Python自带的一些标准库,这一过程在启动时经过monkey patch完成:

import gevent
import time

def foo():
    print("running in foo")
    gevent.sleep(2)
    print("switch to foo again")

def bar():
    print("switch to bar")
    gevent.sleep(5)
    print("switch to bar again")

start=time.time()

gevent.joinall(
    [gevent.spawn(foo),
    gevent.spawn(bar)]
)

print(time.time()-start)
View Code

固然,实际代码里,咱们不会用gevent.sleep()去切换协程,而是在执行到IO操做时,gevent自动切换,代码以下:

from gevent import monkey
monkey.patch_all()
import gevent
from urllib import request
import time

def f(url):
    print('GET: %s' % url)
    resp = request.urlopen(url)
    data = resp.read()
    print('%d bytes received from %s.' % (len(data), url))

start=time.time()

gevent.joinall([
        gevent.spawn(f, 'https://itk.org/'),
        gevent.spawn(f, 'https://www.github.com/'),
        gevent.spawn(f, 'https://zhihu.com/'),
])

# f('https://itk.org/')
# f('https://www.github.com/')
# f('https://zhihu.com/')

print(time.time()-start)
View Code

扩展:

gevent是一个基于协程(coroutine)的Python网络函数库,经过使用greenlet提供了一个在libev事件循环顶部的高级别并发API。

主要特性有如下几点:

<1> 基于libev的快速事件循环,Linux上面的是epoll机制

<2> 基于greenlet的轻量级执行单元

<3> API复用了Python标准库里的内容

<4> 支持SSL的协做式sockets

<5> 可经过线程池或c-ares实现DNS查询

<6> 经过monkey patch功能来使得第三方模块变成协做式

gevent.spawn()方法spawn一些jobs,而后经过gevent.joinall将jobs加入到微线程执行队列中等待其完成,设置超时为2秒。执行后的结果经过检查gevent.Greenlet.value值来收集。

1、关于Linux的epoll机制:

epoll是Linux内核为处理大批量文件描述符而做了改进的poll,是Linux下多路复用IO接口select/poll的
加强版本,它能显著提升程序在大量并发链接中只有少许活跃的状况下的系统CPU利用率。epoll的优势:

(1)支持一个进程打开大数目的socket描述符。select的一个进程所打开的FD由FD_SETSIZE的设置来限定,而epoll没有这个限制,它所支持的FD上限是
最大可打开文件的数目,远大于2048。

(2)IO效率不随FD数目增长而线性降低:因为epoll只会对“活跃”的socket进行操做,因而,只有”活跃”的socket才会主动去调用 callback函数,其余
idle状态的socket则不会。

(3)使用mmap加速内核与用户空间的消息传递。epoll是经过内核于用户空间mmap同一块内存实现的。

(4)内核微调。

2、libev机制

提供了指定文件描述符事件发生时调用回调函数的机制。libev是一个事件循环器:向libev注册感兴趣的事件,好比socket可读事件,libev会对所注册的事件
的源进行管理,并在事件发生时触发相应的程序。

ps
ps

4.2.2 官方文档中的示例:

import gevent

from gevent import socket

urls = [‘www.google.com.hk’,’www.example.com’, ‘www.python.org’ ]

jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]

gevent.joinall(jobs, timeout=2)

[job.value for job in jobs]

[‘74.125.128.199’, ‘208.77.188.166’, ‘82.94.164.162’]

注解:gevent.spawn()方法spawn一些jobs,而后经过gevent.joinall将jobs加入到微线程执行队列中等待其完成,设置超时为2秒。执行后的结果经过检查gevent.Greenlet.value值来收集。gevent.socket.gethostbyname()函数与标准的socket.gethotbyname()有相同的接口,但它不会阻塞整个解释器,所以会使得其余的greenlets跟随着无阻的请求而执行。

4.2.3 Monkey patch

Python的运行环境容许咱们在运行时修改大部分的对象,包括模块、类甚至函数。虽然这样作会产生“隐式的反作用”,并且出现问题很难调试,但在须要修改Python自己的基础行为时,Monkey patch就派上用场了。Monkey patch可以使得gevent修改标准库里面大部分的阻塞式系统调用,包括socket,ssl,threading和select等模块,而变成协做式运行。

from gevent import monkey ;

monkey . patch_socket ()

import urllib2

经过monkey.patch_socket()方法,urllib2模块可使用在多微线程环境,达到与gevent共同工做的目的。

4.2.4 事件循环

不像其余网络库,gevent和eventlet相似, 在一个greenlet中隐式开始事件循环。没有必须调用run()或dispatch()的反应器(reactor),在twisted中是有 reactor的。当gevent的API函数想阻塞时,它得到Hub实例(执行时间循环的greenlet),并切换过去。若是没有集线器实例则会动态 建立。

libev提供的事件循环默认使用系统最快轮询机制,设置LIBEV_FLAGS环境变量可指定轮询机制。LIBEV_FLAGS=1为select, LIBEV_FLAGS = 2为poll, LIBEV_FLAGS = 4为epoll,LIBEV_FLAGS = 8为kqueue。

Libev的API位于gevent.core下。注意libev API的回调在Hub的greenlet运行,所以使用同步greenlet的API。可使用spawn()和Event.set()等异步API。

eventlet实现协程(了解)

eventlet 是基于 greenlet 实现的面向网络应用的并发处理框架,提供“线程”池、队列等与其余 Python 线程、进程模型很是类似的 api,而且提供了对 Python 发行版自带库及其余模块的超轻量并发适应性调整方法,比直接使用 greenlet 要方便得多。

其基本原理是调整 Python 的 socket 调用,当发生阻塞时则切换到其余 greenlet 执行,这样来保证资源的有效利用。须要注意的是:
eventlet 提供的函数只能对 Python 代码中的 socket 调用进行处理,而不能对模块的 C 语言部分的 socket 调用进行修改。对后者这类模块,仍然须要把调用模块的代码封装在 Python 标准线程调用中,以后利用 eventlet 提供的适配器实现 eventlet 与标准线程之间的协做。
虽然 eventlet 把 api 封装成了很是相似标准线程库的形式,但二者的实际并发执行流程仍然有明显区别。在没有出现 I/O 阻塞时,除非显式声明,不然当前正在执行的 eventlet 永远不会把 cpu 交给其余的 eventlet,而标准线程则是不管是否出现阻塞,老是由全部线程一块儿争夺运行资源。全部 eventlet 对 I/O 阻塞无关的大运算量耗时操做基本没有什么帮助。

14.IO模型

IO 就是InputStream,OutputStream 输入和输出。 

同步(synchronous) IO和异步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO分别是什么,到底有什么区别?这个问题其实不一样的人给出的答案均可能不一样,好比wiki,就认为asynchronous IO和non-blocking IO是一个东西。这实际上是由于不一样的人的知识背景不一样,而且在讨论这个问题的时候上下文(context)也不相同。因此,为了更好的回答这个问题,先限定一下本文的上下文。

本文讨论的背景是Linux环境下的network IO。  
Stevens在文章中一共比较了五种IO Model:
  • blocking IO #阻塞IO,全程阻塞(accept,recv)
  • nonblocking IO #非阻塞
  • IO multiplexing #IO多路复用 (监听多个链接)
  • signal driven IO #异步IO
  • asynchronous IO #驱动信号

因为signal driven IO在实际中并不经常使用,因此我这只说起剩下的四种IO Model。
再说一下IO发生时涉及的对象和步骤。
对于一个network IO (这里咱们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process (or thread),另外一个就是系统内核(kernel)。当一个read操做发生时,它会经历两个阶段:
 1 等待数据准备 (Waiting for the data to be ready)
 2 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)
记住这两点很重要,由于这些IO Model的区别就是在两个阶段上各有不一样的状况。

补充:

Windows32位系统,2的32次方,其中内核态占用1个G、用户态占用3个G。
发送得数据必定是先到内核空间,最后操做系统再把数据转给用户空间,而后才能进行处理。
进程切换操做消耗资源比线程要多,线程切换切换操做比协程消耗资源要多。

 

blocking IO (阻塞IO)

在linux中,默认状况下全部的socket都是blocking,一个典型的读操做流程大概是这样:

当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于network io来讲,不少时候数据在一开始尚未到达(好比,尚未收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,而后kernel返回结果,用户进程才解除block的状态,从新运行起来。
因此,blocking IO的特色就是在IO执行的两个阶段都被block了。

non-blocking IO(非阻塞IO)

linux下,能够经过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操做时,流程是这个样子:

从图中能够看出,当用户进程发出read操做时,若是kernel中的数据尚未准备好,那么它并不会block用户进程,而是马上返回一个error。从用户进程角度讲 ,它发起一个read操做后,并不须要等待,而是立刻就获得了一个结果。用户进程判断结果是一个error时,它就知道数据尚未准备好,因而它能够再次发送read操做。一旦kernel中的数据准备好了,而且又再次收到了用户进程的system call,那么它立刻就将数据拷贝到了用户内存,而后返回。因此,用户进程实际上是须要不断的主动询问kernel数据好了没有。

 注意:

      在网络IO时候,非阻塞IO也会进行recvform系统调用,检查数据是否准备好,与阻塞IO不同,”非阻塞将大的整片时间的阻塞分红N多的小的阻塞, 因此进程不断地有机会 ‘被’ CPU光顾”。即每次recvform系统调用之间,cpu的权限还在进程手中,这段时间是能够作其余事情的,

      也就是说非阻塞的recvform系统调用调用以后,进程并无被阻塞,内核立刻返回给进程,若是数据还没准备好,此时会返回一个error。进程在返回以后,能够干点别的事情,而后再发起recvform系统调用。重复上面的过程,循环往复的进行recvform系统调用。这个过程一般被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,进行数据处理。须要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。

import time
import socket
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sk.setsockopt
sk.bind(('127.0.0.1',6667))
sk.listen(5)
sk.setblocking(False)
while True:
    try:
        print ('waiting client connection .......')
        connection,address = sk.accept()   # 进程主动轮询
        print("+++",address)
        client_messge = connection.recv(1024)
        print(str(client_messge,'utf8'))
        connection.close()
    except Exception as e:
        print (e)
        time.sleep(4)

#############################client

import time
import socket
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)

while True:
    sk.connect(('127.0.0.1',6667))
    print("hello")
    sk.sendall(bytes("hello","utf8"))
    time.sleep(2)
    break
View Code
import socket
import select

sock = socket.socket()
sock.bind(("127.0.0.1",8800))
sock.listen(5)

sock.setblocking(False)
inputs=[sock,]
while 1:
    r,w,e=select.select(inputs,[],[]) # 监听有变化的套接字 inputs=[sock,conn1,conn2,conn3..]
    #r=inputs  r=[conn1,conn2]
    print(inputs,"===inputs===") #必定要注意,r不等于inputs,r是会变化得
    print(r,"====r===")
    for obj in r: # 第一次 [sock,]  第二次 #[conn1,]
        if obj==sock:
            conn,addr=obj.accept()
            print(conn,"===conn===")
            inputs.append(conn) #  inputs=[sock,conn]
        else:
            data=obj.recv(1024)
            print(data.decode("utf8"))
            send_data = input(">>>")
            obj.send(send_data.encode("utf8"))

#输出结果
# [<socket.socket fd=204, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8800)>] ===inputs===
# [<socket.socket fd=204, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8800)>] ====r===
# <socket.socket fd=196, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8800), raddr=('127.0.0.1', 61457)> ===conn===
# [<socket.socket fd=204, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8800)>, <socket.socket fd=196, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8800), raddr=('127.0.0.1', 61457)>] ===inputs===
# [<socket.socket fd=196, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8800), raddr=('127.0.0.1', 61457)>] ====r===
# aaa #接收得数据
# >>>bbb #客户端发送数据
基于select机制(服务端)
import socket

sock=socket.socket()

sock.connect(("127.0.0.1",8800))

while 1:
    data=input("input>>>")
    sock.send(data.encode("utf8"))
    rece_data=sock.recv(1024)
    print(rece_data.decode("utf8"))
sock.close()

#输入结果
#input>>>aaa
#bbb
#input>>>
基于select机制(客户端)

优势:可以在等待任务完成的时间里干其余活了(包括提交其余任务,也就是 “后台” 能够有多个任务在同时执行)。

缺点:任务完成的响应延迟增大了,由于每过一段时间才去轮询一次read操做,而任务可能在两次轮询之间的任意时间完成。这会致使总体数据吞吐量的下降。

总结:

非阻塞IO:

发送屡次系统调用。优势:wait for data时无阻塞。缺点:1 系统调用太多。2 数据不是实时接收得。

两个阶段:

wait for data:非阻塞

copy data:阻塞

15.IO multiplexing(IO多路复用)

   IO multiplexing这个词可能有点陌生,可是若是我说select,epoll,大概就都能明白了。有些地方也称这种IO方式为event driven IO。咱们都知道,select/epoll的好处就在于单个process就能够同时处理多个网络链接的IO。它的基本原理就是select/epoll这个function会不断的轮询所负责的全部socket,当某个socket有数据到达了,就通知用户进程。它的流程如图:

   当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”全部select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操做,将数据从kernel拷贝到用户进程。
这个图和blocking IO的图其实并无太大的不一样,事实上,还更差一些。由于这里须要使用两个system call (select 和 recvfrom),而blocking IO只调用了一个system call (recvfrom)。可是,用select的优点在于它能够同时处理多个connection。(多说一句。因此,若是处理的链接数不是很高的话,使用select/epoll的web server不必定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优点并非对于单个链接能处理得更快,而是在于能处理更多的链接。)
在IO multiplexing Model中,实际中,对于每个socket,通常都设置成为non-blocking,可是,如上图所示,整个用户的process实际上是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。

注意1:select函数返回结果中若是有文件可读了,那么进程就能够经过调用accept()或recv()来让kernel将位于内核中准备到的数据copy到用户区。

注意2: select的优点在于能够处理多个链接,不适用于单个链接、

#***********************server.py
import socket
import select
sk=socket.socket()
sk.bind(("127.0.0.1",8801))
sk.listen(5)
inputs=[sk,]
while True:
    r,w,e=select.select(inputs,[],[],5)
    print(len(r))

    for obj in r:
        if obj==sk:
            conn,add=obj.accept()
            print(conn)
            inputs.append(conn)
        else:
            data_byte=obj.recv(1024)
            print(str(data_byte,'utf8'))
            inp=input('回答%s号客户>>>'%inputs.index(obj))
            obj.sendall(bytes(inp,'utf8'))

    print('>>',r)

#***********************client.py

import socket
sk=socket.socket()
sk.connect(('127.0.0.1',8801))

while True:
    inp=input(">>>>")
    sk.sendall(bytes(inp,"utf8"))
    data=sk.recv(1024)
    print(str(data,'utf8'))
View Code

win平台:select

linux平台: select poll epoll 

select的缺点:

  1. 每次调用select都要将全部的fb(文件描述符)拷贝到内核空间致使效率降低。
  2. 遍历全部的fb,是否有数据访问。(最重要的问题)
  3. 最大链接数(1024)

poll:

  1. 每次调用select都要将全部的fb(文件描述符)拷贝到内核空间致使效率降低。
  2. 遍历全部的fb,是否有数据访问。(最重要的问题)
  3. 最大链接数没有限制(是个过渡阶段)

epoll: 

  1. 第一个函数:建立epoll句柄:将全部的fb(文件描述符)拷贝到内核空间,可是只需拷贝一次。
  2. 回调函数:某一个函数或者某一个动做成功完成后会触发的函数,为全部的fd绑定一个回调函数,一旦有数据访问,触发该回调函数,回调函数将fd放到链表中。
  3. 第三个函数 判断链表是否为空

   最大链接数没有上线。

链表是个数据类型。

 

优先级:epoll|kqueue|devpoll > poll > select.
epoll|kqueue|devpoll都是一个级别的。

补充:

socketserver是基于多线程和IO多路复用实现得。

对于文件描述符(套接字对象)
1 是一个惟一的非零整数,不会变
2 收发数据的时候,对于接收端而言,数据先到内核空间,而后copy到用户空间,同时,内核空间数据清除

特色:

一、全程(wait for data,copy data)阻塞

二、能监听多个文件描述符,实现并发

Asynchronous I/O(异步IO)

linux下的asynchronous IO其实用得不多。先看一下它的流程:

用户进程发起read操做以后,马上就能够开始去作其它的事。而另外一方面,从kernel的角度,当它受到一个asynchronous read以后,首先它会马上返回,因此不会对用户进程产生任何block。而后,kernel会等待数据准备完成,而后将数据拷贝到用户内存,当这一切都完成以后,kernel会给用户进程发送一个signal,告诉它read操做完成了。

特色:全程无阻塞

IO模型比较分析

 到目前为止,已经将四个IO Model都介绍完了。如今回过头来回答最初的那几个问题:blocking和non-blocking的区别在哪,synchronous IO和asynchronous IO的区别在哪。
先回答最简单的这个:blocking vs non-blocking。前面的介绍中其实已经很明确的说明了这二者的区别。调用blocking IO会一直block住对应的进程直到操做完成,而non-blocking IO在kernel还准备数据的状况下会马上返回。

在说明synchronous IO和asynchronous IO的区别以前,须要先给出二者的定义。Stevens给出的定义(实际上是POSIX的定义)是这样子的:
    A synchronous I/O operation causes the requesting process to be blocked until that I/O operationcompletes;
    An asynchronous I/O operation does not cause the requesting process to be blocked; 
      二者的区别就在于synchronous IO作”IO operation”的时候会将process阻塞。按照这个定义,以前所述的blocking IO,non-blocking IO,IO multiplexing都属于synchronous IO。有人可能会说,non-blocking IO并无被block啊。这里有个很是“狡猾”的地方,定义中所指的”IO operation”是指真实的IO操做,就是例子中的recvfrom这个system call。non-blocking IO在执行recvfrom这个system call的时候,若是kernel的数据没有准备好,这时候不会block进程。可是,当kernel中数据准备好的时候,recvfrom会将数据从kernel拷贝到用户内存中,这个时候进程是被block了,在这段时间内,进程是被block的。而asynchronous IO则不同,当进程发起IO 操做以后,就直接返回不再理睬了,直到kernel发送一个信号,告诉进程说IO完成。在这整个过程当中,进程彻底没有被block。

各个IO Model的比较如图所示:

通过上面的介绍,会发现non-blocking IO和asynchronous IO的区别仍是很明显的。在non-blocking IO中,虽然进程大部分时间都不会被block,可是它仍然要求进程去主动的check,而且当数据准备完成之后,也须要进程主动的再次调用recvfrom来将数据拷贝到用户内存。而asynchronous IO则彻底不一样。它就像是用户进程将整个IO操做交给了他人(kernel)完成,而后他人作完后发信号通知。在此期间,用户进程不须要去检查IO操做的状态,也不须要主动的去拷贝数据。

补充:

只要有堵塞就叫同步IO
只要没堵塞就叫异步IO

同步:阻塞IO 、非阻塞IO、IO多路复用
异步:异步IO

 selectors模块

import selectors
import socket

sel = selectors.DefaultSelector()

def accept(sock, mask):
    conn, addr = sock.accept()  # Should be ready
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)

def read(conn, mask):
    data = conn.recv(1000)  # Should be ready
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)  # Hope it won't block
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()

sock = socket.socket()
sock.bind(('localhost', 1234))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)

while True:
    events = sel.select()
    for key, mask in events:
        callback = key.data
        callback(key.fileobj, mask)
View Code
import selectors  # 基于select模块实现的IO多路复用,建议你们使用

import socket

sock=socket.socket()
sock.bind(("127.0.0.1",8800))

sock.listen(5)

sock.setblocking(False)

sel=selectors.DefaultSelector() #根据具体平台选择最佳IO多路机制,好比在linux,选择epoll

def read(conn,mask):

    try:
        data=conn.recv(1024)
        print(data.decode("UTF8"))
        data2=input(">>>")
        conn.send(data2.encode("utf8"))
    except Exception:
        sel.unregister(conn)

def accept(sock,mask):

    conn, addr = sock.accept()
    print("conn",conn)
    sel.register(conn,selectors.EVENT_READ,read)

sel.register(sock,selectors.EVENT_READ,accept)  # 注册事件

while 1:

    print("wating...")
    events=sel.select()   #  监听    [(key1,mask1),(key2,mask2)]
    for key,mask in events:

        # print(key.fileobj)    # conn
        # print(key.data)       # read
        func=key.data
        obj=key.fileobj

        func(obj,mask)  # 1 accept(sock,mask)    # 2 read(conn,mask)
练习

Python 2.7版本中listen()超过了设置得值会链接不上,Python3版本listen()没有限制

16.Monkey patch

猴子补丁是一个程序来扩展或修改本地配套系统软件(仅影响到程序的运行实例)的方式。

Monkey patch就是在运行时对已有的代码进行修改,达到hot patch的目的。Eventlet中大量使用了该技巧,以替换标准库中的组件,好比socket。首先来看一下最简单的monkey patch的实现。

class Foo(object):  
    def bar(self):  
        print('Foo.bar')
  
def bar(self):  
    print('Modified bar')  
  
Foo().bar()  
  
Foo.bar = bar  
  
Foo().bar()

因为Python中的名字空间是开放,经过dict来实现,因此很容易就能够达到patch的目的。

参考资料:Monkey patch

 

参考苑昊