I have a PySpark job that updates some objects in HBase (Spark v1.6.0; happybase v0.9).
It sort-of works if I open/close an HBase connection for each row:
def process_row(row):
conn = happybase.Connection(host=[hbase_master])
# update HBase record with data from row
conn.close()
my_dataframe.foreach(process_row)
After a few thousand upserts, we start to see errors like this:
TTransportException: Could not connect to [hbase_master]:9090
Obviously, it's inefficient to open/close a connection for each upsert. This function is really just a placeholder for a proper solution.
I then tried to create a version of the process_row
function that uses a connection pool:
pool = happybase.ConnectionPool(size=20, host=[hbase_master])
def process_row(row):
with pool.connection() as conn:
# update HBase record with data from row
For some reason, the connection pool version of this function returns an error (see complete error message):
TypeError: can't pickle thread.lock objects
Can you see what I'm doing wrong?
I saw this post and suspect I'm experiencing the same issue: Spark attempts to serialize the pool
object and distribute it to each of the executors, but this connection pool object cannot be shared across multiple executors.
It sounds like I need to split the dataset into partitions, and use one connection per partition (see design patterns for using foreachrdd). I tried this, based on an example in the documentation:
def persist_to_hbase(dataframe_partition):
hbase_connection = happybase.Connection(host=[hbase_master])
for row in dataframe_partition:
# persist data
hbase_connection.close()
my_dataframe.foreachPartition(lambda dataframe_partition: persist_to_hbase(dataframe_partition))
Unfortunately, it still returns a "can't pickle thread.lock objects" error.
down the line happybase connections are just tcp connections so they cannot be shared between processes. a connection pool is primarily useful for multi-threaded applications and also proves useful for single-threaded applications that can use the pool as a global "connection factory" with connection reuse, which may simplify code because no "connection" objects need to be passed around. it also makes error recovery is a bit easier.
in any case a pool (which is just a group of connections) cannot be shared between processes. trying to serialise it does not make sense for that reason. (pools use locks which causes serialisation to fail but that is just a symptom.)
perhaps you can use a helper that conditionally creates a pool (or connection) and stores it as a module-local variable, instead of instantiating it upon import, e.g.
_pool = None
def get_pool():
global _pool
if _pool is None:
_pool = happybase.ConnectionPool(size=1, host=[hbase_master])
return pool
def process(...)
with get_pool().connection() as connection:
connection.table(...).put(...)
this instantiates the pool/connection upon first use instead of on import time.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With