12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061 |
- #!/usr/bin/env python
- # -*- encoding: utf-8 -*-
- '''
- @文件 :com.py
- @时间 :2022/11/14 17:28:23
- @作者 :None
- @版本 :1.0
- @说明 :modbus 485驱动
- '''
- import serial.tools.list_ports
- import serial
- from serial.serialutil import SerialException
- from utils.utils import calculate_crc_16
- class ComMaster:
- def __init__(self):
- super(ComMaster, self).__init__()
- self.master = None
- def set_com(self, port, baudrate, bytesize, parity, stopbits):
- self.port = port
- self.baudrate = baudrate
- self.bytesize = bytesize
- self.parity = parity
- self.stopbits = stopbits
- def open_device(self):
- try:
- self.com_manager = serial.Serial(port=self.port, baudrate=self.baudrate, bytesize=self.bytesize, parity=self.parity, stopbits=self.stopbits, xonxoff=0)
- # if (self.com_manager is open):
- return True
- except SerialException:
- return False
- def send_16h(self, slave, addr, num, number_write, data):
- buf = [slave, 0x16, (addr >> 8) & 0xFF, addr & 0xFF, (num >> 8) & 0xFF, num & 0xFF, number_write] + data
- crc = calculate_crc_16(buf, len(buf))
- buf = buf + [crc & 0xFF, (crc >> 8) & 0xFF]
- self.com_manager.write(buf)
- def send(self, addr=0, command=0, param1=0, param0=0, data3=0, data2=0, data1=0, data0=0):
- buf = [addr, command, param1, param0, data3, data2, data1, data0]
- self.com_manager.write(buf)
- def reveive_env_version(self):
- sub_seq = list()
- try:
- data_count = self.com_manager.inWaiting()
- if data_count == 5:
- result = self.com_manager.read(data_count).hex()
- ptr = 0
- while ptr < len(result):
- sub_seq += [int('0x' + result[ptr:ptr + 2], 16)]
- ptr += 2
- else:
- return [0]
- except Exception as e:
- print(str(e))
- return sub_seq
|