(3)分布式下的爬虫Scrapy应该如何做-递归爬取方式,数据输出方式以及数据库链接
【摘要】 [2016-11-21更新]关于demo代码,请参考: ===>scrapy_demo<===
放假这段时间好好的思考了一下关于Scrapy的一些常用操作,主要解决了三个问题
如何连续爬取
数据输出方式
数据库链接
如何连续爬取
思考:要达到连续爬取,逻辑上无非从以下的方向着手
预加载需要爬取的列表,直接到这个列表都处理完,相应的...
[2016-11-21更新]关于demo代码,请参考: ===>scrapy_demo<===
放假这段时间好好的思考了一下关于Scrapy的一些常用操作,主要解决了三个问题
- 如何连续爬取
- 数据输出方式
- 数据库链接
如何连续爬取
思考:要达到连续爬取,逻辑上无非从以下的方向着手
- 预加载需要爬取的列表,直接到这个列表都处理完,相应的爬取工作都已经完成了。
-
从第一页开始爬取,遇到有下一页标签的,那继续爬取,如果没有下一页类似的标签,那表示已经爬到最后一页
-
分析当前页面的所有链接,对于链接符合某种特定规则的,继续爬取,如果没有那表示爬取工作完成(此时需要建立已经爬取列表,防止重复操作)
一般会于定向的爬虫,比如爬取某宝或者某东的数据时,可以采用方式一,二,写好规则就可以了,也方便维护。
1.1 对于预加载的列表,那根据需要生成列表就可以了。
在start_urls 里面生成相应的列表就可以,这里引入一个概念,列表推导式。
我们将代码变换成如下:
from scrapy.spider import BaseSpider from scrapy.selector import HtmlXPathSelector from cnblogs.items import CnblogsItem class CnblogsSpider(BaseSpider): name = "cnblogs" allowed_domains = ["cnblogs.com"] start_urls = [ 'http://www.cnblogs.com/#p%s' % p for p in xrange(1, 11) ] def parse(self, response): self.log("Fetch douban homepage page: %s" % response.url) hxs = HtmlXPathSelector(response) # authors = hxs.select('//a[@class="titlelnk"]') items = hxs.select('//a[contains(@class, "titlelnk")]') listitems = [] for author in items: # print author.select('text()').extract() item = CnblogsItem() # property item['Title'] = ''.join(author.select('text()').extract()) item['TitleUrl'] = author.select('@href').extract() listitems.append(item) return listitems
在这里,start_urls里面使用列表推导式,爬出了一共10页的数据。
1.2对于爬取下一页实现全趴取的过程,就需要使用yield关键字
我们就虫师的博客来进行测试实验:
http://www.cnblogs.com/fnng/default.aspx?page=1
这里介绍一个scrapy 一个非常有用的技巧,scrapy shell ,因为使用 xpath 可以帮助我们调试xpath语法(或者使用firebug又或者是chrome都可以)
语法:scrapy shell http://你要调试xpath的网址
这里我就不继续讲xpath的语法了,自己去搜一下,相比正则要相对简单好理解。
相应的Spider可以这样编写:
# -*- coding: utf-8 -*- from scrapy.spider import BaseSpider from scrapy.selector import HtmlXPathSelector from cnblogs.items import CnblogsItem from scrapy.http import Request from scrapy import log # please pay attention to the encoding of info,otherwise raise error of decode import sys reload(sys) sys.setdefaultencoding('utf8') class BlogsSpider(BaseSpider): name = "cnblogs_blogs" allowed_domains = ["cnblogs.com"] start_urls = [ 'http://www.cnblogs.com/fnng/default.aspx?page=1' ] def parse(self, response): hxs = HtmlXPathSelector(response) # authors = hxs.select('//a[@class="titlelnk"]') # sel.xpath('//a[@class="PostTitle"]').xpath('text()') items = hxs.select('//a[@class="PostTitle"]') a_page = hxs.select('//div[@id="pager"]/a') for a_item in items: item = CnblogsItem() # property item['Title'] = ''.join(a_item.xpath('text()').extract()) item['TitleUrl'] = a_item.xpath('@href').extract() yield item # get the page index log.msg(len(a_page)) if len(a_page) > 0: for a_item in a_page: page_text = ''.join(a_item.xpath('text()').extract()) if page_text == '下一页'.encode('utf-8') or 'Next' in page_text: next_url = ''.join(a_item.xpath('@href').extract()) log.msg(next_url) yield Request(next_url, callback=self.parse) break
我们来运行看看效果如何:
所有的数据完整,效果还是不错的。
关于第三种,以规则来规划爬虫的机制,在以后会介绍 ~ ~
数据输出的方式
使用内置命令
上面的scrapy命令是:scrapy crawl cnblogs_blogs –nolog -o cnblogs_blogs.json -t json
那结果输出的就是json格式的文件,-t 指的是输出文件格式,json ,-t 支持下列参数:
xml csv json jsonlines jl pickle marshal
一般选择xml ,csv,json三种格式就够了,这样可以很方便的导入各种数据库。
更多的参考:http://doc.scrapy.org/en/latest/topics/feed-exports.html
数据库连接
数据保存为文件的形式然后导入是一个不错的选择,不过一般都会有一定的IO开销,一般可以将Item直接保存到数据库中,这个时候就要引入pipelines这个部件了。
在我们项目的根目录下有一个名为:pipelines.py文件,我们在设置里面首先启用这个文件,在启用之后,spider得到的item都会传入到这个部件中进行二次处理,
3.1在settings.py中启用pipelines
ITEM_PIPELINES = { 'cnblogs.pipelines.CnblogsPipelineobj': 300, }
注意命名方式:botname.moudlename.classname 要不然会找不到指定的模块。
3.2 编写pipelines
# -*- coding: utf-8 -*- import MySQLdb import MySQLdb.cursors import logging from twisted.enterprise import adbapi class CnblogsPipelineobj(object): def __init__(self): self.dbpool = adbapi.ConnectionPool( dbapiName ='MySQLdb', host ='127.0.0.1', db = 'cnblogs', user = 'root', passwd = '密码', cursorclass = MySQLdb.cursors.DictCursor, charset = 'utf8', use_unicode = False ) # pipeline dafault function def process_item(self, item, spider): query = self.dbpool.runInteraction(self._conditional_insert, item) logging.debug(query) return item # insert the data to databases def _conditional_insert(self, tx, item): parms = (item['Title'], item['TitleUrl']) sql = "insert into blogs values('%s','%s') " % parms #logging.debug(sql) tx.execute(sql)
OK.运行一下看一下效果如何
中文数据得以保存,OK
3.3 [重要更新]
3.2的方法已经过时很多,这里使用另外一种方法,sqlalchemy去连数据库
准备models文件
from sqlalchemy import create_engine from sqlalchemy import Column, Integer, String, Date, DateTime, Text from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base from settings import MYSQL_CONN from datetime import datetime # declare a Mapping,this is the class describe map to table column Base = declarative_base() class Cnbeta(Base): __tablename__ = 'cnbeta' id = Column(Integer, primary_key=True, autoincrement=True) score = Column(Integer, nullable=False, default=0) catid = Column(Integer, nullable=False, default=0) score_story = Column(String(512), nullable=False, default='') hometext = Column(String(1024), nullable=False, default='') counter = Column(Integer, nullable=False, default=0) inputtime = Column(DateTime, nullable=False, default=datetime.now()) topic = Column(Integer, nullable=False, default=0) source = Column(String(128), nullable=False, default='') mview = Column(Integer, nullable=False, default=0) comments = Column(Integer, nullable=False, default=0) crawled_datetime = Column(DateTime, nullable=False, default=datetime.now()) rate_sum = Column(Integer, nullable=False, default=0) title = Column(String(512), nullable=False, default='') url_show = Column(String(512), nullable=False, default='') thumb = Column(String(256), nullable=False, default='') def create_session(): # declare the connecting to the server engine = create_engine(MYSQL_CONN['mysql_uri'] .format(user=MYSQL_CONN['user'], pwd=MYSQL_CONN['password'], host=MYSQL_CONN['host'], db=MYSQL_CONN['db']) , echo=False) # connect session to active the action Session = sessionmaker(bind=engine) session = Session() return session def map_orm_item(scrapy_item,sql_item): for k, v in scrapy_item.iteritems(): sql_item.__setattr__(k, v) return sql_item def convert_date(date_str): pass
在settings.py 里面的配置好数据库链接
MYSQL_CONN = { 'host':'127.0.0.1', 'user':'user_name', 'password':'user_pwd', 'db':'test_db', 'table':'test_tb', 'mysql_uri':'mysql://{user}:{pwd}@{host}:3306/{db}?charset=utf8' }
在中间件中写好代码
from settings import MONGODB import pymongo import models class CnbetaMysqlPipeline(object): def __init__(self): self.session = models.create_session() def process_item(self, item, spider): sql_cnbeta = models.cnbeta() sql_cnbeta = models.map_orm_item(scrapy_item=item, sql_item=sql_cnbeta) self.session.add(sql_cnbeta) self.session.commit() self.session.close() return item
更多的详细源代码参考==>源代码<===
总结
本次主要多三个方向来解决连续爬取文章内容,并将获得内容保存的问题,不过文中主要介绍的,还是以定向为基础的爬取,和以规则来构建的爬虫还是有区别,下篇文章将介绍。
文章来源: brucedone.com,作者:大鱼的鱼塘,版权归原作者所有,如需转载,请联系作者。
原文链接:brucedone.com/archives/140
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
作者其他文章
评论(0)