#!/usr/bin/env python # -*- encoding: utf-8 -*- import struct import time from utils.modbus import LOGGER from utils.modbus.modbus import ( Databank, Query, Master, Server, InvalidArgumentError, ModbusInvalidResponseError, ModbusInvalidRequestError ) from utils.modbus.hooks import call_hooks from utils.modbus import utils class RtuQuery(Query): def __init__(self): super(RtuQuery, self).__init__() self._request_address = 0 self._response_address = 0 def build_request(self, pdu, slave): self._request_address = slave if (self._request_address < 0) or (self._request_address > 255): raise InvalidArgumentError("Invalid address {0}".format(self._request_address)) data = struct.pack(">B", self._request_address) + pdu crc = struct.pack(">H", utils.calculate_crc(data)) return data + crc def parse_response(self, response): if len(response) < 3: raise ModbusInvalidResponseError("Response length is invalid {0}".format(len(response))) (self._response_address, ) = struct.unpack(">B", response[0:1]) if self._request_address != self._response_address: raise ModbusInvalidResponseError( "Response address {0} is different from request address {1}".format( self._response_address, self._request_address ) ) (crc, ) = struct.unpack(">H", response[-2:]) if crc != utils.calculate_crc(response[:-2]): raise ModbusInvalidResponseError("Invalid CRC in response") return response[1:-2] def parse_request(self, request): if len(request) < 3: raise ModbusInvalidRequestError("Request length is invalid {0}".format(len(request))) (self._request_address, ) = struct.unpack(">B", request[0:1]) (crc, ) = struct.unpack(">H", request[-2:]) if crc != utils.calculate_crc(request[:-2]): raise ModbusInvalidRequestError("Invalid CRC in request") return self._request_address, request[1:-2] def build_response(self, response_pdu): self._response_address = self._request_address data = struct.pack(">B", self._response_address) + response_pdu crc = struct.pack(">H", utils.calculate_crc(data)) return data + crc class RtuMaster(Master): def __init__(self, serial, interchar_multiplier=1.5, interframe_multiplier=3.5, t0=None): self._serial = serial self.use_sw_timeout = False LOGGER.debug("RtuMaster %s is %s", self._serial.name, "opened" if self._serial.is_open else "closed") super(RtuMaster, self).__init__(self._serial.timeout) if t0: self._t0 = t0 else: self._t0 = utils.calculate_rtu_inter_char(self._serial.baudrate) self._serial.inter_byte_timeout = interchar_multiplier * self._t0 self.set_timeout(interframe_multiplier * self._t0) self.handle_local_echo = False def _do_open(self): if not self._serial.is_open: call_hooks("modbus_rtu.RtuMaster.before_open", (self, )) self._serial.open() def _do_close(self): if self._serial.is_open: self._serial.close() call_hooks("modbus_rtu.RtuMaster.after_close", (self, )) return True def set_timeout(self, timeout_in_sec, use_sw_timeout=False): Master.set_timeout(self, timeout_in_sec) self._serial.timeout = timeout_in_sec self.use_sw_timeout = use_sw_timeout def _send(self, request): retval = call_hooks("modbus_rtu.RtuMaster.before_send", (self, request)) if retval is not None: request = retval self._serial.reset_input_buffer() self._serial.reset_output_buffer() self._serial.write(request) self._serial.flush() if self.handle_local_echo: self._serial.read(len(request)) def _recv(self, expected_length=-1): response = utils.to_data("") start_time = time.time() if self.use_sw_timeout else 0 readed_len = 0 while True: if self._serial.timeout: read_bytes = self._serial.read(expected_length - readed_len if (expected_length - readed_len) > 0 else 1) else: read_bytes = self._serial.read(expected_length if expected_length > 0 else 1) if self.use_sw_timeout: read_duration = time.time() - start_time else: read_duration = 0 if (not read_bytes) or (read_duration > self._serial.timeout): break response += read_bytes if expected_length >= 0 and len(response) >= expected_length: break readed_len += len(read_bytes) retval = call_hooks("modbus_rtu.RtuMaster.after_recv", (self, response)) if retval is not None: return retval return response def _make_query(self): return RtuQuery() class RtuServer(Server): _timeout = 0 def __init__(self, serial, databank=None, error_on_missing_slave=True, **kwargs): interframe_multiplier = kwargs.pop('interframe_multiplier', 3.5) interchar_multiplier = kwargs.pop('interchar_multiplier', 1.5) databank = databank if databank else Databank(error_on_missing_slave=error_on_missing_slave) super(RtuServer, self).__init__(databank) self._serial = serial LOGGER.debug("RtuServer %s is %s", self._serial.name, "opened" if self._serial.is_open else "closed") self._t0 = utils.calculate_rtu_inter_char(self._serial.baudrate) self._serial.inter_byte_timeout = interchar_multiplier * self._t0 self.set_timeout(interframe_multiplier * self._t0) self._block_on_first_byte = False def close(self): if self._serial.is_open: call_hooks("modbus_rtu.RtuServer.before_close", (self, )) self._serial.close() call_hooks("modbus_rtu.RtuServer.after_close", (self, )) def set_timeout(self, timeout): self._timeout = timeout self._serial.timeout = timeout def get_timeout(self): return self._timeout def __del__(self): self.close() def _make_query(self): return RtuQuery() def start(self): self._block_on_first_byte = True super(RtuServer, self).start() def stop(self): self._block_on_first_byte = False if self._serial.is_open: self._serial.cancel_read() super(RtuServer, self).stop() def _do_init(self): if not self._serial.is_open: call_hooks("modbus_rtu.RtuServer.before_open", (self, )) self._serial.open() call_hooks("modbus_rtu.RtuServer.after_open", (self, )) def _do_exit(self): self.close() def _do_run(self): try: request = utils.to_data('') if self._block_on_first_byte: self._serial.timeout = None try: read_bytes = self._serial.read(1) request += read_bytes except Exception as e: self._serial.close() self._serial.open() self._serial.timeout = self._timeout while True: try: read_bytes = self._serial.read(128) if not read_bytes: break except Exception as e: self._serial.close() self._serial.open() break request += read_bytes if request: retval = call_hooks("modbus_rtu.RtuServer.after_read", (self, request)) if retval is not None: request = retval response = self._handle(request) retval = call_hooks("modbus_rtu.RtuServer.before_write", (self, response)) if retval is not None: response = retval if response: if self._serial.in_waiting > 0: LOGGER.warning("Not sending response because there is new request pending") else: self._serial.write(response) self._serial.flush() time.sleep(self.get_timeout()) call_hooks("modbus_rtu.RtuServer.after_write", (self, response)) except Exception as excpt: LOGGER.error("Error while handling request, Exception occurred: %s", excpt) call_hooks("modbus_rtu.RtuServer.on_error", (self, excpt))