]> code.delx.au - offlineimap/commitdiff
/head: changeset 69
authorjgoerzen <jgoerzen>
Thu, 4 Jul 2002 03:59:19 +0000 (04:59 +0100)
committerjgoerzen <jgoerzen>
Thu, 4 Jul 2002 03:59:19 +0000 (04:59 +0100)
More updates

head/offlineimap.py
head/offlineimap/folder/Base.py
head/offlineimap/folder/IMAP.py
head/offlineimap/repository/IMAP.py
head/offlineimap/threadutil.py

index 569017d535b7ba0fd56fa244eec9def7880fe14b..79f970e54c271feeea8dd82cbdc6b84e95dee9f6 100644 (file)
@@ -18,6 +18,7 @@
 #    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
 from offlineimap import imaplib, imaputil, imapserver, repository, folder, mbnames, threadutil
+from offlineimap.threadutil import InstanceLimitedThread
 import re, os, os.path, offlineimap, sys
 from ConfigParser import ConfigParser
 from threading import *
@@ -47,7 +48,9 @@ server = None
 remoterepos = None
 localrepos = None
 passwords = {}
-accountsemaphore = BoundedSemaphore(config.getint("general", "maxsyncaccounts"))
+
+threadutil.initInstanceLimit("ACCOUNTLIMIT", config.getint("general",
+                                                           "maxsyncaccounts"))
 
 # We have to gather passwords here -- don't want to have two threads
 # asking for passwords simultaneously.
@@ -57,6 +60,9 @@ for account in accounts:
         passwords[account] = config.get(account, "remotepass")
     else:
         passwords[account] = ui.getpass(account, config)
+    for instancename in ["FOLDER_" + account, "MSGCOPY_" + account]:
+        threadutil.initInstanceLimit(instancename,
+                                     config.getint(account, "maxconnections"))
 
 mailboxes = []
 mailboxlock = Lock()
@@ -70,7 +76,6 @@ def syncaccount(accountname, *args):
     print args
     # We don't need an account lock because syncitall() goes through
     # each account once, then waits for all to finish.
-    accountsemaphore.acquire()
     try:
         ui.acct(accountname)
         accountmetadata = os.path.join(metadatadir, accountname)
@@ -101,17 +106,19 @@ def syncaccount(accountname, *args):
         folderthreads = []
         for remotefolder in remoterepos.getfolders():
             server.connectionwait()
-            thread = Thread(target = syncfolder,
-                            name = "syncfolder-%s-%s" % \
-                            (accountname, remotefolder.getvisiblename()),
-                            args = (accountname, remoterepos,
-                                    remotefolder, localrepos, statusrepos))
+            thread = InstanceLimitedThread(\
+                instancename = 'FOLDER_' + accountname,
+                target = syncfolder,
+                name = "syncfolder-%s-%s" % \
+                (accountname, remotefolder.getvisiblename()),
+                args = (accountname, remoterepos, remotefolder, localrepos,
+                        statusrepos))
             thread.start()
             folderthreads.append(thread)
         threadutil.threadsreset(folderthreads)
         server.close()
     finally:
-        accountsemaphore.release()
+        pass
 
 def syncfolder(accountname, remoterepos, remotefolder, localrepos,
                statusrepos):
@@ -166,10 +173,10 @@ def syncitall():
     mailboxes = []                      # Reset.
     threads = []
     for accountname in accounts:        
-        threadutil.semaphorewait(accountsemaphore)
-        thread = Thread(target = syncaccount,
-                        name = "syncaccount-%s" % accountname,
-                        args = (accountname,))
+        thread = InstanceLimitedThread(instancename = 'ACCOUNTLIMIT',
+                                       target = syncaccount,
+                                       name = "syncaccount-%s" % accountname,
+                                       args = (accountname,))
         thread.start()
         threads.append(thread)
     # Wait for the threads to finish.
index 759f5a644aa4a9a28891dcb4fe26563eef9adaf4..bd05deec4ff52affdfbc14ca918808a478c118b8 100644 (file)
@@ -18,6 +18,8 @@
 
 import __main__
 from threading import *
+from offlineimap import threadutil
+from offlineimap.threadutil import InstanceLimitedThread
 
 class BaseFolder:
     def getname(self):
@@ -34,6 +36,11 @@ class BaseFolder:
         before firing off a thread.  For all others, returns immediately."""
         pass
 
+    def getcopyinstancelimit(self):
+        """For threading folders, returns the instancelimitname for
+        InstanceLimitedThreads."""
+        raise NotImplementedException
+
     def getvisiblename(self):
         return self.name
 
@@ -189,8 +196,10 @@ class BaseFolder:
             if not uid in dest.getmessagelist():
                 if self.suggeststhreads():
                     self.waitforthread()
-                    thread = Thread(target = self.copymessageto,
-                                    args = (uid, applyto))
+                    thread = InstanceLimitedThread(\
+                        self.getcopyinstancelimit(),
+                        target = self.copymessageto,
+                        args = (uid, applyto))
                     thread.start()
                     threads.append(thread)
                 else:
index b612e2ae75e84b96b88e53d7e4ba89681cfada5e..1a007b5918946ff9ed75f0025ffc0a18c1c93d25 100644 (file)
@@ -22,13 +22,14 @@ import rfc822
 from StringIO import StringIO
 
 class IMAPFolder(BaseFolder):
-    def __init__(self, imapserver, name, visiblename):
+    def __init__(self, imapserver, name, visiblename, accountname):
         self.name = imaputil.dequote(name)
         self.root = imapserver.root
         self.sep = imapserver.delim
         self.imapserver = imapserver
         self.messagelist = None
         self.visiblename = visiblename
+        self.accountname = accountname
 
     def suggeststhreads(self):
         return 1
@@ -36,6 +37,9 @@ class IMAPFolder(BaseFolder):
     def waitforthread(self):
         self.imapserver.connectionwait()
 
+    def getcopyinstancelimit(self):
+        return 'MSGCOPY_' + self.accountname
+
     def getvisiblename(self):
         return self.visiblename
 
@@ -116,7 +120,7 @@ class IMAPFolder(BaseFolder):
             self.imapserver.releaseconnection(imapobj)
 
     def savemessageflags(self, uid, flags):
-        imapobj = self.imapserver.acquireconnection(imapobj)
+        imapobj = self.imapserver.acquireconnection()
         try:
             imapobj.select(self.getfullname())
             result = imapobj.uid('store', '%d' % uid, 'FLAGS',
index 272b8a4bfea49d4e3dd761f6c043cfac3a28c43c..6fcb197b387e8444bc688b0d71ddae6304b9a340 100644 (file)
@@ -38,7 +38,8 @@ class IMAPRepository(BaseRepository):
 
     def getfolder(self, foldername):
         return folder.IMAP.IMAPFolder(self.imapserver, foldername,
-                                      self.nametrans(foldername))
+                                      self.nametrans(foldername),
+                                      accountname)
 
     def getfolders(self):
         if self.folders != None:
@@ -54,7 +55,8 @@ class IMAPRepository(BaseRepository):
             if '\\Noselect' in imaputil.flagsplit(flags):
                 continue
             retval.append(folder.IMAP.IMAPFolder(self.imapserver, name,
-                                                 self.nametrans(imaputil.dequote(name))))
+                                                 self.nametrans(imaputil.dequote(name)),
+                                                 self.accountname))
         retval.sort(lambda x, y: cmp(x.getvisiblename(), y.getvisiblename()))
         self.folders = retval
         return retval
index 7ad5a108770ede36f9b9ab60341b827ec607f937..2b20930280fd3dbe88eebf174862769d00b0f99e 100644 (file)
@@ -34,3 +34,30 @@ def semaphorewait(semaphore):
 def threadsreset(threadlist):
     for thread in threadlist:
         thread.join()
+
+instancelimitedsems = {}
+instancelimitedlock = Lock()
+
+def initInstanceLimit(instancename, instancemax):
+    instancelimitedlock.acquire()
+    if not instancelimitedsems.has_key(instancename):
+        instancelimitedsems[instancename] = BoundedSemaphore(instancemax)
+    instancelimitedlock.release()
+
+class InstanceLimitedThread(Thread):
+    def __init__(self, instancename, *args, **kwargs):
+        self.instancename = instancename
+                                                   
+        apply(Thread.__init__, (self,) + args, kwargs)
+
+    def start(self):
+        instancelimitedsems[self.instancename].acquire()
+        Thread.start(self)
+        
+    def run(self):
+        try:
+            Thread.run(self)
+        finally:
+            instancelimitedsems[self.instancename].release()
+        
+