Scrapy 自定义导出器

发布于 2024-12-27 13:28:50 字数 2183 浏览 0 评论 0原文

我正在定义一个项目导出器,它将项目推送到消息队列。下面是代码。

from scrapy.contrib.exporter import JsonLinesItemExporter
from scrapy.utils.serialize import ScrapyJSONEncoder
from scrapy import log

from scrapy.conf import settings

from carrot.connection import BrokerConnection, Exchange
from carrot.messaging import Publisher

log.start()


class QueueItemExporter(JsonLinesItemExporter):

    def __init__(self, **kwargs):

        log.msg("Initialising queue exporter", level=log.DEBUG)

        self._configure(kwargs)

        host_name = settings.get('BROKER_HOST', 'localhost')
        port = settings.get('BROKER_PORT', 5672)
        userid = settings.get('BROKER_USERID', "guest")
        password = settings.get('BROKER_PASSWORD', "guest")
        virtual_host = settings.get('BROKER_VIRTUAL_HOST', "/")

        self.encoder = settings.get('MESSAGE_Q_SERIALIZER', ScrapyJSONEncoder)(**kwargs)

        log.msg("Connecting to broker", level=log.DEBUG)
        self.q_connection = BrokerConnection(hostname=host_name, port=port,
                        userid=userid, password=password,
                        virtual_host=virtual_host)
        self.exchange = Exchange("scrapers", type="topic")
        log.msg("Connected", level=log.DEBUG)

    def start_exporting(self):
        spider_name = "test"
        log.msg("Initialising publisher", level=log.DEBUG)
        self.publisher = Publisher(connection=self.q_connection,
                        exchange=self.exchange, routing_key="scrapy.spider.%s" % spider_name)
        log.msg("done", level=log.DEBUG)

    def finish_exporting(self):
        self.publisher.close()

    def export_item(self, item):
        log.msg("In export item", level=log.DEBUG)
        itemdict = dict(self._get_serialized_fields(item))
        self.publisher.send({"scraped_data": self.encoder.encode(itemdict)})
        log.msg("sent to queue - scrapy.spider.naukri", level=log.DEBUG)

我有一些问题。这些项目未提交到队列中。我已将以下内容添加到我的设置中:

FEED_EXPORTERS = {
    "queue": 'scrapers.exporters.QueueItemExporter'
}

FEED_FORMAT = "queue"

LOG_STDOUT = True

代码不会引发任何错误,而且我也看不到任何日志消息。我对如何调试这个一无所知。

任何帮助将不胜感激。

I am defining an item exporter that pushes items to a message queue. Below is the code.

from scrapy.contrib.exporter import JsonLinesItemExporter
from scrapy.utils.serialize import ScrapyJSONEncoder
from scrapy import log

from scrapy.conf import settings

from carrot.connection import BrokerConnection, Exchange
from carrot.messaging import Publisher

log.start()


class QueueItemExporter(JsonLinesItemExporter):

    def __init__(self, **kwargs):

        log.msg("Initialising queue exporter", level=log.DEBUG)

        self._configure(kwargs)

        host_name = settings.get('BROKER_HOST', 'localhost')
        port = settings.get('BROKER_PORT', 5672)
        userid = settings.get('BROKER_USERID', "guest")
        password = settings.get('BROKER_PASSWORD', "guest")
        virtual_host = settings.get('BROKER_VIRTUAL_HOST', "/")

        self.encoder = settings.get('MESSAGE_Q_SERIALIZER', ScrapyJSONEncoder)(**kwargs)

        log.msg("Connecting to broker", level=log.DEBUG)
        self.q_connection = BrokerConnection(hostname=host_name, port=port,
                        userid=userid, password=password,
                        virtual_host=virtual_host)
        self.exchange = Exchange("scrapers", type="topic")
        log.msg("Connected", level=log.DEBUG)

    def start_exporting(self):
        spider_name = "test"
        log.msg("Initialising publisher", level=log.DEBUG)
        self.publisher = Publisher(connection=self.q_connection,
                        exchange=self.exchange, routing_key="scrapy.spider.%s" % spider_name)
        log.msg("done", level=log.DEBUG)

    def finish_exporting(self):
        self.publisher.close()

    def export_item(self, item):
        log.msg("In export item", level=log.DEBUG)
        itemdict = dict(self._get_serialized_fields(item))
        self.publisher.send({"scraped_data": self.encoder.encode(itemdict)})
        log.msg("sent to queue - scrapy.spider.naukri", level=log.DEBUG)

I'm having a few problems. The items are not being submitted to the queue. Ive added the following to my settings:

FEED_EXPORTERS = {
    "queue": 'scrapers.exporters.QueueItemExporter'
}

FEED_FORMAT = "queue"

LOG_STDOUT = True

The code does not raise any errors, and neither can I see any of the logging messages. Im at my wits end on how to debug this.

Any help would be much appreciated.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

堇年纸鸢 2025-01-03 13:28:50

“Feed Exporters”是调用某些“标准”项目导出器的快速(但有些肮脏)快捷方式。不要从设置中设置 feed 导出器,而是将自定义项导出器硬连接到自定义管道,如下所述 http://doc.scrapy.org/en/0.14/topics/exporters.html#using-item-exporters

from scrapy.xlib.pydispatch import dispatcher
from scrapy import signals
from scrapy.contrib.exporter import XmlItemExporter

class MyPipeline(object):

    def __init__(self):
        ...
        dispatcher.connect(self.spider_opened, signals.spider_opened)
        dispatcher.connect(self.spider_closed, signals.spider_closed)
        ...

    def spider_opened(self, spider):
        self.exporter = QueueItemExporter()
        self.exporter.start_exporting()

    def spider_closed(self, spider):
        self.exporter.finish_exporting()

    def process_item(self, item, spider):
        # YOUR STUFF HERE
        ...
        self.exporter.export_item(item)
        return item

"Feed Exporters" are quick (and somehow dirty) shortcuts to call some "standard" item exporters. Instead of setting up a feed exporter from settings, hard wire your custom item exporter to your custom pipeline, as explained here http://doc.scrapy.org/en/0.14/topics/exporters.html#using-item-exporters :

from scrapy.xlib.pydispatch import dispatcher
from scrapy import signals
from scrapy.contrib.exporter import XmlItemExporter

class MyPipeline(object):

    def __init__(self):
        ...
        dispatcher.connect(self.spider_opened, signals.spider_opened)
        dispatcher.connect(self.spider_closed, signals.spider_closed)
        ...

    def spider_opened(self, spider):
        self.exporter = QueueItemExporter()
        self.exporter.start_exporting()

    def spider_closed(self, spider):
        self.exporter.finish_exporting()

    def process_item(self, item, spider):
        # YOUR STUFF HERE
        ...
        self.exporter.export_item(item)
        return item
咽泪装欢 2025-01-03 13:28:50

提示:可以从 将项目写入 MongoDb 来自官方文档。

我也做过类似的事情。我创建了一个管道,将每个项目放入类似 S3 的服务中(我在这里使用 Minio,但您明白了)。它为每个蜘蛛创建一个新的存储桶,并将每个项目放入具有随机名称的对象中。
完整的源代码可以在我的存储库中找到。

从教程中的简单引号蜘蛛开始:

import scrapy

class QuotesSpider(scrapy.Spider):
    name = "quotes"
    start_urls = ['http://quotes.toscrape.com/page/1/',
                    'http://quotes.toscrape.com/page/1/']

    def parse(self, response):
        for quote in response.css('div.quote'):
            yield {
                    'text':quote.css('span.text::text').extract_first(),
                    'author':quote.css('span small::text').extract_first(),
                    'tags':quote.css('div.tags a.tag::text').extract()
                    }
        next_page = response.css('li.next a::attr(href)').extract_first()
        if next_page is not None:
            next_page = response.urljoin(next_page)
            yield scrapy.Request(next_page, callback=self.parse)

settings.py 中:

ITEM_PIPELINES = {
    'scrapy_quotes.pipelines.ScrapyQuotesPipeline': 300,
}

scrapy_quotes/pipelines.py 中创建一个管道和一个项目导出器:

import uuid
from StringIO import StringIO
from scrapy.contrib.exporter import BaseItemExporter
from scrapy.conf import settings
from scrapy import signals
from scrapy.xlib.pydispatch import dispatcher
from scrapy import log
from scrapy.utils.python import to_bytes
from scrapy.utils.serialize import ScrapyJSONEncoder

class S3ItemExporter(BaseItemExporter):
    def __init__(self, bucket, **kwargs):
        self._configure(kwargs)
        self.bucket = bucket
        kwargs.setdefault('ensure_ascii', not self.encoding)
        self.encoder = ScrapyJSONEncoder(**kwargs)

    def start_exporting(self):
        self.client = connect()
        create_bucket(self.client, self.bucket)

    def finish_exporting(self):
        log.msg("Done from S3 item exporter", level=log.DEBUG)

    def export_item(self, item):
        log.msg("S3 item exporter got item: %s" % item, level=log.DEBUG)
        itemdict = dict(self._get_serialized_fields(item))
        data = self.encoder.encode(itemdict)
        size = len(data) 
        object_data = StringIO(data)
        name = str(uuid.uuid4())
        put_object(self.client, self.bucket, name, object_data, size)  


class ScrapyQuotesPipeline(object):
    """Export scraped items, to different buckets,
    one per spider"""
    @classmethod
    def from_crawler(cls, crawler):
        pipeline = cls()
        crawler.signals.connect(pipeline.spider_opened, signals.spider_opened)
        crawler.signals.connect(pipeline.spider_closed, signals.spider_closed)
        return pipeline

    def spider_opened(self, spider):
        self.exporter = S3ItemExporter(spider.name)
        self.exporter.start_exporting()

    def spider_closed(self, spider):
        self.exporter.finish_exporting()

    def process_item(self, item, spider):
        self.exporter.export_item(item)
        return item

# S3 Related
from minio import Minio
import os
from minio.error import ResponseError
def connect():
    return Minio('192.168.1.111:9000',
            access_key='0M6PYKBBAVQVQGVWVZKQ',
            secret_key='H6vPxz0aHSMZPgagZ3G0lJ6CbhN8RlTtD78SPsL8',
            secure=False)

def create_bucket(client, name):
    client.make_bucket(name)

def put_object(client, bucket_name, object_name, object_data, size):
    client.put_object(bucket_name, object_name, object_data, size)

Tip: Good example to start from is Writing Items to MongoDb from official docs.

I've done a similar thing. I've created a pipeline that places each item into an S3-like service (I'm using Minio here, but you get the idea). It creates a new bucket for each spider and places each item into an object with a random name.
Full source code can by found in my repo.

Starting with simple quotes spider from the tutorial:

import scrapy

class QuotesSpider(scrapy.Spider):
    name = "quotes"
    start_urls = ['http://quotes.toscrape.com/page/1/',
                    'http://quotes.toscrape.com/page/1/']

    def parse(self, response):
        for quote in response.css('div.quote'):
            yield {
                    'text':quote.css('span.text::text').extract_first(),
                    'author':quote.css('span small::text').extract_first(),
                    'tags':quote.css('div.tags a.tag::text').extract()
                    }
        next_page = response.css('li.next a::attr(href)').extract_first()
        if next_page is not None:
            next_page = response.urljoin(next_page)
            yield scrapy.Request(next_page, callback=self.parse)

In settings.py:

ITEM_PIPELINES = {
    'scrapy_quotes.pipelines.ScrapyQuotesPipeline': 300,
}

In scrapy_quotes/pipelines.py create a pipeline and an item exporter:

import uuid
from StringIO import StringIO
from scrapy.contrib.exporter import BaseItemExporter
from scrapy.conf import settings
from scrapy import signals
from scrapy.xlib.pydispatch import dispatcher
from scrapy import log
from scrapy.utils.python import to_bytes
from scrapy.utils.serialize import ScrapyJSONEncoder

class S3ItemExporter(BaseItemExporter):
    def __init__(self, bucket, **kwargs):
        self._configure(kwargs)
        self.bucket = bucket
        kwargs.setdefault('ensure_ascii', not self.encoding)
        self.encoder = ScrapyJSONEncoder(**kwargs)

    def start_exporting(self):
        self.client = connect()
        create_bucket(self.client, self.bucket)

    def finish_exporting(self):
        log.msg("Done from S3 item exporter", level=log.DEBUG)

    def export_item(self, item):
        log.msg("S3 item exporter got item: %s" % item, level=log.DEBUG)
        itemdict = dict(self._get_serialized_fields(item))
        data = self.encoder.encode(itemdict)
        size = len(data) 
        object_data = StringIO(data)
        name = str(uuid.uuid4())
        put_object(self.client, self.bucket, name, object_data, size)  


class ScrapyQuotesPipeline(object):
    """Export scraped items, to different buckets,
    one per spider"""
    @classmethod
    def from_crawler(cls, crawler):
        pipeline = cls()
        crawler.signals.connect(pipeline.spider_opened, signals.spider_opened)
        crawler.signals.connect(pipeline.spider_closed, signals.spider_closed)
        return pipeline

    def spider_opened(self, spider):
        self.exporter = S3ItemExporter(spider.name)
        self.exporter.start_exporting()

    def spider_closed(self, spider):
        self.exporter.finish_exporting()

    def process_item(self, item, spider):
        self.exporter.export_item(item)
        return item

# S3 Related
from minio import Minio
import os
from minio.error import ResponseError
def connect():
    return Minio('192.168.1.111:9000',
            access_key='0M6PYKBBAVQVQGVWVZKQ',
            secret_key='H6vPxz0aHSMZPgagZ3G0lJ6CbhN8RlTtD78SPsL8',
            secure=False)

def create_bucket(client, name):
    client.make_bucket(name)

def put_object(client, bucket_name, object_name, object_data, size):
    client.put_object(bucket_name, object_name, object_data, size)
爱你是孤单的心事 2025-01-03 13:28:50

我遇到了同样的问题,尽管可能是一些版本更新。为我解决这个问题的一个小细节是在 settings.py 中设置 FEED_URI = "something" 。如果没有这个,FEED_EXPORTERS 中的条目根本不会受到尊重。

I hit the same problem, although being probably some version updates ahead. The little detail that solved it for me was setting FEED_URI = "something" in settings.py. Without this, the entry in FEED_EXPORTERS wasn't respected at all.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文