I'm using Dataflow 0.5.5 Python. Ran into the following error in very simple code:
print(len(row_list))
row_list
is a list. Exactly the same code, same data and same pipeline runs perfectly fine on DirectRunner, but throws the following exception on DataflowRunner. What does it mean and how I can solve it?
job name: `beamapp-root-0216042234-124125`
(f14756f20f567f62): Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 544, in do_work
work_executor.execute()
File "dataflow_worker/executor.py", line 973, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:30547)
with op.scoped_metrics_container:
File "dataflow_worker/executor.py", line 974, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:30495)
op.start()
File "dataflow_worker/executor.py", line 302, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:12149)
def start(self):
File "dataflow_worker/executor.py", line 303, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:12053)
with self.scoped_start_state:
File "dataflow_worker/executor.py", line 316, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:11968)
with self.shuffle_source.reader() as reader:
File "dataflow_worker/executor.py", line 320, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:11912)
self.output(windowed_value)
File "dataflow_worker/executor.py", line 152, in dataflow_worker.executor.Operation.output (dataflow_worker/executor.c:6317)
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "dataflow_worker/executor.py", line 85, in dataflow_worker.executor.ConsumerSet.receive (dataflow_worker/executor.c:4021)
cython.cast(Operation, consumer).process(windowed_value)
File "dataflow_worker/executor.py", line 766, in dataflow_worker.executor.BatchGroupAlsoByWindowsOperation.process (dataflow_worker/executor.c:25558)
self.output(wvalue.with_value((k, wvalue.value)))
File "dataflow_worker/executor.py", line 152, in dataflow_worker.executor.Operation.output (dataflow_worker/executor.c:6317)
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "dataflow_worker/executor.py", line 85, in dataflow_worker.executor.ConsumerSet.receive (dataflow_worker/executor.c:4021)
cython.cast(Operation, consumer).process(windowed_value)
File "dataflow_worker/executor.py", line 545, in dataflow_worker.executor.DoOperation.process (dataflow_worker/executor.c:18474)
with self.scoped_process_state:
File "dataflow_worker/executor.py", line 546, in dataflow_worker.executor.DoOperation.process (dataflow_worker/executor.c:18428)
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 195, in apache_beam.runners.common.DoFnRunner.receive (apache_beam/runners/common.c:5137)
self.process(windowed_value)
File "apache_beam/runners/common.py", line 262, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:7078)
self.reraise_augmented(exn)
File "apache_beam/runners/common.py", line 274, in apache_beam.runners.common.DoFnRunner.reraise_augmented (apache_beam/runners/common.c:7467)
raise type(exn), args, sys.exc_info()[2]
File "apache_beam/runners/common.py", line 258, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:6967)
self._dofn_simple_invoker(element)
File "apache_beam/runners/common.py", line 198, in apache_beam.runners.common.DoFnRunner._dofn_simple_invoker (apache_beam/runners/common.c:5283)
self._process_outputs(element, self.dofn_process(element.value))
File "apache_beam/runners/common.py", line 286, in apache_beam.runners.common.DoFnRunner._process_outputs (apache_beam/runners/common.c:7678)
for result in results:
File "trip_augmentation_test.py", line 120, in get_osm_way
TypeError: object of type '_UnwindowedValues' has no len() [while running 'Pull way info from mapserver']
code here: trip_augmentation_test.py
#!/usr/bin/env python
# coding: utf-8
from __future__ import absolute_import
import argparse
import logging
import json
import apache_beam as beam
from apache_beam.utils.options import PipelineOptions
from apache_beam.utils.options import SetupOptions
def get_osm_way(pairs_same_group):
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from multiprocessing.pool import ThreadPool
import time
#disable InsecureRequestWarning for a cleaner output
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
print('processing hardwareid={} trips'.format(pairs_same_group[0]))
row_list = pairs_same_group[1]
print(row_list)
http_request_num = len(row_list) ######### this line ran into the above error##########
with requests.Session() as s:
s.mount('https://ip address',HTTPAdapter(pool_maxsize=http_request_num)) ##### a host name is needed for this http persistent connection
pool = ThreadPool(processes=1)
for row in row_list:
hardwareid=row['HardwareId']
tripid=row['TripId']
latlonArr = row['LatLonStrArr'].split(',');
print('gps points num: {}'.format(len(latlonArr)))
cor_array = []
for latlon in latlonArr:
lat = latlon.split(';')[0]
lon = latlon.split(';')[1]
cor_array.append('{{"x":"{}","y":"{}"}}'.format(lon, lat))
url = 'https://<ip address>/functionname?coordinates=[{}]'.format(','.join(cor_array))
print(url)
print("Requesting")
r = pool.apply_async(thread_get, (s, url)).get()
print ("Got response")
print(r)
if r.status_code==200:
yield (hardwareid,tripid,r.text)
else:
yield (hardwareid,tripid,None)
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input',
help=('Input BigQuery table to process specified as: '
'PROJECT:DATASET.TABLE or DATASET.TABLE.'))
parser.add_argument(
'--output',
required=True,
help=
('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
'or DATASET.TABLE.'))
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(argv)
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
(p
| 'Read trip from BigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=known_args.input))
| 'Convert' >> beam.Map(lambda row: (row['HardwareId'],row))
| 'Group devices' >> beam.GroupByKey()
| 'Pull way info from mapserver' >> beam.FlatMap(get_osm_way)
| 'Map way info to dictionary' >> beam.FlatMap(convert_to_dict)
| 'Save to BQ' >> beam.io.Write(beam.io.BigQuerySink(
known_args.output, schema='HardwareId:INTEGER,TripId:INTEGER,OrderBy:INTEGER,IndexRatio:FLOAT,IsEstimate:BOOLEAN,IsOverRide:BOOLEAN,MaxSpeed:FLOAT,Provider:STRING,RoadName:STRING,WayId:STRING,LastEdited:TIMESTAMP,WayLatLons:STRING,BigDataComment:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
)
# Run the pipeline (all operations are deferred until run() is called).
p.run()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
pipeline call here (I'm using Google Cloud Datalab)
!python trip_augmentation_test.py \
--output 'my-project:my-dataset.mytable' \
--input 'SELECT HardwareId,TripId, LatLonStrArr FROM [my-project:my-dataset.mytable] ' \
--project 'my-project' \
--runner 'DataflowRunner' \ ### if just change this to DirectRunner, everything's fine
--temp_location 'gs://mybucket/tripway_temp' \
--staging_location 'gs://mybucket/tripway_staging' \
--worker_machine_type 'n1-standard-2' \
--profile_cpu True \
--profile_memory True
Follow up
I logged the type of row_list
, turned out, in DataflowRunner, it's <class 'apache_beam.transforms.trigger._UnwindowedValues'>
, while in DirectRunner, it's list
. Is this an expected inconsistency?
This kind of abstraction is necessary in Big Data systems like Beam / Dataflow (and others). Consider that the number of elements in the list could be arbitrarily large.
The _UnwindowedValues
provides the iterable interface to access this set of elements that could be of any size, and may not be possible to keep whole in memory.
The fact that the Direct Runner returned a list is an inconsistency that was fixed a couple versions of Beam ago. In Dataflow, the result from GroupByKey
does not come in the form of a list, and does not support len
- but it is iterable.
In short, before doing http_request_num = len(row_list)
, you can coerce it into a type that supports len, e.g:
row_list = list(pairs_same_group[1])
http_request_num = len(row_list)
But consider that the list may be very large.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With