bparzella / secsgem

Simple Python SECS/GEM implementation
GNU Lesser General Public License v2.1
170 stars 85 forks source link

TCP / IP communication #1

Closed wolfc01 closed 9 years ago

wolfc01 commented 9 years ago

Hello,

I'm interested in your efforts and looked into your code.

For handling sockets we have done extensive research, and it turned out that usage of blocking sockets induces complexity in code and causes situations where sockets cannot be closed (especially when the other end on the connection is doing strange things)

This is also in your code, e.g.

this is evil madness

    socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect(("localhost", self.port))

The answer is to use select calls in combination with nonblocking sockets. If you are interested, I can send you a tcp library we use, just for inspiration.

Best regards Carl.

bparzella commented 9 years ago

Hi Carl,

thanks for your input.

You are right, the blocking sockets are quite a mess. The codes beginning was a simple command line script and it grew from that without getting some needed reworks.

The tcp library sounds interesting, I'd like to take a look if that is okay.

Regards Benjamin

wolfc01 commented 9 years ago

Hello Benjamin,

see attached file, with a number of useful classes and functions for working with TCP from Python. Note that this functionality is extensively researched on and running on several semi production machines used worldwide and also in aerodynamic wind tunnels. So running in serious projects .

I stripped the library from the framework where is is in, and entered some TODO's resuting from stripping the library (mostly logging iso printing). I replaced all logs with printing, as the library used log statements from the framework where it was in. I did not retest it after the stripping.....

I think this lib will be useful and eliminating some lines of code in your secs / gem lib.

Best regards Carl.

2015-04-07 20:15 GMT+02:00 bparzella notifications@github.com:

Hi Carl,

thanks for your input.

You are right, the blocking sockets are quite a mess. The codes beginning was a simple command line script and it grew from that without getting some needed reworks.

The tcp library sounds interesting, I'd like to take a look if that is okay.

Regards Benjamin

— Reply to this email directly or view it on GitHub https://github.com/bparzella/secsgem/issues/1#issuecomment-90686827.

'''

TCP

Contains primitives for TCP communication. One only needs to create a connection object, which has an interface that supports sending any Python object. To create a connection in a client program, use the following code::

connection = Client((host, port))

The server program can create a server as follows::

server = Server((host, port)) connection = server.accept()

Once a connection exists, it can be used by using its :meth:send <_Connection.send> and :meth:receive <_Connection.receive> methods::

connection.send(some_object) some_object = connection.receive()

accept, send and receive block until a new connection is created, the object is sent and an object is received respectively. In any other case an exception is raised. All three methods can be unblocked by calling the close method of the :class:Server or :class:_Connection.

Note: sendBuffer, receiveBuffer and receiveLine are only to be used to communicate with something outside python (no python objects). '''

import socket import select import cPickle import struct import threading import traceback import sys import time

SEND_CHUNK_SIZE = int(1E6) # Send large messages in 1 MB chunks MARKER = 'DEAD' # Marks the size of the message SIZE_FMT = '>Q' # Size is send as 64 bit unsigned integer big endian HEADER_SIZE = struct.calcsize(SIZE_FMT) + 2 * len(MARKER) # Size is surrounded by MARKER's

class Platform(object): '''Platform ''' WINDOWS = 0==sys.platform.find('win') LINUX = 0==sys.platform.find('linux')

PRINTING_CHARACTERS = ''.join([chr(c) for c in xrange(32, 127)]) def hexify(message, exclude = None): '''Returns string hexified, i.e. hexify('\r\nokko') => \x0d\x0aokko. Note that the resulting string is valid Python code if placed in quotes.''' exclude = exclude if exclude else [] return ''.join([x if x in exclude else '\x%s' % x.encode('hex') for x in message])

class Error(object): #TODO: make proper hierarchy for exceptions pass

class NoData(Error): '''Raised when no data is available for a receive with timeout. ''' def init(self):

log error here

pass

class TCPError(Error): '''Raised when; -connection is closed when sending, receiving or accepting -invalid data is received -connection is denied ''' def init(self, _id, message = ''): if not message: message = traceback.format_exc() Error.init(self, 'TCP Error: %s, %s' % (_id, message))

class _Connection(object): '''A TCP Connection.''' def init(self, s, side): '''Not to be used. Instantiate a Client or call Server::accept to create a Connection.''' self.socket = s self.socket.setblocking(0) self.__side = side print('TCP::Connection: %s' % self) #TODO log here

@property def address(self): ''' Get the IP address,port number tuple. ''' try: return self.__socket.getsockname() except: raise TCPError('')

def str(self): '''Returns a string identifying the Connection''' s = 'side: ' + self.side s += ', self: ' try: s += str(self.socket.getsockname()) except: s += 'unknown' s += ', peer: ' try: s += str(self.__socket.getpeername()) except: s += 'unknown' return s

def sendall(self, buffer): '''As socket::sendall but handles EWOULDBLOCK aka EAGAIN error. Note that this is fixed in Python 2.7+, see http://bugs.python.org/issue9090'''

Split up buffer_ to prevent 10055 'No buffer space available.' on some Windows machines.

for i in xrange(0, len(buffer_), SEND_CHUNK_SIZE):
  chunk = buffer_[i:i+SEND_CHUNK_SIZE]
  send = 0
  while send < len(chunk):
    log_flood = True
    while not select.select([], [self.__socket], [], 0.1)[1]:
      if log_flood:
        log_flood = False
        print('TCP Flooding detected for: %s' % self, 'TCP Flooding detected') #TODO: log here
    try:
      send += self.__socket.send(chunk[send:])
    except socket.error as inst:
      if not ((Platform.WINDOWS and 10035 == inst.args[0]) or \
              (Platform.LINUX and 11 == inst.args[0])):
        raise

def send(self, _object): '''Send serialized python object to peer. Raises TCPError when connection is closed or when pickling _object fails. Note that both this end and peer can close connection. ''' try: payload = cPickle.dumps(_object, cPickle.HIGHEST_PROTOCOL) self._sendall(MARKER + struct.pack(SIZE_FMT, len(payload)) + MARKER + payload) except: raise TCPError(self)

def sendBuffer(self, buffer_): '''Send buffer to peer. Raises TCPError when connection is closed. Note that both this end and peer can close connection. ''' try: self.sendall(buffer) except: raise TCPError(self)

def _waitForData(self, timeout = None): '''If optional argument 'timeout' is None (the default), block if necessary until data is available. Raises TCPError when connection is closed. If 'timeout' is a positive number, it blocks at most 'timeout' seconds and raises the NoData exception if no data was available within that time. ''' if None is timeout or timeout < 0:

Block until data becomes available.

  try:
    while not select.select([self.__socket], [], [], 0.1)[0]:
      pass
  except:
    raise TCPError(self)
else:
  # Wait at most timeout seconds for data to become available.
  try:
    for _ in xrange(int(timeout \* 10) + 1):
      s = select.select([self.__socket], [], [], 0.1 if timeout > 0.1 else timeout)[0]
      if s:
        return
  except:
    raise TCPError(self)
  if not s:
    raise NoData

def _recv(self, length): '''Receive > 0 bytes from peer. Raise NoData exception if no data is available. Raises TCPError when connection is closed.''' try: buffer = self.__socket.recv(length) except socket.error as inst:

Refer to the select man page in the BUGS section:

  #   Under Linux, select() may report a socket file descriptor as 'ready for
  #   reading', while nevertheless a subsequent read blocks. This could for
  #   example happen when data has arrived but upon examination has wrong
  #   checksum and is discarded. There may be other circumstances in which a
  #   file descriptor is spuriously reported as ready. Thus it may be safer
  #   to use O_NONBLOCK on sockets that should not block.
  # Under Windows this is seen.
  # Exceptions below are raised when recv (on a non-blocking socket) is done when there is
  # no data in the socket;
  # 10035 on Windows: A non-blocking socket operation could not be completed immediately
  # 11 on Linux: Resource temporarily unavailable
  # Note that it may be that, in the case of select falsely reports the socket to be ready
  # for reading, other exceptions are thrown.
  if (Platform.WINDOWS and 10035 == inst.args[0]) or \
     (Platform.LINUX and 11 == inst.args[0]):
    raise NoData
  raise TCPError(self)
except:
  raise TCPError(self)
if len(buffer_) == 0:
  raise TCPError(self, 'Connection was closed')
return buffer_

def receive(self, length, timeout): '''Receive length bytes from peer.''' buffer = '' while len(buffer_) < length: self._waitForData(timeout) try: portion = self._recv(length - len(buffer)) except NoData: print('Select falsely indicated that the socket was ready for reading') #log error here continue buffer += portion return buffer

def receive(self, timeout = None): '''Receive object from peer. Raises TCPError when connection is closed or when invalid data is received. Note that both this end and peer can close connection. For optional argument timeout see _waitForData method. ''' try: header = self._receive(HEADER_SIZE, timeout) assert header.startswith(MARKER) and header.endswith(MARKER), 'Invalid header %s' % \ hexify(header, PRINTING_CHARACTERS) payload_length = struct.unpack(SIZE_FMT, header[len(MARKER):-len(MARKER)])[0] return cPickle.loads(self._receive(payload_length, timeout)) except (NoData, TCPError): raise except: raise TCPError(self)

def receiveBuffer(self, length, timeout = None, waitall = True): '''Receive buffer with length from peer. Raises TCPError when connection is closed. Note that both this end and peer can close connection. For optional argument timeout see _waitForData method. If waitall is False buffer with length < expected can be returned. USE THIS ONLY AND REALLY ONLY TO COMMUNICATE WITH SOMETHING OUTSIDE PYTHON. ''' if waitall: return self._receive(length, timeout) else: self._waitForData(timeout) try: return self.__recv(length) except NoData: print('Select falsely indicated that the socket was ready for reading') #TODO log error here raise NoData

def receiveLine(self, timeout = None, line_terminator = '\n'): '''Receive one entire line (terminated with line_terminator) from peer. A trailing line_terminator is kept in the string. Raises TCPError when connection is closed. Note that both this end and peer can close connection. For optional argument timeout see _waitForData method. USE THIS ONLY AND REALLY ONLY TO COMMUNICATE WITH SOMETHING OUTSIDE PYTHON. Note that this performs bad, 100 times slower then receive. ''' line = '' while True: line += self._receive(1, timeout) if line.endswith(line_terminator): break return line

def close(self): '''Close the Connection and notify the peer that we stop.''' print('TCP::Connection: %s closing.' % self) #TODO: #log information here try: self.__socket.shutdown(socket.SHUT_RDWR) except:

Note that shutdown may fail when other side is down

  pass
self.__socket.close()

def del(self): if hasattr(self, '__socket'): self.close()

class Server(object): '''A TCP server.''' def init(self, address): try: if address[1] != 0: address = (address[0], address[1]) self.__server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

Disable Nagle's algorithm (i.e. do not combine a number of small outgoing messages in 1

  # transmission) so optimize for low-bandwidth, time-sensitive, big message transmissions.
  self.__server_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
  if Platform.LINUX:
    # SO_REUSEADDR on Linux;
    # This socket option tells the kernel that even if this port is busy (in
    # the TIME_WAIT state), go ahead and reuse it anyway.  If it is busy,
    # but with another state, you will still get an address already in use
    # error.  It is useful if your server has been shut down, and then
    # restarted right away while sockets are still active on its port.  You
    # should be aware that if any unexpected data comes in, it may confuse
    # your server, but while this is possible, it is not likely.
    # SO_REUSEADDR on Windows;
    # The SO_REUSEADDR socket option allows a socket to forcibly bind to a
    # port in use by another socket. .... Once the second socket has
    # successfully bound, the behavior for all sockets bound to that port is
    # indeterminate. For example, if all of the sockets on the same port
    # provide TCP service, any incoming TCP connection requests over the port
    # cannot be guaranteed to be handled by the correct socket
    # the behavior is non-deterministic. A malicious program can use SO_REUSEADDR
    # to forcibly bind sockets already in use for standard network protocol
    # services in order to deny access to those service. No special privileges are
    # required to use this option.
    # Conclusion; On Linux SO_REUSEADDR must be used, On Windows using SO_REUSEADDR is
    # not allowed.
    self.__server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  self.__server_socket.bind(address)
  self.__server_socket.listen(5)
  self.__server_socket.setblocking(0)
except:
  raise TCPError(address)
#log info here
print('TCP::Server: %s' % self)

def str(self): '''Returns a string identifying the Server''' s = 'self: ' try: s += str(self.address) except TCPError: s += 'unknown' return s

@property def address(self): ''' Get the IP address,port number tuple on which the server serves. ''' try: return self.__server_socket.getsockname() except: raise TCPError('')

def accept(self): '''As socket::accept but handles EWOULDBLOCK aka EAGAIN error. Note that this is fixed in Python 2.7+, see http://bugs.python.org/issue9090. Returns a Connection. Raises exception when server is closed.''' try: while True: while not select.select([self.server_socket], [], [], 0.1)[0]: pass try: return _Connection(self.server_socket.accept()[0], 'server') except socket.error as inst: if not ((Platform.WINDOWS and 10035 == inst.args[0]) or \ (Platform.LINUX and 11 == inst.args[0])): raise TCPError(self) except: raise TCPError(self)

def close(self): '''Close the Server.'''

log info here

print('TCP::Server: %s closing.' % self)
self.__server_socket.close()

def del(self): if hasattr(self, '__server_socket'): self.close()

class _Client(_Connection): '''A TCP client''' def init(self, address): try: client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

Disable Nagle's algorithm (i.e. do not combine a number of small outgoing messages in 1

  # transmission) so optimize for low-bandwidth, time-sensitive, big message transmissions.
  client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
  # Note that server side can close this connection after connect returns
  # when maximum number of queued connections is exceeded. Also note that
  # in that case accept on the server side does not return. Also note that
  # socket peer host name is '0.0.0.0' in that case.
  client_socket.connect(address)
  _Connection.__init__(self, client_socket, 'client')
except:
  raise TCPError(address)

class Client(object): '''Client that implicitly connects to server.''' def init(self, address = None, addressTopic = None): self.**stop = False self.addressTopic = addressTopic self.address = address self.connection = None self.connect_lock = threading.Lock() self.connection_lock = threading.Lock() self.log_connect_error = True @property def address(self): ''' Get the IP address,port number tuple. ''' return self.address def connect(self): '''Connect to the server.''' with self.connect_lock: while not self.stop and self.connection is None: address = self.address try: with self.connection_lock: self.__connection = _Client(tuple(address)) self.log_connect_error = True except: if not self.stop and self.log_connect_error:

log error here

        print('Connecting failed. Can be perfectly normal. For instance when '
                            'the server is not ready yet.')
        self.__log_connect_error = False
      time.sleep(0.1) #TODO make this not interruptable by KeyboardInterrupt

def call_connection(self, member, _args, *_kwargs): '''Call connection after making sure that it is made.''' self.connect() if self.stop: raise TCPError('Connection closed.') try: return getattr(self.connection, member)(_args, *_kwargs) except NoData: raise except AttributeError: raise TCPError('Connection closed.') except: with self.connection_lock: self.connection = None raise def receive(self, _args, _kwargs): '''Overwritten''' return self.__call_connection('receive', _args, _kwargs) def receiveBuffer(self, _args, _kwargs): '''Overwritten''' return self.__call_connection('receiveBuffer', _args, _kwargs) def receiveLine(self, _args, _kwargs): '''Overwritten''' return self.__call_connection('receiveLine', _args, _kwargs) def send(self, _args, _kwargs): '''Overwritten''' return self.__call_connection('send', _args, _kwargs) def sendBuffer(self, _args, _kwargs): '''Overwritten''' return self.__call_connection('sendBuffer', _args, _kwargs) receive.doc = _Client.receive.doc receiveBuffer.doc** = _Client.receiveBuffer.doc receiveLine.doc = _Client.receiveLine.doc send.doc = _Client.send.doc sendBuffer.doc = _Client.sendBuffer.doc def str(self): return self.**call_connection('str**') def close(self): '''Closes connection and unblocks any connection call.''' self.stop = True with self.connection_lock: if self.connection: self.connection.close() self.connection = None def connected(self): '''Returns True if connected, False otherwise.''' return not None is self.connection

bparzella commented 9 years ago

Hey Carl,

I switched the hsmsConnection class from blocking to non-blocking sockets using the code as an inspiration.

The splitting of the outbound packets and the handling of false selects are still open. Also the server classes are still using blocking sockets.

Thank you so much for your help.

Regards Benjamin

wolfc01 commented 9 years ago

Hello Benjamin,

Thank you so much for your help. You're welcome.

Can you tell me about further plans you have with the code?

Best regards, Carl.

2015-04-08 23:09 GMT+02:00 Benjamin Parzella notifications@github.com:

Hey Carl,

I switched the hsmsConnection class from blocking to non-blocking sockets using the code as an inspiration.

The splitting of the outbound packets and the handling of false selects are still open. Also the server classes are still using blocking sockets.

Thank you so much for your help.

Regards Benjamin

— Reply to this email directly or view it on GitHub https://github.com/bparzella/secsgem/issues/1#issuecomment-91037945.

bparzella commented 9 years ago

My idea is to have a library to quickly communicate with a host/equipment on any level (hsms/secs/gem). The use cases I see are automatic regression tests (for example malformated hsms messages), simulation of any endpoint for development or use as a simple host system in test environments. It is not directly targeted at a production environment, though I won't stop anybody from using it there.

There are still a lot of scenarios and streams/functions missing, but I'd like to implement as many features as possible.

I currently use it in combination with my other project: https://github.com/bparzella/gemma . It is a simple host system with a web front-end. For me developing that is a simple way to get features into the library, because in that case I have a practical application to test it with. I currently develop equipment software for a customer. I have the machine software available, so the equipment side is my current point of view.

About the history of the code. It started as a script for simple regression tests against equipment software for one customer. For another customer I used it in combination with a CAN bus simulation to simulate the environment their equipment software. Currently I use it to test new factory automation features.

Wow, that's a lot of text for such a simple question, maybe I should put it in the Readme.