p2 = lm.find("</psm>")
if p1 >= 0 and p2 >= 0:
personal = unescapeFromXml(message.message[p1:p2])
- msnContact = self.factory.contacts.getContact(userHandle)
+ msnContact = self.factory.contacts.getContact(message.userHandle)
if not msnContact: return
msnContact.personal = personal
self.contactPersonalChanged(message.userHandle, personal)
+ else:
+ self.contactPersonalChanged(message.userHandle, '')
def checkMessage(self, message):
""" hook used for detecting specific notification messages """
def handle_CHG(self, params):
id = int(params[0])
if not self._fireCallback(id, params[1]):
- self.factory.status = params[1]
+ if self.factory: self.factory.status = params[1]
self.statusChanged(params[1])
def handle_ILN(self, params):
msnContact.status = params[1]
msnContact.screenName = unquote(params[3])
if len(params) > 4: msnContact.caps = int(params[4])
- if len(params) > 5: self.handleAvatarHelper(msnContact, params[5])
+ if len(params) > 5:
+ self.handleAvatarHelper(msnContact, params[5])
+ else:
+ self.handleAvatarGoneHelper(msnContact)
self.gotContactStatus(params[1], params[2], unquote(params[3]))
+ def handleAvatarGoneHelper(self, msnContact):
+ if msnContact.msnobj:
+ msnContact.msnobj = None
+ msnContact.msnobjGot = True
+ self.contactAvatarChanged(msnContact.userHandle, "")
+
def handleAvatarHelper(self, msnContact, msnobjStr):
- s = unquote(msnobjStr)
- msnobj = msnp2p.MSNOBJ()
- msnobj.parse(s)
+ msnobj = msnp2p.MSNOBJ(unquote(msnobjStr))
if not msnContact.msnobj or msnobj.sha1d != msnContact.msnobj.sha1d:
if msnp2p.MSNP2P_DEBUG: log.msg("Updated MSNOBJ received!" + msnobjStr)
msnContact.msnobj = msnobj
msnContact.msnobjGot = False
- self.avatarHashChanged(msnContact.userHandle, msnContact.msnobj.sha1d)
+ self.contactAvatarChanged(msnContact.userHandle, msnContact.msnobj.sha1d)
def handle_CHL(self, params):
checkParamLen(len(params), 2, 'CHL')
msnContact.status = params[0]
msnContact.screenName = unquote(params[2])
if len(params) > 3: msnContact.caps = int(params[3])
- if len(params) > 4: self.handleAvatarHelper(msnContact, params[4])
+ if len(params) > 4:
+ self.handleAvatarHelper(msnContact, params[4])
+ else:
+ self.handleAvatarGoneHelper(msnContact)
self.contactStatusChanged(params[0], params[1], unquote(params[2]))
def handle_FLN(self, params):
self._getStateData('list').privacy = listCodeToID[params[0].lower()]
else:
id = int(params[0])
- self._fireCallback(id, listCodeToID[params[1].lower()])
+ self.factory.contacts.privacy = listCodeToID[params[1].lower()]
+ self._fireCallback(id, params[1])
def handle_GTC(self, params):
# check to see if this is in response to a SYN
try:
messageLen = int(params[1])
except ValueError: raise MSNProtocolError, "Invalid Parameter for UBX length argument"
- self.currentMessage = MSNMessage(length=messageLen, userHandle=params[0], screenName="UBX", specialMessage=True)
- self.setRawMode()
+ if messageLen > 0:
+ self.currentMessage = MSNMessage(length=messageLen, userHandle=params[0], screenName="UBX", specialMessage=True)
+ self.setRawMode()
+ else:
+ self.contactPersonalChanged(params[0], '')
def handle_UUX(self, params):
checkParamLen(len(params), 2, 'UUX')
"""
pass
- def avatarHashChanged(self, userHandle, hash):
+ def contactAvatarChanged(self, userHandle, hash):
"""
Called when we receive the first, or a new <msnobj/> from a
contact.
def statusChanged(self, statusCode):
"""
- Called when our status changes and it isn't in response to
- a client command.
+ Called when our status changes and its not in response to a
+ client command.
@param statusCode: 3-letter status code
"""
id, d = self._createIDMapping()
self.sendLine("CHG %s %s %s %s" % (id, status, str(MSNContact.MSNC1 | MSNContact.MSNC2 | MSNContact.MSNC3 | MSNContact.MSNC4), quote(self.msnobj.text)))
def _cb(r):
- if self.factory: self.factory.status = r[0]
+ self.factory.status = r[0]
return r
return d.addCallback(_cb)
@return: A Deferred, the callback of which will be fired when
the server replies with the new privacy setting.
- The callback argument will be a tuple, the 2 elements
- of which being the list version and either 'al'
- or 'bl' (the new privacy setting).
+ The callback argument will be a tuple, the only element
+ of which being either 'al' or 'bl' (the new privacy setting).
"""
id, d = self._createIDMapping()
def close(self): pass
def loseConnection(self): pass
+
+##################
+# Passport tests #
+##################
+
class PassportTests(unittest.TestCase):
def setUp(self):
self.failUnless(self.result[0] == (msn.LOGIN_REDIRECT, 'https://newlogin.host.com/', 'a'))
-class DummySwitchboardClient(msn.SwitchboardClient):
- def userTyping(self, message):
- self.state = 'TYPING'
-
- def gotSendRequest(self, fileName, fileSize, cookie, message):
- if fileName == 'foobar.ext' and fileSize == 31337 and cookie == 1234: self.state = 'INVITATION'
-
-class DummyNotificationServer(msn.MSNEventBase):
- def __init__(self):
- pass
+######################
+# Notification tests #
+######################
class DummyNotificationClient(msn.NotificationClient):
def loggedIn(self, userHandle, verified):
if code == msn.STATUS_LUNCH and userHandle == "foo@bar.com" and screenName == "Test Name":
self.state = 'NEWSTATUS'
+ def contactAvatarChanged(self, userHandle, hash):
+ if userHandle == "foo@bar.com" and hash == "trC8SlFx2sWQxZMIBAWSEnXc8oQ=":
+ self.state = 'NEWAVATAR'
+ elif self.state == 'NEWAVATAR' and hash == "":
+ self.state = 'AVATARGONE'
+
+ def contactPersonalChanged(self, userHandle, personal):
+ if userHandle == 'foo@bar.com' and personal == 'My Personal Message':
+ self.state = 'GOTPERSONAL'
+ elif userHandle == 'foo@bar.com' and personal == '':
+ self.state = 'PERSONALGONE'
+
def contactOffline(self, userHandle):
if userHandle == "foo@bar.com": self.state = 'OFFLINE'
def gotMSNAlert(self, body, action, subscr):
self.state = 'NOTIFICATION'
+ def gotInitialEmailNotification(self, inboxunread, foldersunread):
+ if inboxunread == 1 and foldersunread == 0:
+ self.state = 'INITEMAIL1'
+ else:
+ self.state = 'INITEMAIL2'
-class DummyPingNotificationServer(LineReceiver):
- def lineReceived(self, line):
- if line.startswith("PNG") and self.good:
- self.sendLine("QNG 50")
-
-class DummyPingNotificationClient(msn.NotificationClient):
- def connectionMade(self):
- self.pingCheckerStart()
-
- def sendLine(self, line):
- msn.NotificationClient.sendLine(self, line)
- self.count += 1
- if self.count > 10:
- self.transport.loseConnection() # But not for real, just to end the test
-
- def connectionLost(self, reason):
- if self.count <= 10:
- self.state = 'DISCONNECTED'
-
+ def gotRealtimeEmailNotification(self, mailfrom, fromaddr, subject):
+ if mailfrom == 'Some Person' and fromaddr == 'example@passport.com' and subject == 'newsubject':
+ self.state = 'REALTIMEEMAIL'
class NotificationTests(unittest.TestCase):
""" testing the various events in NotificationClient """
- #skip = "Not ready"
def setUp(self):
self.client = DummyNotificationClient()
m += 'LoginTime: 1016941010\r\nEmailEnabled: 1\r\nMemberIdHigh: 40000\r\nMemberIdLow: -600000000\r\nlang_preference: 1033\r\n'
m += 'preferredEmail: foo@bar.com\r\ncountry: AU\r\nPostalCode: 90210\r\nGender: M\r\nKid: 0\r\nAge:\r\nsid: 400\r\n'
m += 'kv: 2\r\nMSPAuth: 2CACCBCCADMoV8ORoz64BVwmjtksIg!kmR!Rj5tBBqEaW9hc4YnPHSOQ$$\r\n\r\n'
- map(self.client.dataReceived, [x+'\r\n' for x in m.split('\r\n')[:-1]])
+ self.client.dataReceived(m)
self.failUnless((self.client.state == 'PROFILE'), 'Failed to detect initial profile')
+ def testInitialEmailNotification(self):
+ m = 'MIME-Version: 1.0\r\nContent-Type: text/x-msmsgsinitialemailnotification; charset=UTF-8\r\n'
+ m += '\r\nInbox-Unread: 1\r\nFolders-Unread: 0\r\nInbox-URL: /cgi-bin/HoTMaiL\r\n'
+ m += 'Folders-URL: /cgi-bin/folders\r\nPost-URL: http://www.hotmail.com\r\n\r\n'
+ m = 'MSG Hotmail Hotmail %s\r\n' % (str(len(m))) + m
+ self.client.dataReceived(m)
+ self.failUnless((self.client.state == 'INITEMAIL1'), 'Failed to detect initial email notification')
+
+ def testNoInitialEmailNotification(self):
+ m = 'MIME-Version: 1.0\r\nContent-Type: text/x-msmsgsinitialemailnotification; charset=UTF-8\r\n'
+ m += '\r\nInbox-Unread: 0\r\nFolders-Unread: 0\r\nInbox-URL: /cgi-bin/HoTMaiL\r\n'
+ m += 'Folders-URL: /cgi-bin/folders\r\nPost-URL: http://www.hotmail.com\r\n\r\n'
+ m = 'MSG Hotmail Hotmail %s\r\n' % (str(len(m))) + m
+ self.client.dataReceived(m)
+ self.failUnless((self.client.state != 'INITEMAIL2'), 'Detected initial email notification when I should not have')
+
+ def testRealtimeEmailNotification(self):
+ m = 'MSG Hotmail Hotmail 356\r\nMIME-Version: 1.0\r\nContent-Type: text/x-msmsgsemailnotification; charset=UTF-8\r\n'
+ m += '\r\nFrom: Some Person\r\nMessage-URL: /cgi-bin/getmsg?msg=MSG1050451140.21&start=2310&len=2059&curmbox=ACTIVE\r\n'
+ m += 'Post-URL: https://loginnet.passport.com/ppsecure/md5auth.srf?lc=1038\r\n'
+ m += 'Subject: =?"us-ascii"?Q?newsubject?=\r\nDest-Folder: ACTIVE\r\nFrom-Addr: example@passport.com\r\nid: 2\r\n'
+ self.client.dataReceived(m)
+ self.failUnless((self.client.state == 'REALTIMEEMAIL'), 'Failed to detect realtime email notification')
+
def testMSNAlert(self):
m = '<NOTIFICATION ver="2" id="1342902633" siteid="199999999" siteurl="http://alerts.msn.com">\r\n'
m += '<TO pid="0x0006BFFD:0x8582C0FB" name="example@passport.com"/>\r\n'
msnobj = urllib.quote('<msnobj Creator="buddy1@hotmail.com" Size="24539" Type="3" Location="TFR2C.tmp" Friendly="AAA=" SHA1D="trC8SlFx2sWQxZMIBAWSEnXc8oQ=" SHA1C="U32o6bosZzluJq82eAtMpx5dIEI="/>')
t = [('ILN 1 AWY foo@bar.com Test%20Screen%20Name 268435456 ' + msnobj, 'INITSTATUS', 'Failed to detect initial status report'),
('NLN LUN foo@bar.com Test%20Name 0', 'NEWSTATUS', 'Failed to detect contact status change'),
+ ('NLN AWY foo@bar.com Test%20Name 0 ' + msnobj, 'NEWAVATAR', 'Failed to detect contact avatar change'),
+ ('NLN AWY foo@bar.com Test%20Name 0', 'AVATARGONE', 'Failed to detect contact avatar disappearing'),
('FLN foo@bar.com', 'OFFLINE', 'Failed to detect contact signing off'),
('CHG 1 HDN 0 ' + msnobj, 'MYSTATUS', 'Failed to detect my status changing')]
for i in t:
self.client.lineReceived(i[0])
self.failUnless((self.client.state == i[1]), i[2])
+
+ # Test UBX
+ self.client.dataReceived('UBX foo@bar.com 72\r\n<Data><PSM>My Personal Message</PSM><CurrentMedia></CurrentMedia></Data>')
+ self.failUnless((self.client.state == 'GOTPERSONAL'), 'Failed to detect new personal message')
+ self.client.dataReceived('UBX foo@bar.com 0\r\n')
+ self.failUnless((self.client.state == 'PERSONALGONE'), 'Failed to detect personal message disappearing')
self.client.logOut()
def testAsyncPhoneChange(self):
def testAsyncSwitchboardInvitation(self):
self.client.lineReceived("RNG 1234 192.168.1.1:1863 CKI 123.456 foo@foo.com Screen%20Name")
- self.failUnless(self.client.state == "SBINVITED")
+ self.failUnless((self.client.state == 'SBINVITED'), 'Failed to detect switchboard invitation')
+
+
+#######################################
+# Notification with fake server tests #
+#######################################
+
+class FakeNotificationServer(msn.MSNEventBase):
+ def handle_CHG(self, params):
+ if len(params) < 4:
+ params.append('')
+ self.sendLine("CHG %s %s %s %s" % (params[0], params[1], params[2], params[3]))
+
+ def handle_BLP(self, params):
+ self.sendLine("BLP %s %s 100" % (params[0], params[1]))
+
+class FakeNotificationClient(msn.NotificationClient):
+ def doStatusChange(self):
+ def testcb((status,)):
+ if status == msn.STATUS_AWAY:
+ self.test = 'PASS'
+ self.transport.loseConnection()
+ d = self.changeStatus(msn.STATUS_AWAY)
+ d.addCallback(testcb)
+
+ def doPrivacyMode(self):
+ def testcb((priv,)):
+ if priv.upper() == 'AL':
+ self.test = 'PASS'
+ self.transport.loseConnection()
+ d = self.setPrivacyMode(True)
+ d.addCallback(testcb)
+
+class FakeServerNotificationTests(unittest.TestCase):
+ """ tests the NotificationClient against a fake server. """
+
+ def setUp(self):
+ self.client = FakeNotificationClient()
+ self.client.factory = msn.NotificationFactory()
+ self.server = FakeNotificationServer()
+
+ def tearDown(self):
+ pass
+
+ def testChangeStatus(self):
+ reactor.callLater(0, self.client.doStatusChange)
+ loopback.loopback(self.client, self.server)
+ self.failUnless((self.client.test == 'PASS'), 'Failed to change status properly')
+
+ def testSetPrivacyMode(self):
+ self.client.factory.contacts = msn.MSNContactList()
+ reactor.callLater(0, self.client.doPrivacyMode)
+ loopback.loopback(self.client, self.server)
+ self.failUnless((self.client.test == 'PASS'), 'Failed to change privacy mode')
+
+#################################
+# Notification Challenges tests #
+#################################
+
+class DummyChallengeNotificationServer(msn.MSNEventBase):
+ def doChallenge(self, challenge, response):
+ self.state = 0
+ self.response = response
+ self.sendLine("CHL 0 " + challenge)
+
+ def checkMessage(self, message):
+ if message.message == self.response:
+ self.state = "PASS"
+ self.transport.loseConnection()
+ return 0
+
+ def handle_QRY(self, params):
+ self.state = 1
+ if len(params) == 3 and params[1] == "PROD0090YUAUV{2B" and params[2] == "32":
+ self.state = 2
+ self.currentMessage = msn.MSNMessage(length=32, userHandle="QRY", screenName="QRY", specialMessage=True)
+ self.setRawMode()
+ else:
+ self.transport.loseConnection()
+
+class DummyChallengeNotificationClient(msn.NotificationClient):
+ def connectionMade(self):
+ msn.MSNEventBase.connectionMade(self)
+
+ def handle_CHL(self, params):
+ msn.NotificationClient.handle_CHL(self, params)
+ self.transport.loseConnection()
+
+
+class NotificationChallengeTests(unittest.TestCase):
+ """ tests the responses to the CHLs the server sends """
+
+ def setUp(self):
+ self.client = DummyChallengeNotificationClient()
+ self.server = DummyChallengeNotificationServer()
+
+ def tearDown(self):
+ pass
+
+ def testChallenges(self):
+ challenges = [('13038318816579321232', 'b01c13020e374d4fa20abfad6981b7a9'),
+ ('23055170411503520698', 'ae906c3f2946d25e7da1b08b0b247659'),
+ ('37819769320541083311', 'db79d37dadd9031bef996893321da480'),
+ ('93662730714769834295', 'd619dfbb1414004d34d0628766636568')]
+ for challenge, response in challenges:
+ reactor.callLater(0, lambda: self.server.doChallenge(challenge, response))
+ loopback.loopback(self.client, self.server)
+ self.failUnless((self.server.state == 'PASS'), 'Incorrect challenge response.')
+
+
+###########################
+# Notification ping tests #
+###########################
+
+class DummyPingNotificationServer(LineReceiver):
+ def lineReceived(self, line):
+ if line.startswith("PNG") and self.good:
+ self.sendLine("QNG 50")
+
+class DummyPingNotificationClient(msn.NotificationClient):
+ def connectionMade(self):
+ self.pingCheckerStart()
+
+ def sendLine(self, line):
+ msn.NotificationClient.sendLine(self, line)
+ self.count += 1
+ if self.count > 10:
+ self.transport.loseConnection() # But not for real, just to end the test
+
+ def connectionLost(self, reason):
+ if self.count <= 10:
+ self.state = 'DISCONNECTED'
class NotificationPingTests(unittest.TestCase):
""" tests pinging in the NotificationClient class """
-# skip = "Not working yet"
-
+
def setUp(self):
msn.PINGSPEED = 0.1
self.client = DummyPingNotificationClient()
self.server.good = False
loopback.loopback(self.client, self.server)
self.failUnless((self.client.state == 'DISCONNECTED'), 'Should be disconnected.')
-
+
+
+
+
+#############################
+# Switchboard message tests #
+#############################
+
+class DummySwitchboardClient(msn.SwitchboardClient):
+ def userTyping(self, message):
+ self.state = 'TYPING'
+
+ def gotSendRequest(self, fileName, fileSize, cookie, message):
+ if fileName == 'foobar.ext' and fileSize == 31337 and cookie == 1234: self.state = 'INVITATION'
+
class MessageHandlingTests(unittest.TestCase):
""" testing various message handling methods from SwichboardClient """
if accept and ip == '192.168.0.1' and port == 6891 and aCookie == 4321: self.client.state = 'INFO'
+
+################
+# MSNFTP tests #
+################
+
class FileTransferTestCase(unittest.TestCase):
""" test FileSend against FileReceive """
skip = "Not ready"