【摘要】 最近研究了一下抖音的爬虫,目前实现了热门话题和热门音乐下面全部相关视频的爬取,而且我已经将该爬虫打包成了一个 Python 库并发布,名称就叫作 douyin,利用该库可使用不到 10 行代码完成热门视频的下载、相关音乐的下载以及结构化信息的存储。本文就来详细介绍一下这个库的用法和一些核心逻辑实现。git
在开始介绍以前,咱们就先看看这个库能达到怎样的爬取效果吧,这里咱们想要爬取的部分是这这样的:github
这里是抖音搜索界面热门话题和热门音乐部分,每个话题或音乐都有着很是高的热度,并且每一个热门话题或音乐下面都是相关的抖音视频。mongodb
下面咱们要作的就是把全部热门话题和音乐下的相关视频都爬取到,而且将爬到的视频下载下来,同时还要把视频所配的音乐也单独下载下来,不只如此,全部视频的相关信息如发布人、点赞数、评论数、发布时间、发布人、发布地点等等信息都须要爬取下来,并存储到 MongoDB 数据库。数据库
听起来彷佛挺繁琐的是吧?其实有了 douyin 这个库,咱们不到 10 行代码就能够完成上面的任务了!其 GitHub 地址是:https://github.com/Python3WebSpider/DouYin。json
首先第一步咱们须要安装一下 douyin 库,命令以下:设计模式
pip3 install douyin
使用示例以下:api
import douyin from douyin.structures import Topic, Music # 定义视频下载、音频下载、MongoDB 存储的处理器 video_file_handler = douyin.handlers.VideoFileHandler(folder='./videos') music_file_handler = douyin.handlers.MusicFileHandler(folder='./musics') mongo_handler = douyin.handlers.MongoHandler() # 定义下载器,并将三个处理器当作参数传递 downloader = douyin.downloaders.VideoDownloader([mongo_handler, video_file_handler, music_file_handler]) # 循环爬取抖音热榜信息并下载存储 for result in douyin.hot.trend(): for item in result.data: # 爬取热门话题和热门音乐下面的全部视频,每一个话题或音乐最多爬取 100 个相关视频。 downloader.download(item.videos(max=100))
好,这样就完成了,运行这段代码,便可以完成热门话题、热门音乐下面全部视频和音乐的爬取,并将相关信息存储到 MongoDB 数据库。网络
另外值得注意的是,在运行这段代码以前首先须要安装好 MongoDB 数据库并成功开启服务,这样才能确保代码能够正常链接数据库并把数据成功存储。session
咱们看下运行效果:数据结构
运行截图以下:
在这里咱们能够看到视频被成功存储到了 MongoDB 数据库,而且执行了下载,将视频存储到了本地(音频的的存储没有显示)。
最后咱们看下爬取结果是怎样的,下面是爬取到的音频、视频和视频相关信息:
能够看到视频配的音乐被存储成了 mp3 格式的文件,抖音视频存储成了 mp4 文件,另外视频相关信息如视频描述、做者、音乐、点赞数、评论数等等的信息都已经存储到了 MongoDB 数据库,另外里面还包括了爬取时间、视频连接、分辨率等等额外的信息。
对!就是这么简单,经过这几行代码,咱们就获得了如上的三部分结果,而这只须要安装 douyin 这个库便可实现。
下面咱们来剖析一下这个库的关键技术部分的实现,代码的地址是在:https://github.com/Python3WebSpider/DouYin,在此以前你们能够先将代码下载下来大致浏览一下。
本库依赖的其余库有:
aiohttp:利用它能够完成异步数据下载,加快下载速度。
dateparser:利用它能够完成任意格式日期的转化。
motor:利用它能够完成异步 MongoDB 存储,加快存储速度。
requests:利用它能够完成最基本的 HTTP 请求模拟。
tqdm:利用它能够进行进度条的展现。
下面我就几个部分的关键实现对库的实现进行代码说明。
若是要作一个库的话,一个很重要的点就是对一些关键的信息进行结构化的定义,使用面向对象的思惟对某些对象进行封装,抖音的爬取也不例外。
在抖音中,其实有不少种对象,好比视频、音乐、话题、用户、评论等等,它们之间经过某种关系联系在一块儿,例如视频中使用了某个配乐,那么视频和音乐就存在使用关系;好比用户发布了视频,那么用户和视频就存在发布关系,咱们可使用面向对象的思惟对每一个对象进行封装,好比视频的话,就能够定义成以下结构:
class Video(Base): def __init__(self, **kwargs): """ init video object :param kwargs: """ super().__init__() self.id = kwargs.get('id') self.desc = kwargs.get('desc') self.author = kwargs.get('author') self.music = kwargs.get('music') self.like_count = kwargs.get('like_count') self.comment_count = kwargs.get('comment_count') self.share_count = kwargs.get('share_count') self.hot_count = kwargs.get('hot_count') ... self.address = kwargs.get('address') def __repr__(self): """ video to str :return: str """ return '<Video: <%s, %s>>' % (self.id, self.desc[:10].strip() if self.desc else None)
这里将一些关键的属性定义成 Video 类的一部分,包括 id 索引、desc 描述、author 发布人、music 配乐等等,其中 author 和 music 并非简单的字符串的形式,它也是单独定义的数据结构,好比 author 就是 User 类型的对象,而 User 的定义又是以下结构:
class User(Base): def __init__(self, **kwargs): """ init user object :param kwargs: """ super().__init__() self.id = kwargs.get('id') self.gender = kwargs.get('gender') self.name = kwargs.get('name') self.create_time = kwargs.get('create_time') self.birthday = kwargs.get('birthday') ... def __repr__(self): """ user to str :return: """ return '<User: <%s, %s>>' % (self.alias, self.name) 因此说,经过属性之间的关联,咱们就能够将不一样的对象关联起来,这样显得逻辑架构清晰,并且咱们也不用一个个单独维护字典来存储了,其实这就和 Scrapy 里面的 Item 的定义是相似的。 请求和重试 实现爬取的过程就没必要多说了,这里面其实用到的就是最简单的抓包技巧,使用 Charles 直接进行抓包便可。抓包以后即可以观察到对应的接口请求,而后进行模拟便可。 因此问题就来了,难道我要一个接口写一个请求方法吗?另外还要配置 Headers、超时时间等等的内容,那岂不是太费劲了,因此,咱们能够将请求的方法进行单独的封装,这里我定义了一个 fetch 方法: def _fetch(url, **kwargs): """ fetch api response :param url: fetch url :param kwargs: other requests params :return: json of response """ response = requests.get(url, **kwargs) if response.status_code != 200: raise requests.ConnectireplaceString('Expected status code 200, but got {}'.format(response.status_code)) return response.json()这个方法留了一个必要参数,即 url,另外其余的配置我留成了 kwargs,也就是能够任意传递,传递以后,它会依次传递给 requests 的请求方法,而后这里还作了异常处理,若是成功请求,便可返回正常的请求结果。
定义了这个方法,在其余的调用方法里面咱们只须要单独调用这个 fetch 方法便可,而不须要再去关心异常处理,返回类型了。
好,那么定义好了请求以后,若是出现了请求失败怎么办呢?按照常规的方法,咱们可能就会在外面套一层方法,而后记录调用 fetch 方法请求失败的次数,而后从新调用 fetch 方法进行重试,但这里能够告诉你们一个更好用的库,叫作 retrying,使用它咱们能够经过定义一个装饰器来完成重试的操做。
好比我可使用 retry 装饰器这么装饰 fetch 方法:
from retrying import retry @retry(stop_max_attempt_number=retry_max_number, wait_random_min=retry_min_random_wait, wait_random_max=retry_max_random_wait, retry_on_exception=need_retry) def _fetch(url, **kwargs): pass 这里使用了装饰器的四个参数: stop_max_attempt_number:最大重试次数,若是重试次数达到该次数则放弃重试。 wait_random_min:下次重试以前随机等待时间的最小值。 wait_random_max:下次重试以前随机等待时间的最大值。 retry_on_exception:判断出现了怎样的异常才重试。 这里 retry_on_exception 参数指定了一个方法,叫作 need_retry,方法定义以下: def need_retry(exception): """ need to retry :param exception: :return: """ result = isinstance(exception, (requests.ConnectireplaceString, requests.ReadTimeout)) if result: print('Exception', type(exception), 'occurred, retrying...') return result
这里判断了若是是 requests 的 ConnectireplaceString 和 ReadTimeout 异常的话,就会抛出异常进行重试,不然不予重试。
因此,这样咱们就实现了请求的封装和自动重试,是否是很是 Pythonic?
为了下载视频,咱们须要设计一个下载处理器来下载已经爬取到的视频连接,因此下载处理器的输入就是一批批的视频连接,下载器接收到这些连接,会将其进行下载处理,并将视频存储到对应的位置,另外也能够完成一些信息存储操做。
在设计时,下载处理器的要求有两个,一个是保证高速的下载,另外一个就是可扩展性要强,下面咱们分别来针对这两个特色进行设计:
高速下载,为了实现高速的下载,要么可使用多线程或多进程,要么能够用异步下载,很明显,后者是更有优点的。
扩展性强,下载处理器要能下载音频、视频,另外还能够支持数据库等存储,因此为了解耦合,咱们能够将视频下载、音频下载、数据库存储的功能独立出来,下载处理器只负责视频连接的主要逻辑处理和分配便可。
为了实现高速下载,这里咱们可使用 aiohttp 库来完成,另外异步下载咱们也不能一会儿下载太多,否则网络波动太大,因此咱们能够设置 batch 式下载,能够避免同时大量的请求和网络拥塞,主要的下载函数以下:
def download(self, inputs): """ download video or video lists :param data: :return: """ if isinstance(inputs, types.GeneratorType): temps = [] for result in inputs: print('Processing', result, '...') temps.append(result) if len(temps) == self.batch: self.process_items(temps) temps = [] else: inputs = inputs if isinstance(inputs, list) else [inputs] self.process_items(inputs) 这个 download 方法设计了多种数据接收类型,能够接收一个生成器,也能够接收单个或列表形式的视频对象数据,接着调用了 process_items 方法进行了异步下载,其方法实现以下: def process_items(self, objs): """ process items :param objs: objs :return: """ # define progress bar with tqdm(total=len(objs)) as self.bar: # init event loop loop = asyncio.get_event_loop() # get num of batches total_step = int(math.ceil(len(objs) / self.batch)) # for every batch for step in range(total_step): start, end = step * self.batch, (step + 1) * self.batch print('Processing %d-%d of files' % (start + 1, end)) # get batch of objs objs_batch = objs[start: end] # define tasks and run loop tasks = [asyncio.ensure_future(self.process_item(obj)) for obj in objs_batch] for task in tasks: task.add_done_callback(self.update_progress) loop.run_until_complete(asyncio.wait(tasks)) 这里使用了 asyncio 实现了异步处理,并经过对视频连接进行分批处理保证了流量的稳定性,另外还使用了 tqdm 实现了进度条的显示。 咱们能够看到,真正的处理下载的方法是 process_item,这里面会调用视频下载、音频下载、数据库存储的一些组件来完成处理,因为咱们使用了 asyncio 进行了异步处理,因此 process_item 也须要是一个支持异步处理的方法,定义以下: async def process_item(self, obj): """ process item :param obj: single obj :return: """ if isinstance(obj, Video): print('Processing', obj, '...') for handler in self.handlers: if isinstance(handler, Handler): await handler.process(obj)
这里咱们能够看到,真正的处理逻辑都在一个个 handler 里面,咱们将每一个单独的功能进行了抽离,定义成了一个个 Handler,这样能够实现良好的解耦合,若是咱们要增长和关闭某些功能,只须要配置不一样的 Handler 便可,而不须要去改动代码,这也是设计模式的一个解耦思想,相似工厂模式。
刚才咱们讲了,Handler 就负责一个个具体功能的实现,好比视频下载、音频下载、数据存储等等,因此咱们能够将它们定义成不一样的 Handler,而视频下载、音频下载又都是文件下载,因此又能够利用继承的思想设计一个文件下载的 Handler,定义以下:
from os.path import join, exists from os import makedirs from douyin.handlers import Handler from douyin.utils.type import mime_to_ext import aiohttp class FileHandler(Handler): def __init__(self, folder): """ init save folder :param folder: """ super().__init__() self.folder = folder if not exists(self.folder): makedirs(self.folder) async def _process(self, obj, **kwargs): """ download to file :param url: resource url :param name: save name :param kwargs: :return: """ print('Downloading', obj, '...') kwargs.update({'ssl': False}) kwargs.update({'timeout': 10}) async with aiohttp.ClientSession() as session: async with session.get(obj.play_url, **kwargs) as response: if response.status == 200: extension = mime_to_ext(response.headers.get('Content-Type')) full_path = join(self.folder, '%s.%s' % (obj.id, extension)) with open(full_path, 'wb') as f: f.write(await response.content.read()) print('Downloaded file to', full_path) else: print('Cannot download %s, response status %s' % (obj.id, response.status)) async def process(self, obj, **kwargs): """ process obj :param obj: :param kwargs: :return: """ return await self._process(obj, **kwargs)
这里咱们仍是使用了 aiohttp,由于在下载处理器中须要 Handler 支持异步操做,这里下载的时候就是直接请求了文件连接,而后判断了文件的类型,并完成了文件保存。
视频下载的 Handler 只须要继承当前的 FileHandler 便可:
from douyin.handlers import FileHandler from douyin.structures import Video class VideoFileHandler(FileHandler): async def process(self, obj, **kwargs): """ process video obj :param obj: :param kwargs: :return: """ if isinstance(obj, Video): return await self._process(obj, **kwargs)
这里其实就是加了类别判断,确保数据类型的一致性,固然音频下载也是同样的。
上面介绍了视频和音频处理的 Handler,另外还有一个存储的 Handler 没有介绍,那就是 MongoDB 存储,日常咱们可能习惯使用 PyMongo 来完成存储,但这里咱们为了加速,须要支持异步操做,因此这里有一个能够实现异步 MongoDB 存储的库,叫作 Motor,其实使用的方法差不太多,MongoDB 的链接对象再也不是 PyMongo 的 MongoClient 了,而是 Motor 的 AsyncIOMotorClient,其余的配置基本相似。
在存储时使用的是 update_one 方法并开启了 upsert 参数,这样能够作到存在即更新,不存在即插入的功能,保证数据的不重复性。
整个 MongoDB 存储的 Handler 定义以下:
from douyin.handlers import Handler from motor.motor_asyncio import AsyncIOMotorClient from douyin.structures import * class MongoHandler(Handler): def __init__(self, conn_uri=None, db='douyin'): """ init save folder :param folder: """ super().__init__() if not conn_uri: conn_uri = 'localhost' self.client = AsyncIOMotorClient(conn_uri) self.db = self.client[db] async def process(self, obj, **kwargs): """ download to file :param url: resource url :param name: save name :param kwargs: :return: """ collection_name = 'default' if isinstance(obj, Video): collection_name = 'videos' elif isinstance(obj, Music): collection_name = 'musics' collection = self.db[collection_name] # save to mongodb print('Saving', obj, 'to mongodb...') if await collection.update_one({'id': obj.id}, {'$set': obj.json()}, upsert=True): print('Saved', obj, 'to mongodb successfully') else: print('Error occurred while saving', obj)
能够看到咱们在类中定义了 AsyncIOMotorClient 对象,并暴露了 conn_uri 链接字符串和 db 数据库名称,能够在声明 MongoHandler 类的时候指定 MongoDB 的连接地址和数据库名。
一样的 process 方法,这里使用 await 修饰了 update_one 方法,完成了异步 MongoDB 存储。
好,以上即是 douyin 库的全部的关键部分介绍,这部份内容能够帮助你们理解这个库的核心部分实现,另外可能对设计模式、面向对象思惟以及一些实用库的使用有必定的帮助。
本文介绍了一个能够用来爬取抖音热门视频的 Python 库,并介绍了该库的基本用法和核心部分实现,但愿对你们有所帮助。
本抖音库的 GitHub 地址是:https://github.com/Python3WebSpider/DouYin,若是你对你有帮助,还请赐予一个 Star!很是感谢!