From ef28c9dfc07c0eb3fa12180e84a54e3301c2b965 Mon Sep 17 00:00:00 2001 From: Pepping <pepping> Date: Wed, 4 Jun 2014 10:18:05 +0000 Subject: [PATCH] - Added a second databuffer and support for multiple fanout. --- tools/oneclick/prestudy/multithreads.py | 97 ++++++++----------------- 1 file changed, 29 insertions(+), 68 deletions(-) diff --git a/tools/oneclick/prestudy/multithreads.py b/tools/oneclick/prestudy/multithreads.py index ee96b97a9f..12d85b8cde 100644 --- a/tools/oneclick/prestudy/multithreads.py +++ b/tools/oneclick/prestudy/multithreads.py @@ -25,7 +25,7 @@ Remark: This example shows how a simple sysmtem of a block generator, a multiplexer and a databuffer can be modelled using threads - with queues. + with pipes. """ @@ -48,7 +48,6 @@ class base_component(object): self.packetsize = packetsize self.in_q = [] for i in range(nof_inputs): - #self.in_q.append(Queue.Queue(packetsize)) self.in_q.append(multiprocessing.Pipe(duplex=False)) self.out_q = [] print 'Instantiating', self.name @@ -57,9 +56,13 @@ class base_component(object): def __gt__(self, other): return self.__connect__(other) def __connect__(self, other): - if hasattr(other, 'in_q'): + if hasattr(other, 'in_q'): + # Create a list for all outputs. + other_in_q = [] for j in xrange(self.nof_outputs): - self.out_q.append(other.in_q[j]) + other_in_q.append(other.in_q[j]) + # Add to the list to support multiple fanout + self.out_q.append(other_in_q) print 'connecting', self.name, 'to', other.name return other else: @@ -77,12 +80,12 @@ class blockGen(threading.Thread, base_component): self.data = data def run(self): - for i in xrange(len(self.data)): - for j in xrange(self.nof_outputs): - #self.out_q[j].put(self.data[i][j]) - self.out_q[j][1].send(self.data[i][j]) - print "Sending packet " + str(i) + " on stream " + str(j) -# time.sleep(1) + for i in xrange(len(self.data)): # Iterate over the packets in time + for h in xrange(len(self.out_q)): # Iterate over the fanouts + for j in xrange(self.nof_outputs): # Iterate over the outputs + self.out_q[h][j][1].send(self.data[i][j]) + print "Sending packet " + str(i) + " on stream " + str(j) +# time.sleep(1) class dp_mux(threading.Thread, base_component): @@ -94,28 +97,29 @@ class dp_mux(threading.Thread, base_component): while True: data = [] for i in xrange(self.nof_inputs): -# data.append(self.in_q[i].get()) data.append(self.in_q[i][0].recv()) - for j in xrange(self.nof_inputs): -# self.out_q[0].put(data[j]) - self.out_q[0][1].send(data[j]) + for i in xrange(self.nof_inputs): + for j in xrange(len(self.out_q)): # Iterate over the fanouts + self.out_q[j][0][1].send(data[i]) # DP_MUX has only one output. + class dataBuffer(threading.Thread, base_component): - def __init__(self, nof_inputs=1, nof_outputs=0, packetsize=10, nof_packets=12 ): + def __init__(self, nof_inputs=1, nof_outputs=0, packetsize=10, nof_packets=12, index=0 ): threading.Thread.__init__(self) - base_component.__init__(self, 'dataBuffer', nof_inputs, nof_outputs, packetsize) - self.nof_packets = nof_packets + base_component.__init__(self, 'dataBuffer' + str(index), nof_inputs, nof_outputs, packetsize) + self.nof_packets = nof_packets + self.index = index def run(self): data = [] for j in xrange(self.nof_packets): for i in xrange(self.nof_inputs): -# data.append(self.in_q[i].get()) data.append(self.in_q[i][0].recv()) + time.sleep(self.index) + print self.name for i in data: print i - # Some definitions packetsize = 10 @@ -145,59 +149,16 @@ for h in xrange(nof_transm_packets): # Instantiate the components for the system bg = blockGen(bg_nof_outputs, bg_data) mux = dp_mux(bg_nof_outputs, 1, packetsize) -db = dataBuffer(1, 0 , packetsize, nof_receiv_packets) +db1 = dataBuffer(1, 0 , packetsize, nof_receiv_packets, 1) +db2 = dataBuffer(1, 0 , packetsize, nof_receiv_packets, 2) # Connect the components -bg > mux > db +bg > mux > db1 +mux > db2 # Start the threads bg.start() mux.start() -db.start() - - - - -#import multiprocessing -# -#NOF_PARALLEL_STREAMS = range(1000) -# -#def sink(inst_number, p_snk,p_src): -# p_src.close() # Close this local connection to source end of the pipe -# my_list = [] -# while True: -# try: -# item = p_snk.recv() -# #print inst_number, item -# except EOFError: -# break -# my_list.append(item**3) -# -# -#def source(p_snk,p_src): -# p_snk.close() # Close this local connection to the sink end of the pipe -# for item in xrange(2000000): -# p_src.send(item) -# p_src.close() -# -#if __name__ == '__main__': -# -# pipes = [] -# sinks = [] -# sources = [] -# -# for i in NOF_PARALLEL_STREAMS: -# -# # Create pipe and return connections to the source and sink sides of the pipe -# pipes.append( multiprocessing.Pipe(duplex=False) ) -# -# # Separate sink process -# sinks.append( multiprocessing.Process(target=sink, args=(i, pipes[i][0], pipes[i][1])) ) -# sinks[i].start() -# -# # Separate source process -# sources.append( multiprocessing.Process(target=source, args=(pipes[i][0], pipes[i][1])) ) -# sources[i].start() +db1.start() +db2.start() - - -- GitLab