]>
code.delx.au - bg-scripts/blob - lib/AsyncSocket.py
76f10f19f7cdebbe3b9cf90db23e590d4c09d22c
2 # Copyright 2007 Greg Darke <gdar9540@usyd.edu.au>
3 # Licensed for distribution under the GPL version 2, check COPYING for details
4 # A async framework for sockets and fds (fds only supported under unix operating systems)
5 # NOTE: Orig version submitted for NETS3603 assignment 1 (Semester 1 - 2007)
8 from __future__
import division
9 import os
, sys
, select
, socket
, bisect
, fcntl
12 class Callback(object):
13 __slots__
= ['callback', 'callback_time']
14 def __init__(self
, callback_time
, callback
):
15 self
.callback_time
= callback_time
16 self
.callback
= callback
18 return self
.callback()
19 def __lt__(self
, other
):
20 if hasattr(other
, 'callback_time'):
21 return self
.callback_time
< other
.callback_time
25 class AsyncSocketOwner(object):
26 """This is the object contains the 'main loop' of the application"""
28 self
.sockets_input
= []
29 self
.socket_callbacks
= {}
30 self
.timer_callbacks
= []
34 def print_state(self
):
35 ### sys.stdout.write('\033[H\033[2J')
36 print "\n".join(['%s: %s' % v
for v
in self
.state
.items()])
37 self
.addCallback(1.0, self
.print_state
)
39 def _check_timers_callbacks(self
):
41 i
= bisect
.bisect(self
.timer_callbacks
, Callback(now
, None))
42 self
.state
['Processing Callbacks'] = '%d of %d' % (i
,
43 len(self
.timer_callbacks
))
44 needCall
= self
.timer_callbacks
[0:i
]
45 self
.timer_callbacks
= self
.timer_callbacks
[i
:]
47 for callback
in needCall
:
56 if len(self
.timer_callbacks
) > 0:
57 timeout
= max(self
.timer_callbacks
[0].callback_time
- time(), 0)
58 # Wait until the next timer expires for input
59 inputready
, outputready
, exceptready
= \
60 select
.select(self
.sockets_input
, [], [], timeout
)
62 # Wait forever for input
63 inputready
, outputready
, exceptready
= \
64 select
.select(self
.sockets_input
, [], [])
66 # Handle any data received
67 self
.state
['Waiting sockets'] = len(inputready
)
68 self
.state
['Socket count'] = len(self
.sockets_input
)
70 self
.socket_callbacks
[s
](s
)
73 if len(self
.timer_callbacks
) > 0 and \
74 self
.timer_callbacks
[0].callback_time
< time():
75 self
._check
_timers
_callbacks
()
76 except KeyboardInterrupt:
79 def _addFDCallback(self
, fd
, callback
):
80 """Add a callback for a file descriptor, also add it to the select call"""
81 self
.sockets_input
.append(fd
)
82 self
.socket_callbacks
[fd
] = callback
84 def removeSocket(self
, fd
):
85 """Removes the sepecified fd from the event loop"""
86 self
.sockets_input
.remove(fd
)
87 del self
.socket_callbacks
[fd
]
89 def addFD(self
, fd
, callback
):
90 """Adds a file descriptor to the event loop"""
92 flags
= fcntl
.fcntl(fd
, fcntl
.F_GETFL
) | os
.O_NONBLOCK
93 fcntl
.fcntl(fd
, fcntl
.F_SETFL
, flags
)
94 self
._addFDCallback
(fd
, callback
)
96 def addSocket(self
, s
, callback
):
97 """Adds a socket to the event loop"""
98 # Disable blocking - So now we have an async socket
100 self
._addFDCallback
(s
, callback
)
102 def addLineBufferedSocket(self
, s
, callback
):
103 sockWrapper
= LineBufferedAsyncClientConnection(s
, callback
, self
)
105 self
._addFDCallback
(s
, sockWrapper
._dataArrived
)
107 def addCallback(self
, seconds
, callback
):
108 """Add a timer callback"""
109 # Keep the list of callbacks sorted to keep things more efficient (Note: This would be better with a heap)
110 cb
= Callback(time() + seconds
, callback
)
111 bisect
.insort(self
.timer_callbacks
, cb
)
114 def removeCallback(self
, callback_object
):
115 """Remove a callback from the list. NB: If the time has fired/is in the list to be
116 fired, the outcome is undefined (currently it will be called - but this may change)"""
117 if callback_object
in self
.timer_callbacks
:
118 self
.timer_callbacks
.remove(callback_object
)
120 class LineBufferedAsyncClientConnection(object):
121 __slots__
= ['sock', 'callback', 'delim', 'eventLoop', 'linesBuffer', 'lineBuffer', 'closed']
122 def __init__(self
, sock
, callback
, eventLoop
, delim
= '\n'):
124 self
.callback
= callback
126 self
.eventLoop
= eventLoop
127 self
.linesBuffer
= []
130 def _dataArrived(self
, *args
, **kwargs
):
131 data
= self
.sock
.recv(65535)
134 self
.eventLoop
.removeSocket(self
.sock
)
137 self
.lineBuffer
+= data
138 newLinePos
= self
.lineBuffer
.rfind(self
.delim
)
140 self
.linesBuffer
+= self
.lineBuffer
[:newLinePos
].split(self
.delim
)
141 self
.lineBuffer
= self
.lineBuffer
[newLinePos
+1:]
145 """Return the encapsulated socket's fileno (used for select.select)"""
146 return self
.sock
.fileno()
149 if not self
.hasLine():
150 raise Exception('No data in buffer')
151 ret
= self
.linesBuffer
[0]
152 del self
.linesBuffer
[0]
155 def write(self
, data
):
156 self
.sock
.write(data
)
160 return len(self
.linesBuffer
) > 0