Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pulling data from Neo4j using PySpark

I have a time series currently stored as a graph (using a time tree structure, similar to this) in a Neo4j server instance, version 2.3.6 (so REST interface only, no Bolt). What I am trying to do is to perform some analytics of these time series in a distributed way, using PySpark.

Now, I am aware of existing projects to connect Spark with Neo4j, especially the ones listed here. The problem with these is that they focus on creating an interface to work with graphs. In my case graphs are not relevant, since my Neo4j Cypher queries are meant to produce arrays of values. Everything downstream is about handling these arrays as time series; again, not as graph.

My question is:

has anybody successfully queried a REST-only Neo4j instance in parallel using PySpark, and if yes, how did you do it?

The py2neo library seemed like a good candidate until I realized the connection object could not be shared across partitions (or if it can, I do not know how). Right now I'm considering having my Spark jobs run independent REST queries on the Neo4j server, but I want to see how the community may have solved this problem.

like image 716
ajmazurie Avatar asked Jul 21 '16 18:07

ajmazurie


1 Answers

I am not sure I fully understand the problem. But as I read it:

  • You have no problems connecting to Neo4J or handling any of the Neo4J-specific elements.
  • You are not able to properly manage multiple partitions; there is no persistency or means to ensure that communication across partitions is correct.

If my understanding is correct:

  • I suggest looking at (pandas) Spark UDFs.
    • These have nothing to do with Neo4J, but I have found them to be a great way to handle any ad hoc ETL work where any particular summarization or calculation can be done on a single partition.
    • It is a surprisingly easy-to-use API, except the a priori definition of schemas in the UDF can be a pain.

From the documentation:

import pandas as pd

from pyspark.sql.functions import pandas_udf

@pandas_udf("col1 string, col2 long")
def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame:
    s3['col2'] = s1 + s2.str.len()
    return s3

# Create a Spark DataFrame that has three columns including a sturct column.
df = spark.createDataFrame(
    [[1, "a string", ("a nested string",)]],
    "long_col long, string_col string, struct_col struct<col1:string>")

df.printSchema()
# root
# |-- long_column: long (nullable = true)
# |-- string_column: string (nullable = true)
# |-- struct_column: struct (nullable = true)
# |    |-- col1: string (nullable = true)

df.select(func("long_col", "string_col", "struct_col")).printSchema()
# |-- func(long_col, string_col, struct_col): struct (nullable = true)
# |    |-- col1: string (nullable = true)
# |    |-- col2: long (nullable = true)

To be clear, those pd.Series and pd.DataFrame are not really pandas objects that you have to create. Instead, by decorating your function with @pandas_udf, you supply Spark objects, and they are treated like pandas objects in each partition.

I do not know the technical details beyond this, other than to say it has worked for any weird UDF I have ever needed to try (if the schema can be known a priori!).

like image 171
Mike Williamson Avatar answered Nov 14 '22 05:11

Mike Williamson