+++ /dev/null
-#!/usr/bin/env python
-# Copyright 2007 Greg Darke <gdar9540@usyd.edu.au>
-# Licensed for distribution under the GPL version 2, check COPYING for details
-# A async framework for sockets and fds (fds only supported under unix operating systems)
-# NOTE: Orig version submitted for NETS3603 assignment 1 (Semester 1 - 2007)
-
-
-from __future__ import division
-import os, sys, select, socket, bisect, fcntl
-from time import time
-
-class Callback(object):
- __slots__ = ['callback', 'callback_time']
- def __init__(self, callback_time, callback):
- self.callback_time = callback_time
- self.callback = callback
- def __call__(self):
- return self.callback()
- def __lt__(self, other):
- if hasattr(other, 'callback_time'):
- return self.callback_time < other.callback_time
- else:
- return NotImplemented
-
-class AsyncSocketOwner(object):
- """This is the object contains the 'main loop' of the application"""
- def __init__(self):
- self.sockets_input = []
- self.socket_callbacks = {}
- self.timer_callbacks = []
- self._exit = False
- self.state = {}
-
- def print_state(self):
-### sys.stdout.write('\033[H\033[2J')
- print "\n".join(['%s: %s' % v for v in self.state.items()])
- self.addCallback(1.0, self.print_state)
-
- def _check_timers_callbacks(self):
- now = time()
- i = bisect.bisect(self.timer_callbacks, Callback(now, None))
- self.state['Processing Callbacks'] = '%d of %d' % (i,
- len(self.timer_callbacks))
- needCall = self.timer_callbacks[0:i]
- self.timer_callbacks = self.timer_callbacks[i:]
-
- for callback in needCall:
- callback()
-
- def exit(self):
- self._exit = True
-
- def mainLoop(self):
- try:
- while not self._exit:
- if len(self.timer_callbacks) > 0:
- timeout = max(self.timer_callbacks[0].callback_time - time(), 0)
- # Wait until the next timer expires for input
- inputready, outputready, exceptready = \
- select.select(self.sockets_input, [], [], timeout)
- else:
- # Wait forever for input
- inputready, outputready, exceptready = \
- select.select(self.sockets_input, [], [])
-
- # Handle any data received
- self.state['Waiting sockets'] = len(inputready)
- self.state['Socket count'] = len(self.sockets_input)
- for s in inputready:
- self.socket_callbacks[s](s)
-
- # Handle timers:
- if len(self.timer_callbacks) > 0 and \
- self.timer_callbacks[0].callback_time < time():
- self._check_timers_callbacks()
- except KeyboardInterrupt:
- pass
-
- def _addFDCallback(self, fd, callback):
- """Add a callback for a file descriptor, also add it to the select call"""
- self.sockets_input.append(fd)
- self.socket_callbacks[fd] = callback
-
- def removeSocket(self, fd):
- """Removes the sepecified fd from the event loop"""
- self.sockets_input.remove(fd)
- del self.socket_callbacks[fd]
-
- def addFD(self, fd, callback):
- """Adds a file descriptor to the event loop"""
- # Turn blocking off
- flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK
- fcntl.fcntl(fd, fcntl.F_SETFL, flags)
- self._addFDCallback(fd, callback)
-
- def addSocket(self, s, callback):
- """Adds a socket to the event loop"""
- # Disable blocking - So now we have an async socket
- s.setblocking(False)
- self._addFDCallback(s, callback)
-
- def addLineBufferedSocket(self, s, callback):
- sockWrapper = LineBufferedAsyncClientConnection(s, callback, self)
- s.setblocking(False)
- self._addFDCallback(s, sockWrapper._dataArrived)
-
- def addCallback(self, seconds, callback):
- """Add a timer callback"""
- # Keep the list of callbacks sorted to keep things more efficient (Note: This would be better with a heap)
- cb = Callback(time() + seconds, callback)
- bisect.insort(self.timer_callbacks, cb)
- return cb
-
- def removeCallback(self, callback_object):
- """Remove a callback from the list. NB: If the time has fired/is in the list to be
- fired, the outcome is undefined (currently it will be called - but this may change)"""
- if callback_object in self.timer_callbacks:
- self.timer_callbacks.remove(callback_object)
-
-class LineBufferedAsyncClientConnection(object):
- __slots__ = ['sock', 'callback', 'delim', 'eventLoop', 'linesBuffer', 'lineBuffer', 'closed']
- def __init__(self, sock, callback, eventLoop, delim = '\n'):
- self.sock = sock
- self.callback = callback
- self.delim = delim
- self.eventLoop = eventLoop
- self.linesBuffer = []
- self.lineBuffer = ''
-
- def _dataArrived(self, *args, **kwargs):
- data = self.sock.recv(65535)
- if not data:
- self.closed = True
- self.eventLoop.removeSocket(self.sock)
- return
-
- self.lineBuffer += data
- newLinePos = self.lineBuffer.rfind(self.delim)
- if newLinePos >= 0:
- self.linesBuffer += self.lineBuffer[:newLinePos].split(self.delim)
- self.lineBuffer = self.lineBuffer[newLinePos+1:]
- self.callback(self)
-
- def fileno(self):
- """Return the encapsulated socket's fileno (used for select.select)"""
- return self.sock.fileno()
-
- def readline(self):
- if not self.hasLine():
- raise Exception('No data in buffer')
- ret = self.linesBuffer[0]
- del self.linesBuffer[0]
- return ret
-
- def write(self, data):
- self.sock.write(data)
- send = write
-
- def hasLine(self):
- return len(self.linesBuffer) > 0