Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark dataframe.foreach() with HappyBase connection pool returns 'TypeError: can't pickle thread.lock objects'

I have a PySpark job that updates some objects in HBase (Spark v1.6.0; happybase v0.9).

It sort-of works if I open/close an HBase connection for each row:

def process_row(row):
    conn = happybase.Connection(host=[hbase_master])
    # update HBase record with data from row
    conn.close()

my_dataframe.foreach(process_row)

After a few thousand upserts, we start to see errors like this:

TTransportException: Could not connect to [hbase_master]:9090

Obviously, it's inefficient to open/close a connection for each upsert. This function is really just a placeholder for a proper solution.

I then tried to create a version of the process_row function that uses a connection pool:

pool = happybase.ConnectionPool(size=20, host=[hbase_master])

def process_row(row):
    with pool.connection() as conn:
        # update HBase record with data from row

For some reason, the connection pool version of this function returns an error (see complete error message):

TypeError: can't pickle thread.lock objects

Can you see what I'm doing wrong?

Update

I saw this post and suspect I'm experiencing the same issue: Spark attempts to serialize the pool object and distribute it to each of the executors, but this connection pool object cannot be shared across multiple executors.

It sounds like I need to split the dataset into partitions, and use one connection per partition (see design patterns for using foreachrdd). I tried this, based on an example in the documentation:

def persist_to_hbase(dataframe_partition):
    hbase_connection = happybase.Connection(host=[hbase_master])
    for row in dataframe_partition:
        # persist data
    hbase_connection.close()

my_dataframe.foreachPartition(lambda dataframe_partition: persist_to_hbase(dataframe_partition))

Unfortunately, it still returns a "can't pickle thread.lock objects" error.

like image 897
Alex Woolford Avatar asked Apr 06 '16 15:04

Alex Woolford


1 Answers

down the line happybase connections are just tcp connections so they cannot be shared between processes. a connection pool is primarily useful for multi-threaded applications and also proves useful for single-threaded applications that can use the pool as a global "connection factory" with connection reuse, which may simplify code because no "connection" objects need to be passed around. it also makes error recovery is a bit easier.

in any case a pool (which is just a group of connections) cannot be shared between processes. trying to serialise it does not make sense for that reason. (pools use locks which causes serialisation to fail but that is just a symptom.)

perhaps you can use a helper that conditionally creates a pool (or connection) and stores it as a module-local variable, instead of instantiating it upon import, e.g.

_pool = None

def get_pool():
    global _pool
    if _pool is None:
        _pool = happybase.ConnectionPool(size=1, host=[hbase_master])
    return pool

def process(...)
    with get_pool().connection() as connection:
        connection.table(...).put(...)

this instantiates the pool/connection upon first use instead of on import time.

like image 62
wouter bolsterlee Avatar answered Nov 11 '22 10:11

wouter bolsterlee