Skip to content
Snippets Groups Projects
Commit ef28c9df authored by Pepping's avatar Pepping
Browse files

- Added a second databuffer and support for multiple fanout.

parent 51f075dc
No related branches found
No related tags found
No related merge requests found
......@@ -25,7 +25,7 @@
Remark: This example shows how a simple sysmtem of a block generator,
a multiplexer and a databuffer can be modelled using threads
with queues.
with pipes.
"""
......@@ -48,7 +48,6 @@ class base_component(object):
self.packetsize = packetsize
self.in_q = []
for i in range(nof_inputs):
#self.in_q.append(Queue.Queue(packetsize))
self.in_q.append(multiprocessing.Pipe(duplex=False))
self.out_q = []
print 'Instantiating', self.name
......@@ -57,9 +56,13 @@ class base_component(object):
def __gt__(self, other):
return self.__connect__(other)
def __connect__(self, other):
if hasattr(other, 'in_q'):
if hasattr(other, 'in_q'):
# Create a list for all outputs.
other_in_q = []
for j in xrange(self.nof_outputs):
self.out_q.append(other.in_q[j])
other_in_q.append(other.in_q[j])
# Add to the list to support multiple fanout
self.out_q.append(other_in_q)
print 'connecting', self.name, 'to', other.name
return other
else:
......@@ -77,12 +80,12 @@ class blockGen(threading.Thread, base_component):
self.data = data
def run(self):
for i in xrange(len(self.data)):
for j in xrange(self.nof_outputs):
#self.out_q[j].put(self.data[i][j])
self.out_q[j][1].send(self.data[i][j])
print "Sending packet " + str(i) + " on stream " + str(j)
# time.sleep(1)
for i in xrange(len(self.data)): # Iterate over the packets in time
for h in xrange(len(self.out_q)): # Iterate over the fanouts
for j in xrange(self.nof_outputs): # Iterate over the outputs
self.out_q[h][j][1].send(self.data[i][j])
print "Sending packet " + str(i) + " on stream " + str(j)
# time.sleep(1)
class dp_mux(threading.Thread, base_component):
......@@ -94,28 +97,29 @@ class dp_mux(threading.Thread, base_component):
while True:
data = []
for i in xrange(self.nof_inputs):
# data.append(self.in_q[i].get())
data.append(self.in_q[i][0].recv())
for j in xrange(self.nof_inputs):
# self.out_q[0].put(data[j])
self.out_q[0][1].send(data[j])
for i in xrange(self.nof_inputs):
for j in xrange(len(self.out_q)): # Iterate over the fanouts
self.out_q[j][0][1].send(data[i]) # DP_MUX has only one output.
class dataBuffer(threading.Thread, base_component):
def __init__(self, nof_inputs=1, nof_outputs=0, packetsize=10, nof_packets=12 ):
def __init__(self, nof_inputs=1, nof_outputs=0, packetsize=10, nof_packets=12, index=0 ):
threading.Thread.__init__(self)
base_component.__init__(self, 'dataBuffer', nof_inputs, nof_outputs, packetsize)
self.nof_packets = nof_packets
base_component.__init__(self, 'dataBuffer' + str(index), nof_inputs, nof_outputs, packetsize)
self.nof_packets = nof_packets
self.index = index
def run(self):
data = []
for j in xrange(self.nof_packets):
for i in xrange(self.nof_inputs):
# data.append(self.in_q[i].get())
data.append(self.in_q[i][0].recv())
time.sleep(self.index)
print self.name
for i in data:
print i
# Some definitions
packetsize = 10
......@@ -145,59 +149,16 @@ for h in xrange(nof_transm_packets):
# Instantiate the components for the system
bg = blockGen(bg_nof_outputs, bg_data)
mux = dp_mux(bg_nof_outputs, 1, packetsize)
db = dataBuffer(1, 0 , packetsize, nof_receiv_packets)
db1 = dataBuffer(1, 0 , packetsize, nof_receiv_packets, 1)
db2 = dataBuffer(1, 0 , packetsize, nof_receiv_packets, 2)
# Connect the components
bg > mux > db
bg > mux > db1
mux > db2
# Start the threads
bg.start()
mux.start()
db.start()
#import multiprocessing
#
#NOF_PARALLEL_STREAMS = range(1000)
#
#def sink(inst_number, p_snk,p_src):
# p_src.close() # Close this local connection to source end of the pipe
# my_list = []
# while True:
# try:
# item = p_snk.recv()
# #print inst_number, item
# except EOFError:
# break
# my_list.append(item**3)
#
#
#def source(p_snk,p_src):
# p_snk.close() # Close this local connection to the sink end of the pipe
# for item in xrange(2000000):
# p_src.send(item)
# p_src.close()
#
#if __name__ == '__main__':
#
# pipes = []
# sinks = []
# sources = []
#
# for i in NOF_PARALLEL_STREAMS:
#
# # Create pipe and return connections to the source and sink sides of the pipe
# pipes.append( multiprocessing.Pipe(duplex=False) )
#
# # Separate sink process
# sinks.append( multiprocessing.Process(target=sink, args=(i, pipes[i][0], pipes[i][1])) )
# sinks[i].start()
#
# # Separate source process
# sources.append( multiprocessing.Process(target=source, args=(pipes[i][0], pipes[i][1])) )
# sources[i].start()
db1.start()
db2.start()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment