Skip to content
Snippets Groups Projects
Commit efa5e0fa authored by Pepping's avatar Pepping
Browse files

Replaced queues by pipes.

parent 92e81728
No related branches found
No related tags found
No related merge requests found
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
############################################################################### ###############################################################################
# System imports # System imports
import multiprocessing
import threading import threading
import Queue import Queue
import time import time
...@@ -47,7 +48,8 @@ class base_component(object): ...@@ -47,7 +48,8 @@ class base_component(object):
self.packetsize = packetsize self.packetsize = packetsize
self.in_q = [] self.in_q = []
for i in range(nof_inputs): for i in range(nof_inputs):
self.in_q.append(Queue.Queue(packetsize)) #self.in_q.append(Queue.Queue(packetsize))
self.in_q.append(multiprocessing.Pipe(duplex=False))
self.out_q = [] self.out_q = []
print 'Instantiating', self.name print 'Instantiating', self.name
...@@ -77,7 +79,8 @@ class blockGen(threading.Thread, base_component): ...@@ -77,7 +79,8 @@ class blockGen(threading.Thread, base_component):
def run(self): def run(self):
for i in xrange(len(self.data)): for i in xrange(len(self.data)):
for j in xrange(self.nof_outputs): for j in xrange(self.nof_outputs):
self.out_q[j].put(self.data[i][j]) #self.out_q[j].put(self.data[i][j])
self.out_q[j][1].send(self.data[i][j])
print "Sending packet " + str(i) + " on stream " + str(j) print "Sending packet " + str(i) + " on stream " + str(j)
# time.sleep(1) # time.sleep(1)
...@@ -91,9 +94,11 @@ class dp_mux(threading.Thread, base_component): ...@@ -91,9 +94,11 @@ class dp_mux(threading.Thread, base_component):
while True: while True:
data = [] data = []
for i in xrange(self.nof_inputs): for i in xrange(self.nof_inputs):
data.append(self.in_q[i].get()) # data.append(self.in_q[i].get())
data.append(self.in_q[i][0].recv())
for j in xrange(self.nof_inputs): for j in xrange(self.nof_inputs):
self.out_q[0].put(data[j]) # self.out_q[0].put(data[j])
self.out_q[0][1].send(data[j])
class dataBuffer(threading.Thread, base_component): class dataBuffer(threading.Thread, base_component):
...@@ -106,7 +111,8 @@ class dataBuffer(threading.Thread, base_component): ...@@ -106,7 +111,8 @@ class dataBuffer(threading.Thread, base_component):
data = [] data = []
for j in xrange(self.nof_packets): for j in xrange(self.nof_packets):
for i in xrange(self.nof_inputs): for i in xrange(self.nof_inputs):
data.append(self.in_q[i].get()) # data.append(self.in_q[i].get())
data.append(self.in_q[i][0].recv())
for i in data: for i in data:
print i print i
...@@ -149,6 +155,49 @@ bg.start() ...@@ -149,6 +155,49 @@ bg.start()
mux.start() mux.start()
db.start() db.start()
#import multiprocessing
#
#NOF_PARALLEL_STREAMS = range(1000)
#
#def sink(inst_number, p_snk,p_src):
# p_src.close() # Close this local connection to source end of the pipe
# my_list = []
# while True:
# try:
# item = p_snk.recv()
# #print inst_number, item
# except EOFError:
# break
# my_list.append(item**3)
#
#
#def source(p_snk,p_src):
# p_snk.close() # Close this local connection to the sink end of the pipe
# for item in xrange(2000000):
# p_src.send(item)
# p_src.close()
#
#if __name__ == '__main__':
#
# pipes = []
# sinks = []
# sources = []
#
# for i in NOF_PARALLEL_STREAMS:
#
# # Create pipe and return connections to the source and sink sides of the pipe
# pipes.append( multiprocessing.Pipe(duplex=False) )
#
# # Separate sink process
# sinks.append( multiprocessing.Process(target=sink, args=(i, pipes[i][0], pipes[i][1])) )
# sinks[i].start()
#
# # Separate source process
# sources.append( multiprocessing.Process(target=source, args=(pipes[i][0], pipes[i][1])) )
# sources[i].start()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment