Scrapy 2.6 Pipeline 数据传输管道使用指南

Python3Scrapy 爬虫框架 中进行数据爬取过程中处理抓取数据使用基本包含下面几个步骤:

  1. 清理HTML中的页面数据。
  2. 验证抓取的数据(检查Items是否包含某些字段)。
  3. 检查重复项(并将其删除)。
  4. 将Scrapy中的Items存储在数据库中。

以上进行数据各种处理的过程中就需要进行数据的传输,也就是要使用到pipline。

Scrapy 版本:2.6+


pipeline 方法类

class SomethingPipeline(object):    def __init__(self):            # 可选实现,做参数初始化等        # 写入你的业务逻辑    def process_item(self, item, spider):        # item (Item 对象) – 爬取数据的item        # spider (Spider 对象) – 爬取该item的spider        # 这个方法必须实现,每个item pipeline组件都需要调用该方法,        # 这个方法必须返回一个 Item 对象,被丢弃的item将不会被之后的pipeline组件所处理。        return item    def open_spider(self, spider):        # spider (Spider 对象) – 被开启的spider        # 可选实现,spider开启时,这个方法被调用。    def close_spider(self, spider):        # spider (Spider 对象) – 被关闭的spider        # 可选实现,spider关闭时,这个方法被调用



pipeline 功能使用

激活使用(必须打开)

在 settings.py 中释放代码69行 ITEM_PIPELINES 否则数据库无法写入。

# 这里把这3行注释开,无需修改。DOWNLOADER_MIDDLEWARES = {    '你的项目名称.middlewares.WwwCjnCnDownloaderMiddleware': 543,}


在这里插入图片描述


数据写入JSON文件

import jsonfrom itemadapter import ItemAdapterclass JsonWriterPipeline:    def open_spider(self, spider):        self.file = open('items.jl', 'w')    def close_spider(self, spider):        self.file.close()    def process_item(self, item, spider):        line = json.dumps(ItemAdapter(item).asdict()) + "
"        self.file.write(line)        return item

数据写入MongoDB

import pymongofrom scrapy.utils.project import get_project_settingssettings = get_project_settings()class WwwCjnCnPipeline(object):    # class中全部替换    def __init__(self):        host = settings["MONGODB_HOST"]        port = settings["MONGODB_PORT"]        dbname = settings["MONGODB_DBNAME"]        sheetname = settings["MONGODB_SHEETNAME"]        username = settings["MONGODB_USER"]        password = settings["MONGODB_PASSWORD"]        # 创建MONGODB数据库链接        client = pymongo.MongoClient(host=host, port=port, username=username, password=password)        # 指定数据库        mydb = client[dbname]        # 存放数据的数据库表名        self.post = mydb[sheetname]    def process_item(self, item, spider):        data = dict(item)        self.post.insert(data)        return item

抓取数据截图

from urllib.parse import quoteimport scrapyfrom itemadapter import ItemAdapterclass ScreenshotPipeline:    """Pipeline that uses Splash to render screenshot of    every Scrapy item."""    SPLASH_URL = "http://localhost:8050/render.png?url={}"    async def process_item(self, item, spider):        adapter = ItemAdapter(item)        encoded_item_url = quote(adapter["url"])        screenshot_url = self.SPLASH_URL.format(encoded_item_url)        request = scrapy.Request(screenshot_url)        response = await spider.crawler.engine.download(request, spider)        if response.status != 200:            # Error happened, return item.            return item        # Save screenshot to file, filename will be hash of url.        url = adapter["url"]        url_hash = hashlib.md5(url.encode("utf8")).hexdigest()        filename = f"{url_hash}.png"        with open(filename, "wb") as f:            f.write(response.body)        # Store filename in item.        adapter["screenshot_filename"] = filename        return item

重复数据过滤

from itemadapter import ItemAdapterfrom scrapy.exceptions import DropItemclass DuplicatesPipeline:    def __init__(self):        self.ids_seen = set()    def process_item(self, item, spider):        adapter = ItemAdapter(item)        if adapter['id'] in self.ids_seen:            raise DropItem(f"Duplicate item found: {item!r}")        else:            self.ids_seen.add(adapter['id'])            return item
发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章