Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
multithreads.py 6.09 KiB
###############################################################################
#
# Copyright (C) 2012
# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/>
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
###############################################################################

""" 

   One-Click prototype model "X"

   Remark: This example shows how a simple sysmtem of a block generator,
           a multiplexer and a databuffer can be modelled using threads
           with queues. 
   
"""


###############################################################################
# System imports
import multiprocessing
import threading
import Queue
import time 


###############################################################################
# Functions
class base_component(object):
    def __init__(self, name, nof_inputs=0, nof_outputs=0, packetsize=10 ):
        self.name        = name
        self.nof_inputs  = nof_inputs
        self.nof_outputs = nof_outputs
        self.packetsize  = packetsize
        self.in_q        = []
        for i in range(nof_inputs):
            #self.in_q.append(Queue.Queue(packetsize))
            self.in_q.append(multiprocessing.Pipe(duplex=False))
        self.out_q = []
        print 'Instantiating', self.name
    
    #'>' operator: Connect
    def __gt__(self, other):
        return self.__connect__(other)
    def __connect__(self, other):
        if hasattr(other, 'in_q'):
            for j in xrange(self.nof_outputs):
              self.out_q.append(other.in_q[j])
            print 'connecting', self.name, 'to', other.name 
            return other
        else:
            print 'Error: downstream component', other.name, 'does not have snk'

    def run(self):
        return None

# Components inheriting the base component class
class blockGen(threading.Thread, base_component):
  
    def __init__(self, nof_outputs=1, data=[]):
      threading.Thread.__init__(self)
      base_component.__init__(self, 'blockGen', 0, nof_outputs, len(data[0][0]) )
      self.data = data
        
    def run(self):
      for i in xrange(len(self.data)):
        for j in xrange(self.nof_outputs):
          #self.out_q[j].put(self.data[i][j])   
          self.out_q[j][1].send(self.data[i][j])
          print "Sending packet " + str(i) + " on stream " + str(j) 
#          time.sleep(1)

class dp_mux(threading.Thread, base_component):
  
    def __init__(self, nof_inputs=2, nof_outputs=1, packetsize=10 ):
      threading.Thread.__init__(self)
      base_component.__init__(self, 'dp_mux', nof_inputs, nof_outputs, packetsize)
        
    def run(self):
      while True:
        data = []
        for i in xrange(self.nof_inputs):
#          data.append(self.in_q[i].get())
          data.append(self.in_q[i][0].recv())
        for j in xrange(self.nof_inputs):
#          self.out_q[0].put(data[j])
          self.out_q[0][1].send(data[j])          

class dataBuffer(threading.Thread, base_component):
  
    def __init__(self, nof_inputs=1, nof_outputs=0, packetsize=10, nof_packets=12 ):
      threading.Thread.__init__(self)
      base_component.__init__(self, 'dataBuffer', nof_inputs, nof_outputs, packetsize)
      self.nof_packets = nof_packets
        
    def run(self):
      data = []
      for j in xrange(self.nof_packets):
        for i in xrange(self.nof_inputs):
#          data.append(self.in_q[i].get())
          data.append(self.in_q[i][0].recv())
      for i in data:
          print i
       

# Some definitions
packetsize = 10
bg_nof_outputs = 2
nof_transm_packets = 12
nof_receiv_packets = 24

# Create data for the block generator
#
# bg_data=[h][i][j]
#   h = [0:nof_transm_packets-1]
#   i = [0:nof_outputs-1]
#   j = [0:packetsize-1]
#
# The size of bg_data determines the various paramters of the model. 

bg_data = []
for h in xrange(nof_transm_packets):
    packets = []
    for i in xrange(bg_nof_outputs):
        packet = []
        for j in xrange(packetsize):
            packet.append(j+i*packetsize+h*bg_nof_outputs*packetsize)
        packets.append(packet)
    bg_data.append(packets)

# Instantiate the components for the system
bg  = blockGen(bg_nof_outputs, bg_data)
mux = dp_mux(bg_nof_outputs, 1, packetsize)
db  = dataBuffer(1, 0 , packetsize, nof_receiv_packets)         

# Connect the components
bg > mux > db  

# Start the threads 
bg.start()
mux.start()
db.start()
        
 
 

#import multiprocessing
#
#NOF_PARALLEL_STREAMS = range(1000)
#
#def sink(inst_number, p_snk,p_src):
#    p_src.close() # Close this local connection to source end of the pipe
#    my_list = []
#    while True:
#        try:
#            item = p_snk.recv()
#            #print inst_number, item
#        except EOFError:
#            break
#        my_list.append(item**3)
#
#
#def source(p_snk,p_src):
#    p_snk.close() # Close this local connection to the sink end of the pipe
#    for item in xrange(2000000):
#        p_src.send(item)
#    p_src.close()
#
#if __name__ == '__main__':
#
#    pipes = []
#    sinks = []
#    sources = []
#
#    for i in NOF_PARALLEL_STREAMS:
#
#        # Create pipe and return connections to the source and sink sides of the pipe
#        pipes.append( multiprocessing.Pipe(duplex=False) )
#    
#        # Separate sink process
#        sinks.append( multiprocessing.Process(target=sink, args=(i, pipes[i][0], pipes[i][1])) )
#        sinks[i].start()
#    
#        # Separate source process
#        sources.append( multiprocessing.Process(target=source, args=(pipes[i][0], pipes[i][1])) )
#        sources[i].start()