终于改成zmq通信,但是一个循环两对send,recv,开多个客户端时会混乱。
server
#! /usr/bin/env python#coding=utf-8'''fileName: server.py数据发送方式:zmq'''import zmq import timeimport jsonfrom Queue import Queueimport threadingimport pymongoclass Producer(threading.Thread): def __init__(self, t_name, processdata): threading.Thread.__init__(self, name=t_name) self.data=processdata def run(self): recvMassage()def recvMassage(): context = zmq.Context() socket = context.socket(zmq.REP) socket.bind("tcp://*:5555") while (True): message = socket.recv() print 'The message from the client is : ',message global sign if message == '*': # 前端要求更改周期 socket.send("*") # "@"表示更改周期 sign = 1 # 设置标志位表示要更改周期 c = socket.recv() global cycle cycle = c elif: if sign == 1: # 更改周期 socket.send("@") # "@"表示更改周期 socket.recv() #没用 socket.send(cycle) sign = 0 else : socket.send("#") # "#"表示发送监控数据 message = socket.recv() print "message from client:", message load = json.loads(message) info = dict(load) print type(info) handleData(info) socket.send("#")def handleData(info): conn = pymongo.Connection("localhost", 27017) db = conn['networkSecurity'] systemInfo = db['systemInfo'] # 构造dict数据 message = { 'IP':info[u'IP'],'CPUstate':info[u'CPUstate'],'Memorystate':info[u'Memorystate'], 'PortState':info[u'PortState'],'ProcessName':info[u'ProcessName']} print 'Client said :\nIP:%s\nCPUstate:%s\nMemorystate:%s\nPortState:%s\nProcessName:%s'%(message['IP'],message['CPUstate'],message['Memorystate'],message['PortState'],message['ProcessName']) # 将数据放入队列 processdata.put(message) # 将数据存入数据库 systemInfo.insert(message) print 'success to store the data!'#消费者class Consumer(threading.Thread): def __init__(self, t_name,processdata): threading.Thread.__init__(self, name=t_name) def run(self): print "%s: %s is consuming in the queue!/n" %(time.ctime(), self.getName()) message = processdata.get() print 'the message in the queue is : ',message print type(message) monitorSystem(message)# 黑白名单匹配,info为字典def monitorSystem(info): warning = 0 whiteList = ['cmd.exe'] blackList = ['sublime_text.exe'] # for info in systemInfo.find(): # print info IP = info['IP'] processName = info['ProcessName'] for process in processName: if process in blackList: warning = 1 print 'Process %s in black list is running in IP %s ! '%(process,IP) for process in whiteList: if process not in processName: warning = 1 print 'Process %s in white list is not running in IP %s ! '%(process,IP) if warning == 0: print 'Host %s is running legally ! '%IPif __name__ == '__main__': # 处理队列 processdata=Queue() # 生产进程:接受数据 producer = Producer('Pro.', processdata) # 消费进程:处理数据,黑白名单匹配 consumer = Consumer('Con.', processdata) producer.start() consumer.start() producer.join() consumer.join()
client
#! /usr/bin/env python#coding=utf-8'''fileName:client.py监控windows信息:CPU占有率,内存占有率,端口开放情况,当前进程名称数据格式: {'IP':getIp(),'CPUstate':getCPUState(),'Memorystate':getMemoryState(), 'PortState':getPortState(),'ProcessName':getProcessName()}数据发送方式:zmq'''import zmq import psutilimport jsonimport socketimport threadimport timedef sendMessage(portState): context = zmq.Context() print "Connecting to server..." socket = context.socket(zmq.REQ) socket.connect ("tcp://192.168.111.135:5555") ip = json.dumps(getIp()) socket.send(ip) # print '#######################' message = socket.recv() print "Received reply: ", message if message =='@': # "@"表示更改周期 socket.send('@') global cycle info = socket.recv() json.loads(info) cycle = int(info) print 'type(cycle)',type(cycle) elif message == '#': # "#"表示发送监控数据 mymessage = json.dumps(packMessage(portState)) socket.send(mymessage) socket.recv()# 构造数据包def packMessage(portState): message = { 'IP':getIp(),'CPUstate':getCPUState(),'Memorystate':getMemoryState(), 'PortState':portState,'ProcessName':getProcessName()} print 'My message is :\nIP:%s\nCPUstate:%s\nMemorystate:%s\nPortState:%s\nProcessName:%s'%(message['IP'],message['CPUstate'],message['Memorystate'],message['PortState'],message['ProcessName']) return message# 获取本机IPdef getIp(): myname = socket.getfqdn(socket.gethostname()) myaddr = socket.gethostbyname(myname) return myaddr# 获取CPU使用率 def getCPUState(interval=1): return (str(psutil.cpu_percent(interval)) + "%")# 获取内存使用率 def getMemoryState(): mem_rate = 0 for pnum in psutil.pids(): p = psutil.Process(pnum) mem_rate = mem_rate + p.memory_percent() return "%.2f%%"%mem_rate# 输入IP和端口号,扫描判断端口是否开放def socket_port(ip,port,portList): try: if port >= 65535: print u'端口扫描结束' s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) result = s.connect_ex((ip,port)) if result == 0: lock.acquire() portList.append(port) lock.release() s.close() except: print u'端口扫描异常'# 输入IP,扫描IP的0-65534端口情况 def ip_scan(ip): portList = [] socket.setdefaulttimeout(3) try: for i in range(0,65534): thread.start_new_thread(socket_port,(ip,int(i),portList)) # 返回所有开放的端口号 return portList except: print u'扫描ip出错'# 获取正在运行的进程名称def getProcessName(): ProcessNameList = [] for pnum in psutil.pids(): p = psutil.Process(pnum) ProcessNameList.append(p.name()) return ProcessNameList if __name__ == '__main__': global cycle cycle = 60 while(True): myIP = getIp() lock = thread.allocate_lock() portState = ip_scan(myIP) print portState sendMessage(portState) print 'cycle is',cycle time.sleep(cycle)