From: jamesbunton Date: Wed, 7 Dec 2005 13:47:55 +0000 (+0000) Subject: Improved P2P code. X-Git-Url: https://code.delx.au/pymsnt/commitdiff_plain/8ad2d2e8fabb70336e538861ce8cf29d5a80d23d Improved P2P code. git-svn-id: http://delx.cjb.net/svn/pymsnt/trunk@43 55fbd22a-6204-0410-b2f0-b6c764c7e90a committer: jamesbunton --- diff --git a/src/tlib/msn/msn.py b/src/tlib/msn/msn.py index cf50202..818c373 100644 --- a/src/tlib/msn/msn.py +++ b/src/tlib/msn/msn.py @@ -2066,76 +2066,29 @@ class SwitchboardClient(MSNEventBase): slpLink = None if slpMessage.method == "INVITE": if slpMessage.euf_guid == MSN_MSNFTP_GUID: - slpLink = SLPLink_Receive(slpMessage.fro, slpMessage.sessionID, slpMessage.sessionGuid) + #FIXME! + slpLink = SLPLink_Receive(remoteUser=slpMessage.fro, switchboard=self, consumer=FIXME, transferGuid=MSN_MSNFTP_GUID, sessionID=slpMessage.sessionID, sessionGuid=slpMessage.sessionGuid) context = FileContext(slpMessage.context) fileReceive = msnft.MSNP2P_Receive(context.filename, context.filesize, slpMessage.fro) elif slpMessage.euf_guid == MSN_AVATAR_GUID: # Check that we have an avatar to send if self.msnobj: - slpLink = SLPLink_Send(slpMessage.fro, self, self.msnobj.size, slpMessage.sessionID, slpMessage.sessionGuid) - self._sendMSNSLPResponse(slpLink, "200 OK") + slpLink = SLPLink_Send(remoteUser=slpMessage.fro, switchboard=self, length=self.msnobj.size, transferGuid=MSN_AVATAR_GUID, sessionID=slpMessage.sessionID, sessionGuid=slpMessage.sessionGuid) slpLink.write(self.msnobj.imageData) slpLink.finish() else: - pass # FIXME, should send an error response + # They shouldn't have sent a request if we have + # no avatar. So we'll just ignore them. + pass if slpLink: self.slpLinks[slpMessage.sessionID] = slpLink else: - if slpMessage.status != "200": - for slpLink in self.slpLinks: - if slpLink.sessionGuid == slpMessage.sessionGuid: - del self.slpLinks[slpLink.sessionID] - if slpMessage.method != "BYE": - # Must be an error. If its a file transfer we need to signal that it failed - slpLink.transferError() - else: - slpLink = self.slpLinks[slpMessage.sessionID] - slpLink.transferReady() + for slpLink in self.slpLinks.values(): + if slpLink.sessionGuid == slpMessage.sessionGuid: + slpLink.handleSLPMessage(slpMessage) if slpLink: # Always need to ACK these packets if we can - self._sendP2PACK(slpLink, binaryFields) - - - def _sendP2PACK(self, slpLink, ackHeaders): - binaryFields = BinaryFields() - binaryFields[1] = slpLink.seqID.next() - binaryFields[3] = ackHeaders[3] - binaryFields[5] = BinaryFields.ACK - binaryFields[6] = ackHeaders[1] - binaryFields[7] = ackHeaders[6] - binaryFields[8] = ackHeaders[3] - self._sendP2PMessage(binaryFields, "", slpLink.remoteUser) - - def _sendMSNSLPCommand(self, command, slpLink, guid, context): - msg = MSNSLPMessage() - msg.create(method=command, to=slpLink.remoteUser, fro=self.userHandle, cseq=0, sessionGuid=slpLink.sessionGuid) - msg.setData(sessionID=slpLink.sessionID, appID="1", guid=guid, context=b64enc(context + chr(0))) - self._sendMSNSLPMessage(slpLink, msg) - - def _sendMSNSLPResponse(self, slpLink, response): - msg = MSNSLPMessage() - msg.create(status=response, to=slpLink.remoteUser, fro=self.userHandle, cseq=1, sessionGuid=slpLink.sessionGuid) - msg.setData(sessionID=slpLink.sessionID) - self._sendMSNSLPMessage(slpLink, msg) - - def _sendMSNSLPMessage(self, slpLink, msnSLPMessage): - msgStr = str(msnSLPMessage) - binaryFields = BinaryFields() - binaryFields[1] = slpLink.seqID.next() - binaryFields[3] = len(msgStr) - binaryFields[4] = binaryFields[3] - binaryFields[6] = random.randint(0, sys.maxint) - self._sendP2PMessage(binaryFields, msgStr, msnSLPMessage.to) - - def _sendP2PMessage(self, binaryFields, msgStr, to): - packet = binaryFields.packHeaders() + msgStr + binaryFields.packFooter() - - message = MSNMessage(message=packet) - message.setHeader("Content-Type", "application/x-msnmsgrp2p") - message.setHeader("P2P-Dest", to) - message.ack = MSNMessage.MESSAGE_ACK_FAT - self.sendMessage(message) - + slpLink.sendP2PACK(binaryFields) def checkMessage(self, message): @@ -2329,10 +2282,9 @@ class SwitchboardClient(MSNEventBase): def bufferClosed(data): d.callback((data,)) buffer = StringBuffer(bufferClosed) - slpLink = SLPLink_Receive(msnContact.userHandle, self, buffer) + slpLink = SLPLink_Receive(remoteUser=msnContact.userHandle, switchboard=self, consumer=buffer, transferGuid=MSN_AVATAR_GUID, context=msnContact.msnobj.text) slpLink.avatarDataBuffer = buffer self.slpLinks[slpLink.sessionID] = slpLink - self._sendMSNSLPCommand("INVITE", slpLink, MSN_AVATAR_GUID, msnContact.msnobj.text) return d def sendTypingNotification(self): @@ -2548,7 +2500,7 @@ class SeqID: if baseID: self.seqID = baseID else: - self.seqID = random.randint(4, 2**30) + self.seqID = random.randint(1000, sys.maxint) self.pos = -1 def get(self): @@ -2587,29 +2539,83 @@ class SLPLink: self.sessionGuid = sessionGuid self.seqID = SeqID() - def transferReady(self): - pass + def killLink(self): + def kill(): + del self.switchboard.slpLinks[self.sessionID] + self.switchboard = None + # This is so that handleP2PMessage can still use the SLPLink + # one last time, for ACKing BYEs and 601s. + reactor.callLater(0, kill) def setError(self, text): if MSNP2PDEBUG: print "ERROR in avatar transfer: ", self, text, "in state:", self.state - self.state = self.ERROR def warn(self, text): if MSNP2PDEBUG: print "Warning in avatar transfer: ", self, text, "in state:", self.state + + def sendP2PACK(self, ackHeaders): + binaryFields = BinaryFields() + binaryFields[1] = self.seqID.next() + binaryFields[3] = ackHeaders[3] + binaryFields[5] = BinaryFields.ACK + binaryFields[6] = ackHeaders[1] + binaryFields[7] = ackHeaders[6] + binaryFields[8] = ackHeaders[3] + self.sendP2PMessage(binaryFields, "") + + def sendMSNSLPCommand(self, command, guid, context): + msg = MSNSLPMessage() + msg.create(method=command, to=self.remoteUser, fro=self.switchboard.userHandle, cseq=0, sessionGuid=self.sessionGuid) + msg.setData(sessionID=self.sessionID, appID="1", guid=guid, context=b64enc(context + chr(0))) + self.sendMSNSLPMessage(msg) + + def sendMSNSLPResponse(self, response): + msg = MSNSLPMessage() + msg.create(status=response, to=self.remoteUser, fro=self.switchboard.userHandle, cseq=1, sessionGuid=self.sessionGuid) + msg.setData(sessionID=self.sessionID) + self.sendMSNSLPMessage(msg) + + def sendMSNSLPMessage(self, msnSLPMessage): + msgStr = str(msnSLPMessage) + binaryFields = BinaryFields() + binaryFields[1] = self.seqID.next() + binaryFields[3] = len(msgStr) + binaryFields[4] = binaryFields[3] + binaryFields[6] = random.randint(1000, sys.maxint) + self.sendP2PMessage(binaryFields, msgStr) + + def sendP2PMessage(self, binaryFields, msgStr): + packet = binaryFields.packHeaders() + msgStr + binaryFields.packFooter() + + message = MSNMessage(message=packet) + message.setHeader("Content-Type", "application/x-msnmsgrp2p") + message.setHeader("P2P-Dest", self.remoteUser) + message.ack = MSNMessage.MESSAGE_ACK_FAT + self.switchboard.sendMessage(message) + + + class SLPLink_Send(SLPLink): - def __init__(self, remoteUser, switchboard, length, sessionID=None, sessionGuid=None): + def __init__(self, remoteUser, switchboard, length, transferGuid, sessionID=None, sessionGuid=None): SLPLink.__init__(self, remoteUser, switchboard, sessionID, sessionGuid) self.handlePacket = None + self.transferGuid = transferGuid self.offset = 0 self.length = length - self.ready = False # Have we received the go ahead to transmit yet? self.data = "" # Send dataprep - self.send_dataprep() + if transferGuid == MSN_AVATAR_GUID: + self.sendMSNSLPResponse("200 OK") + self.send_dataprep() + elif transferGuid == MSN_MSNFTP_GUID: + # FIXME Send invite & wait for 200OK before sending dataprep + pass + else: + raise "NotSupportedGuid" def send_dataprep(self): binaryFields = BinaryFields() @@ -2617,23 +2623,11 @@ class SLPLink_Send(SLPLink): binaryFields[1] = self.seqID.next() binaryFields[3] = 4 binaryFields[4] = 4 - binaryFields[6] = random.randint(0, sys.maxint) + binaryFields[6] = random.randint(1000, sys.maxint) binaryFields[9] = 1 - - self.switchboard._sendP2PMessage(binaryFields, chr(0) * 4, self.remoteUser) - - def transferReady(self): - self.ready = True - if self.data: - data = self.data - self.data = "" - self.write(data) + self.sendP2PMessage(binaryFields, chr(0) * 4) def write(self, data): - if not self.ready: - self.data += data - return - i = 0 length = len(data) while i < length: @@ -2651,35 +2645,54 @@ class SLPLink_Send(SLPLink): def _writeChunk(self, chunk): binaryFields = BinaryFields() binaryFields[0] = self.sessionID - binaryFields[1] = self.seqID.next() + binaryFields[1] = self.seqID.get() binaryFields[2] = self.offset binaryFields[3] = self.length binaryFields[4] = len(chunk) binaryFields[5] = BinaryFields.DATA - binaryFields[6] = random.randint(0, 2**30) + binaryFields[6] = random.randint(1000, sys.maxint) binaryFields[9] = 1 - self.offset += len(chunk) - self.switchboard._sendP2PMessage(binaryFields, chunk, self.remoteUser) + self.sendP2PMessage(binaryFields, chunk) def finish(self): if self.data: self._writeChunk(self.data) - self.switchboard = None - # FIXME Now tell the switchboard + if self.transferGuid == MSN_AVATAR_GUID: + pass # Keep the link open to wait for a BYE + elif self.transferGuid == MSN_MSNFTP_GUID: + self.sendMSNSLPCommand("BYE", MSN_AVATAR_GUID, "\0") # FIXME, is this right? + self.killLink() def error(self): pass - # Same as above? + # FIXME, should send 601 or something class SLPLink_Receive(SLPLink): - def __init__(self, remoteUser, switchboard, consumer, sessionID=None, sessionGuid=None): + def __init__(self, remoteUser, switchboard, consumer, transferGuid, context=None, sessionID=None, sessionGuid=None): SLPLink.__init__(self, remoteUser, switchboard, sessionID, sessionGuid) + self.handlePacket = None + self.transferGuid = transferGuid self.consumer = consumer self.pos = 0 - self.handlePacket = self.wait_dataprep + if transferGuid == MSN_AVATAR_GUID: + self.sendMSNSLPCommand("INVITE", MSN_AVATAR_GUID, context) + elif transferGuid == MSN_MSNFTP_GUID: + pass # FIXME + else: + raise "NotSupportedGuid" + + def handleSLPMessage(self, slpMessage): + if self.transferGuid == MSN_AVATAR_GUID: + if slpMessage.status == "200": + self.handlePacket = self.wait_dataprep + else: + # SLPLink is over due to error or BYE + self.killLink() + elif self.transferGuid == MSN_MSNFTP_GUID: + pass #FIXME def wait_dataprep(self, packet): binaryFields = BinaryFields() @@ -2698,7 +2711,7 @@ class SLPLink_Receive(SLPLink): self.warn("field9," + str(binaryFields[9])) # return - self.switchboard._sendP2PACK(self, binaryFields) + self.sendP2PACK(binaryFields) self.handlePacket = self.wait_data def wait_data(self, packet): @@ -2732,11 +2745,13 @@ class SLPLink_Receive(SLPLink): self.consumer.write(data) if self.pos == total: - self.switchboard._sendP2PACK(self, binaryFields) + self.sendP2PACK(binaryFields) self.consumer.finish() self.handlePacket = None - del self.switchboard.slpLinks[self.sessionID] - self.switchboard = None + if self.transferGuid == MSN_AVATAR_GUID: + self.sendMSNSLPCommand("BYE", MSN_AVATAR_GUID, "\0") + elif self.transferGuid == MSN_MSNFTP_GUID: + pass # Link is kept around waiting for a BYE diff --git a/src/tlib/msn/test_msn.py b/src/tlib/msn/test_msn.py index 237a594..6009ed9 100644 --- a/src/tlib/msn/test_msn.py +++ b/src/tlib/msn/test_msn.py @@ -1005,7 +1005,7 @@ class SwitchboardP2PTests(unittest.TestCase): d.addCallback(avatarCallback) # Let them do their thing - for i in xrange(100): + for i in xrange(10): self.loop1.doSteps(1) self.loop2.doSteps(1)