Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache-Beam + Python: Writing JSON (or dictionaries) strings to output file

I am trying to use a Beam pipeline in order to apply the SequenceMatcher function to a ton of words. I (hopefully) have figured everything out except the WriteToText part.

I have defined a custom ParDo (hereby called ProcessDataDoFn) that takes the main_input and the side_input, process them and output dictionaries like this one

{u'key': (u'string', float)}

My pipeline is quite simple

class ProcessDataDoFn(beam.DoFn):
    def process(self, element, side_input):

    ... Series of operations ...

    return output_dictionary

with beam.Pipeline(options=options) as p:

    # Main input
    main_input = p | 'ReadMainInput' >> beam.io.Read(
        beam.io.BigQuerySource(
            query=CUSTOM_SQL,
            use_standard_sql=True
        ))

    # Side input
    side_input = p | 'ReadSideInput' >> beam.io.Read(
        beam.io.BigQuerySource(
            project=PROJECT_ID,
            dataset=DATASET,
            table=TABLE
        ))

    output = (
        main_input
        | 'ProcessData' >> beam.ParDo(
            ProcessDataDoFn(),
            side_input=beam.pvalue.AsList(side_input))
        | 'WriteOutput' >> beam.io.WriteToText(GCS_BUCKET)
    )

Now the problem is that if I leave the pipeline like this it only output the key of the output_dictionary. If I change the return of ProcessDataDoFn to json.dumps(ouput_dictionary), The Json is written correctly but like this

{
'
k
e
y
'

:

[
'
s
t
r
i
n
g
'

,

f
l
o
a
t
]

How can I correctly output the results?

like image 856
Michele 'Ubik' De Simoni Avatar asked Jul 21 '17 16:07

Michele 'Ubik' De Simoni


1 Answers

It's unusual that your output looks like that. json.dumps should print json in a single line, and it should go out to files line-by-line.

Perhaps to have cleaner code, you can add an extra map operation that does your formatting however way you need. Something like so:

output = (
  main_input
  | 'ProcessData' >> beam.ParDo(
        ProcessDataDoFn(),
        side_input=beam.pvalue.AsList(side_input))
  | 'FormatOutput' >> beam.Map(json.dumps)
  | 'WriteOutput' >> beam.io.WriteToText(GCS_BUCKET)
)
like image 99
Pablo Avatar answered Nov 06 '22 16:11

Pablo