123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354 |
- #!/usr/bin/env python
- # -*- encoding: utf-8 -*-
- '''
- @文件 :bms_pcs_ctl.py
- @时间 :2022/02/21 16:56:26
- @作者 :None
- @版本 :1.0
- @说明 :主页面 CTL
- '''
- from sys import argv
- from utils.globalvar import SD
- from utils.qt import QApplication, QThread, QTimer
- from controller.pcs_status_ctl import PcsStatusControll
- from widget.pcs_home import Win_Pcs_Home
- from worker.pcs_work import PcsCanReceived, PcsComWork
- import serial.tools.list_ports
- import utils.modbus.defines as cst
- class PcsControll:
- def __init__(self):
- self._app = QApplication(argv)
- self._view = Win_Pcs_Home()
- self.init()
- def init(self):
- self.port_check()
- self.pcs_status = PcsStatusControll()
- self._view.setStatusBar(self.pcs_status._view.statusbar)
- self._view.interface_signal.connect(self.interface_chose)
- self._view.connect_signal.connect(self.pcs_connect)
- self._view.disconnect_signal.connect(self.pcs_disconnect)
- self._view.btn_all_on.clicked.connect(self._relay_all_on)
- self._view.btn_all_off.clicked.connect(self._relay_all_off)
- self._view.btn_cls_on.clicked.connect(self._relay_on)
- self._view.btn_cls_off.clicked.connect(self._relay_off)
- # self.timer = QTimer()
- # self.timer.timeout.connect(self.com_work)
- def _relay_all_on(self):
- if SD.CAN_ON_OFF:
- if self._view.cb_interface.currentIndex() == 0:
- SD.CAN_CONTROL.send(0x182B003F, [1, 0, 0, 0, 0, 0, 0, 0], extern_flag=True)
- elif self._view.cb_interface.currentIndex() == 1:
- SD.COM_CONTROL.send(1, cst.WRITE_MULTIPLE_REGISTERS, 0x1900, 0x01)
- else:
- return
- def _relay_all_off(self):
- if SD.CAN_ON_OFF:
- if self._view.cb_interface.currentIndex() == 0:
- SD.CAN_CONTROL.send(0x182B003F, [0, 0, 0, 0, 0, 0, 0, 0], extern_flag=True)
- elif self._view.cb_interface.currentIndex() == 1:
- SD.COM_CONTROL.send(1, cst.WRITE_MULTIPLE_REGISTERS, 0x1900, 0x00)
- else:
- return
- def _relay_on(self):
- if SD.CAN_ON_OFF:
- if self._view.cb_interface.currentIndex() == 0:
- SD.CAN_CONTROL.send(0x182B003F, [0xff, 1, self._view.cb_cls.currentIndex() + 1, 0, 0, 0, 0, 0], extern_flag=True)
- elif self._view.cb_interface.currentIndex() == 1:
- SD.COM_CONTROL.send(1, cst.WRITE_MULTIPLE_REGISTERS, 0x1901, 1 << self._view.cb_cls.currentIndex())
- else:
- return
- def _relay_off(self):
- if SD.CAN_ON_OFF:
- if self._view.cb_interface.currentIndex() == 0:
- SD.CAN_CONTROL.send(0x182B003F, [0xff, 0, self._view.cb_cls.currentIndex() + 1, 0, 0, 0, 0, 0], extern_flag=True)
- elif self._view.cb_interface.currentIndex() == 1:
- SD.COM_CONTROL.send(1, cst.WRITE_MULTIPLE_REGISTERS, 0x1901, (~ (1 << self._view.cb_cls.currentIndex())) & 0xFFFF)
- else:
- return
- def com_work(self):
- ret = SD.COM_CONTROL.send(1, cst.READ_HOLDING_REGISTERS, 0x0100, 16)
- print(list(ret))
- def port_check(self):
- com_port = {}
- port_list = list(serial.tools.list_ports.comports())
- self._view.cb_com_interface_port.clear()
- for port in port_list:
- com_port["%s" % port_list[0]] = "%s" % port[1]
- self._view.cb_com_interface_port.addItem(port[0])
- def pcs_connect(self):
- if self._view.cb_interface.currentIndex() == 0:
- self.pcs_can_connect()
- elif self._view.cb_interface.currentIndex() == 1:
- self.pcs_com_connect()
- else:
- return
- def pcs_disconnect(self):
- if self._view.cb_interface.currentIndex() == 0:
- self.pcs_can_disconnect()
- elif self._view.cb_interface.currentIndex() == 1:
- self.pcs_com_disconnect()
- else:
- return
- def pcs_can_connect(self):
- self._can_index_var = self._view.cb_can_interface_index.currentText()
- self._can_chanel_var = self._view.cb_can_interface_channel.currentText()
- self._can_baudrate_var = self._view.cb_can_interface_baudrate.currentText()
- can_index = int(self._can_index_var)
- can_channel = int(self._can_chanel_var)
- can_baudrate = self._can_baudrate_var
- SD.CAN_CONTROL.set_can_board(can_index, can_channel, can_baudrate)
- try:
- if SD.CAN_CONTROL.open_device():
- SD.CAN_ON_OFF = SD.CAN_CONTROL.init_can(0x00000000, 0xFFFFFFFF)
- if SD.CAN_ON_OFF:
- self.pcs_can_connected()
- self._view.groupbox_can_interface.setDisabled(True)
- self._view.groupbox_interface.setDisabled(True)
- else:
- self._view.can_connect_error()
- except AttributeError:
- self._view.no_can_device()
- def pcs_can_disconnect(self):
- if SD.CAN_ON_OFF:
- SD.CAN_ON_OFF = False
- SD.CAN_CONTROL.clear_buffer()
- SD.CAN_CONTROL.close_can()
- if self.pcs_can_received_thread.isRunning():
- self.pcs_can_received_thread.quit()
- self.pcs_can_received_thread.wait()
- if self.pcs_can_received_thread.isFinished():
- del self.pcs_can_received
- del self.pcs_can_received_thread
- self._view.groupbox_can_interface.setDisabled(False)
- self._view.groupbox_interface.setDisabled(False)
- self.pcs_data_recovery()
- self.pcs_status.pcs_disconnect()
- def pcs_com_disconnect(self):
- if SD.CAN_ON_OFF:
- SD.CAN_ON_OFF = False
- if self.pcs_com_thread.isRunning():
- self.pcs_com_thread.quit()
- self.pcs_com_thread.wait()
- if self.pcs_com_thread.isFinished():
- del self.pcs_com_work
- del self.pcs_com_thread
- # self.timer.stop()
- self._view.groupbox_com_interface.setDisabled(False)
- self._view.groupbox_interface.setDisabled(False)
- self.pcs_data_recovery()
- self.pcs_status.pcs_disconnect()
- def pcs_com_connect(self):
- self._port = self._view.cb_com_interface_port.currentText()
- self._baudrate = int(self._view.cb_com_interface_baudrate.currentText())
- self._bytesize = int(self._view.cb_com_interface_word_length.currentText())
- self._stopbits = int(self._view.cb_com_interface_stop.currentText())
- if self._view.cb_com_interface_parity.currentIndex() == 0:
- self._parity = 'N'
- elif self._view.cb_com_interface_parity.currentIndex() == 1:
- self._parity = 'E'
- elif self._view.cb_com_interface_parity.currentIndex() == 2:
- self._parity = 'O'
- else:
- self._parity = 'M'
- SD.COM_CONTROL.set_com(self._port, self._baudrate, self._bytesize, self._parity, self._stopbits)
- SD.CAN_ON_OFF = SD.COM_CONTROL.open_device()
- if SD.CAN_ON_OFF:
- self.pcs_com_connected()
- # self.timer.start(1000)
- self._view.groupbox_com_interface.setDisabled(True)
- self._view.groupbox_interface.setDisabled(True)
- else:
- self._view.no_com_device()
- def pcs_com_connected(self):
- # Com线程类
- self.pcs_com_thread = QThread()
- self.pcs_com_work = PcsComWork()
- self.pcs_com_work.moveToThread(self.pcs_com_thread)
- self.pcs_com_thread.finished.connect(self.pcs_com_work.deleteLater)
- self.pcs_com_thread.started.connect(self.pcs_com_work.work)
- self.pcs_com_work.show_00_com_signal.connect(self._pcs_00_com_signal)
- self.pcs_com_work.show_01_com_signal.connect(self._pcs_01_com_signal)
- self.pcs_com_work.show_02_com_signal.connect(self._pcs_02_com_signal)
- self.pcs_com_work.show_03_com_signal.connect(self._pcs_03_com_signal)
- self.pcs_com_work.show_04_com_signal.connect(self._pcs_04_com_signal)
- self.pcs_com_work.show_05_com_signal.connect(self._pcs_05_com_signal)
- self.pcs_com_work.show_06_com_signal.connect(self._pcs_06_com_signal)
- self.pcs_com_work.show_07_com_signal.connect(self._pcs_07_com_signal)
- self.pcs_com_work.show_08_com_signal.connect(self._pcs_08_com_signal)
- self.pcs_com_work.show_09_com_signal.connect(self._pcs_09_com_signal)
- self.pcs_com_work.show_0A_com_signal.connect(self._pcs_0A_com_signal)
- self.pcs_com_work.show_0B_com_signal.connect(self._pcs_0B_com_signal)
- self.pcs_com_work.show_0C_com_signal.connect(self._pcs_0C_com_signal)
- self.pcs_com_work.show_0D_com_signal.connect(self._pcs_0D_com_signal)
- self.pcs_com_work.show_0E_com_signal.connect(self._pcs_0E_com_signal)
- self.pcs_com_work.show_0F_com_signal.connect(self._pcs_0F_com_signal)
- self.pcs_status.pcs_connect()
- # 启动线程
- self.pcs_com_thread.start()
- def _pcs_00_com_signal(self, data):
- print(hex(data))
- def _pcs_01_com_signal(self, data):
- print(hex(data))
- def _pcs_02_com_signal(self, data):
- self._view.edt_bms_volt.setText(str(data * 0.1))
- def _pcs_03_com_signal(self, data):
- self._view.edt_bms_cur.setText(str(data * 0.1 - 2000))
- def _pcs_04_com_signal(self, data):
- self._view.edt_soc.setText(str(data * 0.1))
- def _pcs_05_com_signal(self, data):
- self._view.edt_bms_soh.setText(str(data * 0.1))
- def _pcs_06_com_signal(self, data):
- self._view.edt_bms_chg_cur_max.setText(str(data * 0.1))
- def _pcs_07_com_signal(self, data):
- self._view.edt_bms_dischg_cur_max.setText(str(data * 0.1))
- def _pcs_08_com_signal(self, data):
- self._view.edt_bms_chg_volt_high.setText(str(data * 0.1))
- def _pcs_09_com_signal(self, data):
- self._view.edt_bms_dischg_volt_high.setText(str(data * 0.1))
- def _pcs_0A_com_signal(self, data):
- self._view.edt_bms_chg_power_real.setText(str(data * 0.1))
- def _pcs_0B_com_signal(self, data):
- self._view.edt_bms_dischg_power_real.setText(str(data * 0.1))
- def _pcs_0C_com_signal(self, data):
- self._view.edt_bms_high_volt.setText(str(data * 0.1))
- def _pcs_0D_com_signal(self, data):
- self._view.edt_bms_low_volt.setText(str(data * 0.1))
- def _pcs_0E_com_signal(self, data):
- self._view.edt_bms_dischg_power_max.setText(str(data * 0.1))
- def _pcs_0F_com_signal(self, data):
- self._view.edt_bms_chg_power_max.setText(str(data * 0.1))
- def pcs_can_connected(self):
- # CAN数据接收类
- self.pcs_can_received_thread = QThread()
- self.pcs_can_received = PcsCanReceived()
- self.pcs_can_received.moveToThread(self.pcs_can_received_thread)
- self.pcs_can_received_thread.finished.connect(self.pcs_can_received.deleteLater)
- self.pcs_can_received_thread.started.connect(self.pcs_can_received.received)
- self.pcs_can_received.show_90_signal.connect(self._90_signal)
- self.pcs_can_received.show_91_signal.connect(self._91_signal)
- self.pcs_can_received.show_92_signal.connect(self._92_signal)
- self.pcs_can_received.show_93_signal.connect(self._93_signal)
- self.pcs_status.pcs_connect()
- # 启动线程
- self.pcs_can_received_thread.start()
- def _90_signal(self, data):
- self._view.edt_soc.setText(str(round(data[0] * 0.4, 1)))
- self._view.led_bms_ready.set_status(data[1])
- if data[2] == 0:
- self._view.edt_bms_chg_enabled.setText("允充允放")
- elif data[2] == 1:
- self._view.edt_bms_chg_enabled.setText("允充禁放")
- elif data[2] == 2:
- self._view.edt_bms_chg_enabled.setText("禁充允放")
- elif data[2] == 3:
- self._view.edt_bms_chg_enabled.setText("禁充禁放")
- self._view.edt_bms_stop.setText("")
- self._view.led_bms_fault.set_status(2 if data[4] == 1 else 0)
- self._view.edt_bms_soh.setText(str(round(data[5] * 0.4, 1)))
- self._view.edt_bms_high_temp.setText(str(data[6] - 40))
- self._view.edt_bms_low_temp.setText(str(data[7] - 40))
- def _91_signal(self, data):
- self._view.edt_bms_dischg_power_max.setText(str(round((data[1] << 8 | data[0]) * 0.1, 1)))
- self._view.edt_bms_chg_power_max.setText(str(round((data[3] << 8 | data[2]) * 0.1, 1)))
- self._view.edt_bms_chg_cur_max.setText(str(round((data[5] << 8 | data[4]) * 0.1 - 3200, 1)))
- self._view.edt_bms_dischg_cur_max.setText(str(round((data[7] << 8 | data[6]) * 0.1 - 3200, 1)))
- def _92_signal(self, data):
- self._view.edt_bms_high_volt.setText(str(round((data[1] << 8 | data[0]) * 0.001, 3)))
- self._view.edt_bms_low_volt.setText(str(round((data[3] << 8 | data[2]) * 0.001, 3)))
- self._view.edt_bms_chg_volt_high.setText(str(round((data[5] << 8 | data[4]) * 0.1, 1)))
- self._view.edt_bms_dischg_volt_high.setText(str(round((data[7] << 8 | data[6]) * 0.1, 1)))
- def _93_signal(self, data):
- self._view.edt_bms_volt.setText(str(round((data[1] << 8 | data[0]) * 0.1, 2)))
- self._view.edt_bms_cur.setText(str(round((data[3] << 8 | data[2]) * 0.1 - 3200, 2)))
- self._view.edt_bms_chg_power_real.setText(str(round((data[5] << 8 | data[4]) * 0.1, 1)))
- self._view.edt_bms_dischg_power_real.setText(str(round((data[7] << 8 | data[6]) * 0.1, 1)))
- def run(self):
- self._view.show()
- return self._app.exec_()
- def interface_chose(self, current_interface):
- if current_interface == 1:
- self._view.groupbox_can_interface.hide()
- self._view.groupbox_com_interface.show()
- else:
- self._view.groupbox_com_interface.hide()
- self._view.groupbox_can_interface.show()
- def pcs_data_recovery(self):
- self._view.edt_soc.setText("")
- self._view.led_bms_ready.set_status(0)
- self._view.edt_bms_chg_enabled.setText("")
- self._view.edt_bms_stop.setText("")
- self._view.led_bms_fault.set_status(0)
- self._view.edt_bms_soh.setText("")
- self._view.edt_bms_high_temp.setText("")
- self._view.edt_bms_low_temp.setText("")
- self._view.edt_bms_dischg_power_max.setText("")
- self._view.edt_bms_chg_power_max.setText("")
- self._view.edt_bms_chg_cur_max.setText("")
- self._view.edt_bms_dischg_cur_max.setText("")
- self._view.edt_bms_high_volt.setText("")
- self._view.edt_bms_low_volt.setText("")
- self._view.edt_bms_chg_volt_high.setText("")
- self._view.edt_bms_dischg_volt_high.setText("")
- self._view.edt_bms_volt.setText("")
- self._view.edt_bms_cur.setText("")
- self._view.edt_bms_chg_power_real.setText("")
- self._view.edt_bms_dischg_power_real.setText("")
|