I have some ETL that saves data to clickhouse with clickhouse-driver.
Save function looks exactly like this:
def insert_data(data: Iterable[Dict], table: str, client: Client = None):
columns = get_table_cols(table)
client = client or get_ch_client(0)
query = f"insert into {table} ({', '.join(columns)}) values"
data = map(lambda row: {key: row[key] for key in columns}, data)
client.execute(query, data)
Function that calls insert_data looks like this:
def save_data(data: DataFrame, client: Client):
mapper = get_mapper(my_table_map)
data = map(lambda x: {col_new: getattr(x, col_old)
for col_old, col_new in map_dataframe_to_ch.items()},
data.collect())
data = map(mapper, data)
insert_data(data, 'my_table_name', client)
get_mapper returns a map function that looks like this:
def map_row(row: Dict[str, Any]) -> Dict[str, Any]:
nonlocal map_
return {key: map_[key](val) for key, val in row.items()}
So basically in the end i have some nested generators that produce dictionaries. And to confirm this, if i put print(next(data)) just before client.execute i get exactly the dict that i expect. Here is an example with sensitive info hidden:
{'account_currency': '***',
'instrument': '***',
'operation': 'open',
'event_time': datetime.datetime(2020, 7, 7, 19, 11, 49),
'country': 'CN',
'region': 'Asia and Pacific',
'registration_source': '***',
'account_type': '***',
'platform': '***',
'server_key': '***'}
Table schema is as follows:
"account_currency": "String",
"instrument": "String",
"operation": "String",
"event_time": "DateTime",
"country": "String",
"region": "String",
"registration_source": "String",
"account_type": "String",
"platform": "String",
"server_key": "String"
But for whatever reason is i get this error:
File "src/etl/usd_volume/prepare_users.py", line 356, in <module>
main()
File "src/etl/usd_volume/prepare_users.py", line 348, in main
save_data(data, client)
File "src/etl/usd_volume/prepare_users.py", line 302, in save_data
insert_data(data, 'report_traded_volume_usd', client)
File "/root/data/src/common/clickhouse_helper.py", line 14, in insert_data
client.execute(query, data)
File "/usr/local/lib/python3.6/dist-packages/clickhouse_driver/client.py", line 224, in execute
columnar=columnar
File "/usr/local/lib/python3.6/dist-packages/clickhouse_driver/client.py", line 341, in process_ordinary_query
query = self.substitute_params(query, params)
File "/usr/local/lib/python3.6/dist-packages/clickhouse_driver/client.py", line 422, in substitute_params
raise ValueError('Parameters are expected in dict form')
According to docs:
:param params: substitution parameters for SELECT queries and data for INSERT queries. Data for INSERT can be
list,tupleor :data:~types.GeneratorType. Defaults toNone(no parameters or data).
So clearly my data fits these requirements.
However in the source code there is only this check:
def substitute_params(self, query, params):
if not isinstance(params, dict):
raise ValueError('Parameters are expected in dict form')
escaped = escape_params(params)
return query % escaped
I didn't really find where they check for it to be a generator. Clickhouse-driver version is 0.1.4
Any help on this problem is greatly appreciated.
Okay, further research on source code revealed the root cause.
The function that throws error substitute_params is called within the process_ordinary_query method of Client class. This method is basically called for any query other than INSERT.
The sign of query being either INSERT or any other is checked by this part of execute method:
is_insert = isinstance(params, (list, tuple, types.GeneratorType))
if is_insert:
rv = self.process_insert_query(
query, params, external_tables=external_tables,
query_id=query_id, types_check=types_check,
columnar=columnar
)
else:
rv = self.process_ordinary_query(
query, params=params, with_column_types=with_column_types,
external_tables=external_tables,
query_id=query_id, types_check=types_check,
columnar=columnar
)
The crux is isinstance(params, (list, tuple, types.GeneratorType))
types.GeneratorType is defined as follows:
def _g():
yield 1
GeneratorType = type(_g())
Which leads to this:
>>>GeneratorType
<class 'generator'>
Obviously, for my data which is map:
>>>type(map(...))
<class 'map'>
>>>isinstance(map(...), GeneratorType)
False
So the simplest solution to avoid this problem is to simply convert data into generator with generator comprehension. And that solves problem completely.
>>>data = (i for i in data)
>>>isinstance(data, GeneratorType)
True
Or if you are going to execute INSERT queries exclusively, you could just call process_insert_query directly, that will lift the need to convert data to generator.
I think this is a bit of an ambiguous type checking by clickhouse-driver, but this is what we have.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With