I am trying to use a Beam pipeline in order to apply the SequenceMatcher function to a ton of words. I (hopefully) have figured everything out except the WriteToText part.
I have defined a custom ParDo (hereby called ProcessDataDoFn) that takes the main_input and the side_input, process them and output dictionaries like this one
{u'key': (u'string', float)}
My pipeline is quite simple
class ProcessDataDoFn(beam.DoFn):
def process(self, element, side_input):
... Series of operations ...
return output_dictionary
with beam.Pipeline(options=options) as p:
# Main input
main_input = p | 'ReadMainInput' >> beam.io.Read(
beam.io.BigQuerySource(
query=CUSTOM_SQL,
use_standard_sql=True
))
# Side input
side_input = p | 'ReadSideInput' >> beam.io.Read(
beam.io.BigQuerySource(
project=PROJECT_ID,
dataset=DATASET,
table=TABLE
))
output = (
main_input
| 'ProcessData' >> beam.ParDo(
ProcessDataDoFn(),
side_input=beam.pvalue.AsList(side_input))
| 'WriteOutput' >> beam.io.WriteToText(GCS_BUCKET)
)
Now the problem is that if I leave the pipeline like this it only output the key of the output_dictionary. If I change the return of ProcessDataDoFn to json.dumps(ouput_dictionary), The Json is written correctly but like this
{
'
k
e
y
'
:
[
'
s
t
r
i
n
g
'
,
f
l
o
a
t
]
How can I correctly output the results?
It's unusual that your output looks like that. json.dumps
should print json in a single line, and it should go out to files line-by-line.
Perhaps to have cleaner code, you can add an extra map operation that does your formatting however way you need. Something like so:
output = (
main_input
| 'ProcessData' >> beam.ParDo(
ProcessDataDoFn(),
side_input=beam.pvalue.AsList(side_input))
| 'FormatOutput' >> beam.Map(json.dumps)
| 'WriteOutput' >> beam.io.WriteToText(GCS_BUCKET)
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With