From efa5e0fa4df6ef03e132eae0c2a06ce38cafff1f Mon Sep 17 00:00:00 2001 From: Pepping <pepping> Date: Wed, 4 Jun 2014 09:10:01 +0000 Subject: [PATCH] Replaced queues by pipes. --- tools/oneclick/prestudy/multithreads.py | 59 ++++++++++++++++++++++--- 1 file changed, 54 insertions(+), 5 deletions(-) diff --git a/tools/oneclick/prestudy/multithreads.py b/tools/oneclick/prestudy/multithreads.py index ce730cee36..ee96b97a9f 100644 --- a/tools/oneclick/prestudy/multithreads.py +++ b/tools/oneclick/prestudy/multithreads.py @@ -32,6 +32,7 @@ ############################################################################### # System imports +import multiprocessing import threading import Queue import time @@ -47,7 +48,8 @@ class base_component(object): self.packetsize = packetsize self.in_q = [] for i in range(nof_inputs): - self.in_q.append(Queue.Queue(packetsize)) + #self.in_q.append(Queue.Queue(packetsize)) + self.in_q.append(multiprocessing.Pipe(duplex=False)) self.out_q = [] print 'Instantiating', self.name @@ -77,7 +79,8 @@ class blockGen(threading.Thread, base_component): def run(self): for i in xrange(len(self.data)): for j in xrange(self.nof_outputs): - self.out_q[j].put(self.data[i][j]) + #self.out_q[j].put(self.data[i][j]) + self.out_q[j][1].send(self.data[i][j]) print "Sending packet " + str(i) + " on stream " + str(j) # time.sleep(1) @@ -91,9 +94,11 @@ class dp_mux(threading.Thread, base_component): while True: data = [] for i in xrange(self.nof_inputs): - data.append(self.in_q[i].get()) +# data.append(self.in_q[i].get()) + data.append(self.in_q[i][0].recv()) for j in xrange(self.nof_inputs): - self.out_q[0].put(data[j]) +# self.out_q[0].put(data[j]) + self.out_q[0][1].send(data[j]) class dataBuffer(threading.Thread, base_component): @@ -106,7 +111,8 @@ class dataBuffer(threading.Thread, base_component): data = [] for j in xrange(self.nof_packets): for i in xrange(self.nof_inputs): - data.append(self.in_q[i].get()) +# data.append(self.in_q[i].get()) + data.append(self.in_q[i][0].recv()) for i in data: print i @@ -149,6 +155,49 @@ bg.start() mux.start() db.start() + + + +#import multiprocessing +# +#NOF_PARALLEL_STREAMS = range(1000) +# +#def sink(inst_number, p_snk,p_src): +# p_src.close() # Close this local connection to source end of the pipe +# my_list = [] +# while True: +# try: +# item = p_snk.recv() +# #print inst_number, item +# except EOFError: +# break +# my_list.append(item**3) +# +# +#def source(p_snk,p_src): +# p_snk.close() # Close this local connection to the sink end of the pipe +# for item in xrange(2000000): +# p_src.send(item) +# p_src.close() +# +#if __name__ == '__main__': +# +# pipes = [] +# sinks = [] +# sources = [] +# +# for i in NOF_PARALLEL_STREAMS: +# +# # Create pipe and return connections to the source and sink sides of the pipe +# pipes.append( multiprocessing.Pipe(duplex=False) ) +# +# # Separate sink process +# sinks.append( multiprocessing.Process(target=sink, args=(i, pipes[i][0], pipes[i][1])) ) +# sinks[i].start() +# +# # Separate source process +# sources.append( multiprocessing.Process(target=source, args=(pipes[i][0], pipes[i][1])) ) +# sources[i].start() -- GitLab