Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to upload crawled data from Scrapy to Amazon S3 as csv or json?

What are the steps to upload the crawled data from Scrapy to the Amazon s3 as a csv/jsonl/json file? All i could find from the internet was to upload scraped images to the s3 bucket.

I'm currently using Ubuntu 16.04, and i have installed boto by the command,

pip install boto

I have added the following lines to settings.py. Can anyone explain the other changes i have to make.

AWS_ACCESS_KEY_ID = 'access key id'
AWS_SECRET_ACCESS_KEY= 'access key'


FEED_URI = 'bucket path'
FEED_FORMAT = 'jsonlines'
FEED_EXPORT_FIELDS = None
FEED_STORE_EMPTY = False
FEED_STORAGES = {}
FEED_STORAGES_BASE = { 
'': None,
'file': None,
'stdout': None,
's3': 'scrapy.extensions.feedexport.S3FeedStorage',
'ftp': None,
}
FEED_EXPORTERS = {}
FEED_EXPORTERS_BASE = {
    'json': None,
    'jsonlines': None,
    'jl': None,
    'csv': None,
    'xml': None,
    'marshal': None,
    'pickle': None,
}

Edit 1 : When i configure all the above and run scrapy crawl spider, I get the following error after the crawled results.

2016-08-08 10:57:03 [scrapy] ERROR: Error storing csv feed (200 items) in: s3: myBucket/crawl.csv
Traceback (most recent call last):
File "/usr/lib/python2.7/dist-packages/twisted/python/threadpool.py", line 246, in inContext
result = inContext.theWork()
File "/usr/lib/python2.7/dist-packages/twisted/python/threadpool.py", line 262, in <lambda>
inContext.theWork = lambda: context.call(ctx, func, *args, **kw)
File "/usr/lib/python2.7/dist-packages/twisted/python/context.py", line 118, in callWithContext
return self.currentContext().callWithContext(ctx, func, *args, **kw)
File "/usr/lib/python2.7/dist-packages/twisted/python/context.py", line 81, in callWithContext
return func(*args,**kw)
File "/usr/local/lib/python2.7/dist-packages/scrapy/extensions/feedexport.py", line 123, in _store_in_thread
key.set_contents_from_file(file)
File "/usr/local/lib/python2.7/dist-packages/boto/s3/key.py", line 1293, in set_contents_from_file
chunked_transfer=chunked_transfer, size=size)
File "/usr/local/lib/python2.7/dist-packages/boto/s3/key.py", line 750, in send_file
chunked_transfer=chunked_transfer, size=size)
File "/usr/local/lib/python2.7/dist-packages/boto/s3/key.py", line 951, in _send_file_internal
query_args=query_args
File "/usr/local/lib/python2.7/dist-packages/boto/s3/connection.py", line 656, in make_request
auth_path = self.calling_format.build_auth_path(bucket, key)
File "/usr/local/lib/python2.7/dist-packages/boto/s3/connection.py", line 94, in build_auth_path
path = '/' + bucket
TypeError: cannot concatenate 'str' and 'NoneType' objects
like image 460
Abhishek K Avatar asked Aug 05 '16 11:08

Abhishek K


2 Answers

The problem was solved by adding the following line into settings.py file:

ITEM_PIPELINE = {
'scrapy.pipelines.files.S3FilesStore': 1
}

along with the S3 credentials mentioned earlier.

AWS_ACCESS_KEY_ID = 'access key id'
AWS_SECRET_ACCESS_KEY= 'access key'

FEED_URI='s3://bucket/folder/filename.json'

Thank you guys for your guidance.

like image 107
Abhishek K Avatar answered Sep 28 '22 16:09

Abhishek K


I've decided to answers Mil0R3 comment on Abhishek K answer with the code snippet that worked for me.

in settings.py you need to add the following code:

AWS_ACCESS_KEY_ID = ''
AWS_SECRET_ACCESS_KEY = ''

# You need to have both variables FEED_URI and S3PIPELINE_URL set to the same
# file or this code will not work.
FEED_URI = 's3://{bucket}/{file_name}.jsonl'
S3PIPELINE_URL = FEED_URI
FEED_FORMAT = 'jsonlines'

# project_folder refers to the folder that both pipelines.py and settings.py are in
ITEM_PIPELINES = {
    '{project_folder}.pipelines.S3Pipeline': 1,
}

Inside the pipelines.py you need to add the following object. The github project this is copied and pasted from can be found here: https://github.com/orangain/scrapy-s3pipeline

# -*- coding: utf-8 -*-

# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html


from io import BytesIO
from urllib.parse import urlparse
from datetime import datetime
import gzip

import boto3
from botocore.exceptions import ClientError

from scrapy.exporters import JsonLinesItemExporter

class S3Pipeline:
    """
    Scrapy pipeline to store items into S3 bucket with JSONLines format.
    Unlike FeedExporter, the pipeline has the following features:
    * The pipeline stores items by chunk.
    * Support GZip compression.
    """

    def __init__(self, settings, stats):
        self.stats = stats

        url = settings['S3PIPELINE_URL']
        o = urlparse(url)
        self.bucket_name = o.hostname
        self.object_key_template = o.path[1:]  # Remove the first '/'

        self.max_chunk_size = settings.getint('S3PIPELINE_MAX_CHUNK_SIZE', 100)
        self.use_gzip = settings.getbool('S3PIPELINE_GZIP', url.endswith('.gz'))

        self.s3 = boto3.client(
            's3',
            region_name=settings['AWS_REGION_NAME'], use_ssl=settings['AWS_USE_SSL'],
            verify=settings['AWS_VERIFY'], endpoint_url=settings['AWS_ENDPOINT_URL'],
            aws_access_key_id=settings['AWS_ACCESS_KEY_ID'],
            aws_secret_access_key=settings['AWS_SECRET_ACCESS_KEY'])
        self.items = []
        self.chunk_number = 0

    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler.settings, crawler.stats)

    def process_item(self, item, spider):
        """
        Process single item. Add item to items and then upload to S3 if size of items
        >= max_chunk_size.
        """
        self.items.append(item)
        if len(self.items) >= self.max_chunk_size:
            self._upload_chunk(spider)

        return item

    def open_spider(self, spider):
        """
        Callback function when spider is open.
        """
        # Store timestamp to replace {time} in S3PIPELINE_URL
        self.ts = datetime.utcnow().replace(microsecond=0).isoformat().replace(':', '-')

    def close_spider(self, spider):
        """
        Callback function when spider is closed.
        """
        # Upload remained items to S3.
        self._upload_chunk(spider)

    def _upload_chunk(self, spider):
        """
        Do upload items to S3.
        """

        if not self.items:
            return  # Do nothing when items is empty.

        f = self._make_fileobj()

        # Build object key by replacing variables in object key template.
        object_key = self.object_key_template.format(**self._get_uri_params(spider))

        try:
            self.s3.upload_fileobj(f, self.bucket_name, object_key)
        except ClientError:
            self.stats.inc_value('pipeline/s3/fail')
            raise
        else:
            self.stats.inc_value('pipeline/s3/success')
        finally:
            # Prepare for the next chunk
            self.chunk_number += len(self.items)
            self.items = []

    def _get_uri_params(self, spider):
        params = {}
        for key in dir(spider):
            params[key] = getattr(spider, key)

        params['chunk'] = self.chunk_number
        params['time'] = self.ts
        return params

    def _make_fileobj(self):
        """
        Build file object from items.
        """

        bio = BytesIO()
        f = gzip.GzipFile(mode='wb', fileobj=bio) if self.use_gzip else bio

        # Build file object using ItemExporter
        exporter = JsonLinesItemExporter(f)
        exporter.start_exporting()
        for item in self.items:
            exporter.export_item(item)
        exporter.finish_exporting()

        if f is not bio:
            f.close()  # Close the file if GzipFile

        # Seek to the top of file to be read later
        bio.seek(0)

        return bio

Special Notes:

I needed to remove some data inside the OP's settings.py file for this Pipeline to work correctly. All of this will need to be removed

FEED_EXPORT_FIELDS = None
FEED_STORE_EMPTY = False
FEED_STORAGES = {}
FEED_STORAGES_BASE = { 
'': None,
'file': None,
'stdout': None,
's3': 'scrapy.extensions.feedexport.S3FeedStorage',
'ftp': None,
}
FEED_EXPORTERS = {}
FEED_EXPORTERS_BASE = {
    'json': None,
    'jsonlines': None,
    'jl': None,
    'csv': None,
    'xml': None,
    'marshal': None,
    'pickle': None,
}

Also, be sure to have S3PIPELINE_URL variable equal to FEED_URI

Either, not removing the above info from settings.py or not having the two above variables set to each other will result in a jsonl file showing up inside your S3 bucket, but with multiple copies of only a single item added. I have no idea why that happens, though...

This took me a few hours to figure out, so I hope it saves someone some time.

like image 43
Osuynonma Avatar answered Sep 28 '22 17:09

Osuynonma