多线程和多进程

1、线程和进程python

  多任务的实现方式有3中:
                        1:多进程模式
                        2:多线程模式
                        3:多进程+多线程模式
Python既支持多线程又支持多进程。
1:多进程:
Unix/Linux操做系统中提供一个fork()函数 在Python的OS模块中就封装了常见的系统调用 其中就包括 fork() 能够再Python中轻松的建立子进程。
    #multiprocess.py
 import os
                    
 print 'Process (%s) start....' % os.getpid()
 pid=os.fork()
 if pid==0:
    print 'I am child process (%s) and my parent is %s ' %       (os.getpid(),os.getppid())
 else:
    print 'i am father process'
 因为windows上面没有fork()故上述代码不能够在windows上面运行。
            
因为Python是跨平台的 天然也提供一个跨平台的多进程支持 multiprocessing 模块就是跨平台版本的多进程支持
                
multiprocessing 模块提供了一个Process类来表明 一个进程对象 下面的例子演示了启动一个子进程并等待其结束
 form multiprocessing import Process
                    import os
                
                    #子进程要执行的代码
                    def run_proc(name):
                        print 'Run child process %s (%s)' %(name,os.getpid())
        
                    if __name__=='__main__':
                        print 'Parent process %s' % os.getpid()
                        p=Process(target=run_proc,args=('test',))
                        print 'Process will start'
                        p.start()
                        p.join()
                        print 'Process end'.
                    运行结果以下:
                    Parent process 389
                    Process will start
                    Run child process test (340)
                    Process end
 建立子进程时候 只须要传入一个执行函数和函数的参数便可 穿件一个process实例 而后用start()函数启动 join()函数等待子进程结束后在继续往下运行。
                    
 若是要启动大量的子进程 能够用进程池的方式批量建立子进程:
  from multiprocessing import Pool
                            import os,time,random
            
                            def long_time_task(name):
                                print 'Run task %s %s' %(name,os.getpid())
                                start = time.time()
                                time.sleep(random.random*3)
                                end=time.time()
                                print 'Task %s runs %0.2f seconds' % (name,(end-start))
                
                            if __name__=='__main__'
                                print 'Parent process %s' % os.getpid()
                                p=Pool()
                                for i in range(5):
                                    p.apply_async(long_time_task,args(i,))
                                print 'waiting for all subprocesses done.....'
                                p.close()
                                p.join()
                                print 'All subprocesses done'
 调用join以前先调用close 调用close以后就不可以继续添加新的Process 了
            
                    进程间的通行:
                        Python的multiprocessing模块包装了底层的机制 提供了Queue Pipes 等多种方式来交换数据                
                        以Queue为例 在父进程中建立两个子进程 一个往Queue中写数据 一个从Queue中读数据
from multiprocessing import Process ,Queue
                            import os,time,random
                
                            #写数据进程执行的代码
                            def write(q):
                                for value in ['A','B','C']:
                                    print 'Put %s to queue ....' %value
                                    q.put(value)
                                    time.sleep(random.random())
                            #读数据进程执行的代码:
                            def read(q):
                                while True:
                                    value=q.get(True)
                                    print 'Get %s from queue'  %value
                    
                            if __name__=='__main__':
                                #父进程建立Queue 并传给各个子进程
                                q=Queue()
                                pw=Process(target=write,args=(q,))
                                pr=Process(target=read,args=(q,))
                                #启动子进程 pw
                                pw.start()
                                #启动子进程pr
                                pr.start()
                                #等待pw结束
                                pw.join()
                                #pr进程里是死循环 没法等待结束 只能强行终止
                                pr.terminate()
2:多线程:
                        一个进程至少有一个线程 Python的线程是真正的 Posix Thread 而不是模拟出来的线程
                        Python 的标准库提供了两个模块 thread和threading 。thread是低级模块 threading是高级模块 对thread进行了封装 绝大多数状况下 咱们只须要使用threading这个高级模块
                        启动一个线程就是把这个函数传入并建立thread实例 而后调用start()开始执行
 #-*-coding:utf-8-*-
import time,threading
#新线程执行的代码
def loop():
 print 'Thread %s is runing ..' % threading.current_thread().name
 n=0
 while n<5:
  n=n+1
  print 'thread %s >>> %s' % (threading.current_thread().name,n)
  time.sleep(1)
 print 'thread %s ended' % threading.current_thread().name
print 'thread %s is runing ' % threading.current_thread().name
t=threading.Thread(target=loop,name='LoopThread')
t.start()
t.join()
print 'thread %s ended:' % threading.current_thread().name
因为任何进程默认启动一个线程,这个线程就是主线程,MainTread 主线程又能够启动新的线程 Python的threading模块有个current_thread()函数 它返回
                        当前线程的实例。子线程的名字在建立的时候指定,名字仅仅在打印的时候显示 彻底没有其余的意义。
                            
                    lock:
                        多线程和多进程的最大的不一样在与,多进程中 同一个变量,各自有一份拷贝存在于每一个进程中,互不影响,而多线程中全部的变量都有线程共享,因此 任何一个变量均可以被任何一个
                        线程修改,所以 吸纳成之间共享的数据最大的危险在与多个线程同时修改一个变量 把内容该乱了
                        为了保证多个线程不能同时执行同一条语句咱们增长 了锁 的概念:
#-*-coding:utf-8-*-
import time,threading
#假定这是银行存款
balance=0
lock=threading.Lock()
def change_it(n):
 #先存后取 结果应该为0
 global balance
 print '%s balance = %d' %(threading.current_thread().name,balance)
 balance=balance+n
 print '%s balance = %d' %(threading.current_thread().name,balance)
 balance=balance-n
 print '%s balance = %d' %(threading.current_thread().name,balance)
def run_thread(n):
 for x in range(100000):
  lock.acquire()
  try:
   change_it(n)
  except Exception, e:
   raise e
  finally:
   lock.release()
t1=threading.Thread(target=run_thread,args=(5,))
t2=threading.Thread(target=run_thread,args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print balance
当多个线程执行到lock.acquire()时 只有一个线程可以成功的获取锁 而后继续执行代码 其余线程等待知道得到锁为止。
                        得到锁的线程用完后必定要释放锁 不然那些苦苦等待的进程将永远等待下去 成为死进程 
                        锁保证了某段关键代码只能有一个线程从头至尾的完整执行 坏处就是阻止 了多线程的并发 下降了效率 其次就是因为能够存在多个锁 不一样线程持有不一样的锁 可能致使死锁的发生。
 
                    多核CPU:
                        一个死循环会100%占用一个CPU 若是有两个死循环 在多核CPU中 能够监控到会占用200%的CPU 也就是占用两个CPU核心
                        要想把N个CPU的核心都爆满 就必须启动N个死循环线程
                        因为Python中代码有一个GIL锁 任何Python线程执行前 必须得到GIL锁 而后每执行100条字节码 解释器自动释放GIL锁 让别的线程又机会执行。这是历史遗留问题
                        所以 在Python中可使用多线程 可是不要期望可以有效的利用多核  能够经过多进程来实现多核任务。
                   
                4:ThreadLocal
                    在多线程的环境下 每一个线程都有本身的数据 一个线程使用本身的局部变量比使用全局变量好 由于局部变量只有本身可以看见 不会影响其余的线程而全局变量的修改必须加锁
                    可是局部变量也有问题 就是在函数调用的时候 传递起来麻烦
def process_student(name):
                        std =Student(name)
                        #std 是局部变量 可是每一个函数都要用它 所以必须穿进去
                        do_task_1(std)
                        do_task_2(std)
            
                    def do_task_1(std)
                        do_subtask_1(std)
                        do_subtask_2(std)
            
                    def do_task_2(std)
                        do_subtask_1(std)
                        do_subtask_2(std)
 这样一级一级传递极为麻烦
                    所以 ThreadLocal 就应运而生 
#-*-coding:utf-8-*-
import threading
#建立全局ThreadLocal对象:
local_school=threading.local()
def process_student():
 print 'Hello ,%s (in %s)' %(local_school.student,threading.current_thread().name)
def process_thread(name):
 #绑定threadlocal的student
 local_school.student=name
 process_student()
t1=threading.Thread(target=process_thread,args=('Alice',),name='Thread-A')
t2=threading.Thread(target=process_thread,args=('Bob',),name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
全局变量local_student 就是一个ThreadLocal 对象 每一个thread对他均可以读写student属性 可是相互之间不影响 
                            
                   
             5 分布式进程:
                    在thread和process中应当优先选择process 由于process跟稳定 并且process能够分布到多台机器上 而thread最多只能在一台机器的多个CPU上
                    Python的multiprocess模块不但支持多进程 其中。managers子模块还支持把多个进程分布到多台机器上 一个服务进程做为调度 将任务分布到其余的进程中 依靠网络通讯
                    因为managers 封装的很好 没必要了解网络通讯的细节 就能够很容易的编写分布式多进程程序
                    例如:  把发送任务的进程和处理任务的进程分布到两个机器上:
                    服务进程 启动Queue 把Queue 注册到网上 而后往   Queue里面写入任务:
 #taskmanager.py
                        import random,time,Queue
                        from multiprocessing.managers import BaseManager
            
                        #发送任务的队列
                        task_queue=Queue.Queue()
                        #接受结果的队列
                        result_queue=Queue.Queue()
                    
                        #从basemanager 继承queuemanager
                        class QueueManager(BaseManager):
                            pass
                
                        # 把两个Queue都注册到网上 callable参数关联了Queue对象:
                        QueueManager.register('get_task_queue',callable=lambda:task_queue)
                        QueueManager.register('get_result_queue,callable=lambda:result_queue')
            
                        #绑定端口50000 设置验证码 abc
                        manager=QueueManager(address=('',5000),authkey='abc')
                
                        #启动Queue
                        manager.start()
                        
                        #得到经过网络访问的Queue对象:
                        task=manager.get_task_queue()
                        result=manager.get_result_queue()
                        #放几个任务进去
                        for i in range(10):
                            n=random.randint(0,10000)
                            print('Put task %d' %n)
                            task.put(n)
                        #从result队列读取结果
                        print('Try get results....'):
                        for i in range(10):
                            r=result.get(timeout=10)
                            print('Result:%s' % r)
                        #关闭
                        manager.shutdown()

在另外一台计算机上启动:web

 #taskworker.py
                        import time,sys,Queue
                        from multiprocessing.managers import BaseManager
                        #建立相似的QueueManager
                        class QueueManager(BaseManager):
                            pass
            
                        #因为这个QueueManager 只从网上得到注册时只提供了名字
                        QueueManager.register('get_task_queue')
                        QueueManager.register('get_task_result')
                    
                        #连接到服务器 也就是运行taskmanager.py的机器
                        server_addr='127.0.0.1'
                        print('connect to server %s ..' % server_addr)
        
                        #端口验证码一致
                        m=QueueManager(address=(server_addr,5000),authkey='abc')
                        #从网络连接
                        m.connect()
                        #获取queue对象
                        task=m.get_task_queue()
                        result=m.get_task_result()
                        #从task队列中取出数据 并把结果放到result队列中
                        for i in range(10):
                            try:
                                n=task.get(timeout=1)
                                print('run task %d*%d' %(n,n))
                                r='%d*%d=%d' % (n,n,n*n)
                                time.sleep(1)
                                result.put(r)
                            except Queue.Empty:
                                print('task queue is empty')
                        #处理结束
                        print('work exit')

这样就能够将任务拆分 并将任务发送到几台 几十台机器上进行处理。数据库

2、memcachewindows

理解一些概念:
Memcache是一个自由和开放源代码、高性能、分配的内存对象缓存系统。用于加速动态web应用程序,减轻数据库负载。它能够应对任意多个链接,使用非阻塞的网络IO。因为它的工做机制是在内存中开辟一块空间,而后创建一个HashTable,Memcached自管理这 些HashTable。Memcached是简单而强大的。它简单的设计促进迅速部署,易于发展所面临的问题,解决了不少大型数据缓存。它的API可供最流行的语言。Memcache是该系统的项目名称,Memcached是该系统的主程序文件,以守护程序方式运行于一个或多个服务器中,随时接受客 户端的链接操做,使用共享内存存取数据。
Memcached最吸引人的一个特性就是支持分布式部署;也就是说能够在一群机器上创建一堆Memcached服务,每一个服务能够根据具体服务器的硬件配置使用不一样大小的内存块,这样一来,理论上能够创建一个无限巨大的基于内存的cache storage系统。缓存

1.分别把memcached和libevent下载回来,放到 /tmp 目录下
Memcache用到了libevent这个库用于Socket的处理,因此还须要安装libevent。服务器

cd /tmp
wget http://www.danga.com/memcached/dist/memcached-1.2.0.tar.gz
 wget http://www.monkey.org/~provos/libevent-1.2.tar.gz

先安装libevent网络

$ tar zxvf libevent-1.2.tar.gz
$ cd libevent-1.2
$ ./configure –prefix=/usr
$ make
$ make install

3.测试libevent是否安装成功多线程

$ ls -al /usr/lib | grep libevent
2
lrwxrwxrwx 1 root root 21 11?? 12 17:38 libevent-1.2.so.1 -> libevent-1.2.so.1.0.3
3
-rwxr-xr-x 1 root root 263546 11?? 12 17:38 libevent-1.2.so.1.0.3
4
-rw-r–r– 1 root root 454156 11?? 12 17:38 libevent.a
5
-rwxr-xr-x 1 root root 811 11?? 12 17:38 libevent.la
6
lrwxrwxrwx 1 root root 21 11?? 12 17:38 libevent.so -> libevent-1.2.so.1.0.3并发

还不错,都安装上了。app

4.安装memcached,同时须要安装中指定libevent的安装位置

1
$ cd /tmp
2
$ tar zxvf memcached-1.2.0.tar.gz
3
$ cd memcached-1.2.0
4
$ ./configure –with-libevent=/usr # 注意这个配置跟着libevent走
5
$ make
6
$ make install

若是中间出现报错,请仔细检查错误信息,按照错误信息来配置或者增长相应的库或者路径。
安装完成后会把memcached放到 /usr/local/bin/memcached ,
5.测试是否成功安装memcached

1
$ ls -al /usr/local/bin/mem*
2
-rwxr-xr-x 1 root root 137986 11?? 12 17:39 /usr/local/bin/memcached
3
-rwxr-xr-x 1 root root 140179 11?? 12 17:39 /usr/local/bin/memcached-debug

6 安装Python-memcached安装

$ sudo apt-get install python-memcache

memcached运行参数:

/usr/local/memcached/bin/memcached -d -m 128 -u root -l 192.168.0.97 -c 256 -P /tmp/memcached.pid

-d选项是启动一个守护进程,
-m是分配给Memcache使用的内存数量,单位是MB,我这里是10MB,
-u是运行Memcache的用户,我这里是root,
-l是监听的服务器IP地址,若是有多个地址的话,我这里指定了服务器的IP地址192.168.22.200(不指定为本机)
-p是设置Memcache监听的端口,我这里设置了12000,最好是1024以上的端口,
-c选项是最大运行的并发链接数,默认是1024,我这里设置了256,按照你服务器的负载量来设定,
-P是设置保存Memcache的pid文件,我这里是保存在 /tmp/memcached.pid,

python使用例子:

#!/usr/bin/env python
import memcache
mc = memcache.Client(['127.0.0.1:12000'],debug=0)
mc.set("foo","bar")
value = mc.get("foo")
print value

 

Python-memcached API总结整个memcache.py只有1241行,至关精简主要方法以下:@set(key,val,time=0,min_compress_len=0)无条件键值对的设置,其中的time用于设置超时,单位是秒,而min_compress_len则用于设置zlib压缩(注:zlib是提供数据压缩用的函式库)@set_multi(mapping,time=0,key_prefix=”,min_compress_len=0)设置多个键值对,key_prefix是key的前缀,完整的键名是key_prefix+key, 使用方法以下>>> mc.set_multi({‘k1′ : 1, ‘k2′ : 2}, key_prefix=’pfx_’) == []>>> mc.get_multi(['k1', 'k2', 'nonexist'], key_prefix=’pfx_’) == {‘k1′ : 1, ‘k2′ : 2}@add(key,val,time=0,min_compress_len=0)添加一个键值对,内部调用_set()方法@replace(key,val,time=0,min_compress_len=0)替换value,内部调用_set()方法@get(key)根据key去获取value,出错返回None@get_multi(keys,key_prefix=”)获取多个key的值,返回的是字典。keys为key的列表@delete(key,time=0)删除某个key。time的单位为秒,用于确保在特定时间内的set和update操做会失败。若是返回非0则表明成功@incr(key,delta=1)自增变量加上delta,默认加1,使用以下>>> mc.set(“counter”, “20″)>>> mc.incr(“counter”)@decr(key,delta=1)自减变量减去delta,默认减1