I'm trying to read from a postgres table using Apache beam's python SDK. I have installed the Java SDK as the documentation says. I'm using the latest release. My code stands as follows:
import logging
import typing
import apache_beam as beam
from apache_beam import coders
from apache_beam.io.jdbc import ReadFromJdbc
from apache_beam.options.pipeline_options import PipelineOptions
from past.builtins import unicode
def run(argv=None):
beam_options = PipelineOptions()
ExampleRow = typing.NamedTuple('ExampleRow', [('id', int), ('name', unicode)])
coders.registry.register_coder(ExampleRow, coders.RowCoder)
with beam.Pipeline(options=beam_options) as p:
result = (
p
| 'Read from jdbc' >> ReadFromJdbc(
table_name='jdbc_external_test_read',
driver_class_name='org.postgresql.Driver',
jdbc_url='jdbc:postgresql://localhost:5432/example',
username='postgres',
password='postgres')
)
if __name__ == '__main__':
logging.getLogger(__name__).setLevel(logging.INFO)
run()
But when I run it I get the error ValueError: No logical type registered for URN 'beam:logical_type:javasdk:v1'
This issue is caused because the VARCHAR
field is returned as an Apache Beam Logical Type in it's schema. Logical types are denoted by their URN's, in this case beam:logical_type:javasdk:v1
. For logical types, a "decoder" has to be registered for corresponding URN to read the value. You can do that as follows:
from apache_beam.typehints.schemas import LogicalType
@LogicalType.register_logical_type
class db_str(LogicalType):
@classmethod
def urn(cls):
return "beam:logical_type:javasdk:v1"
@classmethod
def language_type(cls):
return str
def to_language_type(self, value):
return str(value)
def to_representation_type(self, value):
return str(value)
This has to be done before running the pipeline, so the logical type will be recognized as string and converted to string.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With