123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- #!/usr/bin/env python
- # -*- encoding: utf-8 -*-
- '''
- @文件 :canConnect.py
- @时间 :2021/12/09 13:47:58
- @作者 :None
- @版本 :1.0
- @说明 :CAN连接驱动
- '''
- from ctypes.util import find_library
- from sys import platform
- import ctypes
- from ctypes import c_ubyte, c_uint
- from utils.log_signal import LogSignal
- from utils.resource import resource_path, resource_path_extend
- PYINSTALLER = 1
- if PYINSTALLER:
- if platform == "win64":
- _CanDLLName = resource_path("config\\ControlCANx64\\ControlCAN.dll")
- ZLGCAN = ctypes.windll.LoadLibrary(_CanDLLName)
- elif platform == "win32":
- _CanDLLName = resource_path("config\\ControlCANx86\\ControlCAN.dll")
- ZLGCAN = ctypes.windll.LoadLibrary(_CanDLLName)
- elif platform == "linux":
- _CanDLLName = resource_path("config/linux/libusbcan.so")
- ZLGCAN = ctypes.cdll.LoadLibrary(_CanDLLName)
- else:
- ZLGCAN = ctypes.cdll.LoadLibrary(find_library("libPCBUSB.dylib"))
- else:
- if platform == "win64":
- resource_path_extend("config")
- ZLGCAN = ctypes.windll.LoadLibrary("ControlCAN.dll")
- elif platform == "win32":
- resource_path_extend("config")
- ZLGCAN = ctypes.windll.LoadLibrary("ControlCAN.dll")
- else:
- ZLGCAN = ""
- ubyte_array = c_ubyte * 8
- ubyte_3array = c_ubyte * 3
- # can type
- CANTYPE = {
- 'USBCAN-I': 3,
- 'USBCAN-II': 4,
- }
- # can mode
- NORMAL_MODE = 0
- LISTEN_MODE = 1
- # filter type
- SINGLE_FILTER = 0
- DOUBLE_FILTER = 1
- # status
- STATUS_OK = 1
- # sendtype
- SEND_NORMAL = 0
- SEND_SINGLE = 1
- SELF_SEND_RECV = 2
- SELF_SEND_RECV_SINGLE = 3
- class VCI_INIT_CONFIG(ctypes.Structure):
- _fields_ = [("AccCode", c_uint), # 验收码。SJA1000的帧过滤验收码。对经过屏蔽码过滤为“有关位”进行匹配,全部匹配成功后,此帧可以被接收。
- # 屏蔽码。SJA1000的帧过滤屏蔽码。对接收的CAN帧ID进行过滤,对应位为0的是“有关位”,对应位为1的是“无关位”。屏蔽码推荐设置为0xFFFFFFFF,即全部接收。
- ("AccMask", c_uint),
- # 保留
- ("Reserved", c_uint),
- # 滤波方式
- ("Filter", c_ubyte),
- # 波特率定时器 0
- ("Timing0", c_ubyte),
- # 波特率定时器 1
- ("Timing1", c_ubyte),
- # 模式。=0表示正常模式(相当于正常节点),=1表示只听模式(只接收,不影响总线),=2表示自发自收模式(环回模式)。
- ("Mode", c_ubyte)
- ]
- # VCI_CAN_OBJ结构体是CAN帧结构体,即1个结构体表示一个帧的数据结构。在发送函数VCI_Transmit和接收函数VCI_Receive中,被用来传送CAN信息帧。
- class VCI_CAN_OBJ(ctypes.Structure):
- _fields_ = [("ID", c_uint), # 帧ID。32位变量,数据格式为靠右对齐
- # 设备接收到某一帧的时间标识。时间标示从CAN卡上电开始计时,计时单位为0.1ms。
- ("TimeStamp", c_uint),
- # 是否使用时间标识,为1时TimeStamp有效,TimeFlag和TimeStamp只在此帧为接收帧时有意义。
- ("TimeFlag", c_ubyte),
- # 发送帧类型。=0时为正常发送(发送失败会自动重发,重发时间为4秒,4秒内没有发出则取消);=1时为单次发送(只发送一次,发送失败不会自动重发,总线只产生一帧数据);其它值无效。
- ("SendType", c_ubyte),
- # 是否是远程帧。=0时为为数据帧,=1时为远程帧(数据段空)。
- ("RemoteFlag", c_ubyte),
- # 是否是扩展帧。=0时为标准帧(11位ID),=1时为扩展帧(29位ID)。
- ("ExternFlag", c_ubyte),
- # 数据长度 DLC (<=8),即CAN帧Data有几个字节。约束了后面Data[8]中的有效字节
- ("DataLen", c_ubyte),
- ("Data", c_ubyte * 8),
- # CAN帧的数据。由于CAN规定了最大是8个字节,所以这里预留了8个字节的空间,受DataLen约束。如DataLen定义为3,即Data[0]、Data[1]、Data[2]是有效的
- ("Reserved", c_ubyte * 3) # 系统保留
- ]
- class PVCI_ERR_INFO(ctypes.Structure):
- _fields_ = [("ErrorCode", c_uint),
- ("PassiveErrData", c_ubyte * 3),
- ("ArLostErrData", c_ubyte)
- ]
- baudRateConfig = {
- '5Kbps': {'time0': 0xBF, 'time1': 0xFF},
- '10Kbps': {'time0': 0x31, 'time1': 0x1C},
- '20Kbps': {'time0': 0x18, 'time1': 0x1C},
- '40Kbps': {'time0': 0x87, 'time1': 0xFF},
- '50Kbps': {'time0': 0x09, 'time1': 0x1C},
- '80Kbps': {'time0': 0x83, 'time1': 0xFF},
- '100Kbps': {'time0': 0x04, 'time1': 0x1C},
- '125Kbps': {'time0': 0x03, 'time1': 0x1C},
- '200Kbps': {'time0': 0x81, 'time1': 0xFA},
- '250Kbps': {'time0': 0x01, 'time1': 0x1C},
- '400Kbps': {'time0': 0x80, 'time1': 0xFA},
- '500Kbps': {'time0': 0x00, 'time1': 0x1C},
- '666Kbps': {'time0': 0x80, 'time1': 0xB6},
- '800Kbps': {'time0': 0x00, 'time1': 0x16},
- '1000Kbps': {'time0': 0x00, 'time1': 0x14},
- }
- class MessageDeal:
- def __init__(self):
- self.canType = CANTYPE['USBCAN-II']
- def set_can_board(self, canIndex, canChannel, canBaudrate):
- self.canIndex = canIndex
- self.canChannel = canChannel
- self.canBaudrate = canBaudrate
- def open_device(self):
- ret = ZLGCAN.VCI_OpenDevice(self.canType, self.canChannel, self.canChannel)
- if ret != STATUS_OK:
- # LogSignal.print_log_signal().log_emit('打开CAN卡失败: {}'.format(str(self.canChannel)) if SD.SYSTEM_LANGUAGE == 0 else 'CAN Device Error: {}'.format(str(self.canChannel)))
- return False
- return True
- def init_can(self, accCode, accMask):
- # 初始化通道
- _vci_initconfig = VCI_INIT_CONFIG(accCode, accMask, 0, DOUBLE_FILTER,
- baudRateConfig[self.canBaudrate]['time0'],
- baudRateConfig[self.canBaudrate]['time1'],
- NORMAL_MODE)
- ret = ZLGCAN.VCI_InitCAN(self.canType, self.canIndex, self.canChannel, ctypes.byref(_vci_initconfig))
- if ret != STATUS_OK:
- # LogSignal.print_log_signal().log_emit('初始化CAN卡失败: {}'.format(str(self.canChannel)) if SD.SYSTEM_LANGUAGE == 0 else 'CAN Open Error: {}'.format(str(self.canChannel)))
- return False
- ret = ZLGCAN.VCI_StartCAN(self.canType, self.canIndex, self.canChannel)
- if ret != STATUS_OK:
- # LogSignal.print_log_signal().log_emit('启动CAN卡失败: {}'.format(str(self.canChannel))if SD.SYSTEM_LANGUAGE == 0 else 'CAN Start Error: {}'.format(str(self.canChannel)))
- return False
- return True
- def get_undeal_number(self):
- return ZLGCAN.VCI_GetReceiveNum(self.canType, self.canIndex, self.canChannel)
- def receive(self, number=1):
- objs = (VCI_CAN_OBJ * number)()
- ret = ZLGCAN.VCI_Receive(self.canType, self.canIndex, self.canChannel, ctypes.byref(objs), number, 10)
- if ret == 0xFFFFFFFF:
- return None
- else:
- return objs[:ret]
- def send(self, ID, data, remote_flag=False, extern_flag=False, data_len=8):
- vci_can_obj = VCI_CAN_OBJ()
- vci_can_obj.ID = ID
- vci_can_obj.SendType = SEND_NORMAL
- if remote_flag:
- vci_can_obj.RemoteFlag = 1
- else:
- vci_can_obj.RemoteFlag = 0
- if extern_flag:
- vci_can_obj.ExternFlag = 1
- else:
- vci_can_obj.ExternFlag = 0
- vci_can_obj.DataLen = data_len
- if len(data) < 8:
- data += (8 - len(data)) * [0]
- vci_can_obj.Data = (c_ubyte * 8)(data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7])
- ret = ZLGCAN.VCI_Transmit(self.canType, self.canIndex, self.canChannel, ctypes.byref(vci_can_obj), 1)
- if ret != STATUS_OK:
- LogSignal.print_log_signal().log_emit("CAN Send Error!")
- return False
- else:
- return True
- def read_err_info(self):
- errInfo = PVCI_ERR_INFO(0, ubyte_3array(0, 0, 0), 0)
- ZLGCAN.VCI_ReadErrInfo(self.canType, self.canIndex, self.canChannel, ctypes.byref(errInfo))
- LogSignal.print_log_signal().log_emit(errInfo.ErrorCode, errInfo.PassiveErrData[0], errInfo.PassiveErrData[1], errInfo.PassiveErrData[2], errInfo.ArLostErrData)
- def clear_buffer(self):
- return ZLGCAN.VCI_ClearBuffer(self.canType, self.canIndex, self.canChannel)
- def close_can(self):
- return ZLGCAN.VCI_CloseDevice(self.canType, self.canIndex)
|