Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

python send csv data to spark streaming

I would like to try and load a csv data in python and stream each row spark via SPark Streaming.

Im pretty new to network stuff. Im not exactly if Im supposed to create a server python script that once it establishes a connection(with spark streaming) it will start sending each row. In the Spark Streaming Documentation they do a nc -l 9999 which is a netcat server listening on port 9999 if im correct. So I tried creating a python script similar that parses a csv and sends on port 60000

import socket                   # Import socket module
import csv

 port = 60000                    # Reserve a port for your service.
 s = socket.socket()             # Create a socket object
 host = socket.gethostname()     # Get local machine name
 s.bind((host, port))            # Bind to the port
 s.listen(5)                     # Now wait for client connection.

 print('Server listening....')

 while True:
     conn, addr = s.accept()     # Establish connection with client.
     print('Got connection from', addr)



     csvfile = open('Titantic.csv', 'rb')

     reader = csv.reader(csvfile, delimiter = ',')
     for row in reader:
         line = ','.join(row)

         conn.send(line)
         print(line)

     csvfile.close()

     print('Done sending')
     conn.send('Thank you for connecting')
     conn.close()

SPark Streaming Script -

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 1)

# Create a DStream that will connect to hostname:port, like localhost:9999
lines_RDD = ssc.socketTextStream("localhost", 60000)

# Split each line into words
data_RDD = lines_RDD.flatMap(lambda line: line.split(","))

data_RDD.pprint()

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

When run the spark script(This is in Jupyter Notebooks btw) I get this error - IllegalArgumentException: 'requirement failed: No output operations registered, so nothing to execute'

I' dont think I am doing my socket script properly but im not really sure what to do Im basically trying to replicate what nc -lk 9999 does so I can send text data over the port and then spark streaming is listening to it and receives the data and processes it.

Any help would be greatly appreciated

like image 433
Neil Avatar asked Oct 31 '22 02:10

Neil


1 Answers

I'm trying to do something similar, but I want to stream a row every 10 seconds. I solved with this script:

import socket
from time import sleep

host = 'localhost'
port = 12345

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host, port))
s.listen(1)
while True:
    print('\nListening for a client at',host , port)
    conn, addr = s.accept()
    print('\nConnected by', addr)
    try:
        print('\nReading file...\n')
        with open('iris_test.csv') as f:
            for line in f:
                out = line.encode('utf-8')
                print('Sending line',line)
                conn.send(out)
                sleep(10)
            print('End Of Stream.')
    except socket.error:
        print ('Error Occured.\n\nClient disconnected.\n')
conn.close()

Hope this helps.

like image 91
ggagliano Avatar answered Nov 15 '22 05:11

ggagliano