I have a time series currently stored as a graph (using a time tree structure, similar to this) in a Neo4j server instance, version 2.3.6 (so REST interface only, no Bolt). What I am trying to do is to perform some analytics of these time series in a distributed way, using PySpark.
Now, I am aware of existing projects to connect Spark with Neo4j, especially the ones listed here. The problem with these is that they focus on creating an interface to work with graphs. In my case graphs are not relevant, since my Neo4j Cypher queries are meant to produce arrays of values. Everything downstream is about handling these arrays as time series; again, not as graph.
My question is:
has anybody successfully queried a REST-only Neo4j instance in parallel using PySpark, and if yes, how did you do it?
The py2neo library seemed like a good candidate until I realized the connection object could not be shared across partitions (or if it can, I do not know how). Right now I'm considering having my Spark jobs run independent REST queries on the Neo4j server, but I want to see how the community may have solved this problem.
I am not sure I fully understand the problem. But as I read it:
If my understanding is correct:
From the documentation:
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("col1 string, col2 long")
def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame:
s3['col2'] = s1 + s2.str.len()
return s3
# Create a Spark DataFrame that has three columns including a sturct column.
df = spark.createDataFrame(
[[1, "a string", ("a nested string",)]],
"long_col long, string_col string, struct_col struct<col1:string>")
df.printSchema()
# root
# |-- long_column: long (nullable = true)
# |-- string_column: string (nullable = true)
# |-- struct_column: struct (nullable = true)
# | |-- col1: string (nullable = true)
df.select(func("long_col", "string_col", "struct_col")).printSchema()
# |-- func(long_col, string_col, struct_col): struct (nullable = true)
# | |-- col1: string (nullable = true)
# | |-- col2: long (nullable = true)
To be clear, those pd.Series
and pd.DataFrame
are not really pandas objects that you have to create. Instead, by decorating your function with @pandas_udf
, you supply Spark objects, and they are treated like pandas objects in each partition.
I do not know the technical details beyond this, other than to say it has worked for any weird UDF I have ever needed to try (if the schema can be known a priori!).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With