Skip to content
GitLab
Explore
Sign in
Register
Primary navigation
Search or go to…
Project
P
PyPCC
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Iterations
Wiki
Requirements
Jira
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Build
Pipelines
Jobs
Pipeline schedules
Test cases
Artifacts
Deploy
Releases
Package registry
Container registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Code review analytics
Issue analytics
Insights
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
LOFAR2.0
PyPCC
Merge requests
!11
Pypcc2
Code
Review changes
Check out branch
Download
Patches
Plain diff
Merged
Pypcc2
pypcc2
into
master
Overview
0
Commits
34
Pipelines
0
Changes
52
Merged
Paulus Kruger
requested to merge
pypcc2
into
master
4 years ago
Overview
0
Commits
34
Pipelines
0
Changes
52
Expand
OPC-UA server and i2c drivers running as seperare processes. Use YAML config file
0
0
Merge request reports
Compare
master
master (base)
and
latest version
latest version
604840c4
34 commits,
4 years ago
52 files
+
3123
−
1910
Inline
Compare changes
Side-by-side
Inline
Show whitespace changes
Show one file at a time
Files
52
Search (e.g. *.vue) (Ctrl+P)
clk/CLK.py deleted
100644 → 0
+
0
−
414
Options
from
.
import
Vars
import
numpy
as
np
import
logging
from
.spibitbang2
import
*
import
threading
def
ApplyMask
(
value
,
width
=
8
,
bitoffset
=
0
,
previous
=
0
):
mask
=
(
1
<<
width
)
-
1
if
bitoffset
>
0
:
value
<<=
bitoffset
;
mask
<<=
bitoffset
;
return
(
value
&
mask
)
+
(
previous
-
(
previous
&
mask
));
def
UnMask
(
value
,
width
=
8
,
bitoffset
=
0
):
mask
=
(
1
<<
width
)
-
1
if
bitoffset
>
0
:
value
>>=
bitoffset
;
value
=
value
&
mask
;
return
value
;
def
bytes2int
(
bts
):
x
=
0
;
for
b
in
bts
:
x
=
x
*
256
+
b
;
return
x
;
def
int2bytes
(
i
):
b
=
[];
while
i
>
255
:
b
=
[
i
%
256
]
+
b
;
i
>>=
8
;
return
[
i
]
+
b
;
def
strs2bytes
(
var
):
# print("str2bytes",var)
if
len
(
var
)
==
0
:
return
var
;
if
isinstance
(
var
[
0
],
str
):
#make string a byte array
# return [ord(c.encode('utf-8')[0]) for s in var for c in s]
return
[
c
.
encode
(
'
utf-8
'
)[
0
]
for
s
in
var
for
c
in
s
]
return
var
def
bytes2strs
(
var
,
step
,
dtype
):
if
not
(
dtype
==
Vars
.
datatype
.
dstring
):
return
var
cnt
=
int
(
len
(
var
)
/
step
)
print
(
var
)
return
[(
bytearray
(
var
[
i
*
step
:(
i
+
1
)
*
step
]).
decode
(
"
utf-8
"
))
for
i
in
range
(
cnt
)]
class
RCU1
():
def
__init__
(
self
,
number
,
I2Ccallback
,
Switchcallback
):
self
.
N
=
number
;
self
.
I2Ccallback
=
I2Ccallback
self
.
SWcallback
=
Switchcallback
self
.
previous
=
np
.
zeros
([
number
,
Vars
.
RCU_storeReg
],
dtype
=
'
int
'
)
self
.
previousHBA
=
np
.
zeros
([
number
,
3
,
32
],
dtype
=
'
int
'
)
def
load
(
self
):
Inst1
=
Vars
.
Instr
(
Vars
.
DevType
.
Instr
,
Vars
.
RCU_init
,
0
,[])
#Read the current status of GPIO IOs
self
.
SetVar
(
Inst1
)
#Vars.RCU_mask.OPCW.get_data_value().Value.Value=[1,1,0,0]
#Vars.Ant_mask.OPCW.get_data_value().Value.Value=[1,1,0,0,0,0,0,0,0,0,1,0]
#Inst1=Vars.Instr(Vars.DevType.Instr,Vars.RCU_init,0,[]) #Read the current status of GPIO IOs
#RCU.SetVar(Inst1)
#Inst1=Vars.Instr(Vars.DevType.Var,Vars.RCU_att,12,[0,1,2,3,4,5,6,7,8,9,11])
#RCU.SetVar(Inst1)
#Inst1=Vars.Instr(Vars.DevType.Instr,Vars.RCU_off,0,[])
#RCU.SetVar(Inst1)
#Inst1=Vars.Instr(Vars.DevType.Instr,Vars.RCU_on,0,[])
#RCU.SetVar(Inst1)
#Inst1=Vars.Instr(Vars.DevType.Instr,Vars.ADC1_on,0,[])
#RCU.SetVar(Inst1)
#print(Vars.RCU)
def
SetVar
(
self
,
Instr
,
Mask
=
[]):
# Instr.value=strs2bytes(Instr.value) #Alwast be an array of bytes
if
(
self
.
N
==
1
):
Mask
=
[
True
]
if
Instr
.
type
==
Vars
.
DevType
.
Instr
:
#Execute instructions
Iset
=
Instr
.
dev
;
if
len
(
Mask
)
==
0
:
Mask
=
Vars
.
RCU_mask
.
OPCW
.
get_value
()
for
i
in
Iset
.
instr
:
logging
.
debug
(
str
((
"
Inst
"
,
i
)))
self
.
SetVar
(
i
,
Mask
=
Mask
)
return
;
if
Instr
.
type
in
[
Vars
.
DevType
.
I2C
,
Vars
.
DevType
.
SPIbb
,
Vars
.
DevType
.
I2Cbb
]:
if
len
(
Mask
)
==
0
:
Mask
=
Vars
.
RCU_mask
.
OPCW
.
get_value
()
mask
=
0
;
RCU0
=-
1
;
for
RCUi
in
range
(
self
.
N
):
if
(
Mask
[
RCUi
]):
mask
|=
1
<<
Vars
.
RCU_MPaddr
.
Switch
[
RCUi
]
if
RCU0
<
0
:
RCU0
=
RCUi
;
if
RCU0
<
0
:
return
;
#Mask all zero
self
.
SWcallback
(
mask
)
if
Instr
.
type
==
Vars
.
DevType
.
I2C
:
logging
.
info
(
str
((
'
** Set I2C:
'
,
Instr
.
dev
,
Instr
.
value
)))
self
.
SetI2C
(
RCU0
,
Instr
.
dev
,
8
,
0
,
strs2bytes
(
Instr
.
value
))
return
;
elif
Instr
.
type
==
Vars
.
DevType
.
SPIbb
:
logging
.
debug
(
str
((
'
** Set SPIbb:
'
,
Instr
.
dev
,
Instr
.
value
)))
SetSPIbb
(
self
.
SetI2C
,
RCU0
,
Instr
.
dev
,
strs2bytes
(
Instr
.
value
))
return
;
elif
Instr
.
type
==
Vars
.
DevType
.
I2Cbb
:
logging
.
info
(
str
((
'
** Set I2Cbb:
'
,
Instr
.
dev
,
Instr
.
value
)))
# self.SetI2C(RCUi,Instr.dev,8,0,strs2bytes(Instr.value))
return
;
V1
=
Instr
.
dev
if
not
((
Instr
.
nvalue
==
V1
.
nVars
)
and
(
Instr
.
type
==
Vars
.
DevType
.
VarUpdate
))
and
not
(
Instr
.
nvalue
==
V1
.
size
*
self
.
N
):
logging
.
error
(
"
Wrong size of value
"
)
return
False
if
V1
.
Vars
[
0
].
type
==
Vars
.
DevType
.
Internal
:
Instr
.
dev
.
OPCW
.
set_value
(
Instr
.
value
)
logging
.
debug
(
"
Update internal variable
"
)
return
# if V1.Vars[0].type==Vars.DevType.Internal: return;
Step
=
V1
.
nVars
Step2
=
int
(
V1
.
size
/
V1
.
nVars
)
if
(
self
.
N
>
1
):
Mask
=
(
Vars
.
RCU_mask
if
Step
==
1
else
Vars
.
Ant_mask
)
Mask
=
Mask
.
OPCW
.
get_value
()
value1
=
strs2bytes
(
Instr
.
value
)
if
V1
.
OPCR
is
None
else
strs2bytes
(
V1
.
OPCR
.
get_value
())
if
(
len
(
value1
)
==
V1
.
nVars
)
and
(
self
.
N
>
1
):
value1
=
(
value1
*
self
.
N
);
if
(
Instr
.
type
==
Vars
.
DevType
.
Var
):
logging
.
info
(
str
((
'
** Set Var:
'
,
V1
.
name
,
value1
)))
for
RCUi
in
range
(
self
.
N
):
for
Vari
in
range
(
Step
):
if
not
(
Mask
[
RCUi
*
Step
+
Vari
]):
continue
i0
=
(
RCUi
*
Step
+
Vari
)
*
Step2
i1
=
(
RCUi
*
Step
+
(
Vari
+
1
))
*
Step2
self
.
SetVarValue
(
RCUi
,
V1
.
Vars
[
Vari
],
Instr
.
value
[
i0
:
i1
])
value2
=
value1
[
i0
:
i1
]
self
.
GetVarValue
(
RCUi
,
V1
.
Vars
[
Vari
],
value2
)
value1
[
i0
:
i1
]
=
value2
if
not
(
V1
.
OPCR
is
None
):
V1
.
OPCR
.
set_value
(
bytes2strs
(
value1
,
Step2
,
V1
.
type
))
elif
Instr
.
type
==
Vars
.
DevType
.
VarUpdate
:
if
self
.
N
==
1
:
for
Vari
in
range
(
Step
):
i0
=
(
Vari
)
*
Step2
i1
=
(
Vari
+
1
)
*
Step2
value2
=
value1
[
i0
:
i1
]
self
.
GetVarValue
(
0
,
V1
.
Vars
[
Vari
],
value2
)
value1
[
i0
:
i1
]
=
value2
if
not
(
V1
.
OPCR
is
None
):
V1
.
OPCR
.
set_value
(
bytes2strs
(
value1
,
Step2
,
V1
.
type
))
else
:
self
.
GetVarValueAll
(
V1
,
value1
)
# if not(V1.OPCR is None): V1.OPCR.get_data_value().Value.Value=bytes2strs(value1,Step2,V1.type)
if
not
(
V1
.
OPCR
is
None
):
V1
.
OPCR
.
set_value
(
bytes2strs
(
value1
,
Step2
,
V1
.
type
))
# V1.OPCR.get_data_value().Value.Value=value1
logging
.
info
(
str
((
'
** Readback:
'
,
V1
.
name
,
value1
)))
def
GetVarValue
(
self
,
RCUi
,
var
,
value
):
logging
.
info
(
str
((
"
RCU1 Get
"
,
RCUi
,
var
,
value
)))
if
var
.
type
==
Vars
.
DevType
.
I2C
:
self
.
SWcallback
(
1
<<
Vars
.
RCU_MPaddr
.
Switch
[
RCUi
])
self
.
GetI2C
(
RCUi
,
var
.
devreg
,
var
.
width
,
var
.
bitoffset
,
value
)
elif
var
.
type
==
Vars
.
DevType
.
HBA1
:
self
.
SWcallback
(
1
<<
Vars
.
RCU_MPaddr
.
Switch
[
RCUi
])
self
.
GetI2CHBA1
(
RCUi
,
var
.
devreg
,
var
.
width
,
var
.
bitoffset
,
value
)
elif
var
.
type
==
Vars
.
DevType
.
I2Cbb
:
logging
.
error
(
"
I2Cbb Implemented
"
)
elif
var
.
type
==
Vars
.
DevType
.
SPIbb
:
self
.
SWcallback
(
1
<<
Vars
.
RCU_MPaddr
.
Switch
[
RCUi
])
GetSPIbb
(
self
.
SetI2C
,
self
.
GetI2C
,
RCUi
,
var
.
devreg
,
value
)
else
:
logging
.
error
(
"
Not Implemented
"
)
def
GetVarValueAll
(
self
,
V1
,
value1
):
mask
=
0
;
for
RCUi
in
range
(
self
.
N
):
mask
|=
1
<<
Vars
.
RCU_MPaddr
.
Switch
[
RCUi
]
Step
=
V1
.
nVars
Step2
=
int
(
V1
.
size
/
V1
.
nVars
)
if
V1
.
Vars
[
0
].
type
==
Vars
.
DevType
.
I2C
:
for
Vari
in
range
(
Step
):
DevReg
=
V1
.
Vars
[
Vari
].
devreg
self
.
SWcallback
(
mask
)
self
.
SetI2CAddr
(
self
,
DevReg
)
if
DevReg
.
Register_R
>
255
:
self
.
I2Ccallback
(
DevReg
.
Addr
,[
250
],
read
=
3
)
#Wait for ADC
for
RCUi
in
range
(
self
.
N
):
self
.
SWcallback
(
1
<<
Vars
.
RCU_MPaddr
.
Switch
[
RCUi
])
i0
=
(
RCUi
*
Step
+
Vari
)
*
Step2
i1
=
(
RCUi
*
Step
+
(
Vari
+
1
))
*
Step2
value2
=
value1
[
i0
:
i1
]
var
=
V1
.
Vars
[
Vari
]
# print(Step,Step2,i0,i1,value2,len(value1))
self
.
GetI2Cnoreg
(
RCUi
,
var
.
devreg
,
var
.
width
,
var
.
bitoffset
,
value2
)
# print(value2)
if
(
var
.
Scale
!=
0
):
value2
[
0
]
*=
var
.
Scale
;
value1
[
i0
:
i1
]
=
value2
elif
V1
.
Vars
[
0
].
type
==
Vars
.
DevType
.
SPIbb
:
self
.
GetBBValueAll
(
V1
,
value1
,
mask
)
# logging.info("SPIbb all not implemented yet")
elif
V1
.
Vars
[
0
].
type
==
Vars
.
DevType
.
HBA1
:
for
Vari
in
range
(
Step
):
var
=
V1
.
Vars
[
Vari
]
for
RCUi
in
range
(
self
.
N
):
self
.
SWcallback
(
1
<<
Vars
.
RCU_MPaddr
.
Switch
[
RCUi
])
# print(Step,Step2,len(value1),V1.size)
i0
=
(
RCUi
*
Step
+
Vari
)
*
Step2
i1
=
(
RCUi
*
Step
+
(
Vari
+
1
))
*
Step2
value2
=
value1
[
i0
:
i1
]
self
.
GetI2CHBA1
(
RCUi
,
var
.
devreg
,
var
.
width
,
var
.
bitoffset
,
value2
)
value1
[
i0
:
i1
]
=
value2
# i0=(RCUi*Step+ Vari)*Step2+16
# i1=(RCUi*Step+(Vari+1))*Step2
# value2=value1[i0:i1]
# self.GetI2Cnoreg(RCUi,var.devreg,var.width,var.bitoffset,value2)
# value1[i0:i1]=value2
else
:
logging
.
error
(
"
Type not implemented
"
)
# print(value1)
def
GetBBValueAll
(
self
,
V1
,
value1
,
mask
):
def
SetBit
(
RCUi
,
dev
,
width
,
bitoffset
,
value
,
buffer
=
False
):
if
not
(
buffer
):
self
.
SWcallback
(
mask
)
self
.
SetI2C
(
RCUi
,
dev
,
width
,
bitoffset
,
value
,
buffer
=
buffer
)
def
GetBit
(
RCUixx
,
dev
,
width
,
bitoffset
,
buffer
=
False
):
value
=
[
0
for
RCUi
in
range
(
self
.
N
)]
value2
=
[
0
]
if
buffer
:
for
RCUi
in
range
(
self
.
N
):
self
.
GetI2Cbuffer
(
RCUi
,
dev
,
width
,
bitoffset
,
value2
)
value
[
RCUi
]
=
value2
[
0
]
return
value
self
.
SWcallback
(
mask
)
self
.
SetI2CAddr
(
self
,
dev
)
for
RCUi
in
range
(
self
.
N
):
self
.
SWcallback
(
1
<<
Vars
.
RCU_MPaddr
.
Switch
[
RCUi
])
#for i in range(len(value2)): value2[i]*=0;
value2
[
0
]
=
0
self
.
GetI2Cnoreg
(
RCUi
,
dev
,
width
,
bitoffset
,
value2
)
value
[
RCUi
]
=
value2
[
0
]
return
value
;
devs
=
[
V1
.
Vars
[
Vari
].
devreg
for
Vari
in
range
(
V1
.
nVars
)]
GetSPIbb2
(
SetBit
,
GetBit
,
0
,
devs
,
value1
)
def
SetVarValue
(
self
,
RCUi
,
var
,
value
):
if
var
.
devreg
.
Register_W
==-
1
:
return
True
;
#We can not set it, only read it e.g. temperature
logging
.
debug
(
str
((
"
RCU1 Set
"
,
RCUi
,
var
,
value
)))
if
var
.
type
==
Vars
.
DevType
.
I2C
:
self
.
SWcallback
(
1
<<
Vars
.
RCU_MPaddr
.
Switch
[
RCUi
])
self
.
SetI2C
(
RCUi
,
var
.
devreg
,
var
.
width
,
var
.
bitoffset
,
value
)
elif
var
.
type
==
Vars
.
DevType
.
HBA1
:
self
.
SWcallback
(
1
<<
Vars
.
RCU_MPaddr
.
Switch
[
RCUi
])
self
.
SetHBA1I2C
(
RCUi
,
var
.
devreg
,
var
.
width
,
var
.
bitoffset
,
value
)
# print("RCU1 Set",RCUi,var,value)
elif
var
.
type
==
Vars
.
DevType
.
I2Cbb
:
logging
.
error
(
"
I2Cbb Implemented
"
)
elif
var
.
type
==
Vars
.
DevType
.
SPIbb
:
logging
.
error
(
"
SPIbb Implemented
"
)
else
:
logging
.
error
(
"
Not Implemented
"
)
def
SetI2C
(
self
,
RCUi
,
dev
,
width
,
bitoffset
,
value
,
buffer
=
False
):
if
dev
.
store
>
0
:
previous
=
self
.
previous
[
RCUi
,
dev
.
store
-
1
];
value
[
0
]
=
ApplyMask
(
value
[
0
],
width
,
bitoffset
,
previous
);
self
.
previous
[
RCUi
,
dev
.
store
-
1
]
=
value
[
0
]
# logging.debug(str(('masked to',value)))
# logging.debug(str(("Store buffer",RCUi,dev.store,value[0])))
if
buffer
:
return
True
;
return
self
.
I2Ccallback
(
dev
.
Addr
,
value
,
reg
=
dev
.
Register_W
)
def
SetHBA1I2C
(
self
,
RCUi
,
dev
,
width
,
bitoffset
,
value
,
buffer
=
False
):
# if dev.store>0:
previous
=
self
.
previousHBA
[
RCUi
,
dev
.
store
-
1
];
L
=
len
(
value
)
for
i
in
range
(
L
):
value
[
i
]
=
ApplyMask
(
value
[
i
],
width
,
bitoffset
,
previous
[
i
]);
self
.
previousHBA
[
RCUi
,
dev
.
store
-
1
]
=
value
# if buffer: return True;
self
.
I2Ccallback
(
dev
.
Addr
,
value
[:
16
],
reg
=
dev
.
Register_W
)
if
L
>
16
:
self
.
I2Ccallback
(
dev
.
Addr
,
value
[
16
:],
reg
=
dev
.
Register_W
+
16
)
self
.
I2Ccallback
(
dev
.
Addr
,[
500
],
reg
=
dev
.
Register_W
,
read
=
3
)
#Wait 500ms
return
True
# return self.I2Ccallback(dev.Addr,value,reg=dev.Register_W)
def
GetI2Cbuffer
(
self
,
RCUi
,
dev
,
width
,
bitoffset
,
value
):
if
not
(
dev
.
store
>
0
):
return
False
;
value
[
0
]
=
self
.
previous
[
RCUi
,
dev
.
store
-
1
];
# logging.debug(str(("GetI2Cbuffer",RCUi,dev.store,value)))
l1
=
int
(
np
.
floor
((
width
+
bitoffset
+
7
)
/
8
))
if
(
width
!=
l1
*
8
)
or
(
bitoffset
>
0
):
for
i
in
range
(
len
(
value
)):
value
[
i
]
=
UnMask
(
value
[
i
],
width
,
bitoffset
)
return
True
def
GetI2C
(
self
,
RCUi
,
dev
,
width
,
bitoffset
,
value
):
# if dev.store>0:
# value[0]=self.previous[RCUi,dev.store-1]
# return True
l1
=
int
(
np
.
floor
((
width
+
bitoffset
+
7
)
/
8
))
# print(width,bitoffset,l1)
makesinglevalue
=
((
len
(
value
)
==
1
)
and
(
l1
>
1
));
if
makesinglevalue
:
value2
=
[
0
for
x
in
range
(
l1
)]
else
:
value2
=
value
reg
=
dev
.
Register_R
if
reg
>
255
:
#This is for the monitor ADC
if
not
(
self
.
I2Ccallback
(
dev
.
Addr
,
int2bytes
(
reg
),
read
=
2
)):
return
False
;
I2Ccallback
(
Addr
,[
250
],
read
=
3
)
if
not
(
self
.
I2Ccallback
(
dev
.
Addr
,
value2
,
read
=
1
)):
return
False
;
else
:
if
not
(
self
.
I2Ccallback
(
dev
.
Addr
,
value2
,
reg
=
reg
,
read
=
1
)):
return
False
;
if
value2
[
0
]
is
None
:
return
False
if
makesinglevalue
:
value
[
0
]
=
bytes2int
(
value2
)
else
:
value
[:]
=
value2
[:];
if
dev
.
store
>
0
:
self
.
previous
[
RCUi
,
dev
.
store
-
1
]
=
value
[
0
]
# logging.debug(str(("Store buffer",RCUi,dev.store,value[0])))
if
(
width
!=
l1
*
8
)
or
(
bitoffset
>
0
):
for
i
in
range
(
len
(
value
)):
value
[
i
]
=
UnMask
(
value
[
i
],
width
,
bitoffset
)
else
:
value
[
0
]
=
value2
[
0
]
return
True
;
def
GetI2Cbit
(
self
,
RCUi
,
dev
,
pin
):
value2
=
[
value
]
self
.
I2CGet
(
RCUi
,
dev
,
1
,
1
<<
pin
,
value2
)
return
value2
[
0
]
def
SetI2CAddr
(
self
,
RCUi
,
dev
):
return
self
.
I2Ccallback
(
dev
.
Addr
,
int2bytes
(
dev
.
Register_R
),
read
=
2
)
def
GetI2Cnoreg
(
self
,
RCUi
,
dev
,
width
,
bitoffset
,
value
):
#print(width,len(value))
l1
=
int
(
np
.
floor
((
width
+
bitoffset
+
7
)
/
8
))
makesinglevalue
=
((
len
(
value
)
==
1
)
and
(
l1
>
1
));
if
makesinglevalue
:
value2
=
[
0
for
x
in
range
(
l1
)]
else
:
value2
=
value
reg
=
dev
.
Register_R
if
not
(
self
.
I2Ccallback
(
dev
.
Addr
,
value2
,
read
=
1
)):
return
False
;
if
value2
[
0
]
is
None
:
return
False
if
makesinglevalue
:
value
[
0
]
=
bytes2int
(
value2
)
else
:
value
[:]
=
value2
[:];
if
dev
.
store
>
0
:
self
.
previous
[
RCUi
,
dev
.
store
-
1
]
=
value
[
0
]
# logging.debug(str(("Store buffer",RCUi,dev.store,value[0])))
# if width<8:
if
(
width
!=
l1
*
8
)
or
(
bitoffset
>
0
):
for
i
in
range
(
len
(
value
)):
value
[
i
]
=
UnMask
(
value
[
i
],
width
,
bitoffset
)
# value[0]=UnMask(value[0],width,bitoffset)
#else: value[0]=value2[0]
#if (len(value)>1) and (width<8): print value
return
True
;
def
GetI2CHBA1
(
self
,
RCUi
,
dev
,
width
,
bitoffset
,
value
):
#print(width,len(value))
value2
=
value
reg
=
dev
.
Register_R
if
not
(
self
.
I2Ccallback
(
dev
.
Addr
,
value2
,
read
=
1
)):
return
False
;
if
value2
[
0
]
is
None
:
return
False
value
[:]
=
value2
[:];
if
dev
.
store
>
0
:
self
.
previousHBA
[
RCUi
,
dev
.
store
-
1
]
=
value
for
i
in
range
(
len
(
value
)):
value
[
i
]
=
UnMask
(
value
[
i
],
width
,
bitoffset
)
return
True
;
def
start
(
self
,
Q1
):
def
RCUthread
(
Q1
):
while
True
:
item
=
Q1
.
get
()
if
item
is
None
:
break
;
self
.
SetVar
(
item
)
logging
.
info
(
"
End RCU thread
"
)
RCUthread1
=
threading
.
Thread
(
target
=
RCUthread
,
args
=
(
Q1
,))
RCUthread1
.
start
()
return
RCUthread1
def
Queue_Monitor
(
self
,
Q1
,
NRCU
):
# Inst1=Vars.Instr(Vars.DevType.VarUpdate,Vars.RCU_temp,NRCU,[0]*NRCU)
# Q1.put(Inst1)
# Inst1=Vars.Instr(Vars.DevType.VarUpdate,Vars.RCU_ADC_lock,96,[0]*96)
# Q1.put(Inst1)
return
def
AddVars
(
self
,
Q1
,
AddVarR
,
AddVarW
):
for
v
in
Vars
.
OPC_devvars
:
dim1
=
Vars
.
RCU_MPaddr
.
nI2C
*
Vars
.
RCU_MPaddr
.
nSwitch
*
v
.
nVars
dim2
=
Vars
.
RCU_MPaddr
.
nI2C
*
Vars
.
RCU_MPaddr
.
nSwitch
*
v
.
size
dim3
=
int
(
v
.
size
/
v
.
nVars
)
#print(v.name,dim)
varvalue2
=
0
if
v
.
type
==
Vars
.
datatype
.
dInt
:
varvalue2
=
dim2
*
[
0
]
elif
v
.
type
==
Vars
.
datatype
.
dbool
:
varvalue2
=
dim2
*
[
False
]
elif
v
.
type
==
Vars
.
datatype
.
dfloat
:
varvalue2
=
dim2
*
[
0.0
]
elif
v
.
type
==
Vars
.
datatype
.
dstring
:
varvalue2
=
dim1
*
[
"
"
*
dim3
]
print
(
len
(
varvalue2
),
varvalue2
)
if
v
.
RW
in
[
Vars
.
RW
.
ReadOnly
,
Vars
.
RW
.
ReadWrite
]:
var1
=
AddVarR
(
v
.
name
+
"
_R
"
,
varvalue2
,
v
)
# print(len(varvalue1),len(varvalue2),v.size,dim2)
v
.
OPCR
=
var1
Inst
=
Vars
.
Instr
(
Vars
.
DevType
.
VarUpdate
,
v
,
dim2
,
varvalue2
)
Q1
.
put
(
Inst
)
if
v
.
RW
in
[
Vars
.
RW
.
WriteOnly
,
Vars
.
RW
.
ReadWrite
]:
var1
=
AddVarW
(
v
.
name
+
"
_RW
"
,
varvalue2
,
v
,
Q1
)
v
.
OPCW
=
var1
def
AddMethod
(
self
,
Q1
,
Addmethod
):
for
v
in
Vars
.
OPC_methods
:
Inst1
=
Vars
.
Instr
(
Vars
.
DevType
.
Instr
,
v
,
0
,[])
Addmethod
(
v
.
name
,
Inst1
,
Q1
)
Loading