can.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. #!/usr/bin/env python
  2. # -*- encoding: utf-8 -*-
  3. '''
  4. @文件 :canConnect.py
  5. @时间 :2021/12/09 13:47:58
  6. @作者 :None
  7. @版本 :1.0
  8. @说明 :CAN连接驱动
  9. '''
  10. from ctypes.util import find_library
  11. from sys import platform
  12. import ctypes
  13. from ctypes import c_ubyte, c_uint
  14. from utils.log_signal import LogSignal
  15. from utils.resource import resource_path, resource_path_extend
  16. PYINSTALLER = 1
  17. if PYINSTALLER:
  18. if platform == "win64":
  19. _CanDLLName = resource_path("config\\ControlCANx64\\ControlCAN.dll")
  20. ZLGCAN = ctypes.windll.LoadLibrary(_CanDLLName)
  21. elif platform == "win32":
  22. _CanDLLName = resource_path("config\\ControlCANx86\\ControlCAN.dll")
  23. ZLGCAN = ctypes.windll.LoadLibrary(_CanDLLName)
  24. elif platform == "linux":
  25. _CanDLLName = resource_path("config/linux/libusbcan.so")
  26. ZLGCAN = ctypes.cdll.LoadLibrary(_CanDLLName)
  27. else:
  28. ZLGCAN = ctypes.cdll.LoadLibrary(find_library("libPCBUSB.dylib"))
  29. else:
  30. if platform == "win64":
  31. resource_path_extend("config")
  32. ZLGCAN = ctypes.windll.LoadLibrary("ControlCAN.dll")
  33. elif platform == "win32":
  34. resource_path_extend("config")
  35. ZLGCAN = ctypes.windll.LoadLibrary("ControlCAN.dll")
  36. else:
  37. ZLGCAN = ""
  38. ubyte_array = c_ubyte * 8
  39. ubyte_3array = c_ubyte * 3
  40. # can type
  41. CANTYPE = {
  42. 'USBCAN-I': 3,
  43. 'USBCAN-II': 4,
  44. }
  45. # can mode
  46. NORMAL_MODE = 0
  47. LISTEN_MODE = 1
  48. # filter type
  49. SINGLE_FILTER = 0
  50. DOUBLE_FILTER = 1
  51. # status
  52. STATUS_OK = 1
  53. # sendtype
  54. SEND_NORMAL = 0
  55. SEND_SINGLE = 1
  56. SELF_SEND_RECV = 2
  57. SELF_SEND_RECV_SINGLE = 3
  58. class VCI_INIT_CONFIG(ctypes.Structure):
  59. _fields_ = [("AccCode", c_uint), # 验收码。SJA1000的帧过滤验收码。对经过屏蔽码过滤为“有关位”进行匹配,全部匹配成功后,此帧可以被接收。
  60. # 屏蔽码。SJA1000的帧过滤屏蔽码。对接收的CAN帧ID进行过滤,对应位为0的是“有关位”,对应位为1的是“无关位”。屏蔽码推荐设置为0xFFFFFFFF,即全部接收。
  61. ("AccMask", c_uint),
  62. # 保留
  63. ("Reserved", c_uint),
  64. # 滤波方式
  65. ("Filter", c_ubyte),
  66. # 波特率定时器 0
  67. ("Timing0", c_ubyte),
  68. # 波特率定时器 1
  69. ("Timing1", c_ubyte),
  70. # 模式。=0表示正常模式(相当于正常节点),=1表示只听模式(只接收,不影响总线),=2表示自发自收模式(环回模式)。
  71. ("Mode", c_ubyte)
  72. ]
  73. # VCI_CAN_OBJ结构体是CAN帧结构体,即1个结构体表示一个帧的数据结构。在发送函数VCI_Transmit和接收函数VCI_Receive中,被用来传送CAN信息帧。
  74. class VCI_CAN_OBJ(ctypes.Structure):
  75. _fields_ = [("ID", c_uint), # 帧ID。32位变量,数据格式为靠右对齐
  76. # 设备接收到某一帧的时间标识。时间标示从CAN卡上电开始计时,计时单位为0.1ms。
  77. ("TimeStamp", c_uint),
  78. # 是否使用时间标识,为1时TimeStamp有效,TimeFlag和TimeStamp只在此帧为接收帧时有意义。
  79. ("TimeFlag", c_ubyte),
  80. # 发送帧类型。=0时为正常发送(发送失败会自动重发,重发时间为4秒,4秒内没有发出则取消);=1时为单次发送(只发送一次,发送失败不会自动重发,总线只产生一帧数据);其它值无效。
  81. ("SendType", c_ubyte),
  82. # 是否是远程帧。=0时为为数据帧,=1时为远程帧(数据段空)。
  83. ("RemoteFlag", c_ubyte),
  84. # 是否是扩展帧。=0时为标准帧(11位ID),=1时为扩展帧(29位ID)。
  85. ("ExternFlag", c_ubyte),
  86. # 数据长度 DLC (<=8),即CAN帧Data有几个字节。约束了后面Data[8]中的有效字节
  87. ("DataLen", c_ubyte),
  88. ("Data", c_ubyte * 8),
  89. # CAN帧的数据。由于CAN规定了最大是8个字节,所以这里预留了8个字节的空间,受DataLen约束。如DataLen定义为3,即Data[0]、Data[1]、Data[2]是有效的
  90. ("Reserved", c_ubyte * 3) # 系统保留
  91. ]
  92. class PVCI_ERR_INFO(ctypes.Structure):
  93. _fields_ = [("ErrorCode", c_uint),
  94. ("PassiveErrData", c_ubyte * 3),
  95. ("ArLostErrData", c_ubyte)
  96. ]
  97. baudRateConfig = {
  98. '5Kbps': {'time0': 0xBF, 'time1': 0xFF},
  99. '10Kbps': {'time0': 0x31, 'time1': 0x1C},
  100. '20Kbps': {'time0': 0x18, 'time1': 0x1C},
  101. '40Kbps': {'time0': 0x87, 'time1': 0xFF},
  102. '50Kbps': {'time0': 0x09, 'time1': 0x1C},
  103. '80Kbps': {'time0': 0x83, 'time1': 0xFF},
  104. '100Kbps': {'time0': 0x04, 'time1': 0x1C},
  105. '125Kbps': {'time0': 0x03, 'time1': 0x1C},
  106. '200Kbps': {'time0': 0x81, 'time1': 0xFA},
  107. '250Kbps': {'time0': 0x01, 'time1': 0x1C},
  108. '400Kbps': {'time0': 0x80, 'time1': 0xFA},
  109. '500Kbps': {'time0': 0x00, 'time1': 0x1C},
  110. '666Kbps': {'time0': 0x80, 'time1': 0xB6},
  111. '800Kbps': {'time0': 0x00, 'time1': 0x16},
  112. '1000Kbps': {'time0': 0x00, 'time1': 0x14},
  113. }
  114. class MessageDeal:
  115. def __init__(self):
  116. self.canType = CANTYPE['USBCAN-II']
  117. def set_can_board(self, canIndex, canChannel, canBaudrate):
  118. self.canIndex = canIndex
  119. self.canChannel = canChannel
  120. self.canBaudrate = canBaudrate
  121. def open_device(self):
  122. ret = ZLGCAN.VCI_OpenDevice(self.canType, self.canChannel, self.canChannel)
  123. if ret != STATUS_OK:
  124. # LogSignal.print_log_signal().log_emit('打开CAN卡失败: {}'.format(str(self.canChannel)) if SD.SYSTEM_LANGUAGE == 0 else 'CAN Device Error: {}'.format(str(self.canChannel)))
  125. return False
  126. return True
  127. def init_can(self, accCode, accMask):
  128. # 初始化通道
  129. _vci_initconfig = VCI_INIT_CONFIG(accCode, accMask, 0, DOUBLE_FILTER,
  130. baudRateConfig[self.canBaudrate]['time0'],
  131. baudRateConfig[self.canBaudrate]['time1'],
  132. NORMAL_MODE)
  133. ret = ZLGCAN.VCI_InitCAN(self.canType, self.canIndex, self.canChannel, ctypes.byref(_vci_initconfig))
  134. if ret != STATUS_OK:
  135. # LogSignal.print_log_signal().log_emit('初始化CAN卡失败: {}'.format(str(self.canChannel)) if SD.SYSTEM_LANGUAGE == 0 else 'CAN Open Error: {}'.format(str(self.canChannel)))
  136. return False
  137. ret = ZLGCAN.VCI_StartCAN(self.canType, self.canIndex, self.canChannel)
  138. if ret != STATUS_OK:
  139. # LogSignal.print_log_signal().log_emit('启动CAN卡失败: {}'.format(str(self.canChannel))if SD.SYSTEM_LANGUAGE == 0 else 'CAN Start Error: {}'.format(str(self.canChannel)))
  140. return False
  141. return True
  142. def get_undeal_number(self):
  143. return ZLGCAN.VCI_GetReceiveNum(self.canType, self.canIndex, self.canChannel)
  144. def receive(self, number=1):
  145. objs = (VCI_CAN_OBJ * number)()
  146. ret = ZLGCAN.VCI_Receive(self.canType, self.canIndex, self.canChannel, ctypes.byref(objs), number, 10)
  147. if ret == 0xFFFFFFFF:
  148. return None
  149. else:
  150. return objs[:ret]
  151. def send(self, ID, data, remote_flag=False, extern_flag=False, data_len=8):
  152. vci_can_obj = VCI_CAN_OBJ()
  153. vci_can_obj.ID = ID
  154. vci_can_obj.SendType = SEND_NORMAL
  155. if remote_flag:
  156. vci_can_obj.RemoteFlag = 1
  157. else:
  158. vci_can_obj.RemoteFlag = 0
  159. if extern_flag:
  160. vci_can_obj.ExternFlag = 1
  161. else:
  162. vci_can_obj.ExternFlag = 0
  163. vci_can_obj.DataLen = data_len
  164. if len(data) < 8:
  165. data += (8 - len(data)) * [0]
  166. vci_can_obj.Data = (c_ubyte * 8)(data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7])
  167. ret = ZLGCAN.VCI_Transmit(self.canType, self.canIndex, self.canChannel, ctypes.byref(vci_can_obj), 1)
  168. if ret != STATUS_OK:
  169. LogSignal.print_log_signal().log_emit("CAN Send Error!")
  170. return False
  171. else:
  172. return True
  173. def read_err_info(self):
  174. errInfo = PVCI_ERR_INFO(0, ubyte_3array(0, 0, 0), 0)
  175. ZLGCAN.VCI_ReadErrInfo(self.canType, self.canIndex, self.canChannel, ctypes.byref(errInfo))
  176. LogSignal.print_log_signal().log_emit(errInfo.ErrorCode, errInfo.PassiveErrData[0], errInfo.PassiveErrData[1], errInfo.PassiveErrData[2], errInfo.ArLostErrData)
  177. def clear_buffer(self):
  178. return ZLGCAN.VCI_ClearBuffer(self.canType, self.canIndex, self.canChannel)
  179. def close_can(self):
  180. return ZLGCAN.VCI_CloseDevice(self.canType, self.canIndex)