Scrapy分布式爬虫案例实战
28/10
周一晴
$ pip install scrapy-redis
$ pip install redis
# 指定使用scrapy-redis的去重
DUPEFILTER_CLASS = 'scrapy_redis.dupefilters.RFPDupeFilter'
# 指定使用scrapy-redis的调度器
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
# 在redis中保持scrapy-redis用到的各个队列,从而允许暂停和暂停后恢复,也就是不清理redis queues
SCHEDULER_PERSIST = True
# 指定排序爬取地址时使用的队列,
# 默认的 按优先级排序(Scrapy默认),由sorted set实现的一种非FIFO、LIFO方式。
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.SpiderPriorityQueue'
REDIS_URL = None # 一般情况可以省去
REDIS_HOST = '127.0.0.1' # 也可以根据情况改成 localhost
REDIS_PORT = 6379
from scrapy.item import Item, Field
from scrapy.loader import ItemLoader
from scrapy.loader.processors import MapCompose, TakeFirst, Join
class ExampleItem(Item):
name = Field()
description = Field()
link = Field()
crawled = Field()
spider = Field()
url = Field()
class ExampleLoader(ItemLoader):
default_item_class = ExampleItem
default_input_processor = MapCompose(lambda s: s.strip())
default_output_processor = TakeFirst()
description_out = Join()
from scrapy_redis.spiders import RedisSpider
class MySpider(RedisSpider):
"""Spider that reads urls from redis queue (myspider:start_urls)."""
name = 'myspider_redis'
redis_key = 'myspider:start_urls'
def __init__(self, *args, **kwargs):
# Dynamically define the allowed domains list.
domain = kwargs.pop('domain', '')
self.allowed_domains = filter(None, domain.split(','))
super(MySpider, self).__init__(*args, **kwargs)
def parse(self, response):
return {
'name': response.css('title::text').extract_first(),
'url': response.url,
}
$ scrapy runspider my.py
可以输入多个来观察多进程的效果。。打开了爬虫之后你会发现爬虫处于等待爬取的状态,是因为list此时为空。所以需要在redis控制台中添加启动地址,这样就可以愉快的看到所有的爬虫都动起来啦。
lpush mycrawler:start_urls http://www.***.com
# 指定使用scrapy-redis的调度器
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
# 指定使用scrapy-redis的去重
DUPEFILTER_CLASS = 'scrapy_redis.dupefilters.RFPDupeFilter'
# 指定排序爬取地址时使用的队列,
# 默认的 按优先级排序(Scrapy默认),由sorted set实现的一种非FIFO、LIFO方式。
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.SpiderPriorityQueue'
# 可选的 按先进先出排序(FIFO)
# SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.SpiderQueue'
# 可选的 按后进先出排序(LIFO)
# SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.SpiderStack'
# 在redis中保持scrapy-redis用到的各个队列,从而允许暂停和暂停后恢复,也就是不清理redis queues
SCHEDULER_PERSIST = True
# 只在使用SpiderQueue或者SpiderStack是有效的参数,指定爬虫关闭的最大间隔时间
# SCHEDULER_IDLE_BEFORE_CLOSE = 10
# 通过配置RedisPipeline将item写入key为 spider.name : items 的redis的list中,供后面的分布式处理item
# 这个已经由 scrapy-redis 实现,不需要我们写代码
ITEM_PIPELINES = {
'example.pipelines.ExamplePipeline': 300,
'scrapy_redis.pipelines.RedisPipeline': 400
}
# 指定redis数据库的连接参数
# REDIS_PASS是我自己加上的redis连接密码(默认不做)
REDIS_HOST = '127.0.0.1'
REDIS_PORT = 6379
#REDIS_PASS = 'redisP@ssw0rd'
# LOG等级
LOG_LEVEL = 'DEBUG'
#默认情况下,RFPDupeFilter只记录第一个重复请求。将DUPEFILTER_DEBUG设置为True会记录所有重复的请求。
DUPEFILTER_DEBUG =True
# 覆盖默认请求头,可以自己编写Downloader Middlewares设置代理和UserAgent
DEFAULT_REQUEST_HEADERS = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.8',
'Connection': 'keep-alive',
'Accept-Encoding': 'gzip, deflate, sdch'
Scrapy框架的使用
中的案例demo复制过来两份:master(主)、slave(从)import scrapy
class FangItem(scrapy.Item):
# define the fields for your item here like:
title = scrapy.Field()
address = scrapy.Field()
time = scrapy.Field()
clicks = scrapy.Field()
price = scrapy.Field()
# -*- coding: utf-8 -*-
import scrapy
from demo.items import FangItem
from scrapy_redis.spiders import RedisSpider
class FangSpider(RedisSpider):
name = 'fang'
#allowed_domains = ['fang.5i5j.com']
#start_urls = ['https://fang.5i5j.com/bj/loupan/']
redis_key = 'fangspider:start_urls'
def __init__(self, *args, **kwargs):
# Dynamically define the allowed domains list.
domain = kwargs.pop('domain', '')
self.allowed_domains = filter(None, domain.split(','))
super(FangSpider, self).__init__(*args, **kwargs)
def parse(self, response):
#print(response.status)
hlist = response.css("div.houseList_list")
for vo in hlist:
item = FangItem()
item['title'] = vo.css("h3.fontS20 a::text").extract_first()
item['address'] = vo.css("span.addressName::text").extract_first()
item['time'] = vo.re("<span>(.*?)开盘</span>")[0]
item['clicks'] = vo.re("<span><i>([0-9]+)</i>浏览</span>")[0]
item['price'] = vo.css("i.fontS24::text").extract_first()
print(item)
yield item
#pass
class DemoPipeline(object):
def process_item(self, item, spider):
print("="*70)
return item
...
ITEM_PIPELINES = {
#'demo.pipelines.DemoPipeline': 300,
'scrapy_redis.pipelines.RedisPipeline': 400,
}
...
# 指定使用scrapy-redis的去重
DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'
# 指定使用scrapy-redis的调度器
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
# 在redis中保持scrapy-redis用到的各个队列,从而允许暂停和暂停后恢复,也就是不清理redis queues
SCHEDULER_PERSIST = True
# 指定排序爬取地址时使用的队列,
# 默认的 按优先级排序(Scrapy默认),由sorted set实现的一种非FIFO、LIFO方式。
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.SpiderPriorityQueue'
# REDIS_URL = 'redis://localhost:6379' # 一般情况可以省去
REDIS_HOST = 'localhost' # 也可以根据情况改成 localhost
REDIS_PORT = 6379
# 进入爬虫文件目录找到爬虫文件:
$ scrapy runspider fang.py
另启一个终端,并连接redis数据库
$ redis_cli -p 6379
6379 >lpush fangspider:start_urls https://fang.5i5j.com/bj/loupan/
import scrapy
class MasterItem(scrapy.Item):
# define the fields for your item here like:
url = scrapy.Field()
#pass
# -*- coding: utf-8 -*-
from scrapy.spider import CrawlSpider,Rule
from scrapy.linkextractors import LinkExtractor
from demo.items import MasterItem
class FangSpider(CrawlSpider):
name = 'master'
allowed_domains = ['fang.5i5j.com']
start_urls = ['https://fang.5i5j.com/bj/loupan/']
item = MasterItem()
#Rule是在定义抽取链接的规则
rules = (
Rule(LinkExtractor(allow=('https://fang.5i5j.com/bj/loupan/n[0-9]+/',)), callback='parse_item',
follow=True),
)
def parse_item(self,response):
item = self.item
item['url'] = response.url
return item
import redis,re
class MasterPipeline(object):
def __init__(self,host,port):
#连接redis数据库
self.r = redis.Redis(host=host, port=port, decode_responses=True)
#self.redis_url = 'redis://password:@localhost:6379/'
#self.r = redis.Redis.from_url(self.redis_url,decode_responses=True)
@classmethod
def from_crawler(cls,crawler):
'''注入实例化对象(传入参数)'''
return cls(
host = crawler.settings.get("REDIS_HOST"),
port = crawler.settings.get("REDIS_PORT"),
)
def process_item(self, item, spider):
#使用正则判断url地址是否有效,并写入redis。
if re.search('/bj/loupan/',item['url']):
self.r.lpush('fangspider:start_urls', item['url'])
else:
self.r.lpush('fangspider:no_urls', item['url'])
ITEM_PIPELINES = {
'demo.pipelines.MasterPipeline': 300,
#'scrapy_redis.pipelines.RedisPipeline': 400,
}
...
# 指定使用scrapy-redis的去重
DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'
# 指定使用scrapy-redis的调度器
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
# 在redis中保持scrapy-redis用到的各个队列,从而允许暂停和暂停后恢复,也就是不清理redis queues
SCHEDULER_PERSIST = True
# 指定排序爬取地址时使用的队列,
# 默认的 按优先级排序(Scrapy默认),由sorted set实现的一种非FIFO、LIFO方式。
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.SpiderPriorityQueue'
# REDIS_URL = 'redis:password//127.0.0.1:6379' # 一般情况可以省去
REDIS_HOST = '127.0.0.1' # 也可以根据情况改成 localhost
REDIS_PORT = 6379
# 进入爬虫文件目录找到爬虫文件:
$ scrapy runspider fang.py
# process_demo_mongodb.py
import json
import redis
import pymongo
def main():
# 指定Redis数据库信息
rediscli = redis.StrictRedis(host='127.0.0.1', port=6379, db=0)
# 指定MongoDB数据库信息
mongocli = pymongo.MongoClient(host='localhost', port=27017)
# 创建数据库名
db = mongocli['demodb']
# 创建空间
sheet = db['fang']
while True:
# FIFO模式为 blpop,LIFO模式为 brpop,获取键值
source, data = rediscli.blpop(["demo:items"])
item = json.loads(data)
sheet.insert(item)
try:
print u"Processing: %(name)s <%(link)s>" % item
except KeyError:
print u"Error procesing: %r" % item
if __name__ == '__main__':
main()
岁月有你 惜惜相处