linux - Py 2.7 arch: How to persistent HTTP/S with multiple servers and not gather data to send multiple times? -
this kind of complicated question, i'll best plain in explanation , not give many unnecessary details.
i developed python script work last year. grabs basic system data , sends http/s server, can send commands if users chooses. it's kind of been big experiment last year, seeing works , doesn't. testing different needs within company, etc. have pretty solid understanding of need. starting journey on version 2.
the aim of new version maintain functionality while reducing system/cpu load , bandwidth. after developing python script out, rest of work done on http/s server. question client side, python script. using python 2.7.x, on debian based systems.
the v1 script grabs system data, reads config file contains servers send data to, uses threads send each server. (still in threads) each server can return 1 or more commands, processed through own threads. script run once minute via crontab. can have 5 or more servers send 10 commands each , script still executes smoothly, , without taking long time finish commands issued servers.
in v2 script, seeking make following required changes:
will run system service. instead of code being run cron every minute, script loop every few seconds.
the loop needs gather data once each time through loop, send each web server (as defined in configuration file)
i want persistent http/s connections performance , bandwidth optimization.
i don't want gather data each time through loop each http/s server. want gather data once per iteration through main loop drives service, , send data threads managing established http/s persistent connections.
here in lies problem. how persistent connections inside in respective threads , data threads while collecting data once?
from does httplib reuse tcp connections? see persistent connections can done in such manner (thank corey goldberg):
con = httplib.httpconnection("myweb.com") while true: con.request("get", "/x.css", headers={"connection":" keep-alive"}) result = con.getresponse() result.read() print result.reason, result.getheaders()
data gathering needs happen inside loop. need happen in multiple threads talking various servers @ same time, , don't want waste resources go , fetch data more once. don't see how possible, given relatively limited knowledge of python.
basically see right now, there needs loop drives http/s inside of threads. need kind of loop gather data , prepare go http/s connections. how first loops inside of second loops in such way? it's need http/s persistent connection loop inside data gathering loop, need data gathering loop inside http/s loop.
i explore pure 2.7.x pythonic ways might accomplished. depending on outside utilities may problematic various reasons. script, when finished, deployed 150+ linux systems , less can go wrong, better.
thank , consideration!
i going leave here others who, me, searching expand python understanding. took me awhile figure out how approach problem, solution made clear after talking coworker understood kind of issue.
so in short, answer worked me used python 2.7.x's native threading , queue modules.
i have main program manages various threads , queue's setup. networkworker class, extends threading module, upon init spins new queue's each instance. queue reference / handler stored in global list variable. loop through queue list , send data each threads queue in main thread (main.py). each thread gets it's data , it's supposed to. , data received each http connection loaded queue processed single command execution thread in main.py.
the following code has been modified / extracted it's original context. have tested , works long in correctly configure servers in self.conf dict, located in main.py > my_service > init, , server response valid json. use clean up. ensure code remains public , accessible added creative commons license. feels code resembles own code may contact me proper attribution.
except main.py, names of other 2 files important. shared_globals.py , workerthread.py filenames case sensitive , must in same folder main.py
main executable: main.py
#!/usr/bin/python # encoding=utf8 time import sleep, time import subprocess, sys, os # used ip, system calls, etc import json # web support import httplib import urllib import zlib import base64 # wokerthread dependancy import shared_globals workerthread import networkworker import queue import threading ''' work, python networkworker queue / threading, licensed under creative commons attribution-noncommercial-sharealike 4.0 international license. written john minton @ http://pythonjohn.com/ view copy of license, visit http://creativecommons.org/licenses/by-nc-sa/4.0/. ''' class my_service: # * * * * def __init__(self): # manually list off servers want talk self.conf = {} self.conf['servers'] = {} self.conf['servers']['servername1'] = {} self.conf['servers']['servername1']['protocol'] = "http" self.conf['servers']['servername1']['url'] = "server.com" self.conf['servers']['servername1']['port'] = "80" self.conf['servers']['servername1']['path'] = "/somefile.php" self.conf['servers']['servername1']['timeout'] = "10" # seconds. make sure long enough largest or mission critical http/s transactions finish + time takes wait data come persistant http/s thread. data comes in every 2 seconds, 5-10 seconds should fine. takes long cause queue much. self.conf['servers']['servername2'] = {} self.conf['servers']['servername2']['protocol'] = "http" self.conf['servers']['servername2']['url'] = "otherserver.net" self.conf['servers']['servername2']['port'] = "80" self.conf['servers']['servername2']['path'] = "/dataio.php" self.conf['servers']['servername2']['timeout'] = "5" # start threading manager, manage various threads , components # cross thread communication needs managed queues self.threadmanager() def threadmanager(self): # place reference threads self.threads = [] print "loading shared globals" # 3rd file in project. not need if # networkworker thread inside of same file. since # in file, use shared_globals file make queue's # list , other shared resources available between main thread , networkworker threads shared_globals.init() # keep track of threads / classes initializing self.workers = {} # keep track of worker threads print "initalizing network worker threads config" # each server want talk to, start worker thread # read servers self.conf , init threads / workers t in self.conf['servers']: # loop through servers in config # t = server name #print "t: ", self.conf['servers'][t] self.workers[t] = networkworker() # save worker handlers workers dict # set server data each networkworker thread self.workers[t].set_server(self.conf['servers'][t]['url'], self.conf['servers'][t]['port'], self.conf['servers'][t]['timeout'], self.conf['servers'][t]['path']) print "initalizing command processing queue" cmd_q = queue.queue() cmd_q.daemon = true shared_globals.cmd_active_queue = cmd_q print "starting command processing thread" # start data gathering thread t_cmd = threading.thread(target=self.command_que_thread_manager) t_cmd.daemon = true self.threads.append(t_cmd) t_cmd.start() print "start data gathering thread" # start data gathering thread t = threading.thread(target=self.data_collector_thread) t.daemon = true self.threads.append(t) t.start() print "starting worker threads" w in self.workers: # loop through worker handlers self.workers[w].start() # start jobs # have our networkworker threads running, , init own queues # send data using def below titled self.send_data_to_networkworkers print "service started\n\n\n" # keeps main thread listening can perform actions killing application ctrl+c while threading.active_count() > 0: try: sleep(0.1) except (keyboardinterrupt, systemexit): # exits main thread without complainnt! print "\n" os._exit(0) os._exit(0) def data_collector_thread(self): ''' gather data want send each server send data queues each networkworker thread init'd above ''' # loop indefinately while true: # gather data , load data dict data = {"data":"values"} print "\n\ndata sent networkworker threads: ", data, "\n\n" # prep data http/s # if need else data besides sending threads, here data = self.prep_data_for_http(data) # pre-http/s processing here self.send_data_to_networkworkers(data) # send data out threads queue's sleep(2) # wait little bit , iterate through loop again. main loop timer. def prep_data_for_http(self, data): ''' converting data python dict json starting compress json starting load compressed string dict, http/s object (in networkworker thread) expects dict url encode data http/s post transit return manipulated data object, ready http/s ''' data = json.dumps(data, encoding='utf8') # continue preparing http/s data = zlib.compress(data, 8) # in php, data $_post['data'] key data = {"data":data} data = urllib.urlencode(data) return data # end def def command_que_thread_manager(self): ''' run thread send data thread via it's queue, init'd above in thread manager grabs data, , process ''' while true: data = shared_globals.cmd_active_queue.get() print "processing command: ", data # end def def send_data_to_networkworkers(self,data): ''' send data networkworker threads ''' q in shared_globals.network_active_queues: q.put(data) def clean_exit(self): ''' run when exiting program clean exit don't think call in example, upon main thread exit idea ''' w in self.workers: # loop through worker handlers self.workers[w].stop() # stop jobs # end def # end class if __name__ == "__main__": my_service = my_service()
shared globals file: shared_globals.py
#!/usr/bin/python # encoding=utf8 ''' work, python networkworker queue / threading, licensed under creative commons attribution-noncommercial-sharealike 4.0 international license. written john minton @ http://pythonjohn.com/ view copy of license, visit http://creativecommons.org/licenses/by-nc-sa/4.0/. ''' def init(): global network_active_queues global cmd_active_queues global cmd_q # keep track of data going network worker threads print "initalizing network active queues" network_active_queues = [] # keep track of commands print "initalizing command active queues" cmd_active_queue = "" # ? #cmd_q = []
networkworker class: workerthread.py
#!/usr/bin/python # encoding=utf8 ''' work, python networkworker queue / threading, licensed under creative commons attribution-noncommercial-sharealike 4.0 international license. written john minton @ http://pythonjohn.com/ view copy of license, visit http://creativecommons.org/licenses/by-nc-sa/4.0/. ''' import queue import threading import httplib import urllib import json # wokerthread dependancy # add queue list http/s responses import shared_globals class networkworker(threading.thread): def __init__(self): ''' extend threading module start new queue instance of class run thread daemon shared_globals external file globals between main script , class. append queue list of queue's in shared_globals.network_active_queues loop through shared_globals.network_active_queues send data queues started class ''' threading.thread.__init__(self) self.q = queue.queue() self.q.daemon = true shared_globals.network_active_queues.append(self.q) # init queue processing commands def run(self): ''' establish persistant http connection pull data queue when data comes in, send server send response http server queue / thread can want responses http server ''' # set headers headers = {"content-type": "application/x-www-form-urlencoded", "accept": "text/plain", "connection": "keep-alive"} # "connection": "keep-alive" persistance # init presistant http connection http_request = httplib.httpconnection( self.url, int(self.port), timeout=int(self.timeout) ) # init response_data response_data = str() # start loop while true: # code waits here queue have data. if no data, sleeps until send data via it's queue. data = self.q.get() # .... when gets data, proceed data variable. try: http_request.request( "post", self.path, data, headers ) response = http_request.getresponse() response_data = response.read() # response http/s server print "response: ", response_data except exception, e: # in event goes wrong, can try reestablish http print e, "re-establishing http/s connection" http_request = httplib.httpconnection( self.url, int(self.port), timeout=int(self.timeout) ) # if http transaction successful, have our http response data in response_data variable if response_data: # try except fail on bad json object try: # validate json & convert json native python dict json_data = json.loads(response_data) # send response server command thread manager shared_globals.cmd_active_queue.put(json_data) except valueerror, e: print "bad server response: discarding invalid json" # repackage invalid json, or identifier thereof, , send command processing thread # load networkworker's thread queue new data object tell server there malformed json , resend data. #http_request.request( "post", self.path, data, headers ) #response = http_request.getresponse() #response_data = response.read() # place here measure, if ever exit while loop close http/s connection http_request.close() # end def def set_server(self, url, port, timeout, path): ''' use set server class / thread instance variables passed in translated class instance variables (self) ''' self.url = url self.port = port self.timeout = timeout self.path = path # end def def stop(self): ''' stop queue stop thread clean else needed - tell other threads / queues shutdown ''' shared_globals.network_active_queues.remove(self.q) #self.q.put("shutdown") # need tell threads shutdown? perhaps if reloading config self.join() # end def # end class