Code owners
Assign users and groups as approvers for specific file changes. Learn more.
multithreads.py 6.09 KiB
###############################################################################
#
# Copyright (C) 2012
# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/>
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
###############################################################################
"""
One-Click prototype model "X"
Remark: This example shows how a simple sysmtem of a block generator,
a multiplexer and a databuffer can be modelled using threads
with queues.
"""
###############################################################################
# System imports
import multiprocessing
import threading
import Queue
import time
###############################################################################
# Functions
class base_component(object):
def __init__(self, name, nof_inputs=0, nof_outputs=0, packetsize=10 ):
self.name = name
self.nof_inputs = nof_inputs
self.nof_outputs = nof_outputs
self.packetsize = packetsize
self.in_q = []
for i in range(nof_inputs):
#self.in_q.append(Queue.Queue(packetsize))
self.in_q.append(multiprocessing.Pipe(duplex=False))
self.out_q = []
print 'Instantiating', self.name
#'>' operator: Connect
def __gt__(self, other):
return self.__connect__(other)
def __connect__(self, other):
if hasattr(other, 'in_q'):
for j in xrange(self.nof_outputs):
self.out_q.append(other.in_q[j])
print 'connecting', self.name, 'to', other.name
return other
else:
print 'Error: downstream component', other.name, 'does not have snk'
def run(self):
return None
# Components inheriting the base component class
class blockGen(threading.Thread, base_component):
def __init__(self, nof_outputs=1, data=[]):
threading.Thread.__init__(self)
base_component.__init__(self, 'blockGen', 0, nof_outputs, len(data[0][0]) )
self.data = data
def run(self):
for i in xrange(len(self.data)):
for j in xrange(self.nof_outputs):
#self.out_q[j].put(self.data[i][j])
self.out_q[j][1].send(self.data[i][j])
print "Sending packet " + str(i) + " on stream " + str(j)
# time.sleep(1)
class dp_mux(threading.Thread, base_component):
def __init__(self, nof_inputs=2, nof_outputs=1, packetsize=10 ):
threading.Thread.__init__(self)
base_component.__init__(self, 'dp_mux', nof_inputs, nof_outputs, packetsize)
def run(self):
while True:
data = []
for i in xrange(self.nof_inputs):
# data.append(self.in_q[i].get())
data.append(self.in_q[i][0].recv())
for j in xrange(self.nof_inputs):
# self.out_q[0].put(data[j])
self.out_q[0][1].send(data[j])
class dataBuffer(threading.Thread, base_component):
def __init__(self, nof_inputs=1, nof_outputs=0, packetsize=10, nof_packets=12 ):
threading.Thread.__init__(self)
base_component.__init__(self, 'dataBuffer', nof_inputs, nof_outputs, packetsize)
self.nof_packets = nof_packets
def run(self):
data = []
for j in xrange(self.nof_packets):
for i in xrange(self.nof_inputs):
# data.append(self.in_q[i].get())
data.append(self.in_q[i][0].recv())
for i in data:
print i
# Some definitions
packetsize = 10
bg_nof_outputs = 2
nof_transm_packets = 12
nof_receiv_packets = 24
# Create data for the block generator
#
# bg_data=[h][i][j]
# h = [0:nof_transm_packets-1]
# i = [0:nof_outputs-1]
# j = [0:packetsize-1]
#
# The size of bg_data determines the various paramters of the model.
bg_data = []
for h in xrange(nof_transm_packets):
packets = []
for i in xrange(bg_nof_outputs):
packet = []
for j in xrange(packetsize):
packet.append(j+i*packetsize+h*bg_nof_outputs*packetsize)
packets.append(packet)
bg_data.append(packets)
# Instantiate the components for the system
bg = blockGen(bg_nof_outputs, bg_data)
mux = dp_mux(bg_nof_outputs, 1, packetsize)
db = dataBuffer(1, 0 , packetsize, nof_receiv_packets)
# Connect the components
bg > mux > db
# Start the threads
bg.start()
mux.start()
db.start()
#import multiprocessing
#
#NOF_PARALLEL_STREAMS = range(1000)
#
#def sink(inst_number, p_snk,p_src):
# p_src.close() # Close this local connection to source end of the pipe
# my_list = []
# while True:
# try:
# item = p_snk.recv()
# #print inst_number, item
# except EOFError:
# break
# my_list.append(item**3)
#
#
#def source(p_snk,p_src):
# p_snk.close() # Close this local connection to the sink end of the pipe
# for item in xrange(2000000):
# p_src.send(item)
# p_src.close()
#
#if __name__ == '__main__':
#
# pipes = []
# sinks = []
# sources = []
#
# for i in NOF_PARALLEL_STREAMS:
#
# # Create pipe and return connections to the source and sink sides of the pipe
# pipes.append( multiprocessing.Pipe(duplex=False) )
#
# # Separate sink process
# sinks.append( multiprocessing.Process(target=sink, args=(i, pipes[i][0], pipes[i][1])) )
# sinks[i].start()
#
# # Separate source process
# sources.append( multiprocessing.Process(target=source, args=(pipes[i][0], pipes[i][1])) )
# sources[i].start()