#!/usr/bin/env python # -*- encoding: utf-8 -*- ''' @文件 :canConnect.py @时间 :2021/12/09 13:47:58 @作者 :None @版本 :1.0 @说明 :CAN连接驱动 ''' from ctypes.util import find_library from sys import platform import ctypes from ctypes import c_ubyte, c_uint from utils.log_signal import LogSignal from utils.resource import resource_path, resource_path_extend PYINSTALLER = 1 if PYINSTALLER: if platform == "win64": _CanDLLName = resource_path("config\\ControlCANx64\\ControlCAN.dll") ZLGCAN = ctypes.windll.LoadLibrary(_CanDLLName) elif platform == "win32": _CanDLLName = resource_path("config\\ControlCANx86\\ControlCAN.dll") ZLGCAN = ctypes.windll.LoadLibrary(_CanDLLName) elif platform == "linux": _CanDLLName = resource_path("config/linux/libusbcan.so") ZLGCAN = ctypes.cdll.LoadLibrary(_CanDLLName) else: ZLGCAN = ctypes.cdll.LoadLibrary(find_library("libPCBUSB.dylib")) else: if platform == "win64": resource_path_extend("config") ZLGCAN = ctypes.windll.LoadLibrary("ControlCAN.dll") elif platform == "win32": resource_path_extend("config") ZLGCAN = ctypes.windll.LoadLibrary("ControlCAN.dll") else: ZLGCAN = "" ubyte_array = c_ubyte * 8 ubyte_3array = c_ubyte * 3 # can type CANTYPE = { 'USBCAN-I': 3, 'USBCAN-II': 4, } # can mode NORMAL_MODE = 0 LISTEN_MODE = 1 # filter type SINGLE_FILTER = 0 DOUBLE_FILTER = 1 # status STATUS_OK = 1 # sendtype SEND_NORMAL = 0 SEND_SINGLE = 1 SELF_SEND_RECV = 2 SELF_SEND_RECV_SINGLE = 3 class VCI_INIT_CONFIG(ctypes.Structure): _fields_ = [("AccCode", c_uint), # 验收码。SJA1000的帧过滤验收码。对经过屏蔽码过滤为“有关位”进行匹配,全部匹配成功后,此帧可以被接收。 # 屏蔽码。SJA1000的帧过滤屏蔽码。对接收的CAN帧ID进行过滤,对应位为0的是“有关位”,对应位为1的是“无关位”。屏蔽码推荐设置为0xFFFFFFFF,即全部接收。 ("AccMask", c_uint), # 保留 ("Reserved", c_uint), # 滤波方式 ("Filter", c_ubyte), # 波特率定时器 0 ("Timing0", c_ubyte), # 波特率定时器 1 ("Timing1", c_ubyte), # 模式。=0表示正常模式(相当于正常节点),=1表示只听模式(只接收,不影响总线),=2表示自发自收模式(环回模式)。 ("Mode", c_ubyte) ] # VCI_CAN_OBJ结构体是CAN帧结构体,即1个结构体表示一个帧的数据结构。在发送函数VCI_Transmit和接收函数VCI_Receive中,被用来传送CAN信息帧。 class VCI_CAN_OBJ(ctypes.Structure): _fields_ = [("ID", c_uint), # 帧ID。32位变量,数据格式为靠右对齐 # 设备接收到某一帧的时间标识。时间标示从CAN卡上电开始计时,计时单位为0.1ms。 ("TimeStamp", c_uint), # 是否使用时间标识,为1时TimeStamp有效,TimeFlag和TimeStamp只在此帧为接收帧时有意义。 ("TimeFlag", c_ubyte), # 发送帧类型。=0时为正常发送(发送失败会自动重发,重发时间为4秒,4秒内没有发出则取消);=1时为单次发送(只发送一次,发送失败不会自动重发,总线只产生一帧数据);其它值无效。 ("SendType", c_ubyte), # 是否是远程帧。=0时为为数据帧,=1时为远程帧(数据段空)。 ("RemoteFlag", c_ubyte), # 是否是扩展帧。=0时为标准帧(11位ID),=1时为扩展帧(29位ID)。 ("ExternFlag", c_ubyte), # 数据长度 DLC (<=8),即CAN帧Data有几个字节。约束了后面Data[8]中的有效字节 ("DataLen", c_ubyte), ("Data", c_ubyte * 8), # CAN帧的数据。由于CAN规定了最大是8个字节,所以这里预留了8个字节的空间,受DataLen约束。如DataLen定义为3,即Data[0]、Data[1]、Data[2]是有效的 ("Reserved", c_ubyte * 3) # 系统保留 ] class PVCI_ERR_INFO(ctypes.Structure): _fields_ = [("ErrorCode", c_uint), ("PassiveErrData", c_ubyte * 3), ("ArLostErrData", c_ubyte) ] baudRateConfig = { '5Kbps': {'time0': 0xBF, 'time1': 0xFF}, '10Kbps': {'time0': 0x31, 'time1': 0x1C}, '20Kbps': {'time0': 0x18, 'time1': 0x1C}, '40Kbps': {'time0': 0x87, 'time1': 0xFF}, '50Kbps': {'time0': 0x09, 'time1': 0x1C}, '80Kbps': {'time0': 0x83, 'time1': 0xFF}, '100Kbps': {'time0': 0x04, 'time1': 0x1C}, '125Kbps': {'time0': 0x03, 'time1': 0x1C}, '200Kbps': {'time0': 0x81, 'time1': 0xFA}, '250Kbps': {'time0': 0x01, 'time1': 0x1C}, '400Kbps': {'time0': 0x80, 'time1': 0xFA}, '500Kbps': {'time0': 0x00, 'time1': 0x1C}, '666Kbps': {'time0': 0x80, 'time1': 0xB6}, '800Kbps': {'time0': 0x00, 'time1': 0x16}, '1000Kbps': {'time0': 0x00, 'time1': 0x14}, } class MessageDeal: def __init__(self): self.canType = CANTYPE['USBCAN-II'] def set_can_board(self, canIndex, canChannel, canBaudrate): self.canIndex = canIndex self.canChannel = canChannel self.canBaudrate = canBaudrate def open_device(self): ret = ZLGCAN.VCI_OpenDevice(self.canType, self.canChannel, self.canChannel) if ret != STATUS_OK: # LogSignal.print_log_signal().log_emit('打开CAN卡失败: {}'.format(str(self.canChannel)) if SD.SYSTEM_LANGUAGE == 0 else 'CAN Device Error: {}'.format(str(self.canChannel))) return False return True def init_can(self, accCode, accMask): # 初始化通道 _vci_initconfig = VCI_INIT_CONFIG(accCode, accMask, 0, DOUBLE_FILTER, baudRateConfig[self.canBaudrate]['time0'], baudRateConfig[self.canBaudrate]['time1'], NORMAL_MODE) ret = ZLGCAN.VCI_InitCAN(self.canType, self.canIndex, self.canChannel, ctypes.byref(_vci_initconfig)) if ret != STATUS_OK: # LogSignal.print_log_signal().log_emit('初始化CAN卡失败: {}'.format(str(self.canChannel)) if SD.SYSTEM_LANGUAGE == 0 else 'CAN Open Error: {}'.format(str(self.canChannel))) return False ret = ZLGCAN.VCI_StartCAN(self.canType, self.canIndex, self.canChannel) if ret != STATUS_OK: # LogSignal.print_log_signal().log_emit('启动CAN卡失败: {}'.format(str(self.canChannel))if SD.SYSTEM_LANGUAGE == 0 else 'CAN Start Error: {}'.format(str(self.canChannel))) return False return True def get_undeal_number(self): return ZLGCAN.VCI_GetReceiveNum(self.canType, self.canIndex, self.canChannel) def receive(self, number=1): objs = (VCI_CAN_OBJ * number)() ret = ZLGCAN.VCI_Receive(self.canType, self.canIndex, self.canChannel, ctypes.byref(objs), number, 10) if ret == 0xFFFFFFFF: return None else: return objs[:ret] def send(self, ID, data, remote_flag=False, extern_flag=False, data_len=8): vci_can_obj = VCI_CAN_OBJ() vci_can_obj.ID = ID vci_can_obj.SendType = SEND_NORMAL if remote_flag: vci_can_obj.RemoteFlag = 1 else: vci_can_obj.RemoteFlag = 0 if extern_flag: vci_can_obj.ExternFlag = 1 else: vci_can_obj.ExternFlag = 0 vci_can_obj.DataLen = data_len if len(data) < 8: data += (8 - len(data)) * [0] vci_can_obj.Data = (c_ubyte * 8)(data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]) ret = ZLGCAN.VCI_Transmit(self.canType, self.canIndex, self.canChannel, ctypes.byref(vci_can_obj), 1) if ret != STATUS_OK: LogSignal.print_log_signal().log_emit("CAN Send Error!") return False else: return True def read_err_info(self): errInfo = PVCI_ERR_INFO(0, ubyte_3array(0, 0, 0), 0) ZLGCAN.VCI_ReadErrInfo(self.canType, self.canIndex, self.canChannel, ctypes.byref(errInfo)) LogSignal.print_log_signal().log_emit(errInfo.ErrorCode, errInfo.PassiveErrData[0], errInfo.PassiveErrData[1], errInfo.PassiveErrData[2], errInfo.ArLostErrData) def clear_buffer(self): return ZLGCAN.VCI_ClearBuffer(self.canType, self.canIndex, self.canChannel) def close_can(self): return ZLGCAN.VCI_CloseDevice(self.canType, self.canIndex)