Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to convert csv into a dictionary in apache beam dataflow

I would like to read a csv file and write it to BigQuery using apache beam dataflow. In order to do this I need to present the data to BigQuery in the form of a dictionary. How can I transform the data using apache beam in order to do this?

My input csv file has two columns, and I want to create a subsequent two column table in BigQuery. I know how to create data in BigQuery, thats straight forward, what I don't know is how to transform the csv into a dictionary. The below code is not correct but should give an idea of what i'm trying to do.

# Standard imports
import apache_beam as beam
# Create a pipeline executing on a direct runner (local, non-cloud).
p = beam.Pipeline('DirectPipelineRunner')
# Create a PCollection with names and write it to a file.
(p
| 'read solar data' >> beam.Read(beam.io.TextFileSource('./sensor1_121116.csv'))
# How do you do this??
| 'convert to dictionary' >> beam.Map(lambda (k, v): {'luminosity': k, 'datetime': v})
| 'save' >> beam.Write(
   beam.io.BigQuerySink(
   output_table,
   schema='month:INTEGER, tornado_count:INTEGER',
   create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
   write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run()
like image 906
user1753640 Avatar asked Dec 15 '16 18:12

user1753640


People also ask

How do I import a CSV file into dictionary?

The best way to convert a CSV file to a Python dictionary is to create a CSV file object f using open("my_file. csv") and pass it in the csv. DictReader(f) method. The return value is an iterable of dictionaries, one per row in the CSV file, that maps the column header from the first row to the specific row value.

What is PCollection and Ptransform in dataflow?

A PCollection can contain either a bounded or unbounded number of elements. Bounded and unbounded PCollections are produced as the output of PTransforms (including root PTransforms like Read and Create ), and can be passed as the inputs of other PTransforms.

How do I Create a CSV file from a list of dictionaries in Python?

To write a dictionary of list to CSV files, the necessary functions are csv. writer(), csv. writerow(). This method writes a single row at a time.


1 Answers

Edit: as of version 2.12.0, Beam comes with new fileio transforms that allow you to read from CSV without having to reimplement a source. You can do this like so:

def get_csv_reader(readable_file):
  # You can return whichever kind of reader you want here
  # a DictReader, or a normal csv.reader.
  if sys.version_info >= (3, 0):
    return csv.reader(io.TextIOWrapper(readable_file.open()))
  else:
    return csv.reader(readable_file.open())

with Pipeline(...) as p:
  content_pc = (p
                | beam.io.fileio.MatchFiles("/my/file/name")
                | beam.io.fileio.ReadMatches()
                | beam.Reshuffle()  # Useful if you expect many matches
                | beam.FlatMap(get_csv_reader))

I recently wrote a test for this for Apache Beam. You can take a look on the Github repository.


The old answer relied on reimplementing a source. This is no longer the main recommended way of doing this : )

The idea is to have a source that returns parsed CSV rows. You can do this by subclassing the FileBasedSource class to include CSV parsing. Particularly, the read_records function would look something like this:

class MyCsvFileSource(apache_beam.io.filebasedsource.FileBasedSource):
  def read_records(self, file_name, range_tracker):
    self._file = self.open_file(file_name)

    reader = csv.reader(self._file)

    for rec in reader:
      yield rec
like image 77
Pablo Avatar answered Nov 09 '22 01:11

Pablo