Skip to content
Snippets Groups Projects
Commit 00b6207f authored by Pieter Donker's avatar Pieter Donker
Browse files

Initial commit

parents
Branches
No related tags found
No related merge requests found
Showing
with 1756 additions and 0 deletions
*log
*pyc
File added
#! /usr/bin/env python3
###############################################################################
#
# Copyright (C) 2016
# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/>
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Author Date
# PD mrt 2017
#
###############################################################################
import os
import sys
import argparse
import yaml
import traceback
from subprocess import CalledProcessError
from py_args_lib import *
#from py_args_lib import gen_slave
def main():
""" main """
# show info for all requested peripheral
for peripheralname in args.peripheral:
peripheral_filename = "./peripherals/{}.peripheral.yaml".format(peripheralname)
# show_overview(peripheral_filename)
generate_fw(peripheral_filename)
# show info for all requested fpgas
for fpganame in args.fpga:
fpga_filename = "./fpgas/{}.fpga.yaml".format(fpganame)
show_overview(fpga_filename)
def show_overview(filename):
""" show overview of loaded file """
try:
config = yaml.load(open(filename, "r"))
name = config['hdl_library_name']
print(name)
#settings = config[name]
if '.fpga.yaml' in filename:
fpga = FPGA(filename)
fpga.show_overview()
elif '.peripheral.yaml' in filename:
logger.info("Load peripheral(s) from '%s'", filename)
library_config = yaml.load(open(filename, 'r'))
for peripheral_config in library_config['peripherals']:
peripheral = Peripheral(peripheral_config)
logger.info(" read peripheral '%s'" % peripheral.component_name())
peripheral.eval_peripheral()
peripheral.show_overview()
except IOError:
logger.error("config file '{}' does not exist".format(filename))
def generate_fw(filename):
""" generate vhd reg files for loaded file. """
try:
config = yaml.load(open(filename,"r"))
peripherals = config['peripherals']
# genSlave = gen_slave.Slave()
if '.fpga.yaml' in filename:
# not supported yet
print("not supported yet")
elif '.peripheral.yaml' in filename:
for periph in peripherals:
periph['lib'] = config['hdl_library_name']
peripheral = Peripheral(periph)
peripheral.eval_peripheral()
genSlave = gen_slave.Slave(peripheral)
genSlave.generate_regs(peripheral)
for key in peripheral.rams:
genSlave.generate_mem(peripheral.rams[key],'ram')
for key in peripheral.fifos:
genSlave.generate_mem(peripheral.fifos[key],'fifo')
except IOError:
logger.error("config file '{}' does not exist" .format(filename))
except CalledProcessError:
pass
if __name__ == "__main__":
# setup first log system before importing other user libraries
PROGRAM_NAME = __file__.split('/')[-1].split('.')[0]
unit_logger.set_logfile_name(name=PROGRAM_NAME)
unit_logger.set_file_log_level('DEBUG')
# Parse command line arguments
parser = argparse.ArgumentParser(description="""
=Args demo=
fpga and peripheral config command line parser arguments
""")
parser.add_argument('-p','--peripheral', nargs='*', default=[], help="peripheral names separated by spaces")
parser.add_argument('-f','--fpga', nargs='*', default=[], help="fpga names separated by spaces")
parser.add_argument('-v','--verbosity', default='INFO', help="verbosity level can be [ERROR | WARNING | INFO | DEBUG]")
args = parser.parse_args()
if not args.peripheral and not args.fpga:
parser.print_help()
unit_logger.set_stdout_log_level(args.verbosity)
logger.debug("Used arguments: {}".format(args))
try:
main()
except:
logger.error('Program fault, reporting and cleanup')
logger.error('Caught %s', str(sys.exc_info()[0]))
logger.error(str(sys.exc_info()[1]))
logger.error('TRACEBACK:\n%s', traceback.format_exc())
logger.error('Aborting NOW')
sys.exit("ERROR")
sys.exit("Normal Exit")
#! /usr/bin/env python3
###############################################################################
#
# Copyright (C) 2016
# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/>
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Author Date
# PD feb 2017
#
###############################################################################
"""
Make automatic documentation
"""
import sys
import os
import argparse
import subprocess
import traceback
import yaml
import time
from subprocess import CalledProcessError
from pylatex import Document, Section, Subsection, Subsubsection, Command, Package, Tabular, MultiColumn, MultiRow, NewLine
from pylatex import SmallText, MediumText, LargeText, HugeText
from pylatex.utils import italic, bold, NoEscape, verbatim, escape_latex
from py_args_lib import *
import common as cm
def main():
try:
for systemname in args.system:
doc = SystemDocumentation(systemname)
doc.fill()
doc.make_pdf()
del doc
time.sleep(0.5)
for peripheralname in args.peripheral:
doc = PeripheralDocumentation(peripheralname)
doc.fill()
doc.make_pdf()
del doc
time.sleep(0.5)
except IOError:
logger.error("config file '{}' does not exist".format(filename))
class SystemDocumentation(object):
def __init__(self, systemname):
self.systemname = systemname
self.system_filename = "./systems/{}.system.yaml".format(systemname)
self.config = yaml.load(open(self.system_filename, "r"))
geometry_options = {"tmargin": "1cm", "lmargin": "2.5cm"}
self.doc = Document(geometry_options=geometry_options)
self.doc.preamble.append(Command('title', 'ARGS Documentation for {}'.format(self.systemname)))
self.doc.preamble.append(Command('author', 'oneclick-args-documentation-script'))
self.doc.preamble.append(Command('date', NoEscape(r'\today')))
self.doc.append(NoEscape(r'\maketitle'))
#self.doc = Documentation(self.config['fpga_name'])
self.system = System(self.system_filename)
def __del__(self):
del self.doc
del self.system
time.sleep(0.5)
def fill(self):
with self.doc.create(Section("{} system.".format(self.config['fpga_name']))):
self.doc.append(self.system.fpga_description)
with self.doc.create(Section("Peripherals.")):
added_instances = []
for peri_name, peri_class in sorted(self.system.peripherals.items()):
if peri_class.name() in added_instances:
continue
added_instances.append(peri_class.name())
with self.doc.create(Section(peri_class.name(), numbering=True)):
self.doc.append(peri_class.get_kv('peripheral_description').replace('"', ''))
self.doc.append(NewLine())
#self.doc.append(MediumText(bold("slave ports.")))
for val_info, val_type in ((peri_class.registers, 'Registers'),
(peri_class.rams, 'Rams'),
(peri_class.fifos, 'Fifos')):
if len(val_info) == 0:
continue
#self.doc.add(text=val_type, size="medium")
added_val_types = []
for key, val in sorted(val_info.items()):
if val.name() in added_val_types:
continue
added_val_types.append(val.name())
with self.doc.create(Subsection("{} slave.".format(val.name().lower()), numbering=True)):
if val.get_kv('slave_description') is not None:
self.doc.append(val.get_kv('slave_description').replace('"', ''))
added_fields = []
for field_key, field_val in sorted(val.fields.items()):
real_name = field_val.name().strip().split('.')[0]
if real_name in added_fields:
continue
added_fields.append(real_name)
with self.doc.create(Subsubsection("{} field.".format("{}".format(real_name)), numbering=True)):
self.doc.append(field_val.get_kv('field_description').replace('"', '') )
#self.doc.append(NewLine())
def make_pdf(self):
try:
self.doc.generate_pdf('{}'.format(self.systemname), clean_tex=True)
time.sleep(0.5)
except CalledProcessError:
pass
class PeripheralDocumentation(object):
def __init__(self, peripheralname):
self.peripheralname = peripheralname
self.peripheral_filename = "./peripherals/{}.peripheral.yaml".format(peripheralname)
self.config = yaml.load(open(self.peripheral_filename, "r"))
geometry_options = {"tmargin": "1cm", "lmargin": "2.5cm"}
self.doc = Document(geometry_options=geometry_options)
self.doc.preamble.append(Command('title', 'ARGS Documentation for {}'.format(self.peripheralname)))
self.doc.preamble.append(Command('author', 'oneclick-args-documentation-script'))
self.doc.preamble.append(Command('date', NoEscape(r'\today')))
self.doc.append(NoEscape(r'\maketitle'))
def fill(self):
with self.doc.create(Section("{} library.".format(self.config['hdl_library_name']))):
self.doc.append(self.config['hdl_library_description'])
with self.doc.create(Section("Peripherals.")):
added_instances = []
for peri_info in self.config['peripherals']:
peri_class = Peripheral(peri_info)
if peri_class.name() in added_instances:
continue
added_instances.append(peri_class.name())
with self.doc.create(Section(peri_class.name(), numbering=True)):
self.doc.append(peri_class.get_kv('peripheral_description').replace('"', ''))
self.doc.append(NewLine())
#self.doc.append(MediumText(bold("slave ports.")))
for val_info, val_type in ((peri_class.registers, 'Registers'),
(peri_class.rams, 'Rams'),
(peri_class.fifos, 'Fifos')):
if len(val_info) == 0:
continue
#self.doc.add(text=val_type, size="medium")
added_val_types = []
for key, val in sorted(val_info.items()):
if val.name() in added_val_types:
continue
added_val_types.append(val.name())
with self.doc.create(Subsection("{} slave.".format(val.name().lower()), numbering=True)):
if val.get_kv('slave_description') is not None:
self.doc.append(val.get_kv('slave_description').replace('"', ''))
added_fields = []
for field_key, field_val in sorted(val.fields.items()):
real_name = field_val.name().strip().split('.')[0]
if real_name in added_fields:
continue
added_fields.append(real_name)
with self.doc.create(Subsubsection("{} field.".format("{}".format(real_name)), numbering=True)):
self.doc.append(field_val.get_kv('field_description').replace('"', '') )
#self.doc.append(NewLine())
def make_pdf(self):
try:
self.doc.generate_pdf('{}'.format(self.peripheralname), clean_tex=True)
time.sleep(0.5)
except CalledProcessError:
pass
if __name__ == "__main__":
# setup first log system before importing other user libraries
PROGRAM_NAME = __file__.split('/')[-1].split('.')[0]
unit_logger.set_logfile_name(name=PROGRAM_NAME)
unit_logger.set_file_log_level('DEBUG')
# Parse command line arguments
parser = argparse.ArgumentParser(description="System and peripheral config command line parser arguments")
parser.add_argument('-s','--system', nargs='*', default=[], help="system names separated by spaces")
parser.add_argument('-p','--peripheral', nargs='*', default=[], help="peripheral names separated by spaces")
parser.add_argument('-v','--verbosity', default='INFO', help="verbosity level can be [ERROR | WARNING | INFO | DEBUG]")
args = parser.parse_args()
if not args.peripheral and not args.system:
parser.print_help()
unit_logger.set_stdout_log_level(args.verbosity)
logger.debug("Used arguments: {}".format(args))
try:
main()
except:
logger.error('Program fault, reporting and cleanup')
logger.error('Caught %s', str(sys.exc_info()[0]))
logger.error(str(sys.exc_info()[1]))
logger.error('TRACEBACK:\n%s', traceback.format_exc())
logger.error('Aborting NOW')
sys.exit("ERROR")
sys.exit("Normal Exit")
\ No newline at end of file
This readme is an introduction to py_args_lib.
Contents:
1) Demo of py_args_lib
2) How to develop new tool scripts that use py_args_lib
3) Examples of tool scripts that use py_args_lib
1) Demo of py_args_lib
./args_demo.py -h # help
./args_demo.py -s unb1_minimal_sopc -v INFO # show system.yaml contents
./args_demo.py -p unb1_board dp -v INFO # show peripheral.yaml contents
2) How to develop new tool scripts that use py_args_lib
# import all from py_args_lib
from py_args_lib import *
# logger is automaticly included (unit_logger), only logfile and level must be set,
# if 'unit_logger.set_logfile_name()' is not called no log file is made,
# in the calling directory there must be a directory called log
# valid levels are ERROR, WARNING, INFO or DEBUG
unit_logger.set_logfile_name(name=[program_name])
unit_logger.set_file_log_level('DEBUG') # if not called the default is 'DEBUG'
unit_logger.set_stdout_log_level('INFO') # if not called the default is 'INFO'
# assign System class with requested *.system.yaml filename
# this will load all existing peripherals and include the system peripherals modified with the
# system settings to the system class
system = System(filename=[*.system.yaml])
# now the dictonary system.peripherals holds all the peripheral classes
peripheral = system.peripherals['peripheral_name']
3) Examples of tool scripts that use py_args_lib
a) Create pdf documentation (very draft)
./args_documentation.py -s unb1_minimal_sopc # system with all it peripherals
okular unb1_minimal_sopc.pdf
./args_documentation.py -p unb1_board dp # only the peripheral
okular unb1_board.pdf
b) Create ROM system info for Uniboard
./uniboard_rom_system_info.py -s unb1_minimal_sopc # with self generated base addresses
more unb1_minimal_sopc.reg
./uniboard_rom_system_info.py -s unb1_minimal_sopc -q # using base addresses from sopc file (via -q)
more unb1_minimal_sopc.reg
#################################################################################
#
#
# Bus assumptions:
# - optional user signals not used
# - Has BURST 1
# - Has LOCK 0 (AXI4 does not support locked transactions)
# - Has CACHE 0
# - Has PROT 0
#
#################################################################################
import os
import logging
import common as cm
import collections
from py_args_lib import *
from fpga import FPGA
import numpy as np
from gen_slave import tab_aligned
logger = logging.getLogger('main.gen_bus')
def get_cmd(raw_line):
return raw_line.split('}>')[-1]
class Bus(object):
"""
A bus is generated based on a FPGA object which is a collection of peripherals from different peripheral libraries
"""
def __init__(self, fpga):
self.fpga = fpga
self.root_dir = os.path.expandvars('$RADIOHDL/tools/args')
self.out_dir = os.path.expandvars('$HDL_BUILD_DIR/ARGS/{}'.format(self.fpga.fpga_name))
self.tmpl_mstr_port_full = os.path.join(self.root_dir, 'templates/template_bus_master_axi_full.vho')
self.tmpl_mstr_port_lite = os.path.join(self.root_dir, 'templates/template_bus_master_axi_lite.vho')
self.tmpl_mstr_port_cast = os.path.join(self.root_dir, 'templates/template_bus_master_port_casting.vho')
self.tmpl_bus_pkg = os.path.join(self.root_dir, 'templates/template_bus_pkg.vhd')
self.tmpl_bus_top = os.path.join(self.root_dir, 'templates/template_bus_top.vhd')
self.tmpl_tcl = os.path.join(self.root_dir, 'templates/template_create_bd.tcl')
self.bus_config = {'burst':'1','lock':'1', 'cache':'1', 'prot':'1', 'qos':'0','region':'0','wstrb':'1'}
self.lite_slaves = []
self.full_slaves = []
self.nof_slaves = self.fpga.nof_lite + self.fpga.nof_full
self.output_files = []
self.vhd_replace_dict = {'<nof_lite_slaves>' : str(self.fpga.nof_lite), '<nof_full_slaves>' : str(self.fpga.nof_full),
'<fpga_name>':self.fpga.fpga_name }
self.nof_interconnects = int(np.ceil((max(1,self.nof_slaves-1))/15))
self.indexes = self.calc_indexes()
self.tcl_replace_dict = {'<fpga_name>':self.fpga.fpga_name, '<nof_slaves>' : str(self.nof_slaves)}
def gen_tcl(self):
lines = []
with open(self.tmpl_tcl, 'r') as infile:
input = list(infile)
for line_num, line in enumerate(input):
if '<{' not in line:
for tag, replace_string in self.tcl_replace_dict.items():
line = line.replace(tag, replace_string)
lines.append(line)
else :
if 'create_interconnects' in line:
for i in range(self.nof_interconnects):
lines.append(get_cmd(line).format(i))
nof_slaves = 16 if i < (self.nof_interconnects - 1) else (self.nof_slaves - i*15)
lines.append(get_cmd(input[line_num+1]).replace('<i_nof_slaves>', str(nof_slaves)))
lines.extend([get_cmd(input[line_num+2]).replace('<i>',str(i).zfill(2)) for i in range(nof_slaves)])
lines.append(get_cmd(input[line_num+3]).format(i))
if 'connect_clock_pins' in line:
for i in range(self.nof_interconnects):
lines.append(get_cmd(line).format(i,0, 'S').replace('S00_',''))
lines.append(get_cmd(line).format(i,0, 'S'))
if i < (self.nof_interconnects-1):
lines.append(get_cmd(line).format(i,15,'M'))
for i in range(self.nof_slaves):
lines.append(get_cmd(line).format(self.indexes[i][0],self.indexes[i][1], 'M'))
if 'connect_reset_pins' in line:
for i in range(self.nof_interconnects):
lines.append(get_cmd(line).format(i,0, 'S').replace('S00_',''))
lines.append(get_cmd(line).format(i,0, 'S'))
if i < (self.nof_interconnects-1):
lines.append(get_cmd(line).format(i,15,'M'))
for i in range(self.nof_slaves):
lines.append(get_cmd(line).format(self.indexes[i][0],self.indexes[i][1], 'M'))
if 'create_master_ports' in line:
last_port = -1
last_prot = None
i = -1
# for i, slave_attr in enumerate(self.fpga.address_map.values()):
for slave_attr in self.fpga.address_map.values():
if slave_attr['port_index'] == last_port and slave_attr['type'] == last_prot:
continue; # skip port indexes already dealt with
i = i+1
last_port = slave_attr['port_index']
last_prot = slave_attr['type']
if np.mod(i, 15) == 0 and self.indexes[i][1] == 0 and i != 0:
lines.append(get_cmd(input[line_num+4])) # daisy chain interconnects
lines.append(get_cmd(input[line_num+5]).format(self.indexes[i][0]-1, self.indexes[i][0]))
lines.append(get_cmd(line).replace('<i>',str(i).zfill(2)))
_protocol = 'AXI4LITE' if slave_attr['type'] == 'LITE' else 'AXI4'
lines.append(get_cmd(input[line_num+1]).format(self.indexes[i][0],self.indexes[i][1], i))
_line = get_cmd(input[line_num+2]).replace('<protocol>', _protocol)
lines.append(_line.replace('<i>',str(i).zfill(2)))
fifo_access = slave_attr.get('access',None)
if fifo_access is not None:
_line = get_cmd(input[line_num+6]).replace('<access_mode>', 'READ' if fifo_access == 'RO' else 'WRITE')
lines.append(_line.replace('<i>',str(i).zfill(2)))
lines.append(get_cmd(input[line_num+3]).format(self.indexes[i][0],self.indexes[i][1], i))
if 'set_address_small_range' in line:
last_port = -1
last_prot = None
i = -1
for slave_attr in self.fpga.address_map.values():
if slave_attr['port_index'] == last_port and slave_attr['type'] == last_prot:
continue
i = i+1
last_port = slave_attr['port_index']
last_prot = slave_attr['type']
if isinstance(slave_attr['slave'], Register):
if not getattr(slave_attr['slave'], 'isIP', False):
span = cm.ceil_pow2(max(slave_attr['peripheral'].reg_len, 4096))
else :
span = cm.ceil_pow2(max(slave_attr['slave'].address_length(), 4096))
else :
span = slave_attr['span']
_line = get_cmd(line).replace('<range>', '4') # 4k is minimum settable size for vivado
lines.append(_line.replace('<i>',str(i).zfill(2)))
if 'set_address_map' in line:
last_port = -1
last_prot = None
i = -1
for slave_attr in self.fpga.address_map.values():
if slave_attr['port_index'] == last_port and slave_attr['type'] == last_prot:
continue
i = i+1
last_port = slave_attr['port_index']
last_prot = slave_attr['type']
_line = get_cmd(line).replace('<address>', "{:08x}".format(slave_attr['base']))
lines.append(_line.replace('<i>',str(i).zfill(2)))
if 'set_address_range' in line:
last_port = -1
last_prot = None
i = -1
for slave_attr in self.fpga.address_map.values():
if slave_attr['port_index'] == last_port and slave_attr['type'] == last_prot:
continue
i = i+1
last_port = slave_attr['port_index']
last_prot = slave_attr['type']
if isinstance(slave_attr['slave'], Register):
if not getattr(slave_attr['slave'], 'isIP', False):
span = cm.ceil_pow2(max(slave_attr['peripheral'].reg_len, 4096))
else :
span = cm.ceil_pow2(max(slave_attr['slave'].address_length(), 4096))
else :
span = slave_attr['span']
lines.append('# interconnect[{}] <{}> base: 0x{:08x} span: 0x{:06x}\n'.format(i, slave_attr['peripheral'].name(), slave_attr['base'], span))
_line = get_cmd(line).replace('<range>', str(int(span/1024)))
lines.append(_line.replace('<i>',str(i).zfill(2)))
return lines
def gen_vhdl(self):
lines = []
slave_index = -1
add_lines = []
# if self.fpga.nof_full == 0 or self.nof_slaves == 1 :
# self.vhd_replace_dict.update({'<sla_in_vec>':'SLA_IN','<sla_out_vec>':'SLA_OUT','<(0)>':' '})
# else :
# self.vhd_replace_dict.update({'<sla_in_vec>':'sla_in_vec','<sla_out_vec>':'sla_out_vec','<(0)>':'(0) '})
with open(self.tmpl_bus_top, 'r') as infile:
for line in infile:
# if '<{std_ulogic_casting}>' in line:
# input = list(open(self.tmpl_mstr_port_cast,'r'))
# last_port = -1
# last_prot = None
# for slave_port, slave_dict in self.fpga.address_map.items():
# if slave_dict['port_index'] == last_port and slave_dict['type'] == last_prot:
# continue
# last_port = slave_dict['port_index']
# last_prot = slave_dict['type']
# if self.fpga.nof_full == 0 or slave_dict['type'] == 'FULL':
# bad_tags = ['lock','last'] if slave_dict['type'] == 'LITE' else []
# _input = [_line for _line in input if not any([bad_tag in _line for bad_tag in bad_tags])]
# add_lines = [line.format(slave_dict['type'],slave_dict['port_index'], slave_dict['type'].lower()) for line in _input]
# lines.extend(add_lines)
if '<{master_interfaces}>' in line:
last_port = -1
last_prot = None
for slave_port, slave_dict in self.fpga.address_map.items():
if slave_dict['port_index'] == last_port and slave_dict['type'] == last_prot:
continue
last_port = slave_dict['port_index']
last_prot = slave_dict['type']
slave_index = slave_index + 1
if self.fpga.nof_full == 0 and self.fpga.nof_lite > 1 :
zero_index = '(0)'
# sig_out = 'mstr_out_lite_vec'
# sig_in = 'mstr_in_lite_vec'
else :
zero_index = ''
# sig_out = 'MSTR_OUT_LITE'
# sig_in = 'MSTR_IN_LITE'
template_file = self.tmpl_mstr_port_lite if slave_dict['type'] == 'LITE' else self.tmpl_mstr_port_full
input = list(open(template_file, 'r'))
# # If we have more than 16 slaves, lite interfaces >= 15 use vector for std_logic
# if (self.fpga.nof_full + self.fpga.nof_lite) > 16:
# if slave_index > 14 and slave_dict['type'] == 'LITE':
# zero_index = '(0)'
add_lines = [line.format(slave_index, slave_dict['port_index'], zero_index) for line in input]
if slave_index == (self.nof_slaves-1): # remove last comma to avoid vhdl syntax error
for line_no, line in enumerate(add_lines[::-1]):
if ',' in line and '--' not in line:
add_lines[len(add_lines)-line_no-1] = line.replace(',','')
break
lines.extend(add_lines)
else:
for key in self.vhd_replace_dict.keys():
if key in line:
line = line.replace(key, str(self.vhd_replace_dict[key]))
lines.append(line)
# Strip out full (or liet interfaces & correct syntax)
if self.fpga.nof_full == 0:
bad_tags = ['mstr_out_full','mstr_in_full']
for line_num, line in enumerate(lines):
if 'MSTR_OUT_LITE' in line:
lines[line_num] = line.replace(';','')
break
else :
bad_tags = ['region']
if self.fpga.nof_lite == 0:
bad_tags.extend(['mstr_in_lite', 'mstr_out_lite'])
lines = [line for line in lines if not any([bad_tag in line.lower() for bad_tag in bad_tags])]
return lines
def gen_pkg(self):
lines = []
input = list(open(self.tmpl_bus_pkg, 'r'))
for line in input:
if '<{' not in line:
for tag, replace_string in self.vhd_replace_dict.items():
line = line.replace(tag, replace_string) if tag in line else line
lines.append(line)
else :
sublines = []
type = line.split('<{')[-1].split('}>')[0]
line = line.split('<{')[-1].split('}>')[-1]
last_port = -1
last_prot = None
for slave_port, slave_dict in self.fpga.address_map.items():
if slave_dict['port_index'] == last_port and slave_dict['type'] == last_prot:
continue
last_port = slave_dict['port_index']
last_prot = slave_dict['type']
if slave_dict['type'] == type:
sublines.append(line.format(slave_dict['peripheral'].name(), slave_dict['port_index']))
lines.extend(tab_aligned(sublines))
return lines
def gen_file(self, file_type):
lines = []
if file_type == 'vhd':
lines = self.gen_vhdl()
out_file = self.fpga.fpga_name + '_bus_top.vhd'
if file_type == 'tcl':
lines = self.gen_tcl()
out_file = self.fpga.fpga_name + '_bd.tcl'
if file_type == 'pkg':
lines = self.gen_pkg()
out_file = self.fpga.fpga_name + '_bus_pkg.vhd'
try:
os.stat(self.out_dir)
except:
os.mkdir(self.out_dir)
file_name = os.path.join(self.out_dir, out_file)
with open(file_name, 'w') as out_file:
for line in lines:
out_file.write(line)
logger.info('Generated ARGS output %s', file_name)
self.output_files.append(file_name)
def gen_firmware(self):
self.gen_file('pkg')
self.gen_file('vhd')
self.gen_file('tcl')
return self.output_files
def calc_indexes(self):
"""
Calculate interconnects and local slave port numbers for all slaves
"""
index_list = []
for i in range(self.nof_slaves):
if np.floor(i/15) < self.nof_interconnects:
interconnect_num = int(np.floor(i/15))
else :
interconnect_num = int(np.floor(i/15)-1)
if np.mod(i,15) == 0 and i == (self.nof_slaves-1) and self.nof_interconnects > 1 :
local_slave_num = 15
else :
local_slave_num = np.mod(i,15)
logger.debug("slave_{} on interconnect {} with local port {} nof_interconnects {}".format(i, interconnect_num, local_slave_num, self.nof_interconnects))
index_list.append((interconnect_num, local_slave_num))
return index_list
#! /usr/bin/env python3
###############################################################################
#
# Copyright (C) 2017
# CSIRO (Commonwealth Scientific and Industrial Research Organization) <http://www.csiro.au/>
# GPO Box 1700, Canberra, ACT 2601, Australia
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Author Date Version comments
# Keith Bengsotn Nov 2018 Original
#
###############################################################################
"""
Generate Address Map for usage in C software.
Lists every 'field' in the fpga.yaml and peripheral.yaml files
ordered by address in the address map.
"""
import sys
import os
#import logging
from argparse import ArgumentParser
#from py_args_lib import FPGA, RAM, FIFO, Register, PeripheralLibrary, FPGALibrary
from py_args_lib import RAM, FIFO, Register, FPGALibrary
#from common import ceil_pow2
#import pprint
#import code
#logging.basicConfig(stream=sys.stdout, level=logging.INFO)
def gen_c_config(fpga, fpga_name, out_dir):
"""
Generate adderss map
fpga: all fpga.yaml files found in system
fpga_name: the particular fpga we're interested in
out_dir: the directory in which output should be written
"""
out = [] # list to hold text lines to be written to output .ccfg file
out.append("# Peripherals for {}:\n".format(fpga_name))
field_count = 0 # count of number of fields found in FPGA address map
# for periph_name, periph_info in fpga['fpga'].address_map.items():
for periph_info in fpga['fpga'].address_map.values():
peripheral = periph_info['peripheral']
p_num = int(periph_info['periph_num'])
pname = peripheral.name()
count = peripheral.number_of_peripherals()
base = int(periph_info['base']/4)
span = int(periph_info['span']/4) # span can be incorrect (bug in pyargslib?)
typ = periph_info['type']
txt = ('# peripheral={} start=0x{:04X} span=0x{:04X} type={}'
' count={:02} idx={} stop=0x{:04X}').format(
pname, base, span, typ, count, p_num, base+count*span)
out.append(txt+'\n')
slave = periph_info['slave']
num_slaves = slave.number_of_slaves()
slave_name = slave.name()
if isinstance(slave, RAM):
out.append('# RAM-SLAVE={:20}\n'.format(slave.name()))
ram_base = base
ram_len = int(slave.address_length())
ram_name = 'data'
access = slave.access_mode()
# width = slave.width() # Should be slave.user_width() ??
for i in range(0, num_slaves):
txt = ' BlockRAM 0x{:08X} len={} {}'.format(
ram_base + i*ram_len, ram_len, access)
if p_num == 0:
txt += ' {}'.format(pname)
else:
txt += ' {}[{}]'.format(pname, p_num)
if num_slaves == 1:
txt += ' {} {}'.format(slave_name, ram_name)
out.append(txt+'\n')
else:
out.append(txt + ' {}[{}] {}\n'.format(slave_name, i, ram_name))
field_count += 1
elif isinstance(slave, FIFO):
out.append('# FIFO-SLAVE={:20}\n'.format(slave.name()))
fifo_base = base
fifo_len = int(slave.address_length())
fifo_name = 'data'
access = slave.access_mode()
# width = slave.width() # Should be slave.user_width() ??
for i in range(0, num_slaves):
txt = ' FIFO 0x{:08X} len={} {}'.format(
fifo_base + i*fifo_len, fifo_len, access)
if p_num == 0:
txt += ' {}'.format(pname)
else:
txt += ' {}[{}]'.format(pname, p_num)
if num_slaves == 1:
txt += ' {} {}'.format(slave_name, fifo_name)
out.append(txt + '\n')
else:
out.append(txt + ' {}[{}] {}\n'.format(slave_name, i, fifo_name))
field_count += 1
elif isinstance(slave, Register):
out.append('# REG-SLAVE={} no.slaves={} len={} (base=0x{:X})\n'.format(
slave.name(), slave.number_of_slaves(), int(slave.address_length()),
int(slave.base_address()/4)))
# Fields that have a non-unity 'number_of_fields' specifier may
# become RAM at the start of the slave instances
for ram in slave.rams:
ram_base = int(ram.base_address()/4)+base
ram_len = int(ram.number_of_fields())
ram_name = ram.name()
access = ram.access_mode()
for i in range(0, num_slaves):
txt = ' DistrRAM 0x{:08X} len={} {}'.format(
ram_base + i*ram_len, ram_len, access)
if p_num == 0:
txt += ' {}'.format(pname)
else:
txt += ' {}[{}]'.format(pname, p_num)
if num_slaves == 1:
txt += ' {} {}'.format(slave_name, ram_name)
out.append(txt + '\n')
else:
out.append(txt + ' {}[{}] {}\n'.format(slave_name, i, ram_name))
field_count += 1
# if num_slaves == 1:
# out.append(' DistrRAM 0x{:08X} len={} {} {} {} {}'.format(
# ram_base, ram_len,access,pname,slave_name, ram_name))
# else:
# for i in range(0,num_slaves):
# out.append(' DistrRAM 0x{:08X} len={} {} {} {} {}[{}]'.format(
# ram_base+i*ram_len, ram_len,access,pname, slave_name, ram_name, i))
field_base = base
if slave.rams:
field_base += int(slave.base_address()/4)
# All other fields (with unity 'number_of_fields' attribute)
slave_length = int(slave.address_length()/4)
for i in range(0, num_slaves):
for fld in slave.fields:
bit_lo = fld.bit_offset()
bit_hi = bit_lo+fld.width()-1
#field_base = f.base_address
field_offset = int(fld.address_offset()/4)
field_name = fld.name()
access = fld.access_mode()
txt = ' BitField 0x{:08X} b[{}:{}] {}'.format(
field_offset+field_base+i*slave_length, bit_hi, bit_lo, access)
if p_num == 0:
txt += ' {}'.format(pname)
else:
txt += ' {}[{}]'.format(pname, p_num)
if num_slaves == 1:
txt += ' {} {}'.format(slave_name, field_name)
else:
txt += ' {}[{}] {}'.format(slave_name, i, field_name)
out.append(txt+'\n')
field_count += 1
# Write all text lines held in list 'out' to file
output_filename = out_dir + fpga_name + '.ccfg'
with open(output_filename, 'w') as out_file:
for line in out:
out_file.write(line)
print("Found {} fields in FPGA '{}'".format(field_count, fpga_name))
print('Wrote: {}'.format(output_filename))
if __name__ == '__main__':
parser = ArgumentParser(
description='ARGS tool script to generate fpgamap.py M&C Python client include file')
parser.add_argument('-f', '--fpga', required=True, help='ARGS fpga_name')
fpga_name = parser.parse_args().fpga
# Find and parse all *.fpga.yaml YAML files under root directory for RADIOHDL
# For each FPGA file it extracts a list of the peripherals that are used by that FPGA
# Here we select the FPGA YAML that matches up with the supplied fpga command line argument
libRootDir = os.path.expandvars('$RADIOHDL')
fpga = FPGALibrary(root_dir=libRootDir).library[fpga_name]
# Check that the output directory exists
out_dir = os.path.expandvars('$HDL_BUILD_DIR/ARGS/{}/'.format(fpga_name))
try:
os.stat(out_dir)
except:
print("Error output directory '{}' does not exist".format(out_dir))
sys.exit()
# create the summary file in the output directory
gen_c_config(fpga, fpga_name, out_dir)
This diff is collapsed.
#! /usr/bin/env python3
###############################################################################
#
# Copyright (C) 2017
# CSIRO (Commonwealth Scientific and Industrial Research Organization) <http://www.csiro.au/>
# GPO Box 1700, Canberra, ACT 2601, Australia
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Author Date Version comments
# John Matthews Dec 2017 Original
#
###############################################################################
"""
Generate fpgamap.py M&C Python client include file
"""
import sys
import os
import logging
from argparse import ArgumentParser
from py_args_lib import FPGA, RAM, FIFO, Register, PeripheralLibrary, FPGALibrary
from common import ceil_pow2
import pprint
import code
#logging.basicConfig(stream=sys.stdout, level=logging.INFO)
def genPython(fpga, fpgaName, readable):
slavePorts={}
print("Including slave ports for {}:".format(fpgaName))
for slavePortName, slavePortInfo in fpga['fpga'].address_map.items():
if slavePortInfo['periph_num'] > 0: continue
peripheral = slavePortInfo['peripheral']
slave = slavePortInfo['slave']
base = int(slavePortInfo['base']/4) # Convert from AXI byte address to register address
if peripheral.name() not in slavePorts:
slavePorts[peripheral.name()] = { 'slaves':{}, 'start':base, 'span':int(slavePortInfo['span']/4), 'count':peripheral.number_of_peripherals() }
else:
slavePorts[peripheral.name()]['span'] += int(slavePortInfo['span']/4)
slaves = slavePorts[peripheral.name()]['slaves']
slaveOffset = base - slavePorts[peripheral.name()]['start']
if isinstance(slave, RAM):
print(' {} at 0x{:X}'.format(slavePortName,base))
# Note py_args_lib quirk. RAM slaves include a single named field. There doesn't seem to be any way to recover this
# from py_args_lib at present. I shall assume that this field is always called "data"
slaves[slave.name()] = { 'type':'RAM', 'start':slaveOffset, 'step':slave.number_of_fields(), 'stop':slaveOffset+slave.number_of_fields()*slave.number_of_slaves(), 'fields':{ 'data':{ 'start':0, 'step':1, 'stop':slave.number_of_fields(), 'access_mode':slave.access_mode(), 'width':slave.width(), 'default':slave.reset_value(), 'description':slave.field_description(), 'bit_offset':slave.bit_offset() } } }
#code.interact(local=locals())
elif isinstance(slave,FIFO):
print(' {} at 0x{:X}'.format(slavePortName,base))
# Assume one field per FIFO slave
# Assume one field per word
slaves[slave.name()] = { 'type':'FIFO', 'start':slaveOffset, 'step':1, 'stop':slaveOffset+slave.number_of_slaves(), 'fields':{ 'data':{ 'start':0, 'step':1, 'stop':1, 'access_mode':slave.access_mode(), 'width':slave.width(), 'default':slave.reset_value(), 'description':slave.field_description(), 'bit_offset':slave.bit_offset(), 'depth':slave.number_of_fields() } } }
elif isinstance(slave,Register):
print(' {} at 0x{:X}'.format(slavePortName,base))
fields = {}
minOffset = 2**32
maxOffset = 0
for r in slave.rams:
offset = int(r.base_address()/4) - slaveOffset
stop_address = offset + (slave.number_of_slaves()-1)*ceil_pow2(r.number_of_fields())+r.number_of_fields()
fields[r.name()] = {'start':offset, 'step':1, 'stop':stop_address, 'access_mode': r.access_mode(), 'width' : r.width(), 'default':r.reset_value(), 'description':r.field_description(), 'bit_offset':0}
minOffset = min(minOffset, offset)
maxOffset = max(maxOffset, stop_address)
for f in slave.fields:
# Note py_args_lib quirk. When number_of_fields > 1 then numeric field id is added to string field name
offset = int(f.address_offset()/4) + int(slave.base_address()/4) - slaveOffset
fields[f.name()] = { 'start':offset, 'step':1, 'stop':offset+1, 'access_mode':f.access_mode(), 'width':f.width(), 'default':f.reset_value(), 'description':f.field_description(), 'bit_offset':f.bit_offset() }
minOffset = min(minOffset,offset)
maxOffset = max(maxOffset,offset+1)
slaves[slave.name()] = { 'type':'REG', 'start':slaveOffset, 'step':maxOffset, 'stop':slaveOffset+slave.number_of_slaves()*(maxOffset), 'fields':fields }
map={}
for k, v in slavePorts.items():
map[k] = { 'start':v['start'], 'step':v['span'], 'stop':v['start']+v['count']*v['span'], 'slaves':v['slaves'] }
pp = pprint.PrettyPrinter(width=300)
mapStr = pp.pformat(map) if readable else str(map)
args_dir = os.path.expandvars('$HDL_BUILD_DIR/ARGS')
try: os.stat(args_dir)
except: os.mkdir(args_dir)
out_dir = os.path.join(args_dir, 'py')
try: os.stat(out_dir)
except: os.mkdir(out_dir)
out_dir = os.path.join(out_dir, fpgaName)
try: os.stat(out_dir)
except: os.mkdir(out_dir)
with open(os.path.join(out_dir, 'fpgamap.py'),'wt') as file:
file.write("# M&C Python client include file for {} FPGA\n".format(fpgaName))
file.write("# Note that this file uses register (not AXI byte) addresses and offsets\n".format(fpgaName))
file.write("FPGAMAP = " + mapStr + "\n")
if __name__ == '__main__':
parser = ArgumentParser(description='ARGS tool script to generate fpgamap.py M&C Python client include file')
parser.add_argument('-f','--fpga', required=True, help='ARGS fpga_name')
parser.add_argument('-r','--readable', action='store_true', help='Generate human readable map file')
fpgaName = parser.parse_args().fpga
readable = parser.parse_args().readable
libRootDir = os.path.expandvars('$RADIOHDL')
# Find and parse all *.fpga.yaml YAML files under libRootDir
# For each FPGA file it extracts a list of the peripherals that are used by that FPGA
# Here we select the FPGA YAML that matches up with the supplied fpga command line argument
fpga = FPGALibrary(root_dir=libRootDir).library[fpgaName]
genPython(fpga, fpgaName, readable)
This diff is collapsed.
schema_name : args
schema_version: 1.0
schema_type : peripheral
hdl_library_name : demo
hdl_library_description: " Gemini Publish/Subscribe Service Registers "
peripherals:
- peripheral_name : periph1
peripheral_description : "Registers associated with Publish/Subscribe protocol block"
slave_ports:
- slave_name : client
slave_type : FIFO
fields:
- - field_name : data
width : 32
number_of_fields : 1000
access_mode : WO
- slave_name : test
slave_type : RAM
fields:
- - field_name : weight
width : 32
user_width : 64
access_mode : RW
number_of_fields : 2000
- slave_name : client
slave_type : reg
number_of_slaves : 4
slave_description : "Registers associated with Subscription client"
fields:
#################################
- - field_name : destination_ip
width : 32
access_mode : RW
default : 0xffffffff
field_description : "Destination IP Address"
#################################
- - field_name : destination_mac_lower
width : 32
access_mode : RW
default : 0xffffffff
field_description : "Destination MAC Lower Address"
#################################
- - field_name : destination_mac_upper
width : 16
access_mode : RW
default : 0xffff
field_description : "Destination MAC Upper Address"
#################################
- - field_name : destination_port
width : 16
access_mode : RW
default : 0x7531
field_description : "Destination UDP port"
#################################
- - field_name : event_mask
width : 32
access_mode : RW
default : 0x0
field_description : "Publish Event Mask"
#################################
- - field_name : event
width : 32
access_mode : RO
default : 0x0
field_description : "Current Pending or last event"
#################################
- - field_name : delivery_interval
width : 14
access_mode : RW
default : 0x7D0
number_of_fields : 3
field_description : "Delivery Interval in mS"
#################################
- control:
- field_name : acknowledge
width : 1
access_mode : RW
default : 1
#address_offset : 16
side_effect : PW
number_of_fields : 4
bit_offset : 3
field_description : "Acknowledge pulse event for reliable delivery"
#################################
- status:
- field_name : event_overflow
width : 1
access_mode : RO
default : 1
field_description : "An event queue overflow conditon has occured. Reset on acknowledge"
#################################
- slave_name : broadcast
slave_type : reg
number_of_slaves : 3
slave_description : "Control Register for broadcast client"
fields:
- control:
- field_name : acknowledge
width : 1
access_mode : RW
default : 0
side_effect : PW
field_description : "Acknowledge pulse event for reliable delivery"
# - - field_name : ram1
# access_mode : RW
# number_of_fields : 50
# width : 12
# - - field_name : ram2
# number_of_fields : 100
# - - field_name : ram3
# number_of_fields : 50
# - - field_name : ram4
# number_of_fields : 100
# - - field_name : ram5
# number_of_fields : 500
\ No newline at end of file
schema_name : args
schema_version: 1.0
schema_type : peripheral
hdl_library_name : example
hdl_library_description: "Example of an ARGS peripheral library. Comments in-line with key-value pairs should be replaced with actual values "
peripherals:
- peripheral_name : periph1 # name used in register module and documentation
peripheral_description : "periph1 is the 1st peripheral in the example peripheral library"
slave_ports:
- slave_name :
slave_type : REG # or REG_IP when firmware generation not desired
slave_protocol : # optional, use intended for slave type REG_IP ONLY
number_of_slaves : # optional, default: 1
slave_description : # Custom string for documentation, contained in quotation marks
fields:
################################# Fully specified field with all key options
- - field_name : # custom string, limit length where possible
width : # optional, default: 32
bit_offset : # optional, default: 0 or next available
address_offset : # optional, default: 0 or next available #check this
access_mode : # optional, default: RW
side_effect : # optional, default: None, eg. PR eg2. PR,PW
number_of_fields : # optional, default: 1
reset_value : # optional, default 0
simulation_value : # optional, default: reset_value
radix : # optional, default: signed
field_description : # custom string, include as much detail as possible for documentation purposes
################################# Field with recommended minimum key options
- <field_group_name> :
- field_name :
width :
access_mode :
field_description :
- field_name :
width :
access_mode :
field_description :
#################################
- slave_name : # custom string
slave_type : RAM
number_of_slaves : # optional, default: 1
slave_description : # custom string in quotation marks
fields :
################################# RAM is described by only one field
- - field_name : data # field name not important
width : # optional, resulting BRAM will always have data width of 32
user_width : # optional, default: 32
access_mode : # optional, default: RW
number_of_fields : # data depth, user set value gets rounded up to power of 2 from 1k up to 256k
reset_value : # can be specified by integer, equation or absolute path to .coe file
simulation_value : # optional, default: reset_value
radix : # optional, default: signed
field_description : # custom string
#################################
- slave_name : # custom string
slave_type : FIFO
number_of_slaves : # optional, default: 1
slave_description : # custom string in quotation marks
fields :
################################# FIFO is described by only one field
- - field_name : data # field name not important
width : # optional, resulting FIFO will always have data width of 32
access_mode : # mandatory, RO or WO
number_of_fields : # data depth, user set value gets rounded up to power of 2 from 512 up to 128k
reset_value : # can be specified by integer, equation or absolute path to .coe file
simulation_value : # optional, default: reset_value
radix : # optional, default: signed
field_description : # custom string
schema_name : args
schema_version: 1.0
schema_type : peripheral
hdl_library_name : Template
hdl_library_description: "Template for an ARGS peripheral library. Comments in-line with key-value pairs should be replaced with actual values "
peripherals:
- peripheral_name : periph1 # name used in register module and documentation
peripheral_description : "periph1 is the 1st peripheral in the example peripheral library"
slave_ports:
- slave_name : # custom string
slave_type : REG # or REG_IP when firmware generation not desired
slave_protocol : # optional, use intended for slave type REG_IP ONLY
number_of_slaves : # optional, default: 1
slave_description : # Custom string for documentation, contained in quotation marks
fields:
################################# Fully specified field with all key options
- - field_name : # custom string, limit length where possible
width : # optional, default: 32
bit_offset : # optional, default: 0 or next available
address_offset : # optional, default: 0 or next available #check this
access_mode : # optional, default: RW
side_effect : # optional, default: None, eg. PR eg2. PR,PW
number_of_fields : # optional, default: 1
reset_value : # optional, default 0
simulation_value : # optional, default: reset_value
radix : # optional, default: signed
field_description : # custom string, include as much detail as possible for documentation purposes
################################# Field with recommended minimum key options
- <field_group_name> :
- field_name :
width :
access_mode :
field_description :
- field_name :
width :
access_mode :
field_description :
#################################
- slave_name : # custom string
slave_type : RAM
number_of_slaves : # optional, default: 1
slave_description : # custom string in quotation marks
fields :
################################# RAM is described by only one field
- - field_name : data # field name not important
width : # optional, resulting BRAM will always have data width of 32
user_width : # optional, default: 32
access_mode : # optional, default: RW
number_of_fields : # data depth, user set value gets rounded up to power of 2 from 1k up to 256k
reset_value : # can be specified by integer, equation or absolute path to .coe file
simulation_value : # optional, default: reset_value
radix : # optional, default: signed
field_description : # custom string
#################################
- slave_name : # custom string
slave_type : FIFO
number_of_slaves : # optional, default: 1
slave_description : # custom string in quotation marks
fields :
################################# FIFO is described by only one field
- - field_name : data # field name not important
width : # optional, resulting FIFO will always have data width of 32
access_mode : # mandatory, RO or WO
number_of_fields : # data depth, user set value gets rounded up to power of 2 from 512 up to 128k
reset_value : # can be specified by integer, equation or absolute path to .coe file
simulation_value : # optional, default: reset_value
radix : # optional, default: signed
field_description : # custom string
__pycache__/
""" init file for py_args_lib
"""
import os
import sys
cwd = __file__[:__file__.rfind('/')]
# cwd = os.path.dirname(os.path.realpath(__file__))
sys.path.append(cwd)
from unit_logger import UnitLogger
unit_logger = UnitLogger(os.path.join(cwd, '../log'))
logger = unit_logger.logger
from peripheral import PeripheralLibrary, Peripheral
from fpga import FPGA, FPGALibrary
from peripheral_lib import *
###############################################################################
#
# Copyright (C) 2016
# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/>
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Author Date
# HJ jan 2017 Original
# EK feb 2017
# PD feb 2017
#
###############################################################################
import os
import copy
import logging
import yaml
import time
import collections
import common as cm
from py_args_lib import *
from peripheral_lib import *
import numpy as np
logger = logging.getLogger('main.fpga')
class FPGA(object):
""" A System consist of a set of one or more Peripherals.
"""
def __init__(self, file_path_name=None, periph_lib=None):
self.file_path_name = file_path_name
self.root_dir = os.environ['RADIOHDL']
if periph_lib is None:
self.peri_lib = PeripheralLibrary(self.root_dir)
else:
self.peri_lib = periph_lib
self.fpga_name = ""
self.fpga_description = ""
self.parameters = {}
# self.peripherals = {}
self.peripherals = collections.OrderedDict()
self.valid_file_type = False
self.nof_lite = 0
self.nof_full = 0
self.address_map = collections.OrderedDict()
logger.debug("***FPGA object instantiation: creating for {}".format(file_path_name))
if file_path_name is None:
logger.debug("No system configuration file specified")
self.system_config = None
self.system = None
else:
# list of peripheral configurations that are read from the available peripheral files
self.system_config = self.read_system_file(file_path_name)
self.create_system()
def is_valid(self):
""" return False or True if the given file is a valid system file """
return self.valid_file_type
def read_system_file(self, file_path_name):
"""Read the system information from the file_path_name file."""
logger.info("Load system from '%s'", file_path_name)
system_config = yaml.load(open(file_path_name, 'r'))
self.valid_file_type = True
return system_config
def create_system(self):
""" Create a system object based on the information in the system_config """
logger.debug("Creating system")
logger.debug("Instantiating the peripherals from the peripheral Library")
# self.fpga_name = self.system_config['hdl_library_name']
self.fpga_name = self.system_config['fpga_name']
config = self.system_config
if "fpga_description" in config:
self.fpga_description = copy.deepcopy(config['fpga_description'])
if "parameters" in config:
_parameters = copy.deepcopy(config['parameters'])
# keys with '.' in the name indicate the use of a structs
# inside this class it is a dict in a dict
for parameter_set in _parameters:
name = parameter_set['name']
value = parameter_set['value']
if '.' in name:
_struct, _key = name.split('.')
if _struct not in self.parameters:
self.parameters[_struct] = {}
remove_list = []
for parameter_set_nr, parameter_set in enumerate(_parameters):
name = parameter_set['name']
value = parameter_set['value']
if '.' in name: # struct notation with dot
_struct, _name = name.split('.')
self.parameters[_struct][_name] = self._eval(str(value))
else:
self.parameters[name] = self._eval(str(value))
remove_list.append((parameter_set_nr, name))
for parameter_set_nr, name in sorted(remove_list, reverse=True):
logger.debug("delete '%s' from parameters", name)
del _parameters[parameter_set_nr]
for parameter_set in _parameters:
name = parameter_set['name']
value = parameter_set['value']
logger.debug("eval of name=%s and value=%s not posible", name, value)
logger.debug("parameters={}".format(self.parameters))
for peripheral_config in config['peripherals']:
# (Deep)Copy the peripheral from the library in order to avoid creating a reference
component_name = peripheral_config['peripheral_name'] if '/' not in peripheral_config['peripheral_name'] else peripheral_config['peripheral_name'].split('/')[1]
component_lib = None if '/' not in peripheral_config['peripheral_name'] else peripheral_config['peripheral_name'].split('/')[0]
component_prefix = peripheral_config.get('peripheral_group',None)
number_of_peripherals = int(self._eval(peripheral_config.get('number_of_peripherals', 1)))
peripheral_from_lib = copy.deepcopy(self.peri_lib.find_peripheral(component_name, component_lib, self.fpga_name))
if peripheral_from_lib is None:
logger.error("Peripheral component '{}' referenced in {}.fpga.yaml not found in "
"peripheral library {}".format(component_name, self.fpga_name, '\'' + component_lib + '\'' if component_lib is not None else ''))
sys.exit()
logger.debug(" Finding %s", peripheral_from_lib.name())
if 'parameter_overrides' in peripheral_config:
logger.debug("parameters={}".format(peripheral_config['parameter_overrides']))
for parameter_set in peripheral_config['parameter_overrides']:
name = parameter_set['name']
value = parameter_set['value']
peripheral_from_lib.parameter(key=name, val=self._eval(value))
if 'slave_port_names' in peripheral_config:
logger.debug("slave_port_names={}".format(peripheral_config['slave_port_names']))
for slave_nr, slave_port_name in enumerate(peripheral_config['slave_port_names']):
peripheral_from_lib.set_user_defined_slavename(slave_nr, slave_port_name)
peripheral_from_lib.number_of_peripherals(number_of_peripherals)
peripheral_from_lib.prefix(component_prefix)
peripheral_name = []
if component_prefix not in (None, ''):
peripheral_name.append(component_prefix)
peripheral_name.append(component_name)
peripheral_from_lib.name('_'.join(peripheral_name))
if peripheral_from_lib.name() not in self.peripherals:
self.peripherals[peripheral_from_lib.name()] = copy.deepcopy(peripheral_from_lib)
else:
logger.error(" Duplicate found: use unique labels per instance in %s.fpga.yaml to distinguish "
"between multiple instances of the same peripheral.\n"
" Cannot add a second instance of peripheral: %s",
self.fpga_name, peripheral_from_lib.name())
sys.exit()
logger.debug("Start evaluating the peripherals")
for peripheral_config in self.peripherals.values():
peripheral_config.eval_peripheral()
self.create_address_map()
return
def _eval(self, val):
""" evaluate val.
1) trying to parse values of known parameter into the value
2) eval the value, known imported function are also evaluated """
_val = str(val)
# first replace all knowns parameter names with its assigned value
for key1, val1 in iter(self.parameters.items()):
# if val is a dict, in vhdl it's a struct
if isinstance(val1, dict):
for key2, val2 in iter(val1.items()):
key = "{}.{}".format(key1, key2)
# logger.debug("replace %s with %s", key, str(val2))
_val = _val.replace(key, str(val2))
else:
# logger.debug("replace %s with %s", key1, str(val1))
_val = _val.replace(key1, str(val1))
result = eval(_val)
logger.debug("_eval(%s) returns eval(%s) = %s", str(val), _val, str(result))
return result
def create_address_map(self):
""" Preserves order of entry from fpga.yaml
Based on vivado limitations, minimum span is 4kB
Configurable ranges are 4k, 8k, 16k, 32k, 64k i.e. 2^(12,13,14,15,16)
There is a maximum of one register group per peripheral
"""
print("[fpga.py] create_address_map('{:s}')".format(self.fpga_name))
# Largest peripheral will determine spacing between peripheral base addresses
largest_addr_range = 4096 # minimal allowed address-decode spacing with Xilinx interconnect
for peripheral in self.peripherals.values():
if peripheral.reg_len > largest_addr_range:
largest_addr_range = peripheral.reg_len
peripheral_spacing = cm.ceil_pow2(largest_addr_range)
lowest_free_addr = 0
for peripheral in self.peripherals.values():
# Ensure peripheral base is aligned to address decode
lowest_free_addr = int(np.ceil(lowest_free_addr/peripheral_spacing)*peripheral_spacing)
p_nam = peripheral.name()
if len(p_nam) > 20:
p_nam = p_nam[0:20]
pad = ' ' * (21-len(peripheral.name()))
print('** PERIPHERAL: {} {}base_addr=0x{:08x} [occupied size=0x{:x}]'.format(p_nam, pad, lowest_free_addr, peripheral.reg_len))
# assigned_reg = False
# _nof_regs = sum([isinstance(slave, Register) for slave in peripheral.slaves])
# _minus_regs = _nof_regs - 1 if _nof_regs > 0 else 0
# _nof_slaves = len(peripheral.slaves) - _minus_regs
for periph_num in range(peripheral.number_of_peripherals()):
assigned_reg = False
for slave in peripheral.slaves:
if isinstance(slave, Register) and not getattr(slave, 'isIP', False):
if assigned_reg == False: #calc for entire register slave port
reg_span = cm.ceil_pow2(max(peripheral.reg_len, 4096))
lowest_free_addr = int(np.ceil(lowest_free_addr/reg_span)*reg_span)
register_base = lowest_free_addr
else :
self.nof_lite = self.nof_lite - 1
lowest_free_addr = register_base + (slave.base_address() if not any(slave.rams) else slave.rams[0].base_address())
ram_span = slave.base_address() - slave.rams[0].base_address() if any(slave.rams) else 0
slave_span = slave.address_length()*slave.number_of_slaves()+ram_span
slave_port_name = peripheral.name() + '_' + slave.name() + (('_' + str(periph_num)) if peripheral.number_of_peripherals() > 1 else '')
self.address_map[slave_port_name] = {'base':lowest_free_addr,'span':slave_span,'type':'LITE','port_index':self.nof_lite,'peripheral':peripheral,'periph_num':periph_num,'slave':slave}
logger.info("Register for %s has span 0x%x", peripheral.name(), slave_span)
lowest_free_addr = lowest_free_addr + int(slave_span)
self.nof_lite = self.nof_lite + 1
assigned_reg = True
elif isinstance(slave, Register) and getattr(slave, 'isIP', False):
slave_span = cm.ceil_pow2(max(slave.address_length()*slave.number_of_slaves(), 4096)) #slave.address_length()*slave.number_of_slaves()#
lowest_free_addr = int(np.ceil(lowest_free_addr/slave_span)*slave_span)
slave_port_name = peripheral.name() + '_' + slave.name() #+ '_reg_ip'
self.address_map[slave_port_name] = {'base':lowest_free_addr, 'span':slave_span, 'type':slave.protocol, 'port_index':eval("self.nof_{}".format(slave.protocol.lower())),'peripheral':peripheral,'periph_num':periph_num,'slave':slave}
if slave.protocol.lower() == 'lite':
self.nof_lite = self.nof_lite + 1
else :
self.nof_full = self.nof_full + 1
lowest_free_addr = lowest_free_addr + slave_span
elif isinstance(slave, RAM):
slave_type = 'ram'
size_in_bytes = np.ceil(slave.width()/8)*slave.address_length()*cm.ceil_pow2(slave.number_of_slaves())
slave_span = cm.ceil_pow2(max(size_in_bytes, 4096))
logger.info("Slave %s has span 0x%x", peripheral.name() + '_' + slave.name() , slave_span)
# slave_name = slave.name() + ('_{}'.format(slave_no) if slave.number_of_slaves() >1 else '')
lowest_free_addr = int(np.ceil(lowest_free_addr/slave_span)*slave_span)
slave_port_name = peripheral.name() + '_' + slave.name() + '_' + slave_type+ (('_' + str(periph_num)) if peripheral.number_of_peripherals() > 1 else '')
self.address_map[slave_port_name] = {'base':lowest_free_addr,'span':slave_span,'type':'FULL','port_index':self.nof_full,'peripheral':peripheral,'periph_num':periph_num,'slave':slave}
self.nof_full = self.nof_full + 1
lowest_free_addr = lowest_free_addr + slave_span
elif isinstance(slave, FIFO):
slave_type = 'fifo'
size_in_bytes = np.ceil(slave.width()/8)*slave.address_length()
slave_span = cm.ceil_pow2(max(size_in_bytes, 4096))
for i in range(slave.number_of_slaves()):
lowest_free_addr = int(np.ceil(lowest_free_addr/slave_span)*slave_span)
slave_port_name = peripheral.name() + '_' + slave.name() + '_' + slave_type+ (('_' + str(periph_num)) if peripheral.number_of_peripherals() > 1 else '') + ('_{}'.format(i) if slave.number_of_slaves() > 1 else '')
self.address_map[slave_port_name] = {'base':lowest_free_addr,'span':slave_span,'type':'FULL','port_index':self.nof_full,'access':slave.access_mode(),'peripheral':peripheral,'periph_num':periph_num,'slave':slave}
self.nof_full = self.nof_full + 1
lowest_free_addr = lowest_free_addr + slave_span
if slave_span > 65536:
logger.error("Slave %s has slave span %d. Maximum slave span in Vivado Address Editor is 64kB", slave.name(), slave_span)
sys.exit()
def show_overview(self):
""" print system overview
"""
logger.info("----------------")
logger.info("SYSTEM OVERVIEW:")
logger.info("----------------")
logger.info("System name '%s'", self.fpga_name)
logger.info("- System parameters:")
for key, val in iter(self.parameters.items()):
if isinstance(val, dict):
for _key, _val in val.items():
logger.info(" %-25s %s", "{}.{}".format(key, _key), str(_val))
else:
logger.info(" %-20s %s", key, str(val))
logger.info("- System Address Map:")
for slave_name, attributes in self.address_map.items():
logger.info(" %-30s 0x%x, range %dkB at port MSTR_%s[%d]", slave_name, attributes['base'], attributes['span']/1024, attributes['type'],attributes['port_index'])
logger.info("- Peripherals:")
for peripheral in sorted(self.peripherals):
self.peripherals[peripheral].show_overview(header=False)
class FPGALibrary(object):
"""
List of all information for FPGA config files in the root dir, intended for use in hdl_config.py
"""
def __init__(self, root_dir=None):
self.root_dir = root_dir
self.file_extension = ".fpga.yaml"
self.library = {}
self.nof_peripherals = 0
tic = time.time()
if root_dir is not None:
for root, dirs, files in os.walk(self.root_dir, topdown = True):
if 'tools' not in root:
for name in files:
if self.file_extension in name:
try :
library_config = yaml.load(open(os.path.join(root,name),'r'))
except:
logger.error('Failed parsing YAML in {}. Check file for YAML syntax errors'.format(name))
print('ERROR:\n' + sys.exc_info()[1])
sys.exit()
if not isinstance(library_config, dict):
logger.warning('File {} is not readable as a dictionary, it will not be'
' included in the FPGA library of peripherals'.format(name))
continue
lib_name = name.replace(self.file_extension, '')
logger.info("Found fpga.yaml file {}".format(lib_name))
if lib_name in self.library:
logger.warning("{} library already exists in FPGALibrary, being overwritten".format(lib_name))
self.library[lib_name] = {'file_path':root, 'file_path_name':os.path.join(root,name), 'peripherals':{}}
toc = time.time()
logger.debug("FPGALibrary os.walk took %.4f seconds" %(toc-tic))
self.read_all_fpga_files()
def read_all_fpga_files(self, file_path_names=None):
"""
Read the information from all FPGA files that were found in the root_dir tree
"""
self.fpgas = {}
periph_lib = PeripheralLibrary(self.root_dir)
if file_path_names is None:
file_path_names = [lib_dict['file_path_name'] for lib_dict in self.library.values()]
for fpn in file_path_names:
logger.info("Creating ARGS FPGA object from {}".format(fpn))
tic = time.time()
fpga = FPGA(fpn, periph_lib=periph_lib)
toc = time.time()
logger.debug("fpga creation for %s took %.4f seconds" %(fpn, toc-tic))
fpga.show_overview()
self.library[fpga.fpga_name].update({'fpga':fpga})
for lib, peripheral in fpga.peripherals.items():
self.library[fpga.fpga_name]['peripherals'].update({lib:peripheral})
This diff is collapsed.
__pycache__/
import os
import sys
# cwd = __file__[:__file__.rfind('/')]
cwd = os.path.dirname(os.path.realpath(__file__))
sys.path.append(cwd)
from constants import *
from base_object import BaseObject
from field import Field
from register import Register
from ram import RAM
from fifo import FIFO
from args_errors import *
class ARGSNameError(Exception):
pass
class ARGSModeError(Exception):
pass
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment