]> code.delx.au - pymsnt/blob - src/main.py
Improved reactor autodetection.
[pymsnt] / src / main.py
1 # Copyright 2004-2006 James Bunton <james@delx.cjb.net>
2 # Licensed for distribution under the GPL version 2, check COPYING for details
3
4 import os, os.path, time, sys, codecs, getopt
5 reload(sys)
6 sys.setdefaultencoding("utf-8")
7 sys.stdout = codecs.lookup('utf-8')[-1](sys.stdout)
8
9 # Find the best reactor
10 selectWarning = "Unable to install any good reactors (kqueue, epoll, poll).\nWe fell back to using select. You may have scalability problems.\nThis reactor will not support more than 1024 connections +at a time."
11 try:
12 from twisted.internet import epollreactor as bestreactor
13 except:
14 try:
15 from twisted.internet import kqreactor as bestreactor
16 except:
17 try:
18 from twisted.internet import pollreactor as bestreactor
19 except:
20 try:
21 from twisted.internet import selectreactor as bestreactor
22 print selectWarning
23 except:
24 try:
25 from twisted.internet import default as bestreactor
26 print selectWarning
27 except:
28 print "Unable to find a reactor.\nExiting..."
29 sys.exit(1)
30 bestreactor.install()
31
32
33
34 # Must load config before everything else
35 import config
36 import xmlconfig
37 configFile = "config.xml"
38 configOptions = {}
39 opts, args = getopt.getopt(sys.argv[1:], "bc:o:dDgtlp:h", ["background", "config=", "option=", "debug", "Debug", "garbage", "traceback", "log=", "pid=", "help"])
40 for o, v in opts:
41 if o in ("-c", "--config"):
42 configFile = v
43 elif o in ("-b", "--background"):
44 config.background = True
45 elif o in ("-d", "--debug"):
46 config.debugLevel = "2"
47 elif o in ("-D", "--Debug"):
48 config.debugLevel = "3"
49 elif o in ("-g", "--garbage"):
50 import gc
51 gc.set_debug(gc.DEBUG_LEAK|gc.DEBUG_STATS)
52 elif o in ("-t", "--traceback"):
53 config.debugLevel = "1"
54 elif o in ("-l", "--log"):
55 config.debugFile = v
56 elif o in ("-p", "--pid"):
57 config.pid = v
58 elif o in ("-o", "--option"):
59 var, setting = v.split("=", 2)
60 configOptions[var] = setting
61 elif o in ("-h", "--help"):
62 print "%s [options]" % sys.argv[0]
63 print " -h print this help"
64 print " -b daemonize/background transport"
65 print " -c <file> read configuration from this file"
66 print " -d print debugging output"
67 print " -D print extended debugging output"
68 print " -g print garbage collection output"
69 print " -t print debugging only on traceback"
70 print " -l <file> write debugging output to file"
71 print " -p <file> write process ID to file"
72 print " -o <var>=<setting> set config var to setting"
73 sys.exit(0)
74
75 xmlconfig.reloadConfig(configFile, configOptions)
76
77 if config.reactor:
78 # They picked their own reactor. Lets install it.
79 del sys.modules["twisted.internet.reactor"]
80 if config.reactor == "epoll":
81 from twisted.internet import epollreactor
82 epollreactor.install()
83 elif config.reactor == "poll":
84 from twisted.internet import pollreactor
85 pollreactor.install()
86 elif config.reactor == "kqueue":
87 from twisted.internet import kqreactor
88 kqreactor.install()
89 elif len(config.reactor) > 0:
90 print "Unknown reactor: ", config.reactor, ". Using select(), reactor."
91
92
93 from twisted.internet import reactor, task
94 from twisted.internet.defer import Deferred
95 from tlib.xmlw import Element, jid, component
96 from debug import LogEvent, INFO, WARN, ERROR
97
98 import debug
99 import utils
100 import xdb
101 import avatar
102 import session
103 import jabw
104 import disco
105 import register
106 import misciq
107 import ft
108 import lang
109 import legacy
110 import housekeep
111
112
113
114 class PyTransport(component.Service):
115 def __init__(self):
116 LogEvent(INFO)
117
118 # Discovery, as well as some builtin features
119 self.discovery = disco.ServerDiscovery(self)
120 self.discovery.addIdentity("gateway", legacy.id, legacy.name, config.jid)
121 self.discovery.addIdentity("conference", "text", legacy.name + " Chatrooms", config.jid)
122 self.discovery.addFeature(disco.XCONFERENCE, None, config.jid) # So that clients know you can create groupchat rooms on the server
123 self.discovery.addFeature("jabber:iq:conference", None, config.jid) # We don't actually support this, but Psi has a bug where it looks for this instead of the above
124 self.discovery.addIdentity("client", "pc", "MSN Messenger", "USER")
125
126 self.xdb = xdb.XDB(config.jid, legacy.mangle)
127 self.avatarCache = avatar.AvatarCache()
128 self.registermanager = register.RegisterManager(self)
129 self.gatewayTranslator = misciq.GatewayTranslator(self)
130 self.versionTeller = misciq.VersionTeller(self)
131 self.pingService = misciq.PingService(self)
132 self.adHocCommands = misciq.AdHocCommands(self)
133 self.vCardFactory = misciq.VCardFactory(self)
134 self.iqAvatarFactor = misciq.IqAvatarFactory(self)
135 self.connectUsers = misciq.ConnectUsers(self)
136 if config.ftJabberPort:
137 self.ftSOCKS5Receive = ft.Proxy65(int(config.ftJabberPort))
138 self.ftSOCKS5Send = misciq.Socks5FileTransfer(self)
139 if config.ftOOBPort:
140 self.ftOOBReceive = ft.FileTransferOOBReceive(int(config.ftOOBPort))
141 self.ftOOBSend = misciq.FileTransferOOBSend(self)
142 self.statistics = misciq.Statistics(self)
143 self.startTime = int(time.time())
144
145 self.xmlstream = None
146 self.sessions = {}
147
148 # Groupchat ID handling
149 self.lastID = 0
150 self.reservedIDs = []
151
152 # Message IDs
153 self.messageID = 0
154
155 self.loopTask = task.LoopingCall(self.loopFunc)
156 self.loopTask.start(60.0)
157
158 def removeMe(self):
159 LogEvent(INFO)
160 for session in self.sessions.copy():
161 self.sessions[session].removeMe()
162
163 def makeMessageID(self):
164 self.messageID += 1
165 return str(self.messageID)
166
167 def makeID(self):
168 newID = "r" + str(self.lastID)
169 self.lastID += 1
170 if self.reservedIDs.count(newID) > 0:
171 # Ack, it's already used.. Try again
172 return self.makeID()
173 else:
174 return newID
175
176 def reserveID(self, ID):
177 self.reservedIDs.append(ID)
178
179 def loopFunc(self):
180 numsessions = len(self.sessions)
181
182 #if config.debugOn and numsessions > 0:
183 # print "Sessions:"
184 # for key in self.sessions:
185 # print "\t" + self.sessions[key].jabberID
186
187 self.statistics.stats["Uptime"] = int(time.time()) - self.startTime
188 self.statistics.stats["OnlineUsers"] = numsessions
189 legacy.updateStats(self.statistics)
190 if numsessions > 0:
191 oldDict = self.sessions.copy()
192 self.sessions = {}
193 for key in oldDict:
194 s = oldDict[key]
195 if not s.alive:
196 LogEvent(WARN, "", "Ghost session found.")
197 # Don't add it to the new dictionary. Effectively removing it
198 else:
199 self.sessions[key] = s
200
201 def componentConnected(self, xmlstream):
202 LogEvent(INFO)
203 self.xmlstream = xmlstream
204 self.xmlstream.addObserver("/iq", self.discovery.onIq)
205 self.xmlstream.addObserver("/presence", self.onPresence)
206 self.xmlstream.addObserver("/message", self.onMessage)
207 self.xmlstream.addObserver("/route", self.onRouteMessage)
208 if config.useXCP:
209 pres = Element((None, "presence"))
210 pres.attributes["to"] = "presence@-internal"
211 pres.attributes["from"] = config.compjid
212 x = pres.addElement("x")
213 x.attributes["xmlns"] = "http://www.jabber.com/schemas/component-presence.xsd"
214 x.attributes["xmlns:config"] = "http://www.jabber.com/config"
215 x.attributes["config:version"] = "1"
216 x.attributes["protocol-version"] = "1.0"
217 x.attributes["config-ns"] = legacy.url + "/component"
218 self.send(pres)
219
220 def componentDisconnected(self):
221 LogEvent(INFO)
222 self.xmlstream = None
223
224 def onRouteMessage(self, el):
225 for child in el.elements():
226 if child.name == "message":
227 self.onMessage(child)
228 elif child.name == "presence":
229 # Ignore any presence broadcasts about other XCP components
230 if child.getAttribute("to") and child.getAttribute("to").find("@-internal") > 0: return
231 self.onPresence(child)
232 elif child.name == "iq":
233 self.discovery.onIq(child)
234
235 def onMessage(self, el):
236 fro = el.getAttribute("from")
237 try:
238 froj = jid.intern(fro)
239 except Exception, e:
240 LogEvent(WARN, "", "Failed stringprep.")
241 return
242 mtype = el.getAttribute("type")
243 s = self.sessions.get(froj.userhost(), None)
244 if mtype == "error" and s:
245 s.removeMe()
246 elif s:
247 s.onMessage(el)
248 else:
249 to = el.getAttribute("to")
250 ulang = utils.getLang(el)
251 body = None
252 for child in el.elements():
253 if child.name == "body":
254 body = child.__str__()
255 LogEvent(INFO, "", "Sending error response to a message outside of session.")
256 jabw.sendErrorMessage(self, fro, to, "auth", "not-authorized", lang.get(ulang).notLoggedIn, body)
257
258 def onPresence(self, el):
259 fro = el.getAttribute("from")
260 to = el.getAttribute("to")
261 try:
262 froj = jid.intern(fro)
263 toj = jid.intern(to)
264 except Exception, e:
265 LogEvent(WARN, "", "Failed stringprep.")
266 return
267
268 s = self.sessions.get(froj.userhost())
269 if s:
270 s.onPresence(el)
271 else:
272 ulang = utils.getLang(el)
273 ptype = el.getAttribute("type")
274 if to.find('@') < 0:
275 # If the presence packet is to the transport (not a user) and there isn't already a session
276 if not el.getAttribute("type"): # Don't create a session unless they're sending available presence
277 LogEvent(INFO, "", "Attempting to create a new session.")
278 s = session.makeSession(self, froj.userhost(), ulang)
279 if s:
280 self.statistics.stats["TotalUsers"] += 1
281 self.sessions[froj.userhost()] = s
282 LogEvent(INFO, "", "New session created.")
283 # Send the first presence
284 s.onPresence(el)
285 else:
286 LogEvent(INFO, "", "Failed to create session")
287 jabw.sendMessage(self, to=froj.userhost(), fro=config.jid, body=lang.get(ulang).notRegistered)
288
289 elif el.getAttribute("type") != "error":
290 LogEvent(INFO, "", "Sending unavailable presence to non-logged in user.")
291 pres = Element((None, "presence"))
292 pres.attributes["from"] = to
293 pres.attributes["to"] = fro
294 pres.attributes["type"] = "unavailable"
295 self.send(pres)
296 return
297
298 elif ptype and (ptype.startswith("subscribe") or ptype.startswith("unsubscribe")):
299 # They haven't logged in, and are trying to change subscription to a user
300 # Lets log them in and then do it
301 LogEvent(INFO, "", "Attempting to create a session to do subscription stuff.")
302 s = session.makeSession(self, froj.userhost(), ulang)
303 if s:
304 self.sessions[froj.userhost()] = s
305 LogEvent(INFO, "", "New session created.")
306 # Tell the session there's a new resource
307 s.handleResourcePresence(froj.userhost(), froj.resource, toj.userhost(), toj.resource, 0, None, None, None)
308 # Send this subscription
309 s.onPresence(el)
310
311
312 class App:
313 def __init__(self):
314 # Check for any other instances
315 if config.pid and os.name != "posix":
316 config.pid = ""
317 if config.pid:
318 twistd.checkPID(config.pid)
319
320 # Do any auto-update stuff
321 housekeep.init()
322
323 # Daemonise the process and write the PID file
324 if config.background and os.name == "posix":
325 twistd.daemonize()
326 if config.pid:
327 self.writePID()
328
329 # Initialise debugging, and do the other half of SIGHUPstuff
330 debug.reloadConfig()
331 legacy.reloadConfig()
332
333 # Start the service
334 jid = config.jid
335 if config.useXCP and config.compjid:
336 jid = config.compjid
337 self.c = component.buildServiceManager(jid, config.secret, "tcp:%s:%s" % (config.mainServer, config.port))
338 self.transportSvc = PyTransport()
339 self.transportSvc.setServiceParent(self.c)
340 self.c.startService()
341 reactor.addSystemEventTrigger('before', 'shutdown', self.shuttingDown)
342
343 def writePID(self):
344 # Create a PID file
345 pid = str(os.getpid())
346 pf = open(config.pid, "w")
347 pf.write("%s\n" % pid)
348 pf.close()
349
350 def shuttingDown(self):
351 self.transportSvc.removeMe()
352 # Keep the transport running for another 3 seconds
353 def cb(ignored=None):
354 if config.pid:
355 twistd.removePID(config.pid)
356 d = Deferred()
357 d.addCallback(cb)
358 reactor.callLater(3.0, d.callback, None)
359 return d
360
361
362
363 def SIGHUPstuff(*args):
364 global configFile, configOptions
365 xmlconfig.reloadConfig(configFile, configOptions)
366 if config.pid and os.name != "posix":
367 config.pid = ""
368 debug.reloadConfig()
369 legacy.reloadConfig()
370
371 if os.name == "posix":
372 import signal
373 # Set SIGHUP to reload the config file & close & open debug file
374 signal.signal(signal.SIGHUP, SIGHUPstuff)
375 # Load some scripts for PID and daemonising
376 from twisted.scripts import twistd
377
378
379 def main():
380 # Create the application
381 app = App()
382 reactor.run()
383
384 if __name__ == "__main__":
385 main()
386