]> code.delx.au - pymsnt/commitdiff
more tests.
authorjamesbunton <jamesbunton@55fbd22a-6204-0410-b2f0-b6c764c7e90a>
Sun, 6 Nov 2005 02:40:22 +0000 (02:40 +0000)
committerjamesbunton <jamesbunton@55fbd22a-6204-0410-b2f0-b6c764c7e90a>
Sun, 6 Nov 2005 02:40:22 +0000 (02:40 +0000)
git-svn-id: http://delx.cjb.net/svn/pymsnt/trunk@30 55fbd22a-6204-0410-b2f0-b6c764c7e90a

committer: jamesbunton <jamesbunton@55fbd22a-6204-0410-b2f0-b6c764c7e90a>

src/tlib/msn/msn.py
src/tlib/msn/msnp2p.py
src/tlib/msn/test_msn.py

index 62d645b06927943a5efd50e46ca91a125ba1a912..3f5eb27b39f1cdacb325bc84edec1a799ded5c9a 100644 (file)
@@ -898,10 +898,12 @@ class NotificationClient(MSNEventBase):
         p2 = lm.find("</psm>")
         if p1 >= 0 and p2 >= 0:
             personal = unescapeFromXml(message.message[p1:p2])
-            msnContact = self.factory.contacts.getContact(userHandle)
+            msnContact = self.factory.contacts.getContact(message.userHandle)
             if not msnContact: return
             msnContact.personal = personal
             self.contactPersonalChanged(message.userHandle, personal)
+        else:
+            self.contactPersonalChanged(message.userHandle, '')
 
     def checkMessage(self, message):
         """ hook used for detecting specific notification messages """
@@ -966,7 +968,7 @@ class NotificationClient(MSNEventBase):
     def handle_CHG(self, params):
         id = int(params[0])
         if not self._fireCallback(id, params[1]):
-            self.factory.status = params[1]
+            if self.factory: self.factory.status = params[1]
             self.statusChanged(params[1])
 
     def handle_ILN(self, params):
@@ -976,18 +978,25 @@ class NotificationClient(MSNEventBase):
         msnContact.status = params[1]
         msnContact.screenName = unquote(params[3])
         if len(params) > 4: msnContact.caps = int(params[4])
-        if len(params) > 5: self.handleAvatarHelper(msnContact, params[5])
+        if len(params) > 5:
+            self.handleAvatarHelper(msnContact, params[5])
+        else:
+            self.handleAvatarGoneHelper(msnContact)
         self.gotContactStatus(params[1], params[2], unquote(params[3]))
 
+    def handleAvatarGoneHelper(self, msnContact):
+        if msnContact.msnobj:
+            msnContact.msnobj = None
+            msnContact.msnobjGot = True
+            self.contactAvatarChanged(msnContact.userHandle, "")
+
     def handleAvatarHelper(self, msnContact, msnobjStr):
-        s = unquote(msnobjStr)
-        msnobj = msnp2p.MSNOBJ()
-        msnobj.parse(s)
+        msnobj = msnp2p.MSNOBJ(unquote(msnobjStr))
         if not msnContact.msnobj or msnobj.sha1d != msnContact.msnobj.sha1d:
             if msnp2p.MSNP2P_DEBUG: log.msg("Updated MSNOBJ received!" + msnobjStr)
             msnContact.msnobj = msnobj
             msnContact.msnobjGot = False
-            self.avatarHashChanged(msnContact.userHandle, msnContact.msnobj.sha1d)
+            self.contactAvatarChanged(msnContact.userHandle, msnContact.msnobj.sha1d)
 
     def handle_CHL(self, params):
         checkParamLen(len(params), 2, 'CHL')
@@ -1005,7 +1014,10 @@ class NotificationClient(MSNEventBase):
         msnContact.status = params[0]
         msnContact.screenName = unquote(params[2])
         if len(params) > 3: msnContact.caps = int(params[3])
-        if len(params) > 4: self.handleAvatarHelper(msnContact, params[4])
+        if len(params) > 4:
+            self.handleAvatarHelper(msnContact, params[4])
+        else:
+            self.handleAvatarGoneHelper(msnContact)
         self.contactStatusChanged(params[0], params[1], unquote(params[2]))
 
     def handle_FLN(self, params):
@@ -1069,7 +1081,8 @@ class NotificationClient(MSNEventBase):
             self._getStateData('list').privacy = listCodeToID[params[0].lower()]
         else:
             id = int(params[0])
-            self._fireCallback(id, listCodeToID[params[1].lower()])
+            self.factory.contacts.privacy = listCodeToID[params[1].lower()]
+            self._fireCallback(id, params[1])
 
     def handle_GTC(self, params):
         # check to see if this is in response to a SYN
@@ -1218,8 +1231,11 @@ class NotificationClient(MSNEventBase):
         try:
             messageLen = int(params[1])
         except ValueError: raise MSNProtocolError, "Invalid Parameter for UBX length argument"
-        self.currentMessage = MSNMessage(length=messageLen, userHandle=params[0], screenName="UBX", specialMessage=True)
-        self.setRawMode()
+        if messageLen > 0:
+            self.currentMessage = MSNMessage(length=messageLen, userHandle=params[0], screenName="UBX", specialMessage=True)
+            self.setRawMode()
+        else:
+            self.contactPersonalChanged(params[0], '')
 
     def handle_UUX(self, params):
         checkParamLen(len(params), 2, 'UUX')
@@ -1295,7 +1311,7 @@ class NotificationClient(MSNEventBase):
         """
         pass
 
-    def avatarHashChanged(self, userHandle, hash):
+    def contactAvatarChanged(self, userHandle, hash):
         """
         Called when we receive the first, or a new <msnobj/> from a
         contact.
@@ -1306,8 +1322,8 @@ class NotificationClient(MSNEventBase):
 
     def statusChanged(self, statusCode):
         """
-        Called when our status changes and it isn't in response to
-        client command.
+        Called when our status changes and its not in response to a
+        client command.
 
         @param statusCode: 3-letter status code
         """
@@ -1473,7 +1489,7 @@ class NotificationClient(MSNEventBase):
         id, d = self._createIDMapping()
         self.sendLine("CHG %s %s %s %s" % (id, status, str(MSNContact.MSNC1 | MSNContact.MSNC2 | MSNContact.MSNC3 | MSNContact.MSNC4), quote(self.msnobj.text)))
         def _cb(r):
-            if self.factory: self.factory.status = r[0]
+            self.factory.status = r[0]
             return r
         return d.addCallback(_cb)
 
@@ -1498,9 +1514,8 @@ class NotificationClient(MSNEventBase):
                           
         @return: A Deferred, the callback of which will be fired when
                  the server replies with the new privacy setting.
-                 The callback argument will be a tuple, the 2 elements
-                 of which being the list version and either 'al'
-                 or 'bl' (the new privacy setting).
+                 The callback argument will be a tuple, the only element
+                 of which being either 'al' or 'bl' (the new privacy setting).
         """
 
         id, d = self._createIDMapping()
index a696c5208c7d745e265ac3be08abcb6fc301ed57..aa74ee301269f057719523a50a01ce816c86d0d4 100644 (file)
@@ -23,7 +23,7 @@ def random_guid():
 
 
 class MSNOBJ:
-       def __init__(self):
+       def __init__(self, s=""):
                self.creator = ""
                self.imageData = ""
                self.size = 0
@@ -32,6 +32,8 @@ class MSNOBJ:
                self.friendly = ""
                self.sha1d = ""
                self.text = ""
+               if s:
+                       self.parse(s)
        
        def setData(self, creator, imageData):
                self.creator = creator
index e6a03e8c6936c0928e5ed012ecdf41f5a47c0754..9bdc41d3e889ed9a9651ec7af62f9bd2f77865c5 100644 (file)
@@ -27,6 +27,11 @@ class StringIOWithoutClosing(StringIO.StringIO):
     def close(self): pass
     def loseConnection(self): pass
 
+
+##################
+# Passport tests #
+##################
+
 class PassportTests(unittest.TestCase):
 
     def setUp(self):
@@ -87,17 +92,10 @@ class PassportTests(unittest.TestCase):
         self.failUnless(self.result[0] == (msn.LOGIN_REDIRECT, 'https://newlogin.host.com/', 'a'))
 
 
-class DummySwitchboardClient(msn.SwitchboardClient):
-    def userTyping(self, message):
-        self.state = 'TYPING'
-
-    def gotSendRequest(self, fileName, fileSize, cookie, message):
-        if fileName == 'foobar.ext' and fileSize == 31337 and cookie == 1234: self.state = 'INVITATION'
-
 
-class DummyNotificationServer(msn.MSNEventBase):
-    def __init__(self):
-        pass
+######################
+# Notification tests #
+######################
 
 class DummyNotificationClient(msn.NotificationClient):
     def loggedIn(self, userHandle, verified):
@@ -117,6 +115,18 @@ class DummyNotificationClient(msn.NotificationClient):
         if code == msn.STATUS_LUNCH and userHandle == "foo@bar.com" and screenName == "Test Name":
             self.state = 'NEWSTATUS'
 
+    def contactAvatarChanged(self, userHandle, hash):
+        if userHandle == "foo@bar.com" and hash == "trC8SlFx2sWQxZMIBAWSEnXc8oQ=":
+            self.state = 'NEWAVATAR'
+        elif self.state == 'NEWAVATAR' and hash == "":
+            self.state = 'AVATARGONE'
+    
+    def contactPersonalChanged(self, userHandle, personal):
+        if userHandle == 'foo@bar.com' and personal == 'My Personal Message':
+            self.state = 'GOTPERSONAL'
+        elif userHandle == 'foo@bar.com' and personal == '':
+            self.state = 'PERSONALGONE'
+
     def contactOffline(self, userHandle):
         if userHandle == "foo@bar.com": self.state = 'OFFLINE'
 
@@ -150,30 +160,18 @@ class DummyNotificationClient(msn.NotificationClient):
     def gotMSNAlert(self, body, action, subscr):
         self.state = 'NOTIFICATION'
 
+    def gotInitialEmailNotification(self, inboxunread, foldersunread):
+        if inboxunread == 1 and foldersunread == 0:
+            self.state = 'INITEMAIL1'
+        else:
+            self.state = 'INITEMAIL2'
 
-class DummyPingNotificationServer(LineReceiver):
-    def lineReceived(self, line):
-        if line.startswith("PNG") and self.good:
-            self.sendLine("QNG 50")
-
-class DummyPingNotificationClient(msn.NotificationClient):
-    def connectionMade(self):
-        self.pingCheckerStart()
-
-    def sendLine(self, line):
-        msn.NotificationClient.sendLine(self, line)
-        self.count += 1
-        if self.count > 10:
-            self.transport.loseConnection() # But not for real, just to end the test
-
-    def connectionLost(self, reason):
-        if self.count <= 10:
-            self.state = 'DISCONNECTED'
-
+    def gotRealtimeEmailNotification(self, mailfrom, fromaddr, subject):
+        if mailfrom == 'Some Person' and fromaddr == 'example@passport.com' and subject == 'newsubject':
+            self.state = 'REALTIMEEMAIL'
 
 class NotificationTests(unittest.TestCase):
     """ testing the various events in NotificationClient """
-    #skip = "Not ready"
 
     def setUp(self):
         self.client = DummyNotificationClient()
@@ -192,9 +190,33 @@ class NotificationTests(unittest.TestCase):
         m += 'LoginTime: 1016941010\r\nEmailEnabled: 1\r\nMemberIdHigh: 40000\r\nMemberIdLow: -600000000\r\nlang_preference: 1033\r\n'
         m += 'preferredEmail: foo@bar.com\r\ncountry: AU\r\nPostalCode: 90210\r\nGender: M\r\nKid: 0\r\nAge:\r\nsid: 400\r\n'
         m += 'kv: 2\r\nMSPAuth: 2CACCBCCADMoV8ORoz64BVwmjtksIg!kmR!Rj5tBBqEaW9hc4YnPHSOQ$$\r\n\r\n'
-        map(self.client.dataReceived, [x+'\r\n' for x in m.split('\r\n')[:-1]])
+        self.client.dataReceived(m)
         self.failUnless((self.client.state == 'PROFILE'), 'Failed to detect initial profile')
 
+    def testInitialEmailNotification(self):
+        m =  'MIME-Version: 1.0\r\nContent-Type: text/x-msmsgsinitialemailnotification; charset=UTF-8\r\n'
+        m += '\r\nInbox-Unread: 1\r\nFolders-Unread: 0\r\nInbox-URL: /cgi-bin/HoTMaiL\r\n'
+        m += 'Folders-URL: /cgi-bin/folders\r\nPost-URL: http://www.hotmail.com\r\n\r\n'
+        m =  'MSG Hotmail Hotmail %s\r\n' % (str(len(m))) + m
+        self.client.dataReceived(m)
+        self.failUnless((self.client.state == 'INITEMAIL1'), 'Failed to detect initial email notification')
+
+    def testNoInitialEmailNotification(self):
+        m =  'MIME-Version: 1.0\r\nContent-Type: text/x-msmsgsinitialemailnotification; charset=UTF-8\r\n'
+        m += '\r\nInbox-Unread: 0\r\nFolders-Unread: 0\r\nInbox-URL: /cgi-bin/HoTMaiL\r\n'
+        m += 'Folders-URL: /cgi-bin/folders\r\nPost-URL: http://www.hotmail.com\r\n\r\n'
+        m =  'MSG Hotmail Hotmail %s\r\n' % (str(len(m))) + m
+        self.client.dataReceived(m)
+        self.failUnless((self.client.state != 'INITEMAIL2'), 'Detected initial email notification when I should not have')
+
+    def testRealtimeEmailNotification(self):
+        m =  'MSG Hotmail Hotmail 356\r\nMIME-Version: 1.0\r\nContent-Type: text/x-msmsgsemailnotification; charset=UTF-8\r\n'
+        m += '\r\nFrom: Some Person\r\nMessage-URL: /cgi-bin/getmsg?msg=MSG1050451140.21&start=2310&len=2059&curmbox=ACTIVE\r\n'
+        m += 'Post-URL: https://loginnet.passport.com/ppsecure/md5auth.srf?lc=1038\r\n'
+        m += 'Subject: =?"us-ascii"?Q?newsubject?=\r\nDest-Folder: ACTIVE\r\nFrom-Addr: example@passport.com\r\nid: 2\r\n'
+        self.client.dataReceived(m)
+        self.failUnless((self.client.state == 'REALTIMEEMAIL'), 'Failed to detect realtime email notification')
+
     def testMSNAlert(self):
         m  = '<NOTIFICATION ver="2" id="1342902633" siteid="199999999" siteurl="http://alerts.msn.com">\r\n'
         m += '<TO pid="0x0006BFFD:0x8582C0FB" name="example@passport.com"/>\r\n'
@@ -253,11 +275,19 @@ class NotificationTests(unittest.TestCase):
         msnobj = urllib.quote('<msnobj Creator="buddy1@hotmail.com" Size="24539" Type="3" Location="TFR2C.tmp" Friendly="AAA=" SHA1D="trC8SlFx2sWQxZMIBAWSEnXc8oQ=" SHA1C="U32o6bosZzluJq82eAtMpx5dIEI="/>')
         t = [('ILN 1 AWY foo@bar.com Test%20Screen%20Name 268435456 ' + msnobj, 'INITSTATUS', 'Failed to detect initial status report'),
              ('NLN LUN foo@bar.com Test%20Name 0', 'NEWSTATUS', 'Failed to detect contact status change'),
+             ('NLN AWY foo@bar.com Test%20Name 0 ' + msnobj, 'NEWAVATAR', 'Failed to detect contact avatar change'),
+             ('NLN AWY foo@bar.com Test%20Name 0', 'AVATARGONE', 'Failed to detect contact avatar disappearing'),
              ('FLN foo@bar.com', 'OFFLINE', 'Failed to detect contact signing off'),
              ('CHG 1 HDN 0 ' + msnobj, 'MYSTATUS', 'Failed to detect my status changing')]
         for i in t:
             self.client.lineReceived(i[0])
             self.failUnless((self.client.state == i[1]), i[2])
+
+        # Test UBX
+        self.client.dataReceived('UBX foo@bar.com 72\r\n<Data><PSM>My Personal Message</PSM><CurrentMedia></CurrentMedia></Data>')
+        self.failUnless((self.client.state == 'GOTPERSONAL'), 'Failed to detect new personal message')
+        self.client.dataReceived('UBX foo@bar.com 0\r\n')
+        self.failUnless((self.client.state == 'PERSONALGONE'), 'Failed to detect personal message disappearing')
         self.client.logOut()
 
     def testAsyncPhoneChange(self):
@@ -307,12 +337,142 @@ class NotificationTests(unittest.TestCase):
 
     def testAsyncSwitchboardInvitation(self):
         self.client.lineReceived("RNG 1234 192.168.1.1:1863 CKI 123.456 foo@foo.com Screen%20Name")
-        self.failUnless(self.client.state == "SBINVITED")
+        self.failUnless((self.client.state == 'SBINVITED'), 'Failed to detect switchboard invitation')
+
+
+#######################################
+# Notification with fake server tests #
+#######################################
+
+class FakeNotificationServer(msn.MSNEventBase):
+    def handle_CHG(self, params):
+        if len(params) < 4:
+            params.append('')
+        self.sendLine("CHG %s %s %s %s" % (params[0], params[1], params[2], params[3]))
+
+    def handle_BLP(self, params):
+        self.sendLine("BLP %s %s 100" % (params[0], params[1]))
+
+class FakeNotificationClient(msn.NotificationClient):
+    def doStatusChange(self):
+        def testcb((status,)):
+            if status == msn.STATUS_AWAY:
+                self.test = 'PASS'
+                self.transport.loseConnection()
+        d = self.changeStatus(msn.STATUS_AWAY)
+        d.addCallback(testcb)
+
+    def doPrivacyMode(self):
+        def testcb((priv,)):
+            if priv.upper() == 'AL':
+                self.test = 'PASS'
+                self.transport.loseConnection()
+        d = self.setPrivacyMode(True)
+        d.addCallback(testcb)
+
+class FakeServerNotificationTests(unittest.TestCase):
+    """ tests the NotificationClient against a fake server. """
+
+    def setUp(self):
+        self.client = FakeNotificationClient()
+        self.client.factory = msn.NotificationFactory()
+        self.server = FakeNotificationServer()
+
+    def tearDown(self):
+        pass
+
+    def testChangeStatus(self):
+        reactor.callLater(0, self.client.doStatusChange)
+        loopback.loopback(self.client, self.server)
+        self.failUnless((self.client.test == 'PASS'), 'Failed to change status properly')
+
+    def testSetPrivacyMode(self):
+        self.client.factory.contacts = msn.MSNContactList()
+        reactor.callLater(0, self.client.doPrivacyMode)
+        loopback.loopback(self.client, self.server)
+        self.failUnless((self.client.test == 'PASS'), 'Failed to change privacy mode')
+
+#################################
+# Notification Challenges tests #
+#################################
+
+class DummyChallengeNotificationServer(msn.MSNEventBase):
+    def doChallenge(self, challenge, response):
+        self.state = 0
+        self.response = response
+        self.sendLine("CHL 0 " + challenge)
+
+    def checkMessage(self, message):
+        if message.message == self.response:
+            self.state = "PASS"
+        self.transport.loseConnection()
+        return 0
+
+    def handle_QRY(self, params):
+        self.state = 1
+        if len(params) == 3 and params[1] == "PROD0090YUAUV{2B" and params[2] == "32":
+            self.state = 2
+            self.currentMessage = msn.MSNMessage(length=32, userHandle="QRY", screenName="QRY", specialMessage=True)
+            self.setRawMode()
+        else:
+            self.transport.loseConnection()
+
+class DummyChallengeNotificationClient(msn.NotificationClient):
+    def connectionMade(self):
+        msn.MSNEventBase.connectionMade(self)
+
+    def handle_CHL(self, params):
+        msn.NotificationClient.handle_CHL(self, params)
+        self.transport.loseConnection()
+
+
+class NotificationChallengeTests(unittest.TestCase):
+    """ tests the responses to the CHLs the server sends """
+
+    def setUp(self):
+        self.client = DummyChallengeNotificationClient()
+        self.server = DummyChallengeNotificationServer()
+    
+    def tearDown(self):
+        pass
+    
+    def testChallenges(self):
+        challenges = [('13038318816579321232', 'b01c13020e374d4fa20abfad6981b7a9'),
+                      ('23055170411503520698', 'ae906c3f2946d25e7da1b08b0b247659'),
+                      ('37819769320541083311', 'db79d37dadd9031bef996893321da480'),
+                      ('93662730714769834295', 'd619dfbb1414004d34d0628766636568')]
+        for challenge, response in challenges:
+            reactor.callLater(0, lambda: self.server.doChallenge(challenge, response))
+            loopback.loopback(self.client, self.server)
+            self.failUnless((self.server.state == 'PASS'), 'Incorrect challenge response.')
+        
+
+###########################
+# Notification ping tests #
+###########################
+
+class DummyPingNotificationServer(LineReceiver):
+    def lineReceived(self, line):
+        if line.startswith("PNG") and self.good:
+            self.sendLine("QNG 50")
+
+class DummyPingNotificationClient(msn.NotificationClient):
+    def connectionMade(self):
+        self.pingCheckerStart()
+
+    def sendLine(self, line):
+        msn.NotificationClient.sendLine(self, line)
+        self.count += 1
+        if self.count > 10:
+            self.transport.loseConnection() # But not for real, just to end the test
+
+    def connectionLost(self, reason):
+        if self.count <= 10:
+            self.state = 'DISCONNECTED'
 
 class NotificationPingTests(unittest.TestCase):
     """ tests pinging in the NotificationClient class """
-#    skip = "Not working yet"
-    
+
     def setUp(self):
         msn.PINGSPEED = 0.1
         self.client = DummyPingNotificationClient()
@@ -333,7 +493,21 @@ class NotificationPingTests(unittest.TestCase):
         self.server.good = False
         loopback.loopback(self.client, self.server)
         self.failUnless((self.client.state == 'DISCONNECTED'), 'Should be disconnected.')
-        
+
+
+
+
+#############################
+# Switchboard message tests #
+#############################
+
+class DummySwitchboardClient(msn.SwitchboardClient):
+    def userTyping(self, message):
+        self.state = 'TYPING'
+
+    def gotSendRequest(self, fileName, fileSize, cookie, message):
+        if fileName == 'foobar.ext' and fileSize == 31337 and cookie == 1234: self.state = 'INVITATION'
+
 
 class MessageHandlingTests(unittest.TestCase):
     """ testing various message handling methods from SwichboardClient """
@@ -401,6 +575,11 @@ class MessageHandlingTests(unittest.TestCase):
         if accept and ip == '192.168.0.1' and port == 6891 and aCookie == 4321: self.client.state = 'INFO'
 
 
+
+################
+# MSNFTP tests #
+################
+
 class FileTransferTestCase(unittest.TestCase):
     """ test FileSend against FileReceive """
     skip = "Not ready"