Scrapy分布式原理
队列用什么维护
首先想到的可能是一些特定数据结构, 数据库, 文件等等.
怎样来去重
保证Request队列每个request都是唯一的.
怎样防止中断
怎样实现该架构
源码分析
获取源码
首先我们可以把源码 Clone 下来,执行如下命令:
git clone https://github.com/rmax/scrapy-redis.git
核心源码在 scrapy-redis/src/scrapy_redis
目录下。
爬取队列
首先我们从爬取队列入手,看下它的具体实现,源码文件为 queue.py,在这里它有三个队列的实现,首先它实现了一个父类 Base,提供一些基本方法和属性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
class Base(object): """Per-spider base queue class""" def __init__(self, server, spider, key, serializer=None): if serializer is None: serializer = picklecompat if not hasattr(serializer, 'loads'): raise TypeError("serializer does not implement 'loads' function: %r" % serializer) if not hasattr(serializer, 'dumps'): raise TypeError("serializer '%s' does not implement 'dumps' function: %r" % serializer) self.server = server self.spider = spider self.key = key % {'spider': spider.name} self.serializer = serializer def _encode_request(self, request): obj = request_to_dict(request, self.spider) return self.serializer.dumps(obj) def _decode_request(self, encoded_request): obj = self.serializer.loads(encoded_request) return request_from_dict(obj, self.spider) def __len__(self): """Return the length of the queue""" raise NotImplementedError def push(self, request): """Push a request""" raise NotImplementedError def pop(self, timeout=0): """Pop a request""" raise NotImplementedError def clear(self): """Clear queue/stack""" self.server.delete(self.key) |
首先看一下 _encode_request() 和 _decode_request() 方法,因为我们需要把一 个Request 对象存储到数据库中,但数据库无法直接存储对象,所以需要将 Request 序列化转成字符串再存储,而这两个方法就分别是序列化和反序列化的操作,利用 pickle 库来实现,一般在调用 push() 将 Request 存入数据库时会调用 _encode_request() 方法进行序列化,在调用 pop() 取出 Request 的时候会调用 _decode_request() 进行反序列化。
在父类中 len()、push() 和 pop() 方法都是未实现的,会直接抛出 NotImplementedError,因此这个类是不能直接被使用的,所以必须要实现一个子类来重写这三个方法,而不同的子类就会有不同的实现,也就有着不同的功能。
那么接下来就需要定义一些子类来继承 Base 类,并重写这几个方法,那在源码中就有三个子类的实现,它们分别是 FifoQueue、PriorityQueue、LifoQueue,我们分别来看下它们的实现原理。
首先是 FifoQueue:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
class FifoQueue(Base): """Per-spider FIFO queue""" def __len__(self): """Return the length of the queue""" return self.server.llen(self.key) def push(self, request): """Push a request""" self.server.lpush(self.key, self._encode_request(request)) def pop(self, timeout=0): """Pop a request""" if timeout > 0: data = self.server.brpop(self.key, timeout) if isinstance(data, tuple): data = data[1] else: data = self.server.rpop(self.key) if data: return self._decode_request(data) |
可以看到这个类继承了Base类,并重写了 len()、push()、pop() 这三个方法,在这三个方法中都是对 server 对象的操作,而 server 对象就是一个 Redis 连接对象,我们可以直接调用其操作 Redis 的方法对数据库进行操作,可以看到这里的操作方法有 llen()、lpush()、rpop() 等,那这就代表此爬取队列是使用的 Redis的列表,序列化后的 Request 会被存入列表中,就是列表的其中一个元素,len() 方法是获取列表的长度,push() 方法中调用了 lpush() 操作,这代表从列表左侧存入数据,pop() 方法中调用了 rpop() 操作,这代表从列表右侧取出数据。
所以 Request 在列表中的存取顺序是左侧进、右侧出,所以这是有序的进出,即先进先出,英文叫做 First Input First Output,也被简称作 Fifo,而此类的名称就叫做FifoQueue。
另外还有一个与之相反的实现类,叫做 LifoQueue,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
class LifoQueue(Base): """Per-spider LIFO queue.""" def __len__(self): """Return the length of the stack""" return self.server.llen(self.key) def push(self, request): """Push a request""" self.server.lpush(self.key, self._encode_request(request)) def pop(self, timeout=0): """Pop a request""" if timeout > 0: data = self.server.blpop(self.key, timeout) if isinstance(data, tuple): data = data[1] else: data = self.server.lpop(self.key) if data: return self._decode_request(data) |
与 FifoQueue 不同的就是它的 pop() 方法,在这里使用的是 lpop() 操作,也就是从左侧出,而 push() 方法依然是使用的 lpush() 操作,是从左侧入。那么这样达到的效果就是先进后出、后进先出,英文叫做 Last In First Out,简称为 Lifo,而此类名称就叫做 LifoQueue。同时这个存取方式类似栈的操作,所以其实也可以称作 StackQueue。
另外在源码中还有一个子类实现,叫做 PriorityQueue,顾名思义,它叫做优先级队列,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
class PriorityQueue(Base): """Per-spider priority queue abstraction using redis' sorted set""" def __len__(self): """Return the length of the queue""" return self.server.zcard(self.key) def push(self, request): """Push a request""" data = self._encode_request(request) score = -request.priority self.server.execute_command('ZADD', self.key, score, data) def pop(self, timeout=0): """ Pop a request timeout not support in this queue class """ pipe = self.server.pipeline() pipe.multi() pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0) results, count = pipe.execute() if results: return self._decode_request(results[0]) |
在这里我们可以看到 len()、push()、pop() 方法中使用了 server 对象的 zcard()、zadd()、zrange() 操作,可以知道这里使用的存储结果是有序集合 Sorted Set,在这个集合中每个元素都可以设置一个分数,那么这个分数就代表优先级。
在 len() 方法里调用了 zcard() 操作,返回的就是有序集合的大小,也就是爬取队列的长度,在 push() 方法中调用了 zadd() 操作,就是向集合中添加元素,这里的分数指定成 Request 的优先级的相反数,因为分数低的会排在集合的前面,所以这里高优先级的 Request 就会存在集合的最前面。pop() 方法是首先调用了 zrange() 操作取出了集合的第一个元素,因为最高优先级的 Request 会存在集合最前面,所以第一个元素就是最高优先级的 Request,然后再调用 zremrangebyrank() 操作将这个元素删除,这样就完成了取出并删除的操作。
此队列是默认使用的队列,也就是爬取队列默认是使用有序集合来存储的。
去重过滤
我们在前面说过 Scrapy 中的去重实现就是利用集合这个数据结构,但是在 Scrapy 分布式中去重就需要利用一个共享的集合了,那么在这里使用的就是 Redis 中的集合数据结构,我们来看下它的去重类是怎样实现的,源码文件是 dupefilter.py,其内实现了一个 RFPDupeFilter 类,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
class RFPDupeFilter(BaseDupeFilter): """Redis-based request duplicates filter. This class can also be used with default Scrapy's scheduler. """ logger = logger def __init__(self, server, key, debug=False): """Initialize the duplicates filter. Parameters ---------- server : redis.StrictRedis The redis server instance. key : str Redis key Where to store fingerprints. debug : bool, optional Whether to log filtered requests. """ self.server = server self.key = key self.debug = debug self.logdupes = True @classmethod def from_settings(cls, settings): """Returns an instance from given settings. This uses by default the key ``dupefilter:<timestamp>``. When using the ``scrapy_redis.scheduler.Scheduler`` class, this method is not used as it needs to pass the spider name in the key. Parameters ---------- settings : scrapy.settings.Settings Returns ------- RFPDupeFilter A RFPDupeFilter instance. """ server = get_redis_from_settings(settings) key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())} debug = settings.getbool('DUPEFILTER_DEBUG') return cls(server, key=key, debug=debug) @classmethod def from_crawler(cls, crawler): """Returns instance from crawler. Parameters ---------- crawler : scrapy.crawler.Crawler Returns ------- RFPDupeFilter Instance of RFPDupeFilter. """ return cls.from_settings(crawler.settings) def request_seen(self, request): """Returns True if request was already seen. Parameters ---------- request : scrapy.http.Request Returns ------- bool """ fp = self.request_fingerprint(request) added = self.server.sadd(self.key, fp) return added == 0 def request_fingerprint(self, request): """Returns a fingerprint for a given request. Parameters ---------- request : scrapy.http.Request Returns ------- str """ return request_fingerprint(request) def close(self, reason=''): """Delete data on close. Called by Scrapy's scheduler. Parameters ---------- reason : str, optional """ self.clear() def clear(self): """Clears fingerprints data.""" self.server.delete(self.key) def log(self, request, spider): """Logs given request. Parameters ---------- request : scrapy.http.Request spider : scrapy.spiders.Spider """ if self.debug: msg = "Filtered duplicate request: %(request)s" self.logger.debug(msg, {'request': request}, extra={'spider': spider}) elif self.logdupes: msg = ("Filtered duplicate request %(request)s" " - no more duplicates will be shown" " (see DUPEFILTER_DEBUG to show all duplicates)") self.logger.debug(msg, {'request': request}, extra={'spider': spider}) self.logdupes = False |
在这里我们注意到同样实现了一个 request_seen() 方法,和 Scrapy 中的 request_seen() 方法实现极其类似,不过在这里集合使用的是 server 对象的 sadd() 操作,也就是集合不再是简单的一个简单数据结构了,在这里直接换成了数据库的存储方式。
鉴别重复的方式还是使用指纹,而指纹的获取同样是使用 request_fingerprint() 方法完成的。获取指纹之后就直接尝试向集合中添加这个指纹,如果添加成功,那么就代表这个指纹原本不存在于集合中,返回值就是 1,而最后的返回结果是判定添加结果是否为 0,如果为 1,那这个判定结果就是 False,也就是不重复,否则判定为重复。
这样我们就成功利用 Redis 的集合完成了指纹的记录和重复的验证。
4. 调度器
ScrapyRedis 还帮我们实现了一个配合 Queue、 DupeFilter 使用的调度器 Scheduler,源文件名称是 scheduler.py。
在这里指定了一些配置,如 SCHEDULER_FLUSH_ON_START 即是否在爬取开始的时候清空爬取队列,SCHEDULER_PERSIST 即是否在爬取结束后保持爬取队列不清除,我们可以在 settings.py 里面自由配置,而此调度器很好的实现了对接。
接下来我们再看下两个核心的存取方法,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
def enqueue_request(self, request): if not request.dont_filter and self.df.request_seen(request): self.df.log(request, self.spider) return False if self.stats: self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider) self.queue.push(request) return True def next_request(self): block_pop_timeout = self.idle_before_close request = self.queue.pop(block_pop_timeout) if request and self.stats: self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider) return request |
enqueue_request() 就是调度器向队列中添加 Request,在这里做的核心操作就是调用 Queue 的 push() 操作,同时还有一些统计和日志操作,next_request() 就是从队列中取 Request,核心操作就是调用 Queue 的 pop() 操作,那么此时如果队列中还有 Request,则会直接取出来,接着爬取,否则当队列为空时,则会重新开始爬取。
5. 总结
那么到现在为止我们就把三个分布式的问题解决了,总结如下:
- 爬取队列的实现,在这里提供了三种队列,使用了Redis的列表或有序集合来维护。
- 去重的实现,使用了 Redis 的集合来保存 Request 的指纹来提供重复过滤。
- 中断后重新爬取的实现,中断后 Redis 的队列没有清空,再次启动时调度器的 next_request() 会从队列中取到下一个 Request,继续爬取。
6. 结语
以上便是 ScrapyRedis 的核心源码解析,另外 ScrapyRedis 中还提供了 Spider、Item Pipeline 的实现,不过并不是必须要使用的,如有兴趣可以研究。
其他文件源码简单说明
Scrapy-Redis的配置
使用Scrapy-Redis只需要修改Scrapy项目下的setting.py配置文件就可以。
核心配置
将调度器的类和去重的类替换为Scrapy-Redis提供的类,在setting.py添加如下配置。
1 2 |
SCHEDULER = "scrapy_redis.scheduler.Scheduler" # 调度器 DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter" # 去重 |
Redis连接配置
第一种方式
redis://[:password]@host:port/db
中括号内可有可无,host为IP地址,port为端口号,db是数据库代号默认0。
我的Redis数据库没设置密码在setting.py中配置如下。
REDIS_URL='redis://@127.0.0.1:6379'
第二种方式
单独配置,根据Redis连接信息,在setting.py中配置如下。
1 2 3 |
REDIS_HOST='127.0.0.1' REDIS_PORT=6379 REDIS_PASSWORD='' |
当两种方式都配置了,优先使用第一种方式。
调度队列
配置可选,默认使用PriorityQueue。
1 2 3 |
SCHEDULER_QUEUE_CLASS='scrapy_redis.queue.PriorityQueue SCHEDULER_QUEUE_CLASS='scrapy_redis.queue.FifoQueue SCHEDULER_QUEUE_CLASS='scrapy_redis.queue.LifoQueue |
配置持久化
配置可选,默认是False。Scrapy-redis默认在爬取完成后清空爬取队列和去重指纹集合。
SCHEDULER_PERSIST=True
该设置就会在爬取完成后不清空。
**注意:**强制终端爬虫运行,爬取队列和去重指纹集合不会自动清空。
配置重爬
分布式爬虫不用开启。
配置可选,默认是Flask。如果配置了持久化或强制中断爬虫,那么爬虫队列和指纹集合不会被清空,会继续上次的爬取。
SCHEDULER_FLUSH_ON_START=True
设置为True后爬虫每次启动时,爬取队列和指纹集合都会清空。做分布式爬虫不需要配置,因为每一个爬虫任务在启动的时候都会清空依次。
Pipline配置
配置可选,默认不启动。
实现了存储到Redis的Item Pipline,启用了后爬虫把生成的Item存储到Redis数据库中。
数据量大的时候不要开启,因为Redis基于内存,用来做存储太浪费了。
1 2 3 |
ITEM_PIPELINES = { 'scrapy_redis.pipelines.RedisPipeline': 300 } |