#!/usr/bin/env python # -*- encoding: utf-8 -*- ''' @文件 :bms_pcs_ctl.py @时间 :2022/02/21 16:56:26 @作者 :None @版本 :1.0 @说明 :主页面 CTL ''' from sys import argv from utils.globalvar import SD from utils.qt import QApplication, QThread, QTimer from controller.pcs_status_ctl import PcsStatusControll from widget.pcs_home import Win_Pcs_Home from worker.pcs_work import PcsCanReceived, PcsComWork import serial.tools.list_ports import utils.modbus.defines as cst class PcsControll: def __init__(self): self._app = QApplication(argv) self._view = Win_Pcs_Home() self.init() def init(self): self.port_check() self.pcs_status = PcsStatusControll() self._view.setStatusBar(self.pcs_status._view.statusbar) self._view.interface_signal.connect(self.interface_chose) self._view.connect_signal.connect(self.pcs_connect) self._view.disconnect_signal.connect(self.pcs_disconnect) self._view.btn_all_on.clicked.connect(self._relay_all_on) self._view.btn_all_off.clicked.connect(self._relay_all_off) self._view.btn_cls_on.clicked.connect(self._relay_on) self._view.btn_cls_off.clicked.connect(self._relay_off) # self.timer = QTimer() # self.timer.timeout.connect(self.com_work) def _relay_all_on(self): if SD.CAN_ON_OFF: if self._view.cb_interface.currentIndex() == 0: SD.CAN_CONTROL.send(0x182B003F, [1, 0, 0, 0, 0, 0, 0, 0], extern_flag=True) elif self._view.cb_interface.currentIndex() == 1: SD.COM_CONTROL.send(1, cst.WRITE_MULTIPLE_REGISTERS, 0x1900, 0x01) else: return def _relay_all_off(self): if SD.CAN_ON_OFF: if self._view.cb_interface.currentIndex() == 0: SD.CAN_CONTROL.send(0x182B003F, [0, 0, 0, 0, 0, 0, 0, 0], extern_flag=True) elif self._view.cb_interface.currentIndex() == 1: SD.COM_CONTROL.send(1, cst.WRITE_MULTIPLE_REGISTERS, 0x1900, 0x00) else: return def _relay_on(self): if SD.CAN_ON_OFF: if self._view.cb_interface.currentIndex() == 0: SD.CAN_CONTROL.send(0x182B003F, [0xff, 1, self._view.cb_cls.currentIndex() + 1, 0, 0, 0, 0, 0], extern_flag=True) elif self._view.cb_interface.currentIndex() == 1: SD.COM_CONTROL.send(1, cst.WRITE_MULTIPLE_REGISTERS, 0x1901, 1 << self._view.cb_cls.currentIndex()) else: return def _relay_off(self): if SD.CAN_ON_OFF: if self._view.cb_interface.currentIndex() == 0: SD.CAN_CONTROL.send(0x182B003F, [0xff, 0, self._view.cb_cls.currentIndex() + 1, 0, 0, 0, 0, 0], extern_flag=True) elif self._view.cb_interface.currentIndex() == 1: SD.COM_CONTROL.send(1, cst.WRITE_MULTIPLE_REGISTERS, 0x1901, (~ (1 << self._view.cb_cls.currentIndex())) & 0xFFFF) else: return def com_work(self): ret = SD.COM_CONTROL.send(1, cst.READ_HOLDING_REGISTERS, 0x0100, 16) print(list(ret)) def port_check(self): com_port = {} port_list = list(serial.tools.list_ports.comports()) self._view.cb_com_interface_port.clear() for port in port_list: com_port["%s" % port_list[0]] = "%s" % port[1] self._view.cb_com_interface_port.addItem(port[0]) def pcs_connect(self): if self._view.cb_interface.currentIndex() == 0: self.pcs_can_connect() elif self._view.cb_interface.currentIndex() == 1: self.pcs_com_connect() else: return def pcs_disconnect(self): if self._view.cb_interface.currentIndex() == 0: self.pcs_can_disconnect() elif self._view.cb_interface.currentIndex() == 1: self.pcs_com_disconnect() else: return def pcs_can_connect(self): self._can_index_var = self._view.cb_can_interface_index.currentText() self._can_chanel_var = self._view.cb_can_interface_channel.currentText() self._can_baudrate_var = self._view.cb_can_interface_baudrate.currentText() can_index = int(self._can_index_var) can_channel = int(self._can_chanel_var) can_baudrate = self._can_baudrate_var SD.CAN_CONTROL.set_can_board(can_index, can_channel, can_baudrate) try: if SD.CAN_CONTROL.open_device(): SD.CAN_ON_OFF = SD.CAN_CONTROL.init_can(0x00000000, 0xFFFFFFFF) if SD.CAN_ON_OFF: self.pcs_can_connected() self._view.groupbox_can_interface.setDisabled(True) self._view.groupbox_interface.setDisabled(True) else: self._view.can_connect_error() except AttributeError: self._view.no_can_device() def pcs_can_disconnect(self): if SD.CAN_ON_OFF: SD.CAN_ON_OFF = False SD.CAN_CONTROL.clear_buffer() SD.CAN_CONTROL.close_can() if self.pcs_can_received_thread.isRunning(): self.pcs_can_received_thread.quit() self.pcs_can_received_thread.wait() if self.pcs_can_received_thread.isFinished(): del self.pcs_can_received del self.pcs_can_received_thread self._view.groupbox_can_interface.setDisabled(False) self._view.groupbox_interface.setDisabled(False) self.pcs_data_recovery() self.pcs_status.pcs_disconnect() def pcs_com_disconnect(self): if SD.CAN_ON_OFF: SD.CAN_ON_OFF = False if self.pcs_com_thread.isRunning(): self.pcs_com_thread.quit() self.pcs_com_thread.wait() if self.pcs_com_thread.isFinished(): del self.pcs_com_work del self.pcs_com_thread # self.timer.stop() self._view.groupbox_com_interface.setDisabled(False) self._view.groupbox_interface.setDisabled(False) self.pcs_data_recovery() self.pcs_status.pcs_disconnect() def pcs_com_connect(self): self._port = self._view.cb_com_interface_port.currentText() self._baudrate = int(self._view.cb_com_interface_baudrate.currentText()) self._bytesize = int(self._view.cb_com_interface_word_length.currentText()) self._stopbits = int(self._view.cb_com_interface_stop.currentText()) if self._view.cb_com_interface_parity.currentIndex() == 0: self._parity = 'N' elif self._view.cb_com_interface_parity.currentIndex() == 1: self._parity = 'E' elif self._view.cb_com_interface_parity.currentIndex() == 2: self._parity = 'O' else: self._parity = 'M' SD.COM_CONTROL.set_com(self._port, self._baudrate, self._bytesize, self._parity, self._stopbits) SD.CAN_ON_OFF = SD.COM_CONTROL.open_device() if SD.CAN_ON_OFF: self.pcs_com_connected() # self.timer.start(1000) self._view.groupbox_com_interface.setDisabled(True) self._view.groupbox_interface.setDisabled(True) else: self._view.no_com_device() def pcs_com_connected(self): # Com线程类 self.pcs_com_thread = QThread() self.pcs_com_work = PcsComWork() self.pcs_com_work.moveToThread(self.pcs_com_thread) self.pcs_com_thread.finished.connect(self.pcs_com_work.deleteLater) self.pcs_com_thread.started.connect(self.pcs_com_work.work) self.pcs_com_work.show_00_com_signal.connect(self._pcs_00_com_signal) self.pcs_com_work.show_01_com_signal.connect(self._pcs_01_com_signal) self.pcs_com_work.show_02_com_signal.connect(self._pcs_02_com_signal) self.pcs_com_work.show_03_com_signal.connect(self._pcs_03_com_signal) self.pcs_com_work.show_04_com_signal.connect(self._pcs_04_com_signal) self.pcs_com_work.show_05_com_signal.connect(self._pcs_05_com_signal) self.pcs_com_work.show_06_com_signal.connect(self._pcs_06_com_signal) self.pcs_com_work.show_07_com_signal.connect(self._pcs_07_com_signal) self.pcs_com_work.show_08_com_signal.connect(self._pcs_08_com_signal) self.pcs_com_work.show_09_com_signal.connect(self._pcs_09_com_signal) self.pcs_com_work.show_0A_com_signal.connect(self._pcs_0A_com_signal) self.pcs_com_work.show_0B_com_signal.connect(self._pcs_0B_com_signal) self.pcs_com_work.show_0C_com_signal.connect(self._pcs_0C_com_signal) self.pcs_com_work.show_0D_com_signal.connect(self._pcs_0D_com_signal) self.pcs_com_work.show_0E_com_signal.connect(self._pcs_0E_com_signal) self.pcs_com_work.show_0F_com_signal.connect(self._pcs_0F_com_signal) self.pcs_status.pcs_connect() # 启动线程 self.pcs_com_thread.start() def _pcs_00_com_signal(self, data): print(hex(data)) def _pcs_01_com_signal(self, data): print(hex(data)) def _pcs_02_com_signal(self, data): self._view.edt_bms_volt.setText(str(data * 0.1)) def _pcs_03_com_signal(self, data): self._view.edt_bms_cur.setText(str(data * 0.1 - 2000)) def _pcs_04_com_signal(self, data): self._view.edt_soc.setText(str(data * 0.1)) def _pcs_05_com_signal(self, data): self._view.edt_bms_soh.setText(str(data * 0.1)) def _pcs_06_com_signal(self, data): self._view.edt_bms_chg_cur_max.setText(str(data * 0.1)) def _pcs_07_com_signal(self, data): self._view.edt_bms_dischg_cur_max.setText(str(data * 0.1)) def _pcs_08_com_signal(self, data): self._view.edt_bms_chg_volt_high.setText(str(data * 0.1)) def _pcs_09_com_signal(self, data): self._view.edt_bms_dischg_volt_high.setText(str(data * 0.1)) def _pcs_0A_com_signal(self, data): self._view.edt_bms_chg_power_real.setText(str(data * 0.1)) def _pcs_0B_com_signal(self, data): self._view.edt_bms_dischg_power_real.setText(str(data * 0.1)) def _pcs_0C_com_signal(self, data): self._view.edt_bms_high_volt.setText(str(data * 0.1)) def _pcs_0D_com_signal(self, data): self._view.edt_bms_low_volt.setText(str(data * 0.1)) def _pcs_0E_com_signal(self, data): self._view.edt_bms_dischg_power_max.setText(str(data * 0.1)) def _pcs_0F_com_signal(self, data): self._view.edt_bms_chg_power_max.setText(str(data * 0.1)) def pcs_can_connected(self): # CAN数据接收类 self.pcs_can_received_thread = QThread() self.pcs_can_received = PcsCanReceived() self.pcs_can_received.moveToThread(self.pcs_can_received_thread) self.pcs_can_received_thread.finished.connect(self.pcs_can_received.deleteLater) self.pcs_can_received_thread.started.connect(self.pcs_can_received.received) self.pcs_can_received.show_90_signal.connect(self._90_signal) self.pcs_can_received.show_91_signal.connect(self._91_signal) self.pcs_can_received.show_92_signal.connect(self._92_signal) self.pcs_can_received.show_93_signal.connect(self._93_signal) self.pcs_status.pcs_connect() # 启动线程 self.pcs_can_received_thread.start() def _90_signal(self, data): self._view.edt_soc.setText(str(round(data[0] * 0.4, 1))) self._view.led_bms_ready.set_status(data[1]) if data[2] == 0: self._view.edt_bms_chg_enabled.setText("允充允放") elif data[2] == 1: self._view.edt_bms_chg_enabled.setText("允充禁放") elif data[2] == 2: self._view.edt_bms_chg_enabled.setText("禁充允放") elif data[2] == 3: self._view.edt_bms_chg_enabled.setText("禁充禁放") self._view.edt_bms_stop.setText("") self._view.led_bms_fault.set_status(2 if data[4] == 1 else 0) self._view.edt_bms_soh.setText(str(round(data[5] * 0.4, 1))) self._view.edt_bms_high_temp.setText(str(data[6] - 40)) self._view.edt_bms_low_temp.setText(str(data[7] - 40)) def _91_signal(self, data): self._view.edt_bms_dischg_power_max.setText(str(round((data[1] << 8 | data[0]) * 0.1, 1))) self._view.edt_bms_chg_power_max.setText(str(round((data[3] << 8 | data[2]) * 0.1, 1))) self._view.edt_bms_chg_cur_max.setText(str(round((data[5] << 8 | data[4]) * 0.1 - 3200, 1))) self._view.edt_bms_dischg_cur_max.setText(str(round((data[7] << 8 | data[6]) * 0.1 - 3200, 1))) def _92_signal(self, data): self._view.edt_bms_high_volt.setText(str(round((data[1] << 8 | data[0]) * 0.001, 3))) self._view.edt_bms_low_volt.setText(str(round((data[3] << 8 | data[2]) * 0.001, 3))) self._view.edt_bms_chg_volt_high.setText(str(round((data[5] << 8 | data[4]) * 0.1, 1))) self._view.edt_bms_dischg_volt_high.setText(str(round((data[7] << 8 | data[6]) * 0.1, 1))) def _93_signal(self, data): self._view.edt_bms_volt.setText(str(round((data[1] << 8 | data[0]) * 0.1, 2))) self._view.edt_bms_cur.setText(str(round((data[3] << 8 | data[2]) * 0.1 - 3200, 2))) self._view.edt_bms_chg_power_real.setText(str(round((data[5] << 8 | data[4]) * 0.1, 1))) self._view.edt_bms_dischg_power_real.setText(str(round((data[7] << 8 | data[6]) * 0.1, 1))) def run(self): self._view.show() return self._app.exec_() def interface_chose(self, current_interface): if current_interface == 1: self._view.groupbox_can_interface.hide() self._view.groupbox_com_interface.show() else: self._view.groupbox_com_interface.hide() self._view.groupbox_can_interface.show() def pcs_data_recovery(self): self._view.edt_soc.setText("") self._view.led_bms_ready.set_status(0) self._view.edt_bms_chg_enabled.setText("") self._view.edt_bms_stop.setText("") self._view.led_bms_fault.set_status(0) self._view.edt_bms_soh.setText("") self._view.edt_bms_high_temp.setText("") self._view.edt_bms_low_temp.setText("") self._view.edt_bms_dischg_power_max.setText("") self._view.edt_bms_chg_power_max.setText("") self._view.edt_bms_chg_cur_max.setText("") self._view.edt_bms_dischg_cur_max.setText("") self._view.edt_bms_high_volt.setText("") self._view.edt_bms_low_volt.setText("") self._view.edt_bms_chg_volt_high.setText("") self._view.edt_bms_dischg_volt_high.setText("") self._view.edt_bms_volt.setText("") self._view.edt_bms_cur.setText("") self._view.edt_bms_chg_power_real.setText("") self._view.edt_bms_dischg_power_real.setText("")