生产者消费者模型下的爬虫|Web Crawler with Producer-consumer Pattern

2019 年 7 月 6 日 星期六(已编辑)
/ ,
25
这篇文章上次修改于 2024 年 6 月 1 日 星期六,可能部分内容已经不适用,如有疑问可询问作者。

生产者消费者模型下的爬虫|Web Crawler with Producer-consumer Pattern

提升爬虫效率

提升爬虫的抓取效率无非优化爬虫与服务器的链接速度和效率。最为简单的方式就是“加钱”,使用更大带宽的网络,和更高的并发。

所以很多情况下就可以使用多线程或者分布式的爬虫方案。然而在使用这些模式的爬虫情况下可能会出现另一些影响爬虫效率的因素:入库。

爬虫数据入库

单机情况下爬到的东西一般直接存到本地就行了,直接扔到一个文件内或者数据库内就行。

然而分布式情况下如果爬虫生产的数据存入统一的数据库,每次的存取就会进行一次事务,如果出现失败等情况还需要在爬虫中对这些错误进行处理,不然爬虫就会直接死掉。

爬虫与数据库之间的链接状况也会影响当前的爬虫进程,爬虫无法在数据库入库之前进行下一次抓取。

肯定有人会说:“先全部存到本地就好了啊,爬完再统一存进去。”确实这是一个减轻代码量的“好”方法。但是如果是一个长期项目,整理数据就不是那么的轻松了。

因此选择一个更生产者消费者模式能够更好的提高爬虫的效率。

生产者消费者

生产者消费者就是一方不断的生产,而另一方不断地对产品进行消化,这就是这个模式的行为模式。

因此在这个环境下我们需要:

  • 生产者
  • 超市
  • 消费者

生产者爬虫将爬来的数据进行初步处理(也可以不处理)存入超市(缓冲层),最后再由其他消费者将数据处理后存入数据库。

设计爬虫模式

将整个系统分为4个部分,为爬虫、缓冲、入库、数据库。

爬虫端

因为目标网页并不是很复杂,所以我直接requests一把梭。因为我的机器不是很多,所以我选择使用threading模块多几个线程一起爬。

#部分代码
class Crawler(threading.Thread):
    ···
    def run(self):
        #无限循环保持运行
        while True:
            try:
                self.html = requests.get(self.url).text
            except:#网页抓取异常
                print('fetch failed :',self.id)
                continue
            #解析为json(字典)对象
            self.body=json.loads(self.html)
            if self.body['code']==0:
                #存入缓冲层
                self.redis_queue.put(self.html)
                print('fetch success code 0:',self.id)
            else:
                print('fetch success code ',self.body['code'],':',self.id)
        ···

将我的crawler去除复杂的部分后主体就是这样的情况,获取目标网页地址使用requests模块获取数据,根据自己的需求定制这部分内容。成果获取到数据后存入redis的queue中,一次爬取就这么完成了。

#启动线程代码
managers = []
for i in range(10):
    managers.append(manager())
for i in managers:
    i.start()

缓冲层

为了能满足高并发读写的情况,我在此使用了redis,当然也可以选用更高级的消息队列中间件。

读写缓冲层(redis)

python中有redis的库就不需要花太多心思放在这上面了。redis内置list类型所以我们可以自己对redis封装一个Queue层。

class RedisQueue(object):
    def __init__(self, name, namespace='queue', **kwargs):
       self.db= redis.Redis(**kwargs)
       self.key = '%s_%s' %(namespace, name)
    #返回队列里面list内元素的数量
    def qsize(self):
        return self.db.llen(self.key)  
    #添加新元素到队列最右方
    def put(self, item):
        self.db.rpush(self.key, item) 
    #返回队列第一个元素
    def get(self, timeout=None):
        item = self.db.blpop(self.key, timeout=timeout)
        return item

由于redis的性能,即使在配置低的机器上也能跑出不错的效果。

入库层

入库层就不需要那么多的进程了,根据自己的数据量选择,入库操作最大的延迟就存在与数据库的连接上,所以可以选择与数据库链接状态更好的机器去做入库操作。在此层我们就可以从缓冲层上获取爬好的数据进行处理。

具体的入库代码就不展示了,这部分需要自己对数据进行处理。不过还是NOSQL好用,拼装成json直接扔进去就好。

性能

爬虫层由于太穷导致机器和目标网站之间链接并不好,使用6台左右vps*5线程一起爬,最高的并发也才200page/s左右,处理后有效数据的缓冲层并发大约在50page/s,单个页面产生的数据大约在2kB,总体向缓冲层写入的速度在100kB/s,不需要担心redis会被撑爆,有一次入库层的代码没运行,redis里存了大约40w条数据没有处理,redis内存占用约250MB,缓冲层性能并未受到影响。

入库层我直接放在缓冲层的机器上运行,并不需要太多的线程去写入数据库,线程太多反而会出现并发问题,我的环境大约爬虫:入库为6:1的情况就已经能保持缓冲层数据不再增长。

  • Loading...
  • Loading...
  • Loading...
  • Loading...
  • Loading...