X-Git-Url: https://code.delx.au/bg-scripts/blobdiff_plain/7fef5e45b01eb71bba28aefb7a50be3fcca9345c..f667bb5991af1eb85cb9585b4dfbdf7ea80f6024:/lib/AsyncSocket.py diff --git a/lib/AsyncSocket.py b/lib/AsyncSocket.py deleted file mode 100644 index 76f10f1..0000000 --- a/lib/AsyncSocket.py +++ /dev/null @@ -1,160 +0,0 @@ -#!/usr/bin/env python -# Copyright 2007 Greg Darke -# Licensed for distribution under the GPL version 2, check COPYING for details -# A async framework for sockets and fds (fds only supported under unix operating systems) -# NOTE: Orig version submitted for NETS3603 assignment 1 (Semester 1 - 2007) - - -from __future__ import division -import os, sys, select, socket, bisect, fcntl -from time import time - -class Callback(object): - __slots__ = ['callback', 'callback_time'] - def __init__(self, callback_time, callback): - self.callback_time = callback_time - self.callback = callback - def __call__(self): - return self.callback() - def __lt__(self, other): - if hasattr(other, 'callback_time'): - return self.callback_time < other.callback_time - else: - return NotImplemented - -class AsyncSocketOwner(object): - """This is the object contains the 'main loop' of the application""" - def __init__(self): - self.sockets_input = [] - self.socket_callbacks = {} - self.timer_callbacks = [] - self._exit = False - self.state = {} - - def print_state(self): -### sys.stdout.write('\033[H\033[2J') - print "\n".join(['%s: %s' % v for v in self.state.items()]) - self.addCallback(1.0, self.print_state) - - def _check_timers_callbacks(self): - now = time() - i = bisect.bisect(self.timer_callbacks, Callback(now, None)) - self.state['Processing Callbacks'] = '%d of %d' % (i, - len(self.timer_callbacks)) - needCall = self.timer_callbacks[0:i] - self.timer_callbacks = self.timer_callbacks[i:] - - for callback in needCall: - callback() - - def exit(self): - self._exit = True - - def mainLoop(self): - try: - while not self._exit: - if len(self.timer_callbacks) > 0: - timeout = max(self.timer_callbacks[0].callback_time - time(), 0) - # Wait until the next timer expires for input - inputready, outputready, exceptready = \ - select.select(self.sockets_input, [], [], timeout) - else: - # Wait forever for input - inputready, outputready, exceptready = \ - select.select(self.sockets_input, [], []) - - # Handle any data received - self.state['Waiting sockets'] = len(inputready) - self.state['Socket count'] = len(self.sockets_input) - for s in inputready: - self.socket_callbacks[s](s) - - # Handle timers: - if len(self.timer_callbacks) > 0 and \ - self.timer_callbacks[0].callback_time < time(): - self._check_timers_callbacks() - except KeyboardInterrupt: - pass - - def _addFDCallback(self, fd, callback): - """Add a callback for a file descriptor, also add it to the select call""" - self.sockets_input.append(fd) - self.socket_callbacks[fd] = callback - - def removeSocket(self, fd): - """Removes the sepecified fd from the event loop""" - self.sockets_input.remove(fd) - del self.socket_callbacks[fd] - - def addFD(self, fd, callback): - """Adds a file descriptor to the event loop""" - # Turn blocking off - flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK - fcntl.fcntl(fd, fcntl.F_SETFL, flags) - self._addFDCallback(fd, callback) - - def addSocket(self, s, callback): - """Adds a socket to the event loop""" - # Disable blocking - So now we have an async socket - s.setblocking(False) - self._addFDCallback(s, callback) - - def addLineBufferedSocket(self, s, callback): - sockWrapper = LineBufferedAsyncClientConnection(s, callback, self) - s.setblocking(False) - self._addFDCallback(s, sockWrapper._dataArrived) - - def addCallback(self, seconds, callback): - """Add a timer callback""" - # Keep the list of callbacks sorted to keep things more efficient (Note: This would be better with a heap) - cb = Callback(time() + seconds, callback) - bisect.insort(self.timer_callbacks, cb) - return cb - - def removeCallback(self, callback_object): - """Remove a callback from the list. NB: If the time has fired/is in the list to be - fired, the outcome is undefined (currently it will be called - but this may change)""" - if callback_object in self.timer_callbacks: - self.timer_callbacks.remove(callback_object) - -class LineBufferedAsyncClientConnection(object): - __slots__ = ['sock', 'callback', 'delim', 'eventLoop', 'linesBuffer', 'lineBuffer', 'closed'] - def __init__(self, sock, callback, eventLoop, delim = '\n'): - self.sock = sock - self.callback = callback - self.delim = delim - self.eventLoop = eventLoop - self.linesBuffer = [] - self.lineBuffer = '' - - def _dataArrived(self, *args, **kwargs): - data = self.sock.recv(65535) - if not data: - self.closed = True - self.eventLoop.removeSocket(self.sock) - return - - self.lineBuffer += data - newLinePos = self.lineBuffer.rfind(self.delim) - if newLinePos >= 0: - self.linesBuffer += self.lineBuffer[:newLinePos].split(self.delim) - self.lineBuffer = self.lineBuffer[newLinePos+1:] - self.callback(self) - - def fileno(self): - """Return the encapsulated socket's fileno (used for select.select)""" - return self.sock.fileno() - - def readline(self): - if not self.hasLine(): - raise Exception('No data in buffer') - ret = self.linesBuffer[0] - del self.linesBuffer[0] - return ret - - def write(self, data): - self.sock.write(data) - send = write - - def hasLine(self): - return len(self.linesBuffer) > 0