#!/usr/bin/env python # -*- encoding: utf-8 -*- ''' @文件 :bms_pcs_ctl.py @时间 :2022/02/21 16:56:26 @作者 :None @版本 :1.0 @说明 :主页面 CTL ''' from sys import argv from utils.globalvar import SD, EmvState from utils.qt import QApplication, QThread from controller.emv_status_ctl import EmvStatusControll from widget.emv_home import Win_Emv_Home from worker.emv_work import EmvCanReceived, EmvComWork from utils.utils import calculate_crc_8, a_bit import serial.tools.list_ports class EmvControll: def __init__(self): self._app = QApplication(argv) self._view = Win_Emv_Home() self._model = EmvState() self.init() def init(self): self.port_check() self.emv_status = EmvStatusControll() self._view.setStatusBar(self.emv_status._view.statusbar) self._view.interface_signal.connect(self.interface_chose) self._view.cb_bms.currentIndexChanged.connect(self._bms_id) self._view.connect_signal.connect(self.emv_connect) self._view.disconnect_signal.connect(self.emv_disconnect) self._view.emv_on_signal.connect(self.emv_on) self._view.emv_off_signal.connect(self.emv_off) def _bms_id(self): SD.EMV_ID = int(self._view.cb_bms.currentText()) def emv_on(self): if SD.CAN_ON_OFF: data = [0, 0, 0, 0, 0, 0, 0, 0xAA] if self._view.cb_cls.currentIndex() == 0: data[1] = 0xff data[2] = 0x01 elif self._view.cb_cls.currentIndex() <= 8: data[1] = 1 << self._view.cb_cls.currentIndex() - 1 else: data[2] = 1 << (self._view.cb_cls.currentIndex() - 9) self._model.emv_1_state = a_bit(data[1], 1) self._model.emv_2_state = a_bit(data[1], 2) self._model.emv_3_state = a_bit(data[1], 3) self._model.emv_4_state = a_bit(data[1], 4) self._model.emv_5_state = a_bit(data[1], 5) self._model.emv_6_state = a_bit(data[1], 6) self._model.emv_7_state = a_bit(data[1], 7) self._model.emv_8_state = a_bit(data[1], 8) self._model.emv_9_state = a_bit(data[2], 1) if self._view.cb_interface.currentIndex() == 0: data[0] = calculate_crc_8(0x18CF0000 | SD.EMV_ID, data[1:]) SD.CAN_CONTROL.send(0x18CF0000 | SD.EMV_ID, data, extern_flag=True) elif self._view.cb_interface.currentIndex() == 1: SD.COM_CONTROL.send_16h(0x01, 0x0064, 0x0003, 3, [data[1], data[2], data[7]]) self._emv_ce_signal(data, 1) else: return def emv_off(self): if SD.CAN_ON_OFF: data = [0, 0xFF, 1, 0, 0, 0, 0, 0xFF] if self._view.cb_cls.currentIndex() == 0: data[1] = 0 data[2] = 0 elif self._view.cb_cls.currentIndex() <= 8: data[1] = (~ (1 << self._view.cb_cls.currentIndex() - 1)) & 0xFF else: data[2] = 0 self._model.emv_1_state = a_bit(data[1], 1) self._model.emv_2_state = a_bit(data[1], 2) self._model.emv_3_state = a_bit(data[1], 3) self._model.emv_4_state = a_bit(data[1], 4) self._model.emv_5_state = a_bit(data[1], 5) self._model.emv_6_state = a_bit(data[1], 6) self._model.emv_7_state = a_bit(data[1], 7) self._model.emv_8_state = a_bit(data[1], 8) self._model.emv_9_state = a_bit(data[2], 1) if self._view.cb_interface.currentIndex() == 0: data[0] = calculate_crc_8(0x18CF0000 | SD.EMV_ID, data[1:]) SD.CAN_CONTROL.send(0x18CF0000 | SD.EMV_ID, data, extern_flag=True) elif self._view.cb_interface.currentIndex() == 1: SD.COM_CONTROL.send_16h(0x01, 0x0064, 0x0003, 3, [data[1], data[2], data[7]]) self._emv_ce_signal(data, 1) else: return def port_check(self): com_port = {} port_list = list(serial.tools.list_ports.comports()) self._view.cb_com_interface_port.clear() for port in port_list: com_port["%s" % port_list[0]] = "%s" % port[1] self._view.cb_com_interface_port.addItem(port[0]) def emv_connect(self): if self._view.cb_interface.currentIndex() == 0: self.emv_can_connect() elif self._view.cb_interface.currentIndex() == 1: self.emv_com_connect() else: return def emv_com_connect(self): self._port = self._view.cb_com_interface_port.currentText() self._baudrate = int(self._view.cb_com_interface_baudrate.currentText()) self._bytesize = int(self._view.cb_com_interface_word_length.currentText()) self._stopbits = int(self._view.cb_com_interface_stop.currentText()) if self._view.cb_com_interface_parity.currentIndex() == 0: self._parity = 'N' elif self._view.cb_com_interface_parity.currentIndex() == 1: self._parity = 'E' elif self._view.cb_com_interface_parity.currentIndex() == 2: self._parity = 'O' else: self._parity = 'M' SD.COM_CONTROL.set_com(self._port, self._baudrate, self._bytesize, self._parity, self._stopbits) SD.CAN_ON_OFF = SD.COM_CONTROL.open_device() if SD.CAN_ON_OFF: self.emv_com_connected() self._view.groupbox_com_interface.setDisabled(True) self._view.groupbox_interface.setDisabled(True) else: self._view.no_com_device() def emv_com_connected(self): # Com线程类 self.emv_com_thread = QThread() self.emv_com_work = EmvComWork() self.emv_com_work.moveToThread(self.emv_com_thread) self.emv_com_thread.finished.connect(self.emv_com_work.deleteLater) self.emv_com_thread.started.connect(self.emv_com_work.work) self.emv_com_work.show_com_signal.connect(self.emv_com_signal) # 启动线程 self.emv_com_thread.start() self.emv_status.emv_connect() def emv_com_signal(self, v1, v2, v3): self._view.led_emv_total.set_status(1) ver = "v" + str(v1) + "." + str(v2) + "." + str(v3) self._view.edt_emv_version.setText(ver) def emv_can_connect(self): self._can_index_var = self._view.cb_can_interface_index.currentText() self._can_chanel_var = self._view.cb_can_interface_channel.currentText() self._can_baudrate_var = self._view.cb_can_interface_baudrate.currentText() can_index = int(self._can_index_var) can_channel = int(self._can_chanel_var) can_baudrate = self._can_baudrate_var SD.CAN_CONTROL.set_can_board(can_index, can_channel, can_baudrate) try: if SD.CAN_CONTROL.open_device(): SD.CAN_ON_OFF = SD.CAN_CONTROL.init_can(0x00000000, 0xFFFFFFFF) if SD.CAN_ON_OFF: self.emv_can_connected() self._view.groupbox_can_interface.setDisabled(True) self._view.groupbox_interface.setDisabled(True) else: self._view.can_connect_error() except AttributeError: self._view.no_can_device() def emv_disconnect(self): if self._view.cb_interface.currentIndex() == 0: self.emv_can_disconnect() elif self._view.cb_interface.currentIndex() == 1: self.emv_com_disconnect() else: return def emv_com_disconnect(self): if SD.CAN_ON_OFF: SD.CAN_ON_OFF = False if self.emv_com_thread.isRunning(): self.emv_com_thread.quit() self.emv_com_thread.wait() if self.emv_com_thread.isFinished(): del self.emv_com_work del self.emv_com_thread self._view.groupbox_com_interface.setDisabled(False) self._view.groupbox_interface.setDisabled(False) self.emv_data_recovery() self.emv_status.emv_disconnect() def emv_can_disconnect(self): if SD.CAN_ON_OFF: SD.CAN_ON_OFF = False SD.CAN_CONTROL.clear_buffer() SD.CAN_CONTROL.close_can() if self.emv_can_received_thread.isRunning(): self.emv_can_received_thread.quit() self.emv_can_received_thread.wait() if self.emv_can_received_thread.isFinished(): del self.emv_can_received del self.emv_can_received_thread self._view.groupbox_can_interface.setDisabled(False) self._view.groupbox_interface.setDisabled(False) self.emv_status.emv_disconnect() self.emv_data_recovery() def emv_can_connected(self): # CAN数据接收类 self.emv_can_received_thread = QThread() self.emv_can_received = EmvCanReceived() self.emv_can_received.moveToThread(self.emv_can_received_thread) self.emv_can_received_thread.finished.connect(self.emv_can_received.deleteLater) self.emv_can_received_thread.started.connect(self.emv_can_received.received) self.emv_can_received.show_emv_ce_signal.connect(self._emv_ce_signal) self.emv_can_received.show_emv_cf_signal.connect(self._emv_cf_signal) # 启动线程 self.emv_can_received_thread.start() self.emv_status.emv_connect() def _emv_ce_signal(self, data, con_type): if (data[0] == 0xaa and con_type == 0) or (data[7] == 0xaa and con_type == 1): if self._view.cb_cls.currentIndex() == 0: self._view.led_emv_1.set_status(self._model.emv_1_state) self._view.led_emv_2.set_status(self._model.emv_2_state) self._view.led_emv_3.set_status(self._model.emv_3_state) self._view.led_emv_4.set_status(self._model.emv_4_state) self._view.led_emv_5.set_status(self._model.emv_5_state) self._view.led_emv_6.set_status(self._model.emv_6_state) self._view.led_emv_7.set_status(self._model.emv_7_state) self._view.led_emv_8.set_status(self._model.emv_8_state) self._view.led_emv_9.set_status(self._model.emv_9_state) elif self._view.cb_cls.currentIndex() == 1: self._view.led_emv_1.set_status(self._model.emv_1_state) elif self._view.cb_cls.currentIndex() == 2: self._view.led_emv_2.set_status(self._model.emv_2_state) elif self._view.cb_cls.currentIndex() == 3: self._view.led_emv_3.set_status(self._model.emv_3_state) elif self._view.cb_cls.currentIndex() == 4: self._view.led_emv_4.set_status(self._model.emv_4_state) elif self._view.cb_cls.currentIndex() == 5: self._view.led_emv_5.set_status(self._model.emv_5_state) elif self._view.cb_cls.currentIndex() == 6: self._view.led_emv_6.set_status(self._model.emv_6_state) elif self._view.cb_cls.currentIndex() == 7: self._view.led_emv_7.set_status(self._model.emv_7_state) elif self._view.cb_cls.currentIndex() == 8: self._view.led_emv_8.set_status(self._model.emv_8_state) elif self._view.cb_cls.currentIndex() == 9: self._view.led_emv_9.set_status(self._model.emv_9_state) else: return elif (data[0] == 0xff and con_type == 0) or (data[7] == 0xff and con_type == 1): if self._view.cb_cls.currentIndex() == 0: self._view.led_emv_1.set_status(self._model.emv_1_state) self._view.led_emv_2.set_status(self._model.emv_2_state) self._view.led_emv_3.set_status(self._model.emv_3_state) self._view.led_emv_4.set_status(self._model.emv_4_state) self._view.led_emv_5.set_status(self._model.emv_5_state) self._view.led_emv_6.set_status(self._model.emv_6_state) self._view.led_emv_7.set_status(self._model.emv_7_state) self._view.led_emv_8.set_status(self._model.emv_8_state) self._view.led_emv_9.set_status(self._model.emv_9_state) elif self._view.cb_cls.currentIndex() == 1: self._view.led_emv_1.set_status(self._model.emv_1_state) elif self._view.cb_cls.currentIndex() == 2: self._view.led_emv_2.set_status(self._model.emv_2_state) elif self._view.cb_cls.currentIndex() == 3: self._view.led_emv_3.set_status(self._model.emv_3_state) elif self._view.cb_cls.currentIndex() == 4: self._view.led_emv_4.set_status(self._model.emv_4_state) elif self._view.cb_cls.currentIndex() == 5: self._view.led_emv_5.set_status(self._model.emv_5_state) elif self._view.cb_cls.currentIndex() == 6: self._view.led_emv_6.set_status(self._model.emv_6_state) elif self._view.cb_cls.currentIndex() == 7: self._view.led_emv_7.set_status(self._model.emv_7_state) elif self._view.cb_cls.currentIndex() == 8: self._view.led_emv_8.set_status(self._model.emv_8_state) elif self._view.cb_cls.currentIndex() == 9: self._view.led_emv_9.set_status(self._model.emv_9_state) else: return else: return def _emv_cf_signal(self, data): self._view.led_emv_total.set_status(1) emv_ver = "v" + str(a_bit(data[0], 1)) + "." + str(a_bit(data[0], 2)) + "." + str(a_bit(data[0], 3)) self._view.edt_emv_version.setText(emv_ver) def run(self): self._view.show() return self._app.exec_() def interface_chose(self, current_interface): if current_interface == 1: self._view.groupbox_can_interface.hide() self._view.groupbox_com_interface.show() else: self._view.groupbox_com_interface.hide() self._view.groupbox_can_interface.show() def emv_data_recovery(self): self._view.cb_bms.setCurrentIndex(0) self._view.led_emv_total.set_status(0) self._view.led_emv_1.set_status(0) self._view.led_emv_2.set_status(0) self._view.led_emv_3.set_status(0) self._view.led_emv_4.set_status(0) self._view.led_emv_5.set_status(0) self._view.led_emv_6.set_status(0) self._view.led_emv_7.set_status(0) self._view.led_emv_8.set_status(0) self._view.led_emv_9.set_status(0) self._view.led_emv_10.set_status(0) SD.EMV_ID = 1