A机器负责发送任务和接受结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | #task_master.py import random,time,queue from multiprocessing.managers import BaseManager task_queue = queue.Queue() result_queue = queue.Queue() class QueueManager(BaseManager): pass if __name__ = = '__main__' : print ( "master start." ) QueueManager.register( 'get_task_queue' , callable = lambda :task_queue) QueueManager.register( 'get_result_queue' , callable = lambda :result_queue) manager = QueueManager(address = ( '10.10.100.11' , 9833 ),authkey = b 'abc' ) manager.start() task = manager.get_task_queue() result = manager.get_result_queue() for i in range ( 10 ): n = random.randint( 0 , 1000 ) print ( 'put task %d ...' % n) task.put(n) print ( 'try get results...' ) for i in range ( 10 ): r = result.get(timeout = 100 ) print ( 'Result:%s' % r) manager.shutdown() print ( 'master exit.' ) |
B机器负责处理任务和发送结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | #task_worker.py import sys,time,queue from multiprocessing.managers import BaseManager class QueueManager(BaseManager): pass QueueManager.register( 'get_task_queue' ) QueueManager.register( 'get_result_queue' ) server_addr = '10.10.100.11' print ( 'connect to server %s...' % server_addr) m = QueueManager(address = (server_addr, 9833 ),authkey = b 'abc' ) m.connect() task = m.get_task_queue() result = m.get_result_queue() for i in range ( 10 ): try : n = task.get(timeout = 10 ) print ( 'run task %d * %d' % (n,n)) r = '%d * %d = %d' % (n,n,n * n) time.sleep( 1 ) result.put(r) except Queue.Empty: print ( 'task queue is empty' ) print ( 'worker exit' ) |