]> code.delx.au - offlineimap/blob - offlineimap/threadutil.py
Removed debug statements
[offlineimap] / offlineimap / threadutil.py
1 # Copyright (C) 2002, 2003 John Goerzen
2 # Thread support module
3 # <jgoerzen@complete.org>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
19 from threading import *
20 from StringIO import StringIO
21 from Queue import Queue, Empty
22 import sys, traceback, thread, time
23 from offlineimap.ui import UIBase # for getglobalui()
24
25 profiledir = None
26
27 def setprofiledir(newdir):
28 global profiledir
29 profiledir = newdir
30
31 ######################################################################
32 # General utilities
33 ######################################################################
34
35 def semaphorereset(semaphore, originalstate):
36 """Wait until the semaphore gets back to its original state -- all acquired
37 resources released."""
38 for i in range(originalstate):
39 semaphore.acquire()
40 # Now release these.
41 for i in range(originalstate):
42 semaphore.release()
43
44 def semaphorewait(semaphore):
45 semaphore.acquire()
46 semaphore.release()
47
48 def threadsreset(threadlist):
49 for thr in threadlist:
50 thr.join()
51
52 class threadlist:
53 def __init__(self):
54 self.lock = Lock()
55 self.list = []
56
57 def add(self, thread):
58 self.lock.acquire()
59 try:
60 self.list.append(thread)
61 finally:
62 self.lock.release()
63
64 def remove(self, thread):
65 self.lock.acquire()
66 try:
67 self.list.remove(thread)
68 finally:
69 self.lock.release()
70
71 def pop(self):
72 self.lock.acquire()
73 try:
74 if not len(self.list):
75 return None
76 return self.list.pop()
77 finally:
78 self.lock.release()
79
80 def reset(self):
81 while 1:
82 thread = self.pop()
83 if not thread:
84 return
85 thread.join()
86
87
88 ######################################################################
89 # Exit-notify threads
90 ######################################################################
91
92 exitthreads = Queue(100)
93 inited = 0
94
95 def initexitnotify():
96 """Initialize the exit notify system. This MUST be called from the
97 SAME THREAD that will call monitorloop BEFORE it calls monitorloop.
98 This SHOULD be called before the main thread starts any other
99 ExitNotifyThreads, or else it may miss the ability to catch the exit
100 status from them!"""
101 pass
102
103 def exitnotifymonitorloop(callback):
104 """Enter an infinite "monitoring" loop. The argument, callback,
105 defines the function to call when an ExitNotifyThread has terminated.
106 That function is called with a single argument -- the ExitNotifyThread
107 that has terminated. The monitor will not continue to monitor for
108 other threads until the function returns, so if it intends to perform
109 long calculations, it should start a new thread itself -- but NOT
110 an ExitNotifyThread, or else an infinite loop may result. Furthermore,
111 the monitor will hold the lock all the while the other thread is waiting.
112 """
113 global exitthreads
114 while 1: # Loop forever.
115 try:
116 thrd = exitthreads.get(False)
117 callback(thrd)
118 exitthreads.task_done()
119 except Empty:
120 time.sleep(1)
121
122 def threadexited(thread):
123 """Called when a thread exits."""
124 ui = UIBase.getglobalui()
125 if thread.getExitCause() == 'EXCEPTION':
126 if isinstance(thread.getExitException(), SystemExit):
127 # Bring a SystemExit into the main thread.
128 # Do not send it back to UI layer right now.
129 # Maybe later send it to ui.terminate?
130 raise SystemExit
131 ui.threadException(thread) # Expected to terminate
132 sys.exit(100) # Just in case...
133 os._exit(100)
134 elif thread.getExitMessage() == 'SYNC_WITH_TIMER_TERMINATE':
135 ui.terminate()
136 # Just in case...
137 sys.exit(100)
138 os._exit(100)
139 else:
140 ui.threadExited(thread)
141
142 class ExitNotifyThread(Thread):
143 """This class is designed to alert a "monitor" to the fact that a thread has
144 exited and to provide for the ability for it to find out why."""
145 def run(self):
146 global exitthreads, profiledir
147 self.threadid = thread.get_ident()
148 try:
149 if not profiledir: # normal case
150 Thread.run(self)
151 else:
152 import profile
153 prof = profile.Profile()
154 try:
155 prof = prof.runctx("Thread.run(self)", globals(), locals())
156 except SystemExit:
157 pass
158 prof.dump_stats( \
159 profiledir + "/" + str(self.threadid) + "_" + \
160 self.getName() + ".prof")
161 except:
162 self.setExitCause('EXCEPTION')
163 self.setExitException(sys.exc_info()[1])
164 sbuf = StringIO()
165 traceback.print_exc(file = sbuf)
166 self.setExitStackTrace(sbuf.getvalue())
167 else:
168 self.setExitCause('NORMAL')
169 if not hasattr(self, 'exitmessage'):
170 self.setExitMessage(None)
171
172 exitthreads.put(self, True)
173
174 def setExitCause(self, cause):
175 self.exitcause = cause
176 def getExitCause(self):
177 """Returns the cause of the exit, one of:
178 'EXCEPTION' -- the thread aborted because of an exception
179 'NORMAL' -- normal termination."""
180 return self.exitcause
181 def setExitException(self, exc):
182 self.exitexception = exc
183 def getExitException(self):
184 """If getExitCause() is 'EXCEPTION', holds the value from
185 sys.exc_info()[1] for this exception."""
186 return self.exitexception
187 def setExitStackTrace(self, st):
188 self.exitstacktrace = st
189 def getExitStackTrace(self):
190 """If getExitCause() is 'EXCEPTION', returns a string representing
191 the stack trace for this exception."""
192 return self.exitstacktrace
193 def setExitMessage(self, msg):
194 """Sets the exit message to be fetched by a subsequent call to
195 getExitMessage. This message may be any object or type except
196 None."""
197 self.exitmessage = msg
198 def getExitMessage(self):
199 """For any exit cause, returns the message previously set by
200 a call to setExitMessage(), or None if there was no such message
201 set."""
202 return self.exitmessage
203
204
205 ######################################################################
206 # Instance-limited threads
207 ######################################################################
208
209 instancelimitedsems = {}
210 instancelimitedlock = Lock()
211
212 def initInstanceLimit(instancename, instancemax):
213 """Initialize the instance-limited thread implementation to permit
214 up to intancemax threads with the given instancename."""
215 instancelimitedlock.acquire()
216 if not instancelimitedsems.has_key(instancename):
217 instancelimitedsems[instancename] = BoundedSemaphore(instancemax)
218 instancelimitedlock.release()
219
220 class InstanceLimitedThread(ExitNotifyThread):
221 def __init__(self, instancename, *args, **kwargs):
222 self.instancename = instancename
223
224 apply(ExitNotifyThread.__init__, (self,) + args, kwargs)
225
226 def start(self):
227 instancelimitedsems[self.instancename].acquire()
228 ExitNotifyThread.start(self)
229
230 def run(self):
231 try:
232 ExitNotifyThread.run(self)
233 finally:
234 instancelimitedsems[self.instancename].release()
235
236
237 ######################################################################
238 # Multi-lock -- capable of handling a single thread requesting a lock
239 # multiple times
240 ######################################################################
241
242 class MultiLock:
243 def __init__(self):
244 self.lock = Lock()
245 self.statuslock = Lock()
246 self.locksheld = {}
247
248 def acquire(self):
249 """Obtain a lock. Provides nice support for a single
250 thread trying to lock it several times -- as may be the case
251 if one I/O-using object calls others, while wanting to make it all
252 an atomic operation. Keeps a "lock request count" for the current
253 thread, and acquires the lock when it goes above zero, releases when
254 it goes below one.
255
256 This call is always blocking."""
257
258 # First, check to see if this thread already has a lock.
259 # If so, increment the lock count and just return.
260 self.statuslock.acquire()
261 try:
262 threadid = thread.get_ident()
263
264 if threadid in self.locksheld:
265 self.locksheld[threadid] += 1
266 return
267 else:
268 # This is safe because it is a per-thread structure
269 self.locksheld[threadid] = 1
270 finally:
271 self.statuslock.release()
272 self.lock.acquire()
273
274 def release(self):
275 self.statuslock.acquire()
276 try:
277 threadid = thread.get_ident()
278 if self.locksheld[threadid] > 1:
279 self.locksheld[threadid] -= 1
280 return
281 else:
282 del self.locksheld[threadid]
283 self.lock.release()
284 finally:
285 self.statuslock.release()
286
287