【python 多线程】下载文件分批压缩

大致作的功能为:json

     1.调用api接口,获取json数据;2.json 数据转换为一个csv文件;3.csv文件里的每行数据转换成单个xml文件;4.每5个xml文件进行打包api

用到的模块为:安全

       csv,xml,threading,tarfile,queue多线程

       守护线程app

       

备忘录:测试

       多线程之间通讯,能够用一个安全的队列queueurl

       多线程之间线程等待(wait)与通知(set)线程

       用死循环进行等待通知,跳出死循环,线程为守护线程,给个flag ,而后break设计

'''
Created on 2018年10月4日

@author: 徐良俊


****线程都须要交给cup 随机执行,意思就是线程所有启动
线程间通讯或线程间通知须要人为控制 ****

设计模块:
    csv
    xml.etree.ElementTree
    threading
    queue
    


'''
import requests,logging,csv,time
from functools import wraps
from threading import Thread,Event
from xml.etree.ElementTree import ElementTree,tostring,Element

from queue import Queue

import tarfile
import os

class DownloadThread(Thread):
    '''下载线程
    IO密集性操做
    
    
    "http://hq.sinajs.cn/list=%s"  得到数据格式为:
    var hq_str_sh601015="陕西黑猫,6.400,6.420,6.440,6.470,6.390,6.440,6.450,3377008,21697080.000,53120,6.440,91300,6.430,92750,6.420,131000,6.410,131800,6.400,49600,6.450,25200,6.460,15700,6.470,11700,6.480,14200,6.490,2018-09-28,15:00:00,00";
    '''
    def __init__(self,sid,queue):
        Thread.__init__(self)
        self.sid=sid
        self.url="http://hq.sinajs.cn/list=%s" % sid
        self.queue = queue
        

    def download(self,url):
        response=requests.get(url,timeout=3000)
        response.encoding="GBK"
        if response.ok:
            content=response.text 
            maincontent_list = content[content.find("\"")+1:content.rfind("\"")].split(",")
#             info="股票名称:%s;今日开盘价:%s;昨日收盘价:%s;日期:%s" % (maincontent_list[0],maincontent_list[1],maincontent_list[2],maincontent_list[30]+" "+maincontent_list[31],)
#             logging.warn(info)
        return maincontent_list
    
    
    def run(self):
        data=self.download(self.url)
        self.queue.put((self.sid,data))
        
        
        
        
        
        
class ConvertCSVThread(Thread):

    def __init__(self,queue,csvEvent):
        Thread.__init__(self)     
        self.queue=queue   
        self.csvEvent=csvEvent
        
    def dataToCsv(self,writer):
        
        while True:
            csvdata = []
            sid,data=self.queue.get()
            if sid == -1:
                break
            if data and data[0] != '' :
                csvdata.append((data[0],data[1],data[2],data[30],))
                writer.writerows(csvdata)    
        
        
    def run(self):
        
        csvFile = open( "fundnav.csv" , "w",encoding='utf-8-sig')
        writer = csv.writer(csvFile,lineterminator='\n')
        csvdata = []
        csvdata.append(("基金名称","开盘价","收盘价","日期",))
        writer.writerows(csvdata)    
        
        self.dataToCsv(writer)
        csvFile.close()
         
        self.csvEvent.set() #告诉其余线程,文件已经建立好了,你能够作事了
        

class ConvertXMLThread(Thread):
    ''' 
    把csv每行文件转变为单个xml  
        
    '''
    
    
    def __init__(self,cEvent,tEvent,csvEvent):
        Thread.__init__(self)     
        self.cEvent=cEvent
        self.tEvent=tEvent  
        self.csvEvent=csvEvent  
            
    def csvToXml(self,scsv):
        with open(scsv,'r',encoding='utf-8-sig') as f:
            reader=csv.reader(f)
            headers=next(reader)
            headers=list(map(lambda h:h.replace(' ',''),headers))
            
            
            readerlist=list(reader)
            
            index=0  #记录5个一组
            sycount=len(readerlist) #总数
            itindex=0  #seek
            
            
            for row in readerlist:
                index+=1
                itindex+=1
                
                root = Element('Data')
                eRow = Element('Row')
                for tag,text in zip(headers,row):
                    e=Element(tag)
                    e.text=text
                    eRow.append(e)
                root.append(eRow)
                
#                 print(tostring(root,encoding='unicode',method ='xml' ) )
                 
                et=ElementTree(root)
                et.write("tarxml/%s.xml" % index,"utf-8")
                
                
                if(sycount-itindex) < 5 and sycount==itindex: #处理最后不足5个
                    self.cEvent.set() #通知其余线程,转换完毕,你能够作事了
                    self.tEvent.wait() #阻塞
                    break
                elif index == 5: #每5个打一个包
                    self.cEvent.set()   #通知其余线程,转换完毕,你能够作事了
                    self.tEvent.wait() #阻塞  
                    self.tEvent.clear()
                    index = 0
        
    def run(self):
        while True:
            self.csvEvent.wait() 
            self.csvToXml("fundnav.csv")
            break
 

class TARThread(Thread):
    '''此线程为守护线程,主线程结束,此线程跟着结束'''
    def __init__(self,cEvent,tEvent):
        Thread.__init__(self)
        self.count=0 
        self.cEvent=cEvent
        self.tEvent=tEvent  
        self.setDaemon(True) #守护线程,主线程结束,此线程跟着结束
            
    def tarXML(self):
        self.count+=1
        tfname='%d.tgz' % self.count
        tf = tarfile.open( tfname ,'w:gz')
        for fname in os.listdir('./tarxml'):
            if fname.endswith('.xml'):
                tf.add('./tarxml/%s' % fname)
                os.remove('./tarxml/%s' % fname)
        tf.close()
         
        if not tf.members:
            os.remove(tfname) 
        
    def run(self):
        while True:
            self.cEvent.wait() #阻塞,不继续执行,等待通知(set)
            self.tarXML() 
            self.cEvent.clear() 
            
            self.tEvent.set() 
 

  

def handle():
    '''测试
    
    多个生产者:IO生产者
一个消费者:CPU密集消费者,  只要 queque 有数据就处理 
线程之间交互经过 queque 对象
    
    '''      
    start=time.time()       
    
    queue=Queue()
    codes=['sh601006','sh601005','sh601003','sh601002','sh601001','sh601007','sh601008',
           'sh601009','sh601010','sh601011','sh601012','sh601013','sh601014','sh601015','sh601016','sh601017',
           'sh601018','sh601019','sh601020','sh601021','sh601022','sh601023','sh601024','sh601025','sh601026']       
    
    csvEvent=Event()    #控制 把csv 文件转变成  成多个xml 文件时,确保csv文件以及存在 
    ct=ConvertCSVThread(queue,csvEvent)    
    dts=[DownloadThread(codes[i],queue) for i in range(len(codes))]
     
    #线程都启动起来,等待cup 调用 
    for t in dts:
        t.start()
    ct.start()
    
    cEvent=Event()
    tEvent=Event()
    cxml=ConvertXMLThread(cEvent,tEvent,csvEvent)
    tart=TARThread(cEvent,tEvent)   
    cxml.start()     
    tart.start()
     
    #只有在t线程执行完,才能执行主线程 
    for t in dts:
        t.join()
      
    queue.put((-1,None)) 
     
    #只有在ct线程执行完,才能执行主线程 
    ct.join() 
     
    print("耗时 %s" % (time.time() - start) )



handle()