I'm getting the 'PBegin' object has no attribute 'windowing' while running the dataflow job. I'm calling the connectclass class in pardo function.
I'm trying to connect NOSQL databases from Beam python SDK and running sql to extract data from the table. Then writing output into separate file using another pardo .
class Connector(beam.DoFn):
    def __init__(self,username,seeds,keyspace,password,datacenter=None):
    self.username = username
    self.password = password
    self.seeds = seeds
    self.keyspace = keyspace
    self.datacenter = datacenter
    super(self.__class__, self).__init__()
    def process(self, element):
    if datacenter:
        load_balancing_policy = DCAwareRoundRobinPolicy(local_dc=self.datacenter)
    auth_provider = PlainTextAuthProvider(username=self.username, password=self.password)
    cluster = Cluster(contact_points=self.seeds,
                      load_balancing_policy=load_balancing_policy,
                      auth_provider=auth_provider)
    session=cluster.connect(self.seeds,self.keyspace,self.username, self.password, self.datacenter)
    rows = session.execute(SQL Query)
    yield rows
                Just stumbled upon the same issue. Tried to connect to a RDBMS source, but I guess with respect to implementation design, there is no difference between NoSQL and SQL databases.
Other then what Jayadeep Jayaraman suggests, this can imho be achieved by using a ParDo. Actually, using a ParDo for the connection is what beam documentation recommends if the restrictions of doing so is acceptable for your use case:
For bounded (batch) sources, there are currently two options for creating a Beam source:
Use ParDo and GroupByKey.
Use the Source interface and extend the BoundedSource abstract subclass.
ParDo is the recommended option, as implementing a Source can be tricky. See When to use > the Source interface for a list of some use cases where you might want to use a Source > > (such as dynamic work rebalancing).
You do not show how you use your DoFn. For me, it was helpful to keep in mind that a DoFn acts on elements of an already existing PCollection. It is not capable of creating your DoFn from scratch by itself. So to overcome the issue you are mentioning,you might want create a PCollection from memory, containing one element for the query that you use for retreiving data from your source. Then apply your ParDo which reads from your source to this PCollection.
BTW: I came up with one element per partition I want to read from my RDBMS in my Pcollection - so data can be read in parallel from my SQL database.
Solution might look like follows:
p | beam.Create(["Your Query / source object qualifier goes here"]) 
  | "Read from Database" >> beam.ParDo(YourConnector())
Let me also mention it might be a good idea to use the start_bundle and finish_bundle methods of your DoFn for setting up / tearing down the connection.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With