]>
code.delx.au - offlineimap/blob - offlineimap/threadutil.py
1 # Copyright (C) 2002, 2003 John Goerzen
2 # Thread support module
3 # <jgoerzen@complete.org>
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 from threading
import *
20 from StringIO
import StringIO
21 import sys
, traceback
, thread
22 from offlineimap
.ui
import UIBase
# for getglobalui()
26 def setprofiledir(newdir
):
30 ######################################################################
32 ######################################################################
34 def semaphorereset(semaphore
, originalstate
):
35 """Wait until the semaphore gets back to its original state -- all acquired
36 resources released."""
37 for i
in range(originalstate
):
40 for i
in range(originalstate
):
43 def semaphorewait(semaphore
):
47 def threadsreset(threadlist
):
48 for thr
in threadlist
:
56 def add(self
, thread
):
59 self
.list.append(thread
)
63 def remove(self
, thread
):
66 self
.list.remove(thread
)
73 if not len(self
.list):
75 return self
.list.pop()
87 ######################################################################
89 ######################################################################
91 exitcondition
= Condition(Lock())
96 """Initialize the exit notify system. This MUST be called from the
97 SAME THREAD that will call monitorloop BEFORE it calls monitorloop.
98 This SHOULD be called before the main thread starts any other
99 ExitNotifyThreads, or else it may miss the ability to catch the exit
103 def exitnotifymonitorloop(callback
):
104 """Enter an infinite "monitoring" loop. The argument, callback,
105 defines the function to call when an ExitNotifyThread has terminated.
106 That function is called with a single argument -- the ExitNotifyThread
107 that has terminated. The monitor will not continue to monitor for
108 other threads until the function returns, so if it intends to perform
109 long calculations, it should start a new thread itself -- but NOT
110 an ExitNotifyThread, or else an infinite loop may result. Furthermore,
111 the monitor will hold the lock all the while the other thread is waiting.
113 global exitcondition
, exitthreads
114 while 1: # Loop forever.
115 exitcondition
.acquire()
117 while not len(exitthreads
):
118 exitcondition
.wait(1)
120 while len(exitthreads
):
121 callback(exitthreads
.pop(0)) # Pull off in order added!
123 exitcondition
.release()
125 def threadexited(thread
):
126 """Called when a thread exits."""
127 ui
= UIBase
.getglobalui()
128 if thread
.getExitCause() == 'EXCEPTION':
129 if isinstance(thread
.getExitException(), SystemExit):
130 # Bring a SystemExit into the main thread.
131 # Do not send it back to UI layer right now.
132 # Maybe later send it to ui.terminate?
134 ui
.threadException(thread
) # Expected to terminate
135 sys
.exit(100) # Just in case...
137 elif thread
.getExitMessage() == 'SYNC_WITH_TIMER_TERMINATE':
143 ui
.threadExited(thread
)
145 class ExitNotifyThread(Thread
):
146 """This class is designed to alert a "monitor" to the fact that a thread has
147 exited and to provide for the ability for it to find out why."""
149 global exitcondition
, exitthreads
, profiledir
150 self
.threadid
= thread
.get_ident()
152 if not profiledir
: # normal case
156 prof
= profile
.Profile()
158 prof
= prof
.runctx("Thread.run(self)", globals(), locals())
162 profiledir
+ "/" + str(self
.threadid
) + "_" + \
163 self
.getName() + ".prof")
165 self
.setExitCause('EXCEPTION')
166 self
.setExitException(sys
.exc_info()[1])
168 traceback
.print_exc(file = sbuf
)
169 self
.setExitStackTrace(sbuf
.getvalue())
171 self
.setExitCause('NORMAL')
172 if not hasattr(self
, 'exitmessage'):
173 self
.setExitMessage(None)
174 exitcondition
.acquire()
175 exitthreads
.append(self
)
176 exitcondition
.notify()
177 exitcondition
.release()
179 def setExitCause(self
, cause
):
180 self
.exitcause
= cause
181 def getExitCause(self
):
182 """Returns the cause of the exit, one of:
183 'EXCEPTION' -- the thread aborted because of an exception
184 'NORMAL' -- normal termination."""
185 return self
.exitcause
186 def setExitException(self
, exc
):
187 self
.exitexception
= exc
188 def getExitException(self
):
189 """If getExitCause() is 'EXCEPTION', holds the value from
190 sys.exc_info()[1] for this exception."""
191 return self
.exitexception
192 def setExitStackTrace(self
, st
):
193 self
.exitstacktrace
= st
194 def getExitStackTrace(self
):
195 """If getExitCause() is 'EXCEPTION', returns a string representing
196 the stack trace for this exception."""
197 return self
.exitstacktrace
198 def setExitMessage(self
, msg
):
199 """Sets the exit message to be fetched by a subsequent call to
200 getExitMessage. This message may be any object or type except
202 self
.exitmessage
= msg
203 def getExitMessage(self
):
204 """For any exit cause, returns the message previously set by
205 a call to setExitMessage(), or None if there was no such message
207 return self
.exitmessage
210 ######################################################################
211 # Instance-limited threads
212 ######################################################################
214 instancelimitedsems
= {}
215 instancelimitedlock
= Lock()
217 def initInstanceLimit(instancename
, instancemax
):
218 """Initialize the instance-limited thread implementation to permit
219 up to intancemax threads with the given instancename."""
220 instancelimitedlock
.acquire()
221 if not instancelimitedsems
.has_key(instancename
):
222 instancelimitedsems
[instancename
] = BoundedSemaphore(instancemax
)
223 instancelimitedlock
.release()
225 class InstanceLimitedThread(ExitNotifyThread
):
226 def __init__(self
, instancename
, *args
, **kwargs
):
227 self
.instancename
= instancename
229 apply(ExitNotifyThread
.__init
__, (self
,) + args
, kwargs
)
232 instancelimitedsems
[self
.instancename
].acquire()
233 ExitNotifyThread
.start(self
)
237 ExitNotifyThread
.run(self
)
239 instancelimitedsems
[self
.instancename
].release()
242 ######################################################################
243 # Multi-lock -- capable of handling a single thread requesting a lock
245 ######################################################################
250 self
.statuslock
= Lock()
254 """Obtain a lock. Provides nice support for a single
255 thread trying to lock it several times -- as may be the case
256 if one I/O-using object calls others, while wanting to make it all
257 an atomic operation. Keeps a "lock request count" for the current
258 thread, and acquires the lock when it goes above zero, releases when
261 This call is always blocking."""
263 # First, check to see if this thread already has a lock.
264 # If so, increment the lock count and just return.
265 self
.statuslock
.acquire()
267 threadid
= thread
.get_ident()
269 if threadid
in self
.locksheld
:
270 self
.locksheld
[threadid
] += 1
273 # This is safe because it is a per-thread structure
274 self
.locksheld
[threadid
] = 1
276 self
.statuslock
.release()
280 self
.statuslock
.acquire()
282 threadid
= thread
.get_ident()
283 if self
.locksheld
[threadid
] > 1:
284 self
.locksheld
[threadid
] -= 1
287 del self
.locksheld
[threadid
]
290 self
.statuslock
.release()