I have imported a dataset with 100,000 raw json files of about 100gb through data connection into foundry. I want to use the Python Transforms raw file access
transformation to read the files, Flatten array of structs and structs into a dataframe as an incremental update to df.
I want to use something like from the below example from the documentation for *.json files and also convert that into an incremental updated using @incremental()
decorator.
>>> import csv
>>> from pyspark.sql import Row
>>> from transforms.api import transform, Input, Output
>>>
>>> @transform(
... processed=Output('/examples/hair_eye_color_processed'),
... hair_eye_color=Input('/examples/students_hair_eye_color_csv'),
... )
... def example_computation(hair_eye_color, processed):
...
... def process_file(file_status):
... with hair_eye_color.filesystem().open(file_status.path) as f:
... r = csv.reader(f)
...
... # Construct a pyspark.Row from our header row
... header = next(r)
... MyRow = Row(*header)
...
... for row in csv.reader(f):
... yield MyRow(*row)
...
... files_df = hair_eye_color.filesystem().files('**/*.csv')
... processed_df = files_df.rdd.flatMap(process_file).toDF()
... processed.write_dataframe(processed_df)
With the help of @Jeremy David Gamet i was able to develop the code to get the dataset i want.
from transforms.api import transform, Input, Output
from pyspark import *
import json
@transform(
out=Output('foundry/outputdataset'),
inpt=Input('foundry/inputdataset'),
)
def update_set(ctx, inpt, out):
spark = ctx.spark_session
sc = spark.sparkContext
filesystem = list(inpt.filesystem().ls())
file_dates = []
for files in filesystem:
with inpt.filesystem().open(files.path,'r', encoding='utf-8-sig') as fi:
data = json.load(fi)
file_dates.append(data)
json_object = json.dumps(file_dates)
df_2 = spark.read.option("multiline", "true").json(sc.parallelize([json_object]))
df_2.drop_duplicates()
# this code to [Flatten array column][1]
df_2 = flatten(df_2)
out.write_dataframe(df_2)
code to flatten__df
The above code works for few files, since the files are above 100,0000 i am hitting the following error:
Connection To Driver Lost
This error indicates that connection to the driver was lost unexpectedly, which is often caused by the driver being terminated due to running out of memory. Common reasons for driver out-of-memory (OOM) errors include functions that materialize data to the driver such as .collect(), broadcasted joins, and using Pandas dataframes.
any way around this ?
I have given an example of how this can be done dynamically as an answer to another question.
Here is the link to that code answer: How to union multiple dynamic inputs in Palantir Foundry? and a copy of the same code:
from transforms.api import Input, Output, transform
from pyspark.sql import functions as F
import json
import logging
def transform_generator():
transforms = []
transf_dict = {## enter your dynamic mappings here ##}
for value in transf_dict:
@transform(
out=Output(' path to your output here '.format(val=value)),
inpt=Input(" path to input here ".format(val=value)),
)
def update_set(ctx, inpt, out):
spark = ctx.spark_session
sc = spark.sparkContext
filesystem = list(inpt.filesystem().ls())
file_dates = []
for files in filesystem:
with inpt.filesystem().open(files.path) as fi:
data = json.load(fi)
file_dates.append(data)
logging.info('info logs:')
logging.info(file_dates)
json_object = json.dumps(file_dates)
df_2 = spark.read.option("multiline", "true").json(sc.parallelize([json_object]))
df_2 = df_2.withColumn('upload_date', F.current_date())
df_2.drop_duplicates()
out.write_dataframe(df_2)
transforms.append(update_logs)
return transforms
TRANSFORMS = transform_generator()
Please let me know if there is anything I can clarify.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With