Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Scrapy custom exporter

Tags:

python

scrapy

I am defining an item exporter that pushes items to a message queue. Below is the code.

from scrapy.contrib.exporter import JsonLinesItemExporter
from scrapy.utils.serialize import ScrapyJSONEncoder
from scrapy import log

from scrapy.conf import settings

from carrot.connection import BrokerConnection, Exchange
from carrot.messaging import Publisher

log.start()


class QueueItemExporter(JsonLinesItemExporter):

    def __init__(self, **kwargs):

        log.msg("Initialising queue exporter", level=log.DEBUG)

        self._configure(kwargs)

        host_name = settings.get('BROKER_HOST', 'localhost')
        port = settings.get('BROKER_PORT', 5672)
        userid = settings.get('BROKER_USERID', "guest")
        password = settings.get('BROKER_PASSWORD', "guest")
        virtual_host = settings.get('BROKER_VIRTUAL_HOST', "/")

        self.encoder = settings.get('MESSAGE_Q_SERIALIZER', ScrapyJSONEncoder)(**kwargs)

        log.msg("Connecting to broker", level=log.DEBUG)
        self.q_connection = BrokerConnection(hostname=host_name, port=port,
                        userid=userid, password=password,
                        virtual_host=virtual_host)
        self.exchange = Exchange("scrapers", type="topic")
        log.msg("Connected", level=log.DEBUG)

    def start_exporting(self):
        spider_name = "test"
        log.msg("Initialising publisher", level=log.DEBUG)
        self.publisher = Publisher(connection=self.q_connection,
                        exchange=self.exchange, routing_key="scrapy.spider.%s" % spider_name)
        log.msg("done", level=log.DEBUG)

    def finish_exporting(self):
        self.publisher.close()

    def export_item(self, item):
        log.msg("In export item", level=log.DEBUG)
        itemdict = dict(self._get_serialized_fields(item))
        self.publisher.send({"scraped_data": self.encoder.encode(itemdict)})
        log.msg("sent to queue - scrapy.spider.naukri", level=log.DEBUG)

I'm having a few problems. The items are not being submitted to the queue. Ive added the following to my settings:

FEED_EXPORTERS = {
    "queue": 'scrapers.exporters.QueueItemExporter'
}

FEED_FORMAT = "queue"

LOG_STDOUT = True

The code does not raise any errors, and neither can I see any of the logging messages. Im at my wits end on how to debug this.

Any help would be much appreciated.

like image 424
zsquare Avatar asked Jan 18 '12 13:01

zsquare


People also ask

How do I export data from Scrapy?

Saving CSV Files Via The Command Line​ The first and simplest way to create a CSV file of the data you have scraped, is to simply define a output path when starting your spider in the command line. To save to a CSV file add the flag -o to the scrapy crawl command along with the file path you want to save the file to.

What is pipeline in Scrapy?

Scrapy is a web scraping library that is used to scrape, parse and collect web data. For all these functions we are having a pipelines.py file which is used to handle scraped data through various components (known as class) which are executed sequentially.


2 Answers

"Feed Exporters" are quick (and somehow dirty) shortcuts to call some "standard" item exporters. Instead of setting up a feed exporter from settings, hard wire your custom item exporter to your custom pipeline, as explained here http://doc.scrapy.org/en/0.14/topics/exporters.html#using-item-exporters :

from scrapy.xlib.pydispatch import dispatcher
from scrapy import signals
from scrapy.contrib.exporter import XmlItemExporter

class MyPipeline(object):

    def __init__(self):
        ...
        dispatcher.connect(self.spider_opened, signals.spider_opened)
        dispatcher.connect(self.spider_closed, signals.spider_closed)
        ...

    def spider_opened(self, spider):
        self.exporter = QueueItemExporter()
        self.exporter.start_exporting()

    def spider_closed(self, spider):
        self.exporter.finish_exporting()

    def process_item(self, item, spider):
        # YOUR STUFF HERE
        ...
        self.exporter.export_item(item)
        return item
like image 182
Udi Avatar answered Sep 29 '22 12:09

Udi


Tip: Good example to start from is Writing Items to MongoDb from official docs.

I've done a similar thing. I've created a pipeline that places each item into an S3-like service (I'm using Minio here, but you get the idea). It creates a new bucket for each spider and places each item into an object with a random name. Full source code can by found in my repo.

Starting with simple quotes spider from the tutorial:

import scrapy

class QuotesSpider(scrapy.Spider):
    name = "quotes"
    start_urls = ['http://quotes.toscrape.com/page/1/',
                    'http://quotes.toscrape.com/page/1/']

    def parse(self, response):
        for quote in response.css('div.quote'):
            yield {
                    'text':quote.css('span.text::text').extract_first(),
                    'author':quote.css('span small::text').extract_first(),
                    'tags':quote.css('div.tags a.tag::text').extract()
                    }
        next_page = response.css('li.next a::attr(href)').extract_first()
        if next_page is not None:
            next_page = response.urljoin(next_page)
            yield scrapy.Request(next_page, callback=self.parse)

In settings.py:

ITEM_PIPELINES = {
    'scrapy_quotes.pipelines.ScrapyQuotesPipeline': 300,
}

In scrapy_quotes/pipelines.py create a pipeline and an item exporter:

import uuid
from StringIO import StringIO
from scrapy.contrib.exporter import BaseItemExporter
from scrapy.conf import settings
from scrapy import signals
from scrapy.xlib.pydispatch import dispatcher
from scrapy import log
from scrapy.utils.python import to_bytes
from scrapy.utils.serialize import ScrapyJSONEncoder

class S3ItemExporter(BaseItemExporter):
    def __init__(self, bucket, **kwargs):
        self._configure(kwargs)
        self.bucket = bucket
        kwargs.setdefault('ensure_ascii', not self.encoding)
        self.encoder = ScrapyJSONEncoder(**kwargs)

    def start_exporting(self):
        self.client = connect()
        create_bucket(self.client, self.bucket)

    def finish_exporting(self):
        log.msg("Done from S3 item exporter", level=log.DEBUG)

    def export_item(self, item):
        log.msg("S3 item exporter got item: %s" % item, level=log.DEBUG)
        itemdict = dict(self._get_serialized_fields(item))
        data = self.encoder.encode(itemdict)
        size = len(data) 
        object_data = StringIO(data)
        name = str(uuid.uuid4())
        put_object(self.client, self.bucket, name, object_data, size)  


class ScrapyQuotesPipeline(object):
    """Export scraped items, to different buckets,
    one per spider"""
    @classmethod
    def from_crawler(cls, crawler):
        pipeline = cls()
        crawler.signals.connect(pipeline.spider_opened, signals.spider_opened)
        crawler.signals.connect(pipeline.spider_closed, signals.spider_closed)
        return pipeline

    def spider_opened(self, spider):
        self.exporter = S3ItemExporter(spider.name)
        self.exporter.start_exporting()

    def spider_closed(self, spider):
        self.exporter.finish_exporting()

    def process_item(self, item, spider):
        self.exporter.export_item(item)
        return item

# S3 Related
from minio import Minio
import os
from minio.error import ResponseError
def connect():
    return Minio('192.168.1.111:9000',
            access_key='0M6PYKBBAVQVQGVWVZKQ',
            secret_key='H6vPxz0aHSMZPgagZ3G0lJ6CbhN8RlTtD78SPsL8',
            secure=False)

def create_bucket(client, name):
    client.make_bucket(name)

def put_object(client, bucket_name, object_name, object_data, size):
    client.put_object(bucket_name, object_name, object_data, size)
like image 38
Andriy Drozdyuk Avatar answered Sep 29 '22 12:09

Andriy Drozdyuk