from twisted.protocols import loopback
from twisted.protocols.basic import LineReceiver
from twisted.internet.defer import Deferred
-from twisted.internet import reactor
+from twisted.internet import reactor, main
+from twisted.python import failure
from twisted.trial import unittest
# System imports
def close(self): pass
def loseConnection(self): pass
+class LoopbackCon:
+ def __init__(self, con1, con2):
+ self.con1 = con1
+ self.con2 = con2
+ self.con1ToCon2 = loopback.LoopbackRelay(con1)
+ self.con2ToCon1 = loopback.LoopbackRelay(con2)
+ con2.makeConnection(self.con1ToCon2)
+ con1.makeConnection(self.con2ToCon1)
+ self.connected = True
+
+ def doSteps(self, steps=1):
+ """ Returns true if the connection finished """
+ count = 0
+ while count < steps:
+ reactor.iterate(0.01)
+ self.con1ToCon2.clearBuffer()
+ self.con2ToCon1.clearBuffer()
+ if self.con1ToCon2.shouldLose:
+ self.con1ToCon2.clearBuffer()
+ count = -1
+ break
+ elif self.con2ToCon1.shouldLose:
+ count = -1
+ break
+ else:
+ count += 1
+ if count == -1:
+ self.disconnect()
+ return True
+ return False
+
+ def disconnect(self):
+ if self.connected:
+ self.con1.connectionLost(failure.Failure(main.CONNECTION_DONE))
+ self.con2.connectionLost(failure.Failure(main.CONNECTION_DONE))
+ reactor.iterate()
+
+
+
##################
# Passport tests #
self.client.factory = msn.NotificationFactory()
self.client.test = 'FAIL'
self.server = FakeNotificationServer()
+ self.loop = LoopbackCon(self.client, self.server)
def tearDown(self):
- pass
+ self.loop.disconnect()
def testChangeStatus(self):
- reactor.callLater(0, self.client.doStatusChange)
- loopback.loopback(self.client, self.server)
+ self.client.doStatusChange()
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
self.failUnless((self.client.test == 'PASS'), 'Failed to change status properly')
def testSetPrivacyMode(self):
self.client.factory.contacts = msn.MSNContactList()
- reactor.callLater(0, self.client.doPrivacyMode)
- loopback.loopback(self.client, self.server)
+ self.client.doPrivacyMode()
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
self.failUnless((self.client.test == 'PASS'), 'Failed to change privacy mode')
def testSyncList(self):
- reactor.callLater(0, self.client.doSyncList)
- loopback.loopback(self.client, self.server)
+ self.client.doSyncList()
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
self.failUnless((self.client.test == 'PASS'), 'Failed to synchronise list')
testSyncList.skip = "Will do after list versions."
def testAddContactFL(self):
self.client.factory.contacts = msn.MSNContactList()
- reactor.callLater(0, self.client.doAddContactFL)
- loopback.loopback(self.client, self.server)
+ self.client.doAddContactFL()
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
self.failUnless((self.client.test == 'PASS'), 'Failed to add contact to forward list')
def testAddContactAL(self):
self.client.factory.contacts = msn.MSNContactList()
- reactor.callLater(0, self.client.doAddContactAL)
- loopback.loopback(self.client, self.server)
+ self.client.doAddContactAL()
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
self.failUnless((self.client.test == 'PASS'), 'Failed to add contact to allow list')
def testRemContactFL(self):
self.client.factory.contacts = msn.MSNContactList()
self.client.factory.contacts.addContact(msn.MSNContact(userGuid="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", userHandle="foo@bar.com", screenName="Some guy", lists=msn.FORWARD_LIST))
- reactor.callLater(0, self.client.doRemContactFL)
- loopback.loopback(self.client, self.server)
+ self.client.doRemContactFL()
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
self.failUnless((self.client.test == 'PASS'), 'Failed to remove contact from forward list')
def testRemContactAL(self):
self.client.factory.contacts = msn.MSNContactList()
self.client.factory.contacts.addContact(msn.MSNContact(userGuid="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", userHandle="foo@bar.com", screenName="Some guy", lists=msn.ALLOW_LIST))
- reactor.callLater(0, self.client.doRemContactAL)
- loopback.loopback(self.client, self.server)
+ self.client.doRemContactAL()
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
self.failUnless((self.client.test == 'PASS'), 'Failed to remove contact from allow list')
def testChangedScreenName(self):
- reactor.callLater(0, self.client.doScreenNameChange)
- loopback.loopback(self.client, self.server)
+ self.client.doScreenNameChange()
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
self.failUnless((self.client.test == 'PASS'), 'Failed to change screen name properly')
def testChangePersonal1(self):
- reactor.callLater(0, lambda: self.client.doPersonalChange("Some personal message"))
- loopback.loopback(self.client, self.server)
+ self.client.doPersonalChange("Some personal message")
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
self.failUnless((self.client.test == 'PASS'), 'Failed to change personal message properly')
def testChangePersonal2(self):
- reactor.callLater(0, lambda: self.client.doPersonalChange(""))
- loopback.loopback(self.client, self.server)
+ self.client.doPersonalChange("")
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
self.failUnless((self.client.test == 'PASS'), 'Failed to change personal message properly')
def testChangeAvatar(self):
- reactor.callLater(0, lambda: self.client.doAvatarChange("DATADATADATADATA"))
- loopback.loopback(self.client, self.server)
+ self.client.doAvatarChange("DATADATADATADATA")
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
self.failUnless((self.client.test == 'PASS'), 'Failed to change avatar properly')
def testRequestSwitchboard(self):
- reactor.callLater(0, self.client.doRequestSwitchboard)
- loopback.loopback(self.client, self.server)
+ self.client.doRequestSwitchboard()
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
self.failUnless((self.client.test == 'PASS'), 'Failed to request switchboard')
def setUp(self):
self.client = DummyChallengeNotificationClient()
self.server = DummyChallengeNotificationServer()
+ self.loop = LoopbackCon(self.client, self.server)
def tearDown(self):
- pass
+ self.loop.disconnect()
def testChallenges(self):
challenges = [('13038318816579321232', 'b01c13020e374d4fa20abfad6981b7a9'),
('37819769320541083311', 'db79d37dadd9031bef996893321da480'),
('93662730714769834295', 'd619dfbb1414004d34d0628766636568')]
for challenge, response in challenges:
- reactor.callLater(0, lambda: self.server.doChallenge(challenge, response))
- loopback.loopback(self.client, self.server)
+ self.server.doChallenge(challenge, response)
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
self.failUnless((self.server.state == 'PASS'), 'Incorrect challenge response.')
self.server = DummyPingNotificationServer()
self.client.state = 'CONNECTED'
self.client.count = 0
+ self.loop = LoopbackCon(self.client, self.server)
def tearDown(self):
msn.PINGSPEED = 50.0
self.client.logOut()
+ self.loop.disconnect()
def testPingGood(self):
self.server.good = True
- loopback.loopback(self.client, self.server)
+ self.loop.doSteps(100)
self.failUnless((self.client.state == 'CONNECTED'), 'Should be connected.')
def testPingBad(self):
self.server.good = False
- loopback.loopback(self.client, self.server)
+ self.loop.doSteps(100)
self.failUnless((self.client.state == 'DISCONNECTED'), 'Should be disconnected.')
self.client.key = 'somekey'
self.client.sessionID = 'someSID'
self.server = DummySwitchboardServer()
+ self.loop = LoopbackCon(self.client, self.server)
def tearDown(self):
- pass
+ self.loop.disconnect()
def _testSB(self, reply):
self.client.reply = reply
- loopback.loopback(self.client, self.server)
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
self.failUnless((self.client.state == 'LOGGEDIN'), 'Failed to login with reply='+str(reply))
def testReply(self):
def testInviteUser(self):
self.client.connectionMade = lambda: None
- reactor.callLater(0, self.client.doSendInvite)
- loopback.loopback(self.client, self.server)
+ self.client.doSendInvite()
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
self.failUnless((self.client.state == 'INVITESUCCESS'), 'Failed to invite user')
def testSendMessage(self):
self.client.connectionMade = lambda: None
- reactor.callLater(0, self.client.doSendMessage)
- loopback.loopback(self.client, self.server)
+ self.client.doSendMessage()
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
self.failUnless((self.client.state == 'MESSAGESUCCESS'), 'Failed to send message')
+################
+# MSNP2P tests #
+################
+
+class DummySwitchboardP2PServerHelper(msn.MSNEventBase):
+ def __init__(self, server):
+ msn.MSNEventBase.__init__(self)
+ self.server = server
+
+ def handle_USR(self, params):
+ if len(params) != 3:
+ self.transport.loseConnection()
+ self.userHandle = params[1]
+ if params[1] == 'foo1@bar.com' and params[2] == 'somekey1':
+ self.sendLine("USR %s OK %s %s" % (params[0], params[1], params[1]))
+ if params[1] == 'foo2@bar.com' and params[2] == 'somekey2':
+ self.sendLine("USR %s OK %s %s" % (params[0], params[1], params[1]))
+
+ def checkMessage(self, message):
+ return 1
+
+ def gotMessage(self, message):
+ message.userHandle = self.userHandle
+ message.screenName = self.userHandle
+ self.server.gotMessage(message, self)
+
+ def sendMessage(self, message):
+ if message.length == 0: message.length = message._calcMessageLen()
+ self.sendLine("MSG %s %s %s" % (message.userHandle, message.screenName, message.length))
+ self.sendLine('MIME-Version: %s' % message.getHeader('MIME-Version'))
+ self.sendLine('Content-Type: %s' % message.getHeader('Content-Type'))
+ for header in [h for h in message.headers.items() if h[0].lower() not in ('mime-version','content-type')]:
+ self.sendLine("%s: %s" % (header[0], header[1]))
+ self.transport.write("\r\n")
+ self.transport.write(message.message)
+
+
+class DummySwitchboardP2PServer:
+ def __init__(self):
+ self.clients = []
+
+ def newClient(self):
+ c = DummySwitchboardP2PServerHelper(self)
+ self.clients.append(c)
+ return c
+
+ def gotMessage(self, message, sender):
+ for c in self.clients:
+ if c != sender:
+ c.sendMessage(message)
+
+class DummySwitchboardP2PClient(msn.SwitchboardClient):
+ def gotMessage(self, message):
+ if message.message == "Test Message" and message.userHandle == "foo1@bar.com":
+ self.status = "GOTMESSAGE"
+
+class SwitchboardP2PTests(unittest.TestCase):
+ def setUp(self):
+ self.server = DummySwitchboardP2PServer()
+ self.client1 = DummySwitchboardP2PClient()
+ self.client1.key = 'somekey1'
+ self.client1.userHandle = 'foo1@bar.com'
+ self.client2 = DummySwitchboardP2PClient()
+ self.client2.key = 'somekey2'
+ self.client2.userHandle = 'foo2@bar.com'
+ self.client2.status = "INIT"
+ self.loop1 = LoopbackCon(self.client1, self.server.newClient())
+ self.loop2 = LoopbackCon(self.client2, self.server.newClient())
+
+ def tearDown(self):
+ self.loop1.disconnect()
+ self.loop2.disconnect()
+
+ def testMessage(self):
+ self.client1.sendMessage(msn.MSNMessage(message='Test Message'))
+ self.loop1.doSteps(10)
+ self.loop2.doSteps(10)
+ self.failUnless((self.client2.status == "GOTMESSAGE"), "Fake switchboard server not working.")
################
sender.fileSize = 7000
client = msnft.MSNFTP_FileReceive(auth, "foo@bar.com", self.output)
client.fileSize = 7000
- loopback.loopback(sender, client)
+ loop = LoopbackCon(client, sender)
+ loop.doSteps(100)
self.failUnless((client.completed and sender.completed), "send failed to complete")
self.failUnless((self.input.getvalue() == self.output.getvalue()), "saved file does not match original")