sockets - python send csv data to spark streaming -


i try , load csv data in python , stream each row spark via spark streaming.

im pretty new network stuff. im not if im supposed create server python script once establishes connection(with spark streaming) start sending each row. in spark streaming documentation nc -l 9999 netcat server listening on port 9999 if im correct. tried creating python script similar parses csv , sends on port 60000

import socket                   # import socket module import csv   port = 60000                    # reserve port service.  s = socket.socket()             # create socket object  host = socket.gethostname()     # local machine name  s.bind((host, port))            # bind port  s.listen(5)                     # wait client connection.   print('server listening....')   while true:      conn, addr = s.accept()     # establish connection client.      print('got connection from', addr)         csvfile = open('titantic.csv', 'rb')       reader = csv.reader(csvfile, delimiter = ',')      row in reader:          line = ','.join(row)           conn.send(line)          print(line)       csvfile.close()       print('done sending')      conn.send('thank connecting')      conn.close() 

spark streaming script -

from pyspark import sparkcontext pyspark.streaming import streamingcontext ssc = streamingcontext(sc, 1)  # create dstream connect hostname:port, localhost:9999 linesrdd = ssc.sockettextstream("localhost", 60000)  # split each line words datardd = lines.flatmap(lambda line: line.split(","))  datardd.pprint()  ssc.start()             # start computation ssc.awaittermination()  # wait computation terminate 

when run spark script(this in jupyter notebooks btw) error - illegalargumentexception: 'requirement failed: no output operations registered, nothing execute'

i' dont think doing socket script im not sure im trying replicate nc -lk 9999 can send text data on port , spark streaming listening , receives data , processes it.

any appreciated

i'm trying similar, want stream row every 10 seconds. solved script:

import socket time import sleep  host = 'localhost' port = 12345  s = socket.socket(socket.af_inet, socket.sock_stream) s.bind((host, port)) s.listen(1) while true:     print('\nlistening client at',host , port)     conn, addr = s.accept()     print('\nconnected by', addr)     try:         print('\nreading file...\n')         open('iris_test.csv') f:             line in f:                 out = line.encode('utf-8')                 print('sending line',line)                 conn.send(out)                 sleep(10)             print('end of stream.')     except socket.error:         print ('error occured.\n\nclient disconnected.\n') conn.close() 

hope helps.