# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
from offlineimap import imaplib, imaputil, imapserver, repository, folder, mbnames, threadutil
+from offlineimap.threadutil import InstanceLimitedThread
import re, os, os.path, offlineimap, sys
from ConfigParser import ConfigParser
from threading import *
remoterepos = None
localrepos = None
passwords = {}
-accountsemaphore = BoundedSemaphore(config.getint("general", "maxsyncaccounts"))
+
+threadutil.initInstanceLimit("ACCOUNTLIMIT", config.getint("general",
+ "maxsyncaccounts"))
# We have to gather passwords here -- don't want to have two threads
# asking for passwords simultaneously.
passwords[account] = config.get(account, "remotepass")
else:
passwords[account] = ui.getpass(account, config)
+ for instancename in ["FOLDER_" + account, "MSGCOPY_" + account]:
+ threadutil.initInstanceLimit(instancename,
+ config.getint(account, "maxconnections"))
mailboxes = []
mailboxlock = Lock()
print args
# We don't need an account lock because syncitall() goes through
# each account once, then waits for all to finish.
- accountsemaphore.acquire()
try:
ui.acct(accountname)
accountmetadata = os.path.join(metadatadir, accountname)
folderthreads = []
for remotefolder in remoterepos.getfolders():
server.connectionwait()
- thread = Thread(target = syncfolder,
- name = "syncfolder-%s-%s" % \
- (accountname, remotefolder.getvisiblename()),
- args = (accountname, remoterepos,
- remotefolder, localrepos, statusrepos))
+ thread = InstanceLimitedThread(\
+ instancename = 'FOLDER_' + accountname,
+ target = syncfolder,
+ name = "syncfolder-%s-%s" % \
+ (accountname, remotefolder.getvisiblename()),
+ args = (accountname, remoterepos, remotefolder, localrepos,
+ statusrepos))
thread.start()
folderthreads.append(thread)
threadutil.threadsreset(folderthreads)
server.close()
finally:
- accountsemaphore.release()
+ pass
def syncfolder(accountname, remoterepos, remotefolder, localrepos,
statusrepos):
mailboxes = [] # Reset.
threads = []
for accountname in accounts:
- threadutil.semaphorewait(accountsemaphore)
- thread = Thread(target = syncaccount,
- name = "syncaccount-%s" % accountname,
- args = (accountname,))
+ thread = InstanceLimitedThread(instancename = 'ACCOUNTLIMIT',
+ target = syncaccount,
+ name = "syncaccount-%s" % accountname,
+ args = (accountname,))
thread.start()
threads.append(thread)
# Wait for the threads to finish.
import __main__
from threading import *
+from offlineimap import threadutil
+from offlineimap.threadutil import InstanceLimitedThread
class BaseFolder:
def getname(self):
before firing off a thread. For all others, returns immediately."""
pass
+ def getcopyinstancelimit(self):
+ """For threading folders, returns the instancelimitname for
+ InstanceLimitedThreads."""
+ raise NotImplementedException
+
def getvisiblename(self):
return self.name
if not uid in dest.getmessagelist():
if self.suggeststhreads():
self.waitforthread()
- thread = Thread(target = self.copymessageto,
- args = (uid, applyto))
+ thread = InstanceLimitedThread(\
+ self.getcopyinstancelimit(),
+ target = self.copymessageto,
+ args = (uid, applyto))
thread.start()
threads.append(thread)
else:
from StringIO import StringIO
class IMAPFolder(BaseFolder):
- def __init__(self, imapserver, name, visiblename):
+ def __init__(self, imapserver, name, visiblename, accountname):
self.name = imaputil.dequote(name)
self.root = imapserver.root
self.sep = imapserver.delim
self.imapserver = imapserver
self.messagelist = None
self.visiblename = visiblename
+ self.accountname = accountname
def suggeststhreads(self):
return 1
def waitforthread(self):
self.imapserver.connectionwait()
+ def getcopyinstancelimit(self):
+ return 'MSGCOPY_' + self.accountname
+
def getvisiblename(self):
return self.visiblename
self.imapserver.releaseconnection(imapobj)
def savemessageflags(self, uid, flags):
- imapobj = self.imapserver.acquireconnection(imapobj)
+ imapobj = self.imapserver.acquireconnection()
try:
imapobj.select(self.getfullname())
result = imapobj.uid('store', '%d' % uid, 'FLAGS',
def getfolder(self, foldername):
return folder.IMAP.IMAPFolder(self.imapserver, foldername,
- self.nametrans(foldername))
+ self.nametrans(foldername),
+ accountname)
def getfolders(self):
if self.folders != None:
if '\\Noselect' in imaputil.flagsplit(flags):
continue
retval.append(folder.IMAP.IMAPFolder(self.imapserver, name,
- self.nametrans(imaputil.dequote(name))))
+ self.nametrans(imaputil.dequote(name)),
+ self.accountname))
retval.sort(lambda x, y: cmp(x.getvisiblename(), y.getvisiblename()))
self.folders = retval
return retval
def threadsreset(threadlist):
for thread in threadlist:
thread.join()
+
+instancelimitedsems = {}
+instancelimitedlock = Lock()
+
+def initInstanceLimit(instancename, instancemax):
+ instancelimitedlock.acquire()
+ if not instancelimitedsems.has_key(instancename):
+ instancelimitedsems[instancename] = BoundedSemaphore(instancemax)
+ instancelimitedlock.release()
+
+class InstanceLimitedThread(Thread):
+ def __init__(self, instancename, *args, **kwargs):
+ self.instancename = instancename
+
+ apply(Thread.__init__, (self,) + args, kwargs)
+
+ def start(self):
+ instancelimitedsems[self.instancename].acquire()
+ Thread.start(self)
+
+ def run(self):
+ try:
+ Thread.run(self)
+ finally:
+ instancelimitedsems[self.instancename].release()
+
+