I have an airflow task to extract data with this flow
PostgreSQL -> Google Cloud Storage -> BigQuery
The problem that I have is, it seems not all the data is ingested into BigQuery. on the PostgreSQL source, the table has 18M+ rows of data, but after ingested it only has 4M+ rows of data.
When I check on production, the data return 18M+ rows with this query:
SELECT COUNT(1) FROM my_table
-- This return 18M+ rows
But after the DAG finished running, when I check on BigQuery:
SELECT COUNT(1) FROM data_lake.my_table
-- This return 4M+ rows
To take notes, not all the tables that I ingested returned like this. All of the smaller tables ingested just fine. But when it hits a certain amount of rows it behaves like this.
My suspicion is when the data is extracted from PostgreSQL to Google Cloud Storage. So I'll provide my function here:
def create_operator_write_append_init(self, worker=10):
worker_var = dict()
with TaskGroup(group_id=self.task_id_init) as tg1:
for i in range(worker):
worker_var[f'worker_{i}'] = PostgresToGCSOperator(
task_id = f'worker_{i}',
postgres_conn_id = self.conn_id,
sql = 'extract_init.sql',
bucket = self.bucket,
filename = f'{self.filename_init}_{i}.{self.export_format}',
export_format = self.export_format, # the export format is json
gzip = True,
params = {
'worker': i
}
)
return tg1
and here is the SQL file:
SELECT id,
name,
created_at,
updated_at,
deleted_at
FROM my_table
WHERE 1=1
AND ABS(MOD(hashtext(id::TEXT), 10)) = {{params.worker}};
What I did is I chunk the data and split it into several workers, hence the TaskGroup.
To provide more information. I use Composer:
composer-2.0.32-airflow-2.3.4
Large instance
Worker 8CPU
Worker 32GB Memory
Worker 2GB storage
Worker between 1-16
What are the possibilities of these happening?
PostgresToGCSOperator inherits from BaseSQLToGCSOperator(https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/transfers/sql_to_gcs/index.html)
According to source code, approx_max_file_size_bytes=1900000000. So if you split your table into 10 parts (or workers lets say) the maximum size of this chunk should be maximum 1.9 gigabyte. In case this chunk is bigger, the previous chunk will be replaced with the new one as you did not specify to create "chunks of your chunk" by PostgresToGCSOperator.
You can to it by adding placeholder {} in the filename and the Operator will handle it.
def create_operator_write_append_init(self, worker=10):
worker_var = dict()
with TaskGroup(group_id=self.task_id_init) as tg1:
for i in range(worker):
worker_var[f'worker_{i}'] = PostgresToGCSOperator(
task_id = f'worker_{i}',
postgres_conn_id = self.conn_id,
sql = 'extract_init.sql',
bucket = self.bucket,
filename = f'{self.filename_init}_{i}_part_{{}}.{self.export_format}',
export_format = self.export_format, # the export format is json
gzip = True,
params = {
'worker': i
}
)
return tg1
You definitely can explore Apache 2.0 licensed Astro SDK maintained by Astronomer which allows rapid and clean development of {Extract, Load, Transform} workflows using Python and SQL, powered by Apache Airflow.
In this case, aql.transform_file can be used to run the SQL query from the .sql file and select the data from Postgres. aql.export_to_file() would export the data from the Postgres table to the GCS bucket. And finally aql.load_file() can be used to load data from a file from GCS to BigQuery. Following is the example DAG:
from airflow.models.dag import DAG
from astro.files import File
from astro.constants import FileType
from astro.table import Table
from astro.sql.operators.load_file import load_file
from astro.sql.operators.export_to_file import export_to_file
from astro.sql.operators.transform import transform_file
from datetime import datetime
from pathlib import Path
POSTGRES_CONN_ID ="postgres_conn"
with DAG(
dag_id="sample-dag",
schedule_interval=None,
start_date=datetime(2022, 1, 1),
catchup=False,
) as dag:
postgres_table = Table(name="my_table", temp=True, conn_id=POSTGRES_CONN_ID)
postgres_data = transform_file(
file_path=f"{Path(__file__).parent.as_posix()}/transform.sql",
parameters={"input_table": postgres_table},
)
save_file_to_gcs = export_to_file(
task_id="save_file_to_gcs",
input_data=postgres_data,
output_file=File(
path="gs://astro-sdk/all_postgres_data.csv",
conn_id="gcp_conn",
),
if_exists="replace",
)
load_data_to_bq = load_file(
input_file=File(
"gs://astro-sdk/all_postgres_data.csv",
conn_id="gcp_conn",
filetype=FileType.CSV,
),
output_table=Table(conn_id="gcp_conn"),
use_native_support=False,
native_support_kwargs={
"ignore_unknown_values": True,
"allow_jagged_rows": True,
"skip_leading_rows": "1",
},
enable_native_fallback=True,
)
load_data_to_bq.set_upstream(save_file_to_gcs)
Adding the screenshot the for DAG run. DAG screenshot
Hence using astro-sdk-python instead would just simplify the approach.
We have various operators and decorators as part of this project which is described here: https://astro-sdk-python.readthedocs.io/
Disclaimer: I work at Astronomer, which develops Astro SDK as an Open Source project.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With