I am trying to write a query in SPARK SQL performing join of three tables. But the query output is actually null
. It is working fine for single table. My Join query is correct as I have already executed it in oracle database. What correction do I need to appply here? Spark version is 2.0.0.
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
lines = sc.textFile("/Users/Hadoop_IPFile/purchase")
lines2 = sc.textFile("/Users/Hadoop_IPFile/customer")
lines3 = sc.textFile("/Users/Hadoop_IPFile/book")
parts = lines.map(lambda l: l.split("\t"))
purchase = parts.map(lambda p: Row(year=p[0],cid=p[1],isbn=p[2],seller=p[3],price=int(p[4])))
schemapurchase = sqlContext.createDataFrame(purchase)
schemapurchase.registerTempTable("purchase")
parts2 = lines.map(lambda l: l.split("\t"))
customer = parts2.map(lambda p: Row(cid=p[0],name=p[1],age=p[2],city=p[3],sex=p[4]))
schemacustomer = sqlContext.createDataFrame(customer)
schemacustomer.registerTempTable("customer")
parts3 = lines.map(lambda l: l.split("\t"))
book = parts3.map(lambda p: Row(isbn=p[0],name=p[1]))
schemabook = sqlContext.createDataFrame(book)
schemabook.registerTempTable("book")
result_purchase = sqlContext.sql("""SELECT DISTINCT customer.name AS name FROM purchase JOIN book ON purchase.isbn = book.isbn JOIN customer ON customer.cid = purchase.cid WHERE customer.name != 'Harry Smith' AND purchase.isbn IN (SELECT purchase.isbn FROM customer JOIN purchase ON customer.cid = purchase.cid WHERE customer.name = 'Harry Smith')""")
result = result_purchase.rdd.map(lambda p: "name: " + p.name).collect()
for name in result:
print(name)
DataSet
---------
Purchase
1999 C1 B1 Amazon 90
2001 C1 B2 Amazon 20
2008 C2 B2 Barnes Noble 30
2008 C3 B3 Amazon 28
2009 C2 B1 Borders 90
2010 C4 B3 Barnes Noble 26
Customer
C1 Jackie Chan 50 Dayton M
C2 Harry Smith 30 Beavercreek M
C3 Ellen Smith 28 Beavercreek F
C4 John Chan 20 Dayton M
Book
B1 Novel
B2 Drama
B3 Poem
I found below instruction in some webpage, but it is still not working:
schemapurchase.join(schemabook, schemapurchase.isbn == schemabook.isbn)
schemapurchase.join(schemacustomer, schemapurchase.cid == schemacustomer.cid)
Given this input DataFrames like in your example (sorry if some column names are wrong, I guessed them):
purchase:
+----+---+----+------------+-----+
|year|cid|isbn| shop|price|
+----+---+----+------------+-----+
|1999| C1| B1| Amazon| 90|
|2001| C1| B2| Amazon| 20|
|2008| C2| B2|Barnes Noble| 30|
|2008| C3| B3| Amazon| 28|
|2009| C2| B1| Borders| 90|
|2010| C4| B3|Barnes Noble| 26|
+----+---+----+------------+-----+
customer:
+---+-----------+---+-----------+-----+
|cid| name|age| city|genre|
+---+-----------+---+-----------+-----+
| C1|Jackie Chan| 50| Dayton| M|
| C2|Harry Smith| 30|Beavercreek| M|
| C3|Ellen Smith| 28|Beavercreek| F|
| C4| John Chan| 20| Dayton| M|
+---+-----------+---+-----------+-----+
book:
+----+-----+
|isbn|genre|
+----+-----+
| B1|Novel|
| B2|Drama|
| B3| Poem|
+----+-----+
You can translate that sql query using DataFrame functions, like follow:
val result = purchase.join(book, purchase("isbn")===book("isbn"))
.join(customer, customer("cid")===purchase("cid"))
.where(customer("name") !== "Harry Smith")
.join(temp, purchase("isbn")===temp("purchase_isbn"))
.select(customer("name").as("NAME")).distinct()
where "temp" is the result of the "SELECT IN", that can be considered like the result of another one join:
val temp = customer.join(purchase, customer("cid")===purchase("cid") )
.where(customer("name")==="Harry Smith")
.select(purchase("isbn").as("purchase_isbn"))
+-------------+
|purchase_isbn|
+-------------+
| B2|
| B1|
+-------------+
So the final result is:
+-----------+
| NAME|
+-----------+
|Jackie Chan|
+-----------+
Consider this answer like a point you can start thinking from (too much joins can have bad impact on performance, for example).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With