]> code.delx.au - pymsnt/commitdiff
Start of MSNSLP tests
authorjamesbunton <jamesbunton@55fbd22a-6204-0410-b2f0-b6c764c7e90a>
Sun, 20 Nov 2005 13:46:21 +0000 (13:46 +0000)
committerjamesbunton <jamesbunton@55fbd22a-6204-0410-b2f0-b6c764c7e90a>
Sun, 20 Nov 2005 13:46:21 +0000 (13:46 +0000)
git-svn-id: http://delx.cjb.net/svn/pymsnt/trunk@35 55fbd22a-6204-0410-b2f0-b6c764c7e90a

committer: jamesbunton <jamesbunton@55fbd22a-6204-0410-b2f0-b6c764c7e90a>

src/tlib/msn/test_msn.py

index ff998845739996d403201ac4c57f78f02992134b..fb3e0930b159116fbaf8ebc57e8f9e4ab0e97966 100644 (file)
@@ -10,7 +10,8 @@ Test cases for msn.
 from twisted.protocols import loopback
 from twisted.protocols.basic import LineReceiver
 from twisted.internet.defer import Deferred
-from twisted.internet import reactor
+from twisted.internet import reactor, main
+from twisted.python import failure
 from twisted.trial import unittest
 
 # System imports
@@ -28,6 +29,45 @@ class StringIOWithoutClosing(StringIO.StringIO):
     def close(self): pass
     def loseConnection(self): pass
 
+class LoopbackCon:
+    def __init__(self, con1, con2):
+        self.con1 = con1
+        self.con2 = con2
+        self.con1ToCon2 = loopback.LoopbackRelay(con1)
+        self.con2ToCon1 = loopback.LoopbackRelay(con2)
+        con2.makeConnection(self.con1ToCon2)
+        con1.makeConnection(self.con2ToCon1)
+        self.connected = True
+
+    def doSteps(self, steps=1):
+        """ Returns true if the connection finished """
+        count = 0
+        while count < steps:
+            reactor.iterate(0.01)
+            self.con1ToCon2.clearBuffer()
+            self.con2ToCon1.clearBuffer()
+            if self.con1ToCon2.shouldLose:
+                self.con1ToCon2.clearBuffer()
+                count = -1
+                break
+            elif self.con2ToCon1.shouldLose:
+                count = -1
+                break
+            else:
+                count += 1
+        if count == -1:
+            self.disconnect()
+            return True
+        return False
+
+    def disconnect(self):
+        if self.connected:
+            self.con1.connectionLost(failure.Failure(main.CONNECTION_DONE))
+            self.con2.connectionLost(failure.Failure(main.CONNECTION_DONE))
+            reactor.iterate()
+
+
+
 
 ##################
 # Passport tests #
@@ -538,76 +578,77 @@ class FakeServerNotificationTests(unittest.TestCase):
         self.client.factory = msn.NotificationFactory()
         self.client.test = 'FAIL'
         self.server = FakeNotificationServer()
+        self.loop = LoopbackCon(self.client, self.server)
 
     def tearDown(self):
-        pass
+        self.loop.disconnect()
 
     def testChangeStatus(self):
-        reactor.callLater(0, self.client.doStatusChange)
-        loopback.loopback(self.client, self.server)
+        self.client.doStatusChange()
+        self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
         self.failUnless((self.client.test == 'PASS'), 'Failed to change status properly')
 
     def testSetPrivacyMode(self):
         self.client.factory.contacts = msn.MSNContactList()
-        reactor.callLater(0, self.client.doPrivacyMode)
-        loopback.loopback(self.client, self.server)
+        self.client.doPrivacyMode()
+        self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
         self.failUnless((self.client.test == 'PASS'), 'Failed to change privacy mode')
 
     def testSyncList(self):
-        reactor.callLater(0, self.client.doSyncList)
-        loopback.loopback(self.client, self.server)
+        self.client.doSyncList()
+        self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
         self.failUnless((self.client.test == 'PASS'), 'Failed to synchronise list')
     testSyncList.skip = "Will do after list versions."
 
     def testAddContactFL(self):
         self.client.factory.contacts = msn.MSNContactList()
-        reactor.callLater(0, self.client.doAddContactFL)
-        loopback.loopback(self.client, self.server)
+        self.client.doAddContactFL()
+        self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
         self.failUnless((self.client.test == 'PASS'), 'Failed to add contact to forward list')
 
     def testAddContactAL(self):
         self.client.factory.contacts = msn.MSNContactList()
-        reactor.callLater(0, self.client.doAddContactAL)
-        loopback.loopback(self.client, self.server)
+        self.client.doAddContactAL()
+        self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
         self.failUnless((self.client.test == 'PASS'), 'Failed to add contact to allow list')
 
     def testRemContactFL(self):
         self.client.factory.contacts = msn.MSNContactList()
         self.client.factory.contacts.addContact(msn.MSNContact(userGuid="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", userHandle="foo@bar.com", screenName="Some guy", lists=msn.FORWARD_LIST))
-        reactor.callLater(0, self.client.doRemContactFL)
-        loopback.loopback(self.client, self.server)
+        self.client.doRemContactFL()
+        self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
         self.failUnless((self.client.test == 'PASS'), 'Failed to remove contact from forward list')
 
     def testRemContactAL(self):
         self.client.factory.contacts = msn.MSNContactList()
         self.client.factory.contacts.addContact(msn.MSNContact(userGuid="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", userHandle="foo@bar.com", screenName="Some guy", lists=msn.ALLOW_LIST))
-        reactor.callLater(0, self.client.doRemContactAL)
-        loopback.loopback(self.client, self.server)
+        self.client.doRemContactAL()
+        self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
         self.failUnless((self.client.test == 'PASS'), 'Failed to remove contact from allow list')
 
     def testChangedScreenName(self):
-        reactor.callLater(0, self.client.doScreenNameChange)
-        loopback.loopback(self.client, self.server)
+        self.client.doScreenNameChange()
+        self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
         self.failUnless((self.client.test == 'PASS'), 'Failed to change screen name properly')
 
     def testChangePersonal1(self):
-        reactor.callLater(0, lambda: self.client.doPersonalChange("Some personal message"))
-        loopback.loopback(self.client, self.server)
+        self.client.doPersonalChange("Some personal message")
+        self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
         self.failUnless((self.client.test == 'PASS'), 'Failed to change personal message properly')
 
     def testChangePersonal2(self):
-        reactor.callLater(0, lambda: self.client.doPersonalChange(""))
-        loopback.loopback(self.client, self.server)
+        self.client.doPersonalChange("")
+        self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
         self.failUnless((self.client.test == 'PASS'), 'Failed to change personal message properly')
 
     def testChangeAvatar(self):
-        reactor.callLater(0, lambda: self.client.doAvatarChange("DATADATADATADATA"))
-        loopback.loopback(self.client, self.server)
+        self.client.doAvatarChange("DATADATADATADATA")
+        self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
         self.failUnless((self.client.test == 'PASS'), 'Failed to change avatar properly')
 
     def testRequestSwitchboard(self):
-        reactor.callLater(0, self.client.doRequestSwitchboard)
-        loopback.loopback(self.client, self.server)
+        self.client.doRequestSwitchboard()
+        self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
         self.failUnless((self.client.test == 'PASS'), 'Failed to request switchboard')
 
 
@@ -651,9 +692,10 @@ class NotificationChallengeTests(unittest.TestCase):
     def setUp(self):
         self.client = DummyChallengeNotificationClient()
         self.server = DummyChallengeNotificationServer()
+        self.loop = LoopbackCon(self.client, self.server)
     
     def tearDown(self):
-        pass
+        self.loop.disconnect()
     
     def testChallenges(self):
         challenges = [('13038318816579321232', 'b01c13020e374d4fa20abfad6981b7a9'),
@@ -661,8 +703,8 @@ class NotificationChallengeTests(unittest.TestCase):
                       ('37819769320541083311', 'db79d37dadd9031bef996893321da480'),
                       ('93662730714769834295', 'd619dfbb1414004d34d0628766636568')]
         for challenge, response in challenges:
-            reactor.callLater(0, lambda: self.server.doChallenge(challenge, response))
-            loopback.loopback(self.client, self.server)
+            self.server.doChallenge(challenge, response)
+            self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
             self.failUnless((self.server.state == 'PASS'), 'Incorrect challenge response.')
         
 
@@ -698,19 +740,21 @@ class NotificationPingTests(unittest.TestCase):
         self.server = DummyPingNotificationServer()
         self.client.state = 'CONNECTED'
         self.client.count = 0
+        self.loop = LoopbackCon(self.client, self.server)
 
     def tearDown(self):
         msn.PINGSPEED = 50.0
         self.client.logOut()
+        self.loop.disconnect()
 
     def testPingGood(self):
         self.server.good = True
-        loopback.loopback(self.client, self.server)
+        self.loop.doSteps(100)
         self.failUnless((self.client.state == 'CONNECTED'), 'Should be connected.')
 
     def testPingBad(self):
         self.server.good = False
-        loopback.loopback(self.client, self.server)
+        self.loop.doSteps(100)
         self.failUnless((self.client.state == 'DISCONNECTED'), 'Should be disconnected.')
 
 
@@ -804,13 +848,14 @@ class SwitchboardBasicTests(unittest.TestCase):
         self.client.key = 'somekey'
         self.client.sessionID = 'someSID'
         self.server = DummySwitchboardServer()
+        self.loop = LoopbackCon(self.client, self.server)
  
     def tearDown(self):
-        pass
+        self.loop.disconnect()
  
     def _testSB(self, reply):
         self.client.reply = reply
-        loopback.loopback(self.client, self.server)
+        self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
         self.failUnless((self.client.state == 'LOGGEDIN'), 'Failed to login with reply='+str(reply))
 
     def testReply(self):
@@ -853,17 +898,95 @@ class SwitchboardBasicTests(unittest.TestCase):
 
     def testInviteUser(self):
         self.client.connectionMade = lambda: None
-        reactor.callLater(0, self.client.doSendInvite)
-        loopback.loopback(self.client, self.server)
+        self.client.doSendInvite()
+        self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
         self.failUnless((self.client.state == 'INVITESUCCESS'), 'Failed to invite user')
 
     def testSendMessage(self):
         self.client.connectionMade = lambda: None
-        reactor.callLater(0, self.client.doSendMessage)
-        loopback.loopback(self.client, self.server)
+        self.client.doSendMessage()
+        self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
         self.failUnless((self.client.state == 'MESSAGESUCCESS'), 'Failed to send message')
 
 
+################
+# MSNP2P tests #
+################
+
+class DummySwitchboardP2PServerHelper(msn.MSNEventBase):
+    def __init__(self, server):
+        msn.MSNEventBase.__init__(self)
+        self.server = server
+
+    def handle_USR(self, params):
+        if len(params) != 3:
+            self.transport.loseConnection()
+        self.userHandle = params[1]
+        if params[1] == 'foo1@bar.com' and params[2] == 'somekey1':
+            self.sendLine("USR %s OK %s %s" % (params[0], params[1], params[1]))
+        if params[1] == 'foo2@bar.com' and params[2] == 'somekey2':
+            self.sendLine("USR %s OK %s %s" % (params[0], params[1], params[1]))
+
+    def checkMessage(self, message):
+        return 1
+
+    def gotMessage(self, message):
+        message.userHandle = self.userHandle
+        message.screenName = self.userHandle
+        self.server.gotMessage(message, self)
+
+    def sendMessage(self, message):
+        if message.length == 0: message.length = message._calcMessageLen()
+        self.sendLine("MSG %s %s %s" % (message.userHandle, message.screenName, message.length))
+        self.sendLine('MIME-Version: %s' % message.getHeader('MIME-Version'))
+        self.sendLine('Content-Type: %s' % message.getHeader('Content-Type'))
+        for header in [h for h in message.headers.items() if h[0].lower() not in ('mime-version','content-type')]:
+            self.sendLine("%s: %s" % (header[0], header[1]))
+        self.transport.write("\r\n")
+        self.transport.write(message.message)
+
+
+class DummySwitchboardP2PServer:
+    def __init__(self):
+        self.clients = []
+
+    def newClient(self):
+        c = DummySwitchboardP2PServerHelper(self)
+        self.clients.append(c)
+        return c
+
+    def gotMessage(self, message, sender):
+        for c in self.clients:
+            if c != sender:
+                c.sendMessage(message)
+
+class DummySwitchboardP2PClient(msn.SwitchboardClient):
+    def gotMessage(self, message):
+        if message.message == "Test Message" and message.userHandle == "foo1@bar.com":
+            self.status = "GOTMESSAGE"
+
+class SwitchboardP2PTests(unittest.TestCase):
+    def setUp(self):
+        self.server  = DummySwitchboardP2PServer()
+        self.client1 = DummySwitchboardP2PClient()
+        self.client1.key = 'somekey1'
+        self.client1.userHandle = 'foo1@bar.com'
+        self.client2 = DummySwitchboardP2PClient()
+        self.client2.key = 'somekey2'
+        self.client2.userHandle = 'foo2@bar.com'
+        self.client2.status = "INIT"
+        self.loop1 = LoopbackCon(self.client1, self.server.newClient())
+        self.loop2 = LoopbackCon(self.client2, self.server.newClient())
+
+    def tearDown(self):
+        self.loop1.disconnect()
+        self.loop2.disconnect()
+
+    def testMessage(self):
+        self.client1.sendMessage(msn.MSNMessage(message='Test Message'))
+        self.loop1.doSteps(10)
+        self.loop2.doSteps(10)
+        self.failUnless((self.client2.status == "GOTMESSAGE"), "Fake switchboard server not working.")
 
 
 ################
@@ -890,7 +1013,8 @@ class FileTransferTestCase(unittest.TestCase):
         sender.fileSize = 7000
         client = msnft.MSNFTP_FileReceive(auth, "foo@bar.com", self.output)
         client.fileSize = 7000
-        loopback.loopback(sender, client)
+        loop = LoopbackCon(client, sender)
+        loop.doSteps(100)
         self.failUnless((client.completed and sender.completed), "send failed to complete")
         self.failUnless((self.input.getvalue() == self.output.getvalue()), "saved file does not match original")