linux - Py 2.7 arch: How to persistent HTTP/S with multiple servers and not gather data to send multiple times? -


this kind of complicated question, i'll best plain in explanation , not give many unnecessary details.

i developed python script work last year. grabs basic system data , sends http/s server, can send commands if users chooses. it's kind of been big experiment last year, seeing works , doesn't. testing different needs within company, etc. have pretty solid understanding of need. starting journey on version 2.

the aim of new version maintain functionality while reducing system/cpu load , bandwidth. after developing python script out, rest of work done on http/s server. question client side, python script. using python 2.7.x, on debian based systems.

the v1 script grabs system data, reads config file contains servers send data to, uses threads send each server. (still in threads) each server can return 1 or more commands, processed through own threads. script run once minute via crontab. can have 5 or more servers send 10 commands each , script still executes smoothly, , without taking long time finish commands issued servers.

in v2 script, seeking make following required changes:

  • will run system service. instead of code being run cron every minute, script loop every few seconds.

  • the loop needs gather data once each time through loop, send each web server (as defined in configuration file)

  • i want persistent http/s connections performance , bandwidth optimization.

  • i don't want gather data each time through loop each http/s server. want gather data once per iteration through main loop drives service, , send data threads managing established http/s persistent connections.

here in lies problem. how persistent connections inside in respective threads , data threads while collecting data once?

from does httplib reuse tcp connections? see persistent connections can done in such manner (thank corey goldberg):

con = httplib.httpconnection("myweb.com") while true:     con.request("get", "/x.css", headers={"connection":" keep-alive"})     result = con.getresponse()     result.read()     print result.reason, result.getheaders() 

data gathering needs happen inside loop. need happen in multiple threads talking various servers @ same time, , don't want waste resources go , fetch data more once. don't see how possible, given relatively limited knowledge of python.

basically see right now, there needs loop drives http/s inside of threads. need kind of loop gather data , prepare go http/s connections. how first loops inside of second loops in such way? it's need http/s persistent connection loop inside data gathering loop, need data gathering loop inside http/s loop.

i explore pure 2.7.x pythonic ways might accomplished. depending on outside utilities may problematic various reasons. script, when finished, deployed 150+ linux systems , less can go wrong, better.

thank , consideration!

i going leave here others who, me, searching expand python understanding. took me awhile figure out how approach problem, solution made clear after talking coworker understood kind of issue.

so in short, answer worked me used python 2.7.x's native threading , queue modules.

i have main program manages various threads , queue's setup. networkworker class, extends threading module, upon init spins new queue's each instance. queue reference / handler stored in global list variable. loop through queue list , send data each threads queue in main thread (main.py). each thread gets it's data , it's supposed to. , data received each http connection loaded queue processed single command execution thread in main.py.

the following code has been modified / extracted it's original context. have tested , works long in correctly configure servers in self.conf dict, located in main.py > my_service > init, , server response valid json. use clean up. ensure code remains public , accessible added creative commons license. feels code resembles own code may contact me proper attribution.

except main.py, names of other 2 files important. shared_globals.py , workerthread.py filenames case sensitive , must in same folder main.py

main executable: main.py

#!/usr/bin/python # encoding=utf8  time import sleep, time import subprocess, sys, os # used ip, system calls, etc import json  # web support import httplib import urllib import zlib import base64  # wokerthread dependancy import shared_globals workerthread import networkworker  import queue import threading  ''' work, python networkworker queue / threading, licensed under creative commons attribution-noncommercial-sharealike 4.0 international license. written john minton @ http://pythonjohn.com/ view copy of license, visit http://creativecommons.org/licenses/by-nc-sa/4.0/. '''  class my_service:      # * * * *      def __init__(self):          # manually list off servers want talk          self.conf = {}         self.conf['servers'] = {}          self.conf['servers']['servername1'] = {}         self.conf['servers']['servername1']['protocol'] = "http"         self.conf['servers']['servername1']['url'] = "server.com"         self.conf['servers']['servername1']['port'] = "80"         self.conf['servers']['servername1']['path'] = "/somefile.php"         self.conf['servers']['servername1']['timeout'] = "10" # seconds. make sure long enough largest or mission critical http/s transactions finish + time takes wait data come persistant http/s thread. data comes in every 2 seconds, 5-10 seconds should fine. takes long cause queue much.          self.conf['servers']['servername2'] = {}         self.conf['servers']['servername2']['protocol'] = "http"         self.conf['servers']['servername2']['url'] = "otherserver.net"         self.conf['servers']['servername2']['port'] = "80"         self.conf['servers']['servername2']['path'] = "/dataio.php"         self.conf['servers']['servername2']['timeout'] = "5"          # start threading manager, manage various threads , components         # cross thread communication needs managed queues         self.threadmanager()       def threadmanager(self):          # place reference threads         self.threads = []          print "loading shared globals"         # 3rd file in project. not need if         # networkworker thread inside of same file. since         # in file, use shared_globals file make queue's          # list , other shared resources available between main thread , networkworker threads         shared_globals.init()          # keep track of threads / classes initializing         self.workers = {} # keep track of worker threads          print "initalizing network worker threads config"         # each server want talk to, start worker thread         # read servers self.conf , init threads / workers         t in self.conf['servers']: # loop through servers in config             # t = server name             #print "t: ", self.conf['servers'][t]             self.workers[t] = networkworker()      # save worker handlers workers dict              # set server data each networkworker thread             self.workers[t].set_server(self.conf['servers'][t]['url'], self.conf['servers'][t]['port'], self.conf['servers'][t]['timeout'], self.conf['servers'][t]['path'])          print "initalizing command processing queue"         cmd_q = queue.queue()         cmd_q.daemon = true         shared_globals.cmd_active_queue = cmd_q          print "starting command processing thread"         # start data gathering thread         t_cmd = threading.thread(target=self.command_que_thread_manager)         t_cmd.daemon = true         self.threads.append(t_cmd)         t_cmd.start()          print "start data gathering thread"         # start data gathering thread         t = threading.thread(target=self.data_collector_thread)         t.daemon = true         self.threads.append(t)         t.start()          print "starting worker threads"         w in self.workers:      # loop through worker handlers             self.workers[w].start() # start jobs          # have our networkworker threads running, , init own queues          # send data using def below titled self.send_data_to_networkworkers          print "service started\n\n\n"          # keeps main thread listening can perform actions killing application ctrl+c         while threading.active_count() > 0:             try:                 sleep(0.1)             except (keyboardinterrupt, systemexit): # exits main thread without complainnt!                 print "\n"                 os._exit(0)         os._exit(0)      def data_collector_thread(self):         '''         gather data want send each server         send data queues each networkworker thread init'd above         '''         # loop indefinately         while true:              # gather data , load data dict             data = {"data":"values"}             print "\n\ndata sent networkworker threads: ", data, "\n\n"              # prep data http/s             # if need else data besides sending threads, here             data = self.prep_data_for_http(data) # pre-http/s processing here             self.send_data_to_networkworkers(data) # send data out threads queue's             sleep(2) # wait little bit , iterate through loop again. main loop timer.      def prep_data_for_http(self, data):         '''         converting data python dict json starting         compress json starting         load compressed string dict, http/s object (in networkworker thread) expects dict         url encode data http/s post transit         return manipulated data object, ready http/s         '''         data = json.dumps(data, encoding='utf8') # continue preparing http/s         data = zlib.compress(data, 8)         # in php, data $_post['data'] key         data = {"data":data}         data = urllib.urlencode(data)         return data     # end def      def command_que_thread_manager(self):         '''         run thread         send data thread via it's queue, init'd above in thread manager         grabs data, , process         '''         while true:             data = shared_globals.cmd_active_queue.get()             print "processing command: ", data     # end def      def send_data_to_networkworkers(self,data):         '''         send data networkworker threads         '''         q in shared_globals.network_active_queues:             q.put(data)      def clean_exit(self):         '''         run when exiting program clean exit         don't think call in example,          upon main thread exit idea         '''         w in self.workers:      # loop through worker handlers             self.workers[w].stop()  # stop jobs      # end def     # end class  if __name__ == "__main__":     my_service = my_service() 

shared globals file: shared_globals.py

#!/usr/bin/python # encoding=utf8  ''' work, python networkworker queue / threading, licensed under creative commons attribution-noncommercial-sharealike 4.0 international license. written john minton @ http://pythonjohn.com/ view copy of license, visit http://creativecommons.org/licenses/by-nc-sa/4.0/. '''  def init():      global network_active_queues     global cmd_active_queues     global cmd_q      # keep track of data going network worker threads     print "initalizing network active queues"     network_active_queues = []      # keep track of commands      print "initalizing command active queues"     cmd_active_queue = ""      # ?     #cmd_q = [] 

networkworker class: workerthread.py

#!/usr/bin/python # encoding=utf8 ''' work, python networkworker queue / threading, licensed under creative commons attribution-noncommercial-sharealike 4.0 international license. written john minton @ http://pythonjohn.com/ view copy of license, visit http://creativecommons.org/licenses/by-nc-sa/4.0/. ''' import queue import threading  import httplib import urllib import json   # wokerthread dependancy # add queue list http/s responses import shared_globals  class networkworker(threading.thread):      def __init__(self):         '''         extend threading module         start new queue instance of class         run thread daemon         shared_globals external file globals between main script , class.         append queue list of queue's in shared_globals.network_active_queues         loop through shared_globals.network_active_queues send data queues started class         '''         threading.thread.__init__(self)         self.q = queue.queue()         self.q.daemon = true         shared_globals.network_active_queues.append(self.q)         # init queue processing commands      def run(self):         '''         establish persistant http connection         pull data queue         when data comes in, send server         send response http server queue / thread         can want responses http server         '''         # set headers         headers = {"content-type": "application/x-www-form-urlencoded", "accept": "text/plain", "connection": "keep-alive"} # "connection": "keep-alive" persistance         # init presistant http connection         http_request = httplib.httpconnection( self.url, int(self.port), timeout=int(self.timeout) )         # init response_data         response_data = str()         # start loop         while true:             # code waits here queue have data. if no data, sleeps until send data via it's queue.             data = self.q.get()             # .... when gets data, proceed data variable.             try:                 http_request.request( "post", self.path, data, headers )                 response = http_request.getresponse()                 response_data = response.read()                 # response http/s server                 print "response: ", response_data             except exception, e:                 # in event goes wrong, can try reestablish http                 print e, "re-establishing http/s connection"                 http_request = httplib.httpconnection( self.url, int(self.port), timeout=int(self.timeout) )              # if http transaction successful, have our http response data in response_data variable             if response_data:                 # try except fail on bad json object                            try:                     # validate json & convert json native python dict                        json_data = json.loads(response_data)                      # send response server command thread manager                     shared_globals.cmd_active_queue.put(json_data)                  except valueerror, e:                     print "bad server response: discarding invalid json"                     # repackage invalid json, or identifier thereof, , send command processing thread                     # load networkworker's thread queue new data object tell server there malformed json , resend data.                     #http_request.request( "post", self.path, data, headers )                     #response = http_request.getresponse()                     #response_data = response.read()           # place here measure, if ever exit while loop close http/s connection         http_request.close()      # end def       def set_server(self, url, port, timeout, path):         '''         use set server class / thread instance         variables passed in translated class instance variables (self)         '''         self.url = url         self.port = port         self.timeout = timeout         self.path = path     # end def       def stop(self):         '''         stop queue         stop thread         clean else needed - tell other threads / queues shutdown         '''         shared_globals.network_active_queues.remove(self.q)         #self.q.put("shutdown") # need tell threads shutdown? perhaps if reloading config         self.join()      # end def  # end class