I am trying to use apache beam's beam.GroupBy(...) construct, and running into a problem with it not knowing how to encode a standard python datetime.date.
Here's a simplified code chunk demonstrating the issue. The code:
import logging
import random
import apache_beam as beam
from apache_beam.io import WriteToText
from datetime import date
def random_record():
return {
'account_id' : random.randint(1004, 1009),
'activity_date': date(2021, 10, random.randint(1, 4)),
'region' : random.randint(30, 40),
'largest_sale' : random.randint(10000, 40000),
'total_sales' : random.randint(100000, 900000)
}
def main(argv=None):
random.seed(2349090823434)
records = [random_record() for i in range(100)]
with beam.Pipeline() as p:
output = (
p
| "Source" >> beam.Create(records)
| "Typed" >> beam.Map(lambda d: beam.Row(**d))
| "Rollup" >> (beam.GroupBy('account_id', 'activity_date')
.aggregate_field('total_sales', sum, 'total_sales')
.aggregate_field('largest_sale', max, 'largest_sale')
)
| "Output" >> WriteToText('learn-output', file_name_suffix='.csv')
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
main()
When I run this:
% python learn.py
Results in the error:
WARNING:apache_beam.coders.coder_impl:Using fallback deterministic coder for type '<class 'apache_beam.transforms.core.Key'>' in 'Rollup/CombinePerKey(TupleCombineFn)/GroupByKey'.
Traceback (most recent call last):
...
TypeError: Unable to deterministically encode '2021-10-01' of type '<class 'datetime.date'>', please provide a type hint for the input of 'Rollup/CombinePerKey(TupleCombineFn)/GroupByKey'
...
TypeError: Unable to deterministically encode 'Key(account_id=1007, activity_date=datetime.date(2021, 10, 1))' of type '<class 'apache_beam.transforms.core.Key'>',
please provide a type hint for the input of 'Rollup/CombinePerKey(TupleCombineFn)/GroupByKey'
I am wondering how to actually "provide a type hint for the input of 'Rollup/CombinePerKey(TupleCombineFn)/GroupByKey'".
It seems like the GroupByKey is being called within GroupBy. So I either need to tell GroupBy how to handle my 2-tuple key, or perhaps register some coder to support datetime.date...
Generally you should be able to do something like
class DateCoder(beam.coders.Coder):
def encode(self, d):
return d.isoformat().encode('ascii')
def decode(self, bs):
return datetime.date.fromisoformat(bs.decode('ascii'))
def is_deterministic(self):
return True
beam.coders.registry.register_coder(date, DateCoder)
to handle this, but there is some poor interactions with schemas here that make it harder to use beam.GroupBy in this case. One issue is that beam.Row(**d) doesn't actually let Beam figure out the column name/types (schema) of these Row objects at construction time (for that you need to pass keywords explicitly). (Future work to make this work better.)
Definitely converting to strings is as the comment suggested is a simple option here.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With