Skip to content
GitLab
Explore
Sign in
Register
Primary navigation
Search or go to…
Project
S
sdptr
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Iterations
Wiki
Requirements
Jira
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Build
Pipelines
Jobs
Pipeline schedules
Test cases
Artifacts
Deploy
Releases
Container registry
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Code review analytics
Issue analytics
Insights
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
GitLab community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
LOFAR2.0
sdptr
Commits
225b620d
Commit
225b620d
authored
Oct 7, 2021
by
Pieter Donker
Browse files
Options
Downloads
Patches
Plain Diff
add test script
parent
cd5e2895
Branches
Branches containing commit
No related tags found
No related merge requests found
Changes
2
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
test/py/Client.py
+209
-52
209 additions, 52 deletions
test/py/Client.py
test/py/tools.py
+55
-0
55 additions, 0 deletions
test/py/tools.py
with
264 additions
and
52 deletions
test/py/Client.py
+
209
−
52
View file @
225b620d
...
...
@@ -33,11 +33,15 @@ import logging
import
traceback
import
argparse
from
functools
import
wraps
from
tools
import
*
sys
.
path
.
insert
(
0
,
"
..
"
)
Temp_only
=
True
N_NODES
=
16
N_BEAMSETS
=
1
S_PN
=
12
N_SUB
=
512
...
...
@@ -123,7 +127,6 @@ def explore_temp(node):
print
(
"
no variables to read
"
)
explore
(
children
[
i
])
@timing
def
check_get_all_R_time
(
obj
):
# '2:' = space 2 is "http://lofar.eu".
...
...
@@ -181,6 +184,69 @@ def check_get_all_R_time(obj):
# print('\n'.join(info))
def
plot_histogram
(
obj
,
nodes
,
inputs
):
import
numpy
as
np
import
matplotlib
# matplotlib.use('Agg')
import
matplotlib.pyplot
as
plt
var
=
obj
.
get_child
(
"
2:FPGA_signal_input_histogram_R
"
)
data
=
var
.
get_value
()
np_data
=
np
.
array
(
data
)
N_HISTO
=
512
# for i in range(N_NODES):
for
i
in
list
(
nodes
):
for
j
in
list
(
inputs
):
s1
=
(
i
*
S_PN
*
N_HISTO
)
+
(
j
*
N_HISTO
)
s2
=
s1
+
N_HISTO
print
(
"
i={}, j={}, start={}, stop={}
"
.
format
(
i
,
j
,
s1
,
s2
))
_data
=
np_data
[
s1
:
s2
]
# _data = np.ma.masked_where(_data < 1, _data)
# _data = np.ma.masked_where(_data > 2**28-1, _data)
print
(
"
used data : {}
"
.
format
(
_data
))
print
(
"
min value : {}
"
.
format
(
_data
.
min
()))
print
(
"
max value : {}
"
.
format
(
_data
.
max
()))
print
(
"
sum values: {}
"
.
format
(
_data
.
sum
()))
print
(
"
skip first and last value from array
"
)
bin_range
=
range
(
-
255
,
255
)
plt
.
bar
(
bin_range
,
height
=
_data
[
1
:
-
1
],
width
=
1.0
)
plt
.
xlim
([
-
256
,
255
])
# plt.ylim([0, 200e6])
plt
.
tight_layout
()
plt
.
show
()
return
True
def
plot_input_data
(
obj
,
nodes
,
inputs
):
import
numpy
as
np
import
matplotlib
# matplotlib.use('Agg')
import
matplotlib.pyplot
as
plt
var
=
obj
.
get_child
(
"
2:FPGA_signal_input_data_buffer_R
"
)
data
=
var
.
get_value
()
np_data
=
np
.
array
(
data
)
N_SIGNALS
=
1024
# for i in range(N_NODES):
for
i
in
list
(
nodes
):
for
j
in
list
(
inputs
):
s1
=
(
i
*
S_PN
*
N_SIGNALS
)
+
(
j
*
N_SIGNALS
)
s2
=
s1
+
N_SIGNALS
print
(
"
i={}, j={}, start={}, stop={}
"
.
format
(
i
,
j
,
s1
,
s2
))
_data
=
np_data
[
s1
:
s2
]
print
(
"
{}
"
.
format
(
_data
.
shape
))
print
(
"
{}
"
.
format
(
_data
))
plt
.
plot
(
range
(
N_SIGNALS
),
_data
)
plt
.
tight_layout
()
plt
.
show
()
print
(
"
size data={}
"
.
format
(
len
(
data
)))
return
True
def
write_fpga_mask
(
obj
,
nodes
=
None
,
mask
=
None
):
enable_mask
=
[
False
]
*
N_NODES
if
mask
is
not
None
:
...
...
@@ -200,12 +266,48 @@ def get_fpga_mask(obj):
return
enable_mask
@timing
def
read_subband_weights
(
obj
):
var
=
obj
.
get_child
(
"
2:FPGA_subband_weights_R
"
)
vals
=
var
.
get_value
()
def
write_subband_weights
(
obj
):
weights
=
[
8192
]
*
N_SUB
*
N_NODES
*
S_PN
# set weights for subbands to default
var
=
obj
.
get_child
(
"
2:FPGA_subband_weights_RW
"
)
var
.
set_value
(
ua
.
Variant
(
value
=
list
(
weights
),
varianttype
=
ua
.
VariantType
.
UInt32
))
def
setup_wg_sinus
(
obj
):
'''
setup wg for sinus stream
'''
# Write WG configuration with phases from 0 - 360 and 1 to 1/12 amplitudes
print
(
"
wg on
"
)
enable
=
[
False
]
*
(
S_PN
*
N_NODES
)
# enable wg
var
=
obj
.
get_child
(
"
2:FPGA_wg_enable_RW
"
)
var
.
set_value
(
ua
.
Variant
(
value
=
list
(
enable
),
varianttype
=
ua
.
VariantType
.
Boolean
))
_ampl
=
[
wg_ampl
]
*
(
S_PN
*
N_NODES
)
var
=
obj
.
get_child
(
"
2:FPGA_wg_amplitude_RW
"
)
var
.
set_value
(
ua
.
Variant
(
value
=
list
(
_ampl
),
varianttype
=
ua
.
VariantType
.
Double
))
_phase
=
[
wg_phase
]
*
(
S_PN
*
N_NODES
)
var
=
obj
.
get_child
(
"
2:FPGA_wg_phase_RW
"
)
var
.
set_value
(
ua
.
Variant
(
value
=
list
(
_phase
),
varianttype
=
ua
.
VariantType
.
Double
))
_freq
=
[
wg_freq
]
*
(
S_PN
*
N_NODES
)
var
=
obj
.
get_child
(
"
2:FPGA_wg_frequency_RW
"
)
var
.
set_value
(
ua
.
Variant
(
value
=
list
(
_freq
),
varianttype
=
ua
.
VariantType
.
Double
))
print
(
"
wg on
"
)
enable
=
[
True
]
*
(
S_PN
*
N_NODES
)
# enable wg
var
=
obj
.
get_child
(
"
2:FPGA_wg_enable_RW
"
)
var
.
set_value
(
ua
.
Variant
(
value
=
list
(
enable
),
varianttype
=
ua
.
VariantType
.
Boolean
))
print
(
'
wait until wg active again
'
)
time
.
sleep
(
2.0
)
# wain until active
def
setup_wg_xst_mode
(
obj
):
'''
setup wg for xst stream
...
...
@@ -236,6 +338,23 @@ def turn_wg_off(obj):
var
.
set_value
(
ua
.
Variant
(
value
=
list
(
enable
),
varianttype
=
ua
.
VariantType
.
Boolean
))
def
setup_beamlet_stream
(
obj
):
'''
setup output stream
'''
port
=
[
5000
]
*
N_NODES
*
N_BEAMSETS
# use port 5002
var
=
obj
.
get_child
(
"
2:FPGA_beamlet_output_hdr_udp_destination_port_RW
"
)
var
.
set_value
(
ua
.
Variant
(
value
=
list
(
port
),
varianttype
=
ua
.
VariantType
.
UInt16
))
dest_mac
=
[
"
00:07:43:06:c7:00
"
]
*
N_NODES
*
N_BEAMSETS
# use mac of ??
var
=
obj
.
get_child
(
"
2:FPGA_beamlet_output_hdr_eth_destination_mac_RW
"
)
var
.
set_value
(
ua
.
Variant
(
value
=
list
(
dest_mac
),
varianttype
=
ua
.
VariantType
.
String
))
dest_addr
=
[
"
192.168.0.1
"
]
*
N_NODES
*
N_BEAMSETS
# use addr of ??
var
=
obj
.
get_child
(
"
2:FPGA_beamlet_output_hdr_ip_destination_address_RW
"
)
var
.
set_value
(
ua
.
Variant
(
value
=
list
(
dest_addr
),
varianttype
=
ua
.
VariantType
.
String
))
def
setup_sst_stream
(
obj
):
'''
setup bst stream, use git/upe_gear/peripherals/pi_satistics_stream.py for recording and plotting.
...
...
@@ -253,10 +372,6 @@ def setup_sst_stream(obj):
var
=
obj
.
get_child
(
"
2:FPGA_sst_offload_hdr_ip_destination_address_RW
"
)
var
.
set_value
(
ua
.
Variant
(
value
=
list
(
dest_addr
),
varianttype
=
ua
.
VariantType
.
String
))
weights
=
[
8192
]
*
N_SUB
*
N_NODES
*
S_PN
# set weights for subbands to default
var
=
obj
.
get_child
(
"
2:FPGA_subband_weights_RW
"
)
var
.
set_value
(
ua
.
Variant
(
value
=
list
(
weights
),
varianttype
=
ua
.
VariantType
.
UInt32
))
enable
=
[
True
]
*
N_NODES
# enable weighted subbands
var
=
obj
.
get_child
(
"
2:FPGA_sst_offload_weighted_subbands_RW
"
)
var
.
set_value
(
ua
.
Variant
(
value
=
list
(
enable
),
varianttype
=
ua
.
VariantType
.
Boolean
))
...
...
@@ -267,15 +382,15 @@ def setup_bst_stream(obj):
setup bst stream, use git/upe_gear/peripherals/pi_satistics_stream.py for recording and plotting.
- pi_statistic_stream.py --unb2 2 --pn2 0 --cmd 4 -s=BST
'''
port
=
[
5002
]
*
N_NODES
# use port 5002
port
=
[
5002
]
*
N_NODES
*
N_BEAMSETS
# use port 5002
var
=
obj
.
get_child
(
"
2:FPGA_bst_offload_hdr_udp_destination_port_RW
"
)
var
.
set_value
(
ua
.
Variant
(
value
=
list
(
port
),
varianttype
=
ua
.
VariantType
.
UInt16
))
dest_mac
=
[
"
00:1B:21:71:76:B9
"
]
*
N_NODES
# use mac of dop36
dest_mac
=
[
"
00:1B:21:71:76:B9
"
]
*
N_NODES
*
N_BEAMSETS
# use mac of dop36
var
=
obj
.
get_child
(
"
2:FPGA_bst_offload_hdr_eth_destination_mac_RW
"
)
var
.
set_value
(
ua
.
Variant
(
value
=
list
(
dest_mac
),
varianttype
=
ua
.
VariantType
.
String
))
dest_addr
=
[
"
10.99.0.254
"
]
*
N_NODES
# use addr of dop36
dest_addr
=
[
"
10.99.0.254
"
]
*
N_NODES
*
N_BEAMSETS
# use addr of dop36
var
=
obj
.
get_child
(
"
2:FPGA_bst_offload_hdr_ip_destination_address_RW
"
)
var
.
set_value
(
ua
.
Variant
(
value
=
list
(
dest_addr
),
varianttype
=
ua
.
VariantType
.
String
))
...
...
@@ -301,7 +416,7 @@ def setup_xst_stream(obj):
var
=
obj
.
get_child
(
"
2:FPGA_xst_subband_select_RW
"
)
var
.
set_value
(
ua
.
Variant
(
value
=
list
(
subsel
),
varianttype
=
ua
.
VariantType
.
UInt32
))
interval
=
[
1.0
]
*
N_NODES
# use fixed interval 1.0 second
interval
=
[
0.1
]
*
N_NODES
# use fixed interval 1.0 second
var
=
obj
.
get_child
(
"
2:FPGA_xst_integration_interval_RW
"
)
var
.
set_value
(
ua
.
Variant
(
value
=
list
(
interval
),
varianttype
=
ua
.
VariantType
.
Double
))
...
...
@@ -310,42 +425,54 @@ def setup_xst_stream(obj):
var
.
set_value
(
ua
.
Variant
(
value
=
list
(
processing
),
varianttype
=
ua
.
VariantType
.
Boolean
))
def
set_enable_stream
(
obj
,
mode
):
if
mode
==
'
OFF
'
:
enable
=
[
False
]
*
N_NODES
# disable offload
def
set_sst_stream
(
obj
,
stream_on
=
False
):
var
=
obj
.
get_child
(
"
2:FPGA_sst_offload_enable_RW
"
)
enable
=
[
bool
(
stream_on
)]
*
N_NODES
# enable offload
var
.
set_value
(
ua
.
Variant
(
value
=
list
(
enable
),
varianttype
=
ua
.
VariantType
.
Boolean
))
def
set_bst_stream
(
obj
,
stream_on
=
False
):
var
=
obj
.
get_child
(
"
2:FPGA_bst_offload_enable_RW
"
)
enable
=
[
bool
(
stream_on
)]
*
N_NODES
*
N_BEAMSETS
# enable offload
var
.
set_value
(
ua
.
Variant
(
value
=
list
(
enable
),
varianttype
=
ua
.
VariantType
.
Boolean
))
def
set_xst_stream
(
obj
,
stream_on
=
False
):
var
=
obj
.
get_child
(
"
2:FPGA_xst_offload_enable_RW
"
)
enable
=
[
bool
(
stream_on
)]
*
N_NODES
# enable offload
var
.
set_value
(
ua
.
Variant
(
value
=
list
(
enable
),
varianttype
=
ua
.
VariantType
.
Boolean
))
return
enable
=
[
True
]
*
N_NODES
# enable offload
if
mode
==
'
SST
'
:
var
=
obj
.
get_child
(
"
2:FPGA_sst_offload_enable_RW
"
)
var
.
set_value
(
ua
.
Variant
(
value
=
list
(
enable
),
varianttype
=
ua
.
VariantType
.
Boolean
))
elif
mode
==
'
BST
'
:
var
=
obj
.
get_child
(
"
2:FPGA_bst_offload_enable_RW
"
)
var
.
set_value
(
ua
.
Variant
(
value
=
list
(
enable
),
varianttype
=
ua
.
VariantType
.
Boolean
))
elif
mode
==
'
XST
'
:
var
=
obj
.
get_child
(
"
2:FPGA_xst_offload_enable_RW
"
)
def
set_beamlet_stream
(
obj
,
stream_on
=
False
):
var
=
obj
.
get_child
(
"
2:FPGA_beamlet_output_enable_RW
"
)
enable
=
[
bool
(
stream_on
)]
*
N_NODES
*
N_BEAMSETS
# enable offload
var
.
set_value
(
ua
.
Variant
(
value
=
list
(
enable
),
varianttype
=
ua
.
VariantType
.
Boolean
))
else
:
print
(
'
Wrong mode
"
{}
"'
.
format
(
mode
))
return
if
__name__
==
"
__main__
"
:
# Parse command line arguments
parser
=
argparse
.
ArgumentParser
(
description
=
"
opcua client command line argument parser
"
)
parser
.
add_argument
(
'
-n
'
,
'
--nodes
'
,
dest
=
'
nodes
'
,
type
=
int
,
nargs
=
'
+
'
,
help
=
"
nodes to use
"
)
parser
.
add_argument
(
'
-i
'
,
'
--info
'
,
dest
=
'
info
'
,
action
=
'
store_true
'
,
help
=
"
print point infor from server
"
)
parser
.
add_argument
(
'
-a
'
,
'
--all
'
,
dest
=
'
all_r
'
,
action
=
'
store_true
'
,
help
=
"
recv all*_R points and show time
"
)
parser
.
add_argument
(
'
--wg
'
,
dest
=
'
wg_mode
'
,
type
=
str
,
choices
=
[
'
OFF
'
,
'
XST
'
],
help
=
"
turn wg off/on for xst
"
)
parser
.
add_argument
(
'
--setup
'
,
dest
=
'
offload_setup
'
,
type
=
str
,
choices
=
[
'
SST
'
,
'
BST
'
,
'
XST
'
],
help
=
"
setup offload for selected mode
"
)
parser
.
add_argument
(
'
--stream
'
,
dest
=
'
stream
'
,
type
=
str
,
choices
=
[
'
OFF
'
,
'
SST
'
,
'
BST
'
,
'
XST
'
],
help
=
"
turn off/on selected offload stream
"
)
parser
.
add_argument
(
'
--host
'
,
dest
=
'
host
'
,
type
=
str
,
default
=
'
dop36
'
,
help
=
"
host to connect to
"
)
parser
.
add_argument
(
'
-n
'
,
'
--nodes
'
,
dest
=
'
nodes
'
,
type
=
str
,
help
=
"
nodes to use
"
)
parser
.
add_argument
(
'
-i
'
,
'
--inputs
'
,
dest
=
'
inputs
'
,
type
=
str
,
help
=
"
inputs to use
"
)
parser
.
add_argument
(
'
--info
'
,
dest
=
'
info
'
,
action
=
'
store_true
'
,
help
=
"
print point infor from server
"
)
parser
.
add_argument
(
'
--all
'
,
dest
=
'
all_r
'
,
action
=
'
store_true
'
,
help
=
"
recv all*_R points and show time
"
)
parser
.
add_argument
(
'
--wg
'
,
dest
=
'
wg_mode
'
,
type
=
str
,
choices
=
[
'
OFF
'
,
'
XST
'
,
'
SIN
'
],
help
=
"
turn wg off/on for xst
"
)
parser
.
add_argument
(
'
--freq
'
,
dest
=
'
wg_freq
'
,
type
=
float
,
default
=
1e6
,
help
=
"
set wg freq
"
)
parser
.
add_argument
(
'
--ampl
'
,
dest
=
'
wg_ampl
'
,
type
=
float
,
default
=
1.0
,
help
=
"
set wg ampl
"
)
parser
.
add_argument
(
'
--phase
'
,
dest
=
'
wg_phase
'
,
type
=
float
,
default
=
0.0
,
help
=
"
set wg phase
"
)
parser
.
add_argument
(
'
--setup
'
,
dest
=
'
offload_setup
'
,
type
=
str
,
choices
=
[
'
SST
'
,
'
BST
'
,
'
XST
'
,
'
BEAMLET
'
],
help
=
"
setup offload for selected mode
"
)
parser
.
add_argument
(
'
--stream
'
,
dest
=
'
stream
'
,
type
=
str
,
choices
=
[
'
OFF
'
,
'
SST
'
,
'
BST
'
,
'
XST
'
,
'
BEAMLET
'
],
help
=
"
turn off/on selected offload stream
"
)
parser
.
add_argument
(
'
--plot
'
,
dest
=
'
plot
'
,
type
=
str
,
choices
=
[
'
INP
'
,
'
HIST
'
],
help
=
"
plot selected type
"
)
parser
.
add_argument
(
'
--verbosity
'
,
default
=
'
INFO
'
,
help
=
"
stdout log level can be [ERROR | WARNING | INFO | DEBUG]
"
)
args
=
parser
.
parse_args
()
# print(args)
node_list
=
arg_str_to_list
(
args
.
nodes
)
if
args
.
nodes
else
range
(
16
)
input_list
=
arg_str_to_list
(
args
.
inputs
)
if
args
.
inputs
else
range
(
12
)
wg_freq
=
args
.
wg_freq
wg_ampl
=
args
.
wg_ampl
wg_phase
=
args
.
wg_phase
print
(
args
)
...
...
@@ -353,9 +480,8 @@ if __name__ == "__main__":
# logger = logging.getLogger("KeepAlive")
# logger.setLevel(logging.DEBUG)
host
=
'
dop36
'
port
=
4840
client
=
Client
(
"
opc.tcp://{}:{}/
"
.
format
(
host
,
port
))
client
=
Client
(
"
opc.tcp://{}:{}/
"
.
format
(
args
.
host
,
port
))
# client = Client("opc.tcp://LAPTOP-N0VQ3UDT:4840/")
# client = Client("opc.tcp://192.168.137.102:4840/")
...
...
@@ -375,7 +501,7 @@ if __name__ == "__main__":
while
clientRunning
:
try
:
client
.
connect
()
print
(
"
Connected to {}:{}
"
.
format
(
host
,
port
))
print
(
"
Connected to {}:{}
"
.
format
(
args
.
host
,
port
))
################
# this section contains some code about navigating around the address space
...
...
@@ -387,7 +513,7 @@ if __name__ == "__main__":
Object
=
client
.
get_objects_node
()
fpga_mask
=
get_fpga_mask
(
Object
)
# get active fpga_mask
write_fpga_mask
(
Object
,
nodes
=
args
.
nodes
)
write_fpga_mask
(
Object
,
nodes
=
node_list
)
if
args
.
info
:
root
=
client
.
get_root_node
()
...
...
@@ -402,13 +528,21 @@ if __name__ == "__main__":
children
=
Object
.
get_children
()
print
(
"
\t
children of Object:
"
,
children
)
for
i
in
range
(
len
(
children
)):
print
(
"
\t\t
child
"
,
i
,
"
:
"
,
children
[
i
].
get_browse_name
())
browse_name
=
children
[
i
].
get_browse_name
().
to_string
()
print
(
"
\t\t
child
"
,
i
,
"
:
"
,
browse_name
)
if
browse_name
in
[
'
0:Server
'
,
'
0:Aliases
'
]:
continue
print
(
"
\t\t
child
"
,
i
,
"
:
"
,
children
[
i
].
get_array_dimensions
())
print
(
"
\t\t
child
"
,
i
,
"
:
"
,
children
[
i
].
get_data_type_as_variant_type
())
print
(
"
\t\t
child
"
,
i
,
"
:
"
,
children
[
i
].
get_node_class
())
if
args
.
wg_mode
is
not
None
:
if
args
.
wg_mode
==
'
OFF
'
:
turn_wg_off
(
Object
)
elif
args
.
wg_mode
==
'
XST
'
:
setup_wg_xst_mode
(
Object
)
elif
args
.
wg_mode
==
'
SIN
'
:
setup_wg_sinus
(
Object
)
if
args
.
offload_setup
is
not
None
:
if
args
.
offload_setup
==
'
SST
'
:
...
...
@@ -417,25 +551,48 @@ if __name__ == "__main__":
setup_bst_stream
(
Object
)
elif
args
.
offload_setup
==
'
XST
'
:
setup_xst_stream
(
Object
)
elif
args
.
offload_setup
==
'
BEAMLET
'
:
setup_beamlet_stream
(
Object
)
if
args
.
stream
is
not
None
:
set_enable_stream
(
Object
,
mode
=
args
.
stream
)
if
args
.
stream
==
'
OFF
'
:
set_sst_stream
(
Object
,
False
)
set_bst_stream
(
Object
,
False
)
set_xst_stream
(
Object
,
False
)
set_beamlet_stream
(
Object
,
False
)
elif
args
.
stream
==
'
SST
'
:
set_sst_stream
(
Object
,
True
)
elif
args
.
stream
==
'
BST
'
:
set_bst_stream
(
Object
,
True
)
elif
args
.
stream
==
'
XST
'
:
set_xst_stream
(
Object
,
True
)
elif
args
.
stream
==
'
BEAMLET
'
:
set_beamlet_stream
(
Object
,
True
)
if
args
.
all_r
is
True
:
check_get_all_R_time
(
Object
)
if
args
.
plot
==
'
INP
'
:
plot_input_data
(
Object
,
nodes
=
node_list
,
inputs
=
input_list
)
if
args
.
plot
==
'
HIST
'
:
plot_histogram
(
Object
,
nodes
=
node_list
,
inputs
=
input_list
)
#write_xst_subband_select(Object)
#write_subband_weights(Object)
# ##############################
# TEST AREA (for new functions)
#write_subband_weights(Object)
#read_subband_weights(Object)
# END TEST AREA
# ##############################
write_fpga_mask
(
Object
,
mask
=
fpga_mask
)
# write back start mask
clientRunning
=
False
break
;
break
time
.
sleep
(
1
)
...
...
This diff is collapsed.
Click to expand it.
test/py/tools.py
0 → 100644
+
55
−
0
View file @
225b620d
import
string
def
arg_str_to_list
(
arg_str
,
sort
=
"
sort
"
):
"""
= Convert argument string to (sorted)list =
def arg_str_to_list(arg_str, [sort=
"
sort
"
])
- arg_str = argument string to convert
- sort =
"
sort
"
|
"
nosort
"
(optional)
arg_str_to_list(
"
0,1,2,3,10:13,9
"
) ==> [0,1,2,3,9,10,11,12,13]
arg_str_to_list(
"
0,1,2,3,10:13,9
"
,
"
nosort
"
) ==> [0,1,2,3,10,11,12,13,9]
"""
_arg_list
=
[]
_range
=
False
_nr_str
=
''
_last_nr
=
0
if
arg_str
:
for
ch
in
arg_str
:
if
ch
in
string
.
digits
or
ch
==
'
-
'
:
_nr_str
+=
ch
elif
ch
==
'
,
'
:
_nr
=
int
(
_nr_str
)
if
_range
:
for
i
in
range
(
_last_nr
,
_nr
+
1
,
1
):
_arg_list
.
append
(
i
)
_range
=
False
else
:
_arg_list
.
append
(
_nr
)
_last_nr
=
_nr
_nr_str
=
''
elif
ch
==
'
:
'
:
_nr
=
int
(
_nr_str
)
_last_nr
=
_nr
_nr_str
=
''
_range
=
True
else
:
pass
# process last number
_nr
=
int
(
_nr_str
)
if
_range
:
for
i
in
range
(
_last_nr
,
_nr
+
1
,
1
):
_arg_list
.
append
(
i
)
_range
=
False
else
:
_arg_list
.
append
(
_nr
)
# return sorted or unsorted
if
sort
==
"
sort
"
:
return
sorted
(
_arg_list
)
return
_arg_list
\ No newline at end of file
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment