123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- #!/usr/bin/env python
- # -*- encoding: utf-8 -*-
- import struct
- import time
- from utils.modbus import LOGGER
- from utils.modbus.modbus import (
- Databank, Query, Master, Server,
- InvalidArgumentError, ModbusInvalidResponseError, ModbusInvalidRequestError
- )
- from utils.modbus.hooks import call_hooks
- from utils.modbus import utils
- class RtuQuery(Query):
- def __init__(self):
- super(RtuQuery, self).__init__()
- self._request_address = 0
- self._response_address = 0
- def build_request(self, pdu, slave):
- self._request_address = slave
- if (self._request_address < 0) or (self._request_address > 255):
- raise InvalidArgumentError("Invalid address {0}".format(self._request_address))
- data = struct.pack(">B", self._request_address) + pdu
- crc = struct.pack(">H", utils.calculate_crc(data))
- return data + crc
- def parse_response(self, response):
- if len(response) < 3:
- raise ModbusInvalidResponseError("Response length is invalid {0}".format(len(response)))
- (self._response_address, ) = struct.unpack(">B", response[0:1])
- if self._request_address != self._response_address:
- raise ModbusInvalidResponseError(
- "Response address {0} is different from request address {1}".format(
- self._response_address, self._request_address
- )
- )
- (crc, ) = struct.unpack(">H", response[-2:])
- if crc != utils.calculate_crc(response[:-2]):
- raise ModbusInvalidResponseError("Invalid CRC in response")
- return response[1:-2]
- def parse_request(self, request):
- if len(request) < 3:
- raise ModbusInvalidRequestError("Request length is invalid {0}".format(len(request)))
- (self._request_address, ) = struct.unpack(">B", request[0:1])
- (crc, ) = struct.unpack(">H", request[-2:])
- if crc != utils.calculate_crc(request[:-2]):
- raise ModbusInvalidRequestError("Invalid CRC in request")
- return self._request_address, request[1:-2]
- def build_response(self, response_pdu):
- self._response_address = self._request_address
- data = struct.pack(">B", self._response_address) + response_pdu
- crc = struct.pack(">H", utils.calculate_crc(data))
- return data + crc
- class RtuMaster(Master):
- def __init__(self, serial, interchar_multiplier=1.5, interframe_multiplier=3.5, t0=None):
- self._serial = serial
- self.use_sw_timeout = False
- LOGGER.debug("RtuMaster %s is %s", self._serial.name, "opened" if self._serial.is_open else "closed")
- super(RtuMaster, self).__init__(self._serial.timeout)
- if t0:
- self._t0 = t0
- else:
- self._t0 = utils.calculate_rtu_inter_char(self._serial.baudrate)
- self._serial.inter_byte_timeout = interchar_multiplier * self._t0
- self.set_timeout(interframe_multiplier * self._t0)
- self.handle_local_echo = False
- def _do_open(self):
- if not self._serial.is_open:
- call_hooks("modbus_rtu.RtuMaster.before_open", (self, ))
- self._serial.open()
- def _do_close(self):
- if self._serial.is_open:
- self._serial.close()
- call_hooks("modbus_rtu.RtuMaster.after_close", (self, ))
- return True
- def set_timeout(self, timeout_in_sec, use_sw_timeout=False):
- Master.set_timeout(self, timeout_in_sec)
- self._serial.timeout = timeout_in_sec
- self.use_sw_timeout = use_sw_timeout
- def _send(self, request):
- retval = call_hooks("modbus_rtu.RtuMaster.before_send", (self, request))
- if retval is not None:
- request = retval
- self._serial.reset_input_buffer()
- self._serial.reset_output_buffer()
- self._serial.write(request)
- self._serial.flush()
- if self.handle_local_echo:
- self._serial.read(len(request))
- def _recv(self, expected_length=-1):
- response = utils.to_data("")
- start_time = time.time() if self.use_sw_timeout else 0
- readed_len = 0
- while True:
- if self._serial.timeout:
- read_bytes = self._serial.read(expected_length - readed_len if (expected_length - readed_len) > 0 else 1)
- else:
- read_bytes = self._serial.read(expected_length if expected_length > 0 else 1)
- if self.use_sw_timeout:
- read_duration = time.time() - start_time
- else:
- read_duration = 0
- if (not read_bytes) or (read_duration > self._serial.timeout):
- break
- response += read_bytes
- if expected_length >= 0 and len(response) >= expected_length:
- break
- readed_len += len(read_bytes)
- retval = call_hooks("modbus_rtu.RtuMaster.after_recv", (self, response))
- if retval is not None:
- return retval
- return response
- def _make_query(self):
- return RtuQuery()
- class RtuServer(Server):
- _timeout = 0
- def __init__(self, serial, databank=None, error_on_missing_slave=True, **kwargs):
- interframe_multiplier = kwargs.pop('interframe_multiplier', 3.5)
- interchar_multiplier = kwargs.pop('interchar_multiplier', 1.5)
- databank = databank if databank else Databank(error_on_missing_slave=error_on_missing_slave)
- super(RtuServer, self).__init__(databank)
- self._serial = serial
- LOGGER.debug("RtuServer %s is %s", self._serial.name, "opened" if self._serial.is_open else "closed")
- self._t0 = utils.calculate_rtu_inter_char(self._serial.baudrate)
- self._serial.inter_byte_timeout = interchar_multiplier * self._t0
- self.set_timeout(interframe_multiplier * self._t0)
- self._block_on_first_byte = False
- def close(self):
- if self._serial.is_open:
- call_hooks("modbus_rtu.RtuServer.before_close", (self, ))
- self._serial.close()
- call_hooks("modbus_rtu.RtuServer.after_close", (self, ))
- def set_timeout(self, timeout):
- self._timeout = timeout
- self._serial.timeout = timeout
- def get_timeout(self):
- return self._timeout
- def __del__(self):
- self.close()
- def _make_query(self):
- return RtuQuery()
- def start(self):
- self._block_on_first_byte = True
- super(RtuServer, self).start()
- def stop(self):
- self._block_on_first_byte = False
- if self._serial.is_open:
- self._serial.cancel_read()
- super(RtuServer, self).stop()
- def _do_init(self):
- if not self._serial.is_open:
- call_hooks("modbus_rtu.RtuServer.before_open", (self, ))
- self._serial.open()
- call_hooks("modbus_rtu.RtuServer.after_open", (self, ))
- def _do_exit(self):
- self.close()
- def _do_run(self):
- try:
- request = utils.to_data('')
- if self._block_on_first_byte:
- self._serial.timeout = None
- try:
- read_bytes = self._serial.read(1)
- request += read_bytes
- except Exception as e:
- self._serial.close()
- self._serial.open()
- self._serial.timeout = self._timeout
- while True:
- try:
- read_bytes = self._serial.read(128)
- if not read_bytes:
- break
- except Exception as e:
- self._serial.close()
- self._serial.open()
- break
- request += read_bytes
- if request:
- retval = call_hooks("modbus_rtu.RtuServer.after_read", (self, request))
- if retval is not None:
- request = retval
- response = self._handle(request)
- retval = call_hooks("modbus_rtu.RtuServer.before_write", (self, response))
- if retval is not None:
- response = retval
- if response:
- if self._serial.in_waiting > 0:
- LOGGER.warning("Not sending response because there is new request pending")
- else:
- self._serial.write(response)
- self._serial.flush()
- time.sleep(self.get_timeout())
- call_hooks("modbus_rtu.RtuServer.after_write", (self, response))
- except Exception as excpt:
- LOGGER.error("Error while handling request, Exception occurred: %s", excpt)
- call_hooks("modbus_rtu.RtuServer.on_error", (self, excpt))
|