--- /dev/null
+# Copyright 2005 James Bunton <james@delx.cjb.net>
+# Licensed for distribution under the GPL version 2, check COPYING for details
+
+"""
+Test cases for msnw (MSN Wrapper)
+"""
+
+# Twisted imports
+from twisted.internet.defer import Deferred
+from twisted.internet import reactor, error
+from twisted.trial import unittest
+from twisted.python import log
+
+# System imports
+import sys
+
+# Local imports
+import msnw
+
+
+# Settings
+TIMEOUT = 30.0 # Connection timeout in seconds
+LOGGING = True
+USER1 = "messengertest1@hotmail.com"
+PASS1 = "hellohello"
+USER2 = "messengertest2@hotmail.com"
+PASS2 = "hellohello"
+
+
+
+if LOGGING:
+ log.startLogging(sys.stdout)
+
+
+checkCount = 0 # Uck!
+def clearAccount(msncon):
+ """ Clears the contact list of the given MSNConnection. Returns a
+ Deferred which fires when the task is complete.
+ """
+ d = Deferred()
+ count = 0
+ global checkCount
+ checkCount = 0
+ def cb(ignored=None):
+ global checkCount
+ checkCount += 1
+ if checkCount == count:
+ d.callback(None)
+
+ for msnContact in msncon.getContacts().contacts.values():
+ for list in [msnw.FORWARD_LIST, msnw.BLOCK_LIST, msnw.ALLOW_LIST, msnw.PENDING_LIST]:
+ if msnContact.lists & list:
+ msncon.remContact(list, msnContact.userHandle).addCallback(cb)
+ count += 1
+
+ if count == 0:
+ reactor.callLater(0, d.callback, None)
+ return d
+
+
+####################
+# Basic connection #
+####################
+
+class MSNConnection(msnw.MSNConnection):
+ def __init__(self, username, password, ident, testCase):
+ msnw.MSNConnection.__init__(self, username, password, ident)
+ self.testCase = testCase
+
+ def listSynchronized(self):
+ # Now we're fully connected
+ self.testCase.done = "SYNCED"
+
+ def gotMessage(self, userHandle, text):
+ self.testCase.done = "GOTMESSAGE"
+ self.message = (userHandle, text)
+
+ def contactAddedMe(self, userHandle):
+ print "CONTACT ADDED ME"
+ self.contactAdded = userHandle
+
+
+class BasicConnection(unittest.TestCase):
+ def failed(self, why):
+ self.done = True
+
+ def testConnect(self):
+ self.done = False
+ self.timeout = reactor.callLater(TIMEOUT, self.failed)
+ self.user = MSNConnection(USER1, PASS1, "user", self)
+ while not self.done:
+ reactor.iterate(0.1)
+ self.failUnless((self.done == "SYNCED"), "Failed to connect to MSN servers.")
+ self.user.logOut()
+ reactor.iterate(0.1)
+ try:
+ self.timeout.cancel()
+ except (error.AlreadyCancelled, error.AlreadyCalled):
+ pass
+
+class BasicTests(unittest.TestCase):
+ def setUp(self):
+ self.failure = None
+ self.timeout = None
+ self.done = False
+
+ # Login both accounts
+ self.user1 = MSNConnection(USER1, PASS1, "user1", self)
+ self.loop("Logging in.", cond="SYNCED")
+ self.user2 = MSNConnection(USER2, PASS2, "user2", self)
+ self.loop("Logging in.", cond="SYNCED")
+
+ # Purge both contact lists
+ clearAccount(self.user1).addCallback(self.cb)
+ self.loop("Purging user1 contact list.")
+ clearAccount(self.user2).addCallback(self.cb)
+ self.loop("Purging user2 contact list.")
+
+ # Adding users to each other's lists
+ self.user1.addContact(msnw.FORWARD_LIST, USER2).addCallback(self.cb)
+ self.loop("Adding user2 to user1's forward list.")
+ self.user1.addContact(msnw.ALLOW_LIST, USER2).addCallback(self.cb)
+ self.loop("Adding user2 to user1's allow list.")
+ self.user2.addContact(msnw.FORWARD_LIST, USER1).addCallback(self.cb)
+ self.loop("Adding user1 to user2's forward list.")
+ self.user2.addContact(msnw.ALLOW_LIST, USER1).addCallback(self.cb)
+ self.loop("Adding user1 to user2's allow list.")
+
+ # Check the contacts have seen each other
+ self.loop("Waiting...")
+ self.failUnless((self.user1.contactAdded == USER2 and self.user2.contactAdded == USER1), "Contacts can't see each other.")
+
+
+ def tearDown(self):
+ self.user1.logOut()
+ self.user2.logOut()
+ reactor.iterate(0.1)
+
+ def cb(self, ignored=None):
+ self.done = True
+
+ def loop(self, failMsg, cond=None):
+ # Loops with a timeout
+ self.done = False
+ self.timeout = reactor.callLater(TIMEOUT, self.failed, "Timeout: " + failMsg)
+ while not self.done:
+ reactor.iterate(0.1)
+ try:
+ self.timeout.cancel()
+ except (error.AlreadyCancelled, error.AlreadyCalled):
+ pass
+ if self.failure:
+ self.fail(self.failure)
+ if cond:
+ self.failUnless((self.done == cond), "Failed: " + failMsg)
+
+ def failed(self, why):
+ self.failure = why
+ self.done = True
+
+ # The tests!
+ def testMessageExchange(self):
+ self.user1.sendMessage(USER2, "Hi user2")
+ self.loop("Timeout exchanging message 1.", cond="GOTMESSAGE")
+ self.failUnless((self.user1.message == (USER2, "Hi user1")), "Failed to transfer message 1.")
+
+