Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Error beam:logical_type:javasdk:v1 while using Apache Beam io.jdbc.ReadFromJdbc

I'm trying to read from a postgres table using Apache beam's python SDK. I have installed the Java SDK as the documentation says. I'm using the latest release. My code stands as follows:

import logging
import typing

import apache_beam as beam
from apache_beam import coders
from apache_beam.io.jdbc import ReadFromJdbc
from apache_beam.options.pipeline_options import PipelineOptions
from past.builtins import unicode


def run(argv=None):
    beam_options = PipelineOptions()

    ExampleRow = typing.NamedTuple('ExampleRow', [('id', int), ('name', unicode)])

    coders.registry.register_coder(ExampleRow, coders.RowCoder)

    with beam.Pipeline(options=beam_options) as p:
        result = (
            p
            | 'Read from jdbc' >> ReadFromJdbc(
                                    table_name='jdbc_external_test_read',
                                    driver_class_name='org.postgresql.Driver',
                                    jdbc_url='jdbc:postgresql://localhost:5432/example',
                                    username='postgres',
                                    password='postgres')
        )


if __name__ == '__main__':
    logging.getLogger(__name__).setLevel(logging.INFO)
    run()

But when I run it I get the error ValueError: No logical type registered for URN 'beam:logical_type:javasdk:v1'

like image 512
nicolasassi Avatar asked Oct 16 '25 17:10

nicolasassi


1 Answers

This issue is caused because the VARCHAR field is returned as an Apache Beam Logical Type in it's schema. Logical types are denoted by their URN's, in this case beam:logical_type:javasdk:v1. For logical types, a "decoder" has to be registered for corresponding URN to read the value. You can do that as follows:

from apache_beam.typehints.schemas import LogicalType

@LogicalType.register_logical_type
class db_str(LogicalType):
    @classmethod
    def urn(cls):
        return "beam:logical_type:javasdk:v1"

    @classmethod
    def language_type(cls):
        return str

    def to_language_type(self, value):
        return str(value)

    def to_representation_type(self, value):
        return str(value)

This has to be done before running the pipeline, so the logical type will be recognized as string and converted to string.

like image 57
Ilango Rajagopal Avatar answered Oct 19 '25 05:10

Ilango Rajagopal