Skip to content
Snippets Groups Projects
Commit 8eb97a9c authored by Paulus Kruger's avatar Paulus Kruger
Browse files

pitr added

parent 9ec0408f
No related branches found
No related tags found
No related merge requests found
Pipeline #69660 failed
[Unit]
Description=pi translator
After=multi-user.target
#After=network-online.target
[Service]
Type=simple
ExecStart=pitr
[Install]
WantedBy=multi-user.target
#!/bin/bash
sudo cp bin/*.service /lib/systemd/system/.
sudo systemctl daemon-reload
sudo systemctl enable recvtr.service
sudo systemctl enable apscttr.service
sudo systemctl enable apsputr.service
sudo systemctl enable unb2tr.service
sudo systemctl enable pitr.service
#sudo systemctl enable recvtr.service
#sudo systemctl enable apscttr.service
#sudo systemctl enable apsputr.service
#sudo systemctl enable unb2tr.service
#!/bin/bash
sudo cp bin/*.service /lib/systemd/system/.
sudo systemctl daemon-reload
sudo systemctl enable pitr.service
sudo systemctl enable ccdtr.service
IP_CCD= '10.99.250.90'
IP_APSCT_TEST='10.99.100.100'
#IP_APSCT_subrack='10.99.x.100'
import logging
import RPi.GPIO as GPIO
import subprocess
def get_LMP_ID():
pins=[21,20,16,12,7,8]
Npins=len(pins);
GPIO.setmode(GPIO.BCM)
for i,pin in enumerate(pins):
GPIO.setup(pin,GPIO.IN)
value=0;
for pin in pins:
value=2*value+1 if GPIO.input(pin) else 2*value
return value
def get_eth0_ip_ifconfig():
result=subprocess.run(['ifconfig','eth0'],stdout=subprocess.PIPE).stdout.decode()
result=result.split('\n')[1].split()
# print(result)
if result[0]=='inet': return result[1]
return None
def get_eth0_ip():
result=subprocess.run(['grep','static ip_address=10.99','/etc/dhcpcd.conf'],stdout=subprocess.PIPE).stdout.decode()
for line in result.split('\n'):
line=line.split(' ')
if (len(line)==0) or not(line[0]=='static'): continue
line=line[1].split('=');
if len(line)<2: continue
line=line[1].split('/')[0]
return line
# if result[0]=='inet': return result[1]
return None
def replace_IP(old,new):
s="sudo sed -i ""s/%s/%s/g"" /etc/dhcpcd.conf" % (old,new)
logging.info(s)
subprocess.run(s.split(' '))
result=subprocess.run(['grep',new,'/etc/dhcpcd.conf'],stdout=subprocess.PIPE).stdout.decode()
if len(result.split('\n'))!=2:
logging.error("IP replacement error")
return
logging.info("Restart eth0")
subprocess.run(['sudo','ifconfig','eth0','down'])
# sleep(1)
subprocess.run(['sudo','ifconfig','eth0','up'])
def check_ip():
ip_current=get_eth0_ip()
logging.info("current IP="+str(ip_current))
ID=get_LMP_ID()
logging.info("LMP id="+hex(ID))
ip_new=IP_CCD #default (CCD)
if ID==0x3F: #APSCT in test setup
ip_new=IP_APSCT_TEST
logging.info("APSCT test setup, ip=%s"%ip_new)
if ID in [0,1,2,3]: #APSCT in subrack
ip_new='10.99.%i.100'%ID
logging.info("APSCT subrack, ip=%s"%ip_new)
if (ip_new!=ip_current) and not(ip_current is None):
logging.warning("Change IP to %s"%ip_new)
replace_IP(ip_current,ip_new)
#print(get_value());
if __name__=='__main__':
logging.getLogger().setLevel('INFO')
check_ip()
version: "1.0"
description: "1234"
name: "APSCTTR"
drivers:
- name: I2C
......
version: "0.0"
description: "UNB2 DTS first draft"
name: "UNB2TR"
drivers:
- name: I2C1
......
......@@ -4,11 +4,16 @@ import subprocess
import signal
import logging
from queue import Queue
from pypcc.check_ip import check_ip
port=4899
logging.basicConfig(encoding='utf-8', level=logging.INFO)
if True:
check_ip()
#stop program neatly when stopped
#global running
#running=True;
......@@ -97,6 +102,11 @@ def EEPROM_code_changed(value):
# except:
# logging.error("Get ID failed")
def update_temperature():
temp=subprocess.run(['/usr/bin/vcgencmd','measure_temp'],stdout=subprocess.PIPE).stdout.decode()
# print(temp[5:-3])
pi_temp.set_value(float(temp[5:-3]))
if True:
# global server,running,PCCobj,DEBUGobj,idx,sub;
......@@ -142,11 +152,19 @@ if True:
EEPROM_version_new = obj.add_variable(idx, 'pytr_EEPROM_version_RW', 'None')
EEPROM_version_new.set_writable()
pi_temp = obj.add_variable(idx, 'pytr_pi_temperature_R', 0.0)
update_temperature()
# logging.info("Add variables:")
nodeid=0
while nodeid!=-1:
nodeid=datachanged.get()
try:
nodeid=datachanged.get(timeout=10)
except:
update_temperature()
continue
if (nodeid==EEPROM_code.nodeid.Identifier): EEPROM_code_changed(EEPROM_code.get_value())
# sleep(10)
#P1.GetVarNames("",AddVar);
server.stop()
......
from test_common import *
connect("opc.tcp://localhost:4899/")
#set_value("pytr_translator_select","apscttr")
#callmethod("pytr_start")
#callmethod("pytr_stop")
set_value("pytr_EEPROM_passcode_RW","4899")
disconnect();
from test_common import *
from time import sleep
connect("opc.tcp://localhost:4843/")
names=get_all_variables()
for name in names:
att=get_value(name)
print(name,'=',att)
disconnect();
from test_common import *
from time import sleep
connect("opc.tcp://localhost:4841/")
names=get_all_variables()
for name in names:
att=get_value(name)
print(name,'=',att)
disconnect();
......@@ -25,6 +25,7 @@ where=.
[options.entry_points]
console_scripts =
hwtr = pypcc.pypcc
pitr = pypcc.pitr
[options.package_data]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment