Date de création : 22 juin 2008
"""
-import sys, getopt, xmpp
-import urllib
+import sys, getopt, xmpp, urllib
from xml.dom import minidom
DEMLOG_URL="http://bacasable.auf/rest/demlog/%s"
PHONE_URL="http://intranet.auf/intranet_annuaire_telephonique.php3"
def xmlurlopen(url):
- xmlfile = urllib.urlopen(url)
- xml = xmlfile.read()
- if not xml.startswith('<?xml '):
- typeheader = xmlfile.headers.typeheader
- if typeheader and (typeheader.find('=') >= 0):
- (junk,charset) = typeheader.rsplit('=',1)
- else:
- charset = 'utf-8'
- xml = ('<?xml version="1.0" encoding="%s"?>\n' % charset) + xml
- xmlfile.close()
- return xml
+ xmlfile = urllib.urlopen(url)
+ xml = xmlfile.read()
+ if not xml.startswith('<?xml '):
+ typeheader = xmlfile.headers.typeheader
+ if typeheader and (typeheader.find('=') >= 0):
+ (junk,charset) = typeheader.rsplit('=',1)
+ else:
+ charset = 'utf-8'
+ xml = ('<?xml version="1.0" encoding="%s"?>\n' % charset) + xml
+ xmlfile.close()
+ return xml
def _xmltojab_r(node, prefix = ''):
- if node.nodeType == node.TEXT_NODE:
- result = node.nodeValue.strip().replace('\n', '\\n')
- if result <> '': result = prefix + result + '\n'
- return result
- elif node.nodeType == node.ELEMENT_NODE:
- prefix += node.nodeName + ' : '
- if node.nodeValue <> None:
- value = node.nodeValue.strip().replace('\n', '\\n')
- if value <> '': value = prefix + value + '\n'
- result = value
- else:
- result = ''
- for child in node.childNodes:
- result += _xmltojab_r(child, prefix)
- return result
- else:
- return prefix + u'Type de noeud inconnu...\n'
+ if node.nodeType == node.TEXT_NODE:
+ result = node.nodeValue.strip().replace('\n', '\\n')
+ if result <> '': result = prefix + result + '\n'
+ return result
+ elif node.nodeType == node.ELEMENT_NODE:
+ prefix += node.nodeName + ' : '
+ if node.nodeValue <> None:
+ value = node.nodeValue.strip().replace('\n', '\\n')
+ if value <> '': value = prefix + value + '\n'
+ result = value
+ else:
+ result = ''
+ for child in node.childNodes:
+ result += _xmltojab_r(child, prefix)
+ return result
+ else:
+ return prefix + u'Type de noeud inconnu...\n'
def xmltojab(xml, startwith=''):
- try:
- doc = minidom.parseString(xml)
- except:
- return ''
- if startwith <> '':
- nodes = []
- for node in doc.getElementsByTagName(startwith):
- nodes += node.childNodes
- else:
- nodes = doc.documentElement.childNodes
- result=''
- for node in nodes:
- result += _xmltojab_r(node)
- return result
+ try:
+ doc = minidom.parseString(xml)
+ except:
+ return ''
+ if startwith <> '':
+ nodes = []
+ for node in doc.getElementsByTagName(startwith):
+ nodes += node.childNodes
+ else:
+ nodes = doc.documentElement.childNodes
+ result=''
+ for node in nodes:
+ result += _xmltojab_r(node)
+ return result
def _phonefind(xhtml, nom):
- try:
- xmldoc = minidom.parseString(xhtml)
- except:
- return ''
- # recherche de la division contenant les infos
- main_content = None
- for div in xmldoc.getElementsByTagName('div'):
- if div.hasAttribute('id') and div.getAttribute('id') == 'main_content':
- main_content = div
- break
- if main_content == None: return ''
- # parcours des infos pour trouver le nom demandé
- result = u''
- nom = nom.lower()
- for tr in main_content.getElementsByTagName('tr'):
- tds = tr.getElementsByTagName('td')
- if len(tds) == 3:
- # FIXME: gérer les futurs changements de structure !!
- nom_complet = tds[0].firstChild.firstChild.nodeValue
- if nom_complet.lower().find(nom) >= 0:
- result += nom_complet + ':'
- telpub = tds[1].firstChild
- if telpub <> None:
- result += u" %s (public)" % telpub.nodeValue
- telip = tds[2].firstChild
- if telip <> None:
- result += u" %s (VoIP)" % telip.nodeValue
- result += "\n"
- return result
+ try:
+ xmldoc = minidom.parseString(xhtml)
+ except:
+ return ''
+ # recherche de la division contenant les infos
+ main_content = None
+ for div in xmldoc.getElementsByTagName('div'):
+ if div.hasAttribute('id') and div.getAttribute('id') == 'main_content':
+ main_content = div
+ break
+ if main_content == None: return ''
+ # parcours des infos pour trouver le nom demandé
+ result = u''
+ nom = nom.lower()
+ for tr in main_content.getElementsByTagName('tr'):
+ tds = tr.getElementsByTagName('td')
+ if len(tds) == 3:
+ # FIXME: gérer les futurs changements de structure !!
+ nom_complet = tds[0].firstChild.firstChild.nodeValue
+ if nom_complet.lower().find(nom) >= 0:
+ result += nom_complet + ':'
+ telpub = tds[1].firstChild
+ if telpub <> None:
+ result += u" %s (public)" % telpub.nodeValue
+ telip = tds[2].firstChild
+ if telip <> None:
+ result += u" %s (VoIP)" % telip.nodeValue
+ result += "\n"
+ return result
class Bot(object):
- _jid = _password = ''
- _admins = ['jean-christophe.andre@auf.org', 'doan.manh.ha@auf.org', 'thomas.noel@auf.org']
- _salons = {}
- _quitter = False
- _quitter_message = 'Au revoir !'
+ _jid = _password = ''
+ _admins = ['jean-christophe.andre@auf.org', 'doan.manh.ha@auf.org', 'thomas.noel@auf.org']
+ _salons = {}
+ _quitter = False
+ _quitter_message = 'Au revoir !'
- def __init__(self, jid, password):
- self._jid = xmpp.JID(jid)
- self._password = password
- if self._jid.getResource() == '':
- self._jid.setResource('botap')
+ def __init__(self, jid, password):
+ self._jid = xmpp.JID(jid)
+ self._password = password
+ if self._jid.getResource() == '':
+ self._jid.setResource('jabbot')
- self._client = xmpp.Client(self._jid.getDomain(), debug=[])
- try:
- if not self._client.connect():
- raise IOError, u"can't connect to '%s'" % self._jid.getDomain()
- except IOError:
- raise IOError, u"can't connect to '%s'" % self._jid.getDomain()
- self._client.RegisterHandler('presence', self._presenceHandler)
- self._client.RegisterHandler('message', self._messageHandler)
- self._client.RegisterHandler('iq', self._iqHandler)
- # self._client.UnregisterDisconnectHandler(self._client.DisconnectHandler)
+ self._client = xmpp.Client(self._jid.getDomain(), debug=[])
+ try:
+ if not self._client.connect():
+ raise IOError, u"can't connect to '%s'" % self._jid.getDomain()
+ except IOError:
+ raise IOError, u"can't connect to '%s'" % self._jid.getDomain()
+ self._client.RegisterHandler('presence', self._presenceHandler)
+ self._client.RegisterHandler('message', self._messageHandler)
+ self._client.RegisterHandler('iq', self._iqHandler)
+ # self._client.UnregisterDisconnectHandler(self._client.DisconnectHandler)
- auth = self._client.auth(self._jid.getNode(), self._password, self._jid.getResource())
- if not auth:
- raise IOError, u"unable to authenticate '%s'" % self._jid.getNode()
- self._client.sendPresence(jid=self._jid.getDomain())
- if self._jid.getDomain() == 'auf.org':
- self.cmd_join('test@reunion.auf.org/' + self._jid.getResource())
- #self.cmd_join('tap@reunion.auf.org/' + self._jid.getResource())
- if self._jid.getDomain() == 'net127':
- self.cmd_join('test@conf.net127/' + self._jid.getResource())
+ auth = self._client.auth(self._jid.getNode(), self._password, self._jid.getResource())
+ if not auth:
+ raise IOError, u"unable to authenticate '%s'" % self._jid.getNode()
+ self._client.sendPresence(jid=self._jid.getDomain())
+ if self._jid.getDomain() == 'auf.org':
+ self.cmd_join('test@reunion.auf.org/' + self._jid.getResource())
+ #self.cmd_join('tap@reunion.auf.org/' + self._jid.getResource())
+ if self._jid.getDomain() == 'net127':
+ self.cmd_join('test@conf.net127/' + self._jid.getResource())
- def __del__(self):
- if not self._client.isConnected(): return
- if self._jid.getDomain() == 'auf.org':
- self.cmd_part('tap@reunion.auf.org/' + self._jid.getResource())
- self.cmd_part('test@reunion.auf.org/' + self._jid.getResource())
- if self._jid.getDomain() == 'net127':
- self.cmd_part('test@conf.net127/' + self._jid.getResource())
- self._client.sendPresence(typ='unavailable')
- self._client.disconnect()
+ def __del__(self):
+ if not self._client.isConnected(): return
+ if self._jid.getDomain() == 'auf.org':
+ self.cmd_part('tap@reunion.auf.org/' + self._jid.getResource())
+ self.cmd_part('test@reunion.auf.org/' + self._jid.getResource())
+ if self._jid.getDomain() == 'net127':
+ self.cmd_part('test@conf.net127/' + self._jid.getResource())
+ self._client.sendPresence(typ='unavailable')
+ self._client.disconnect()
- def _publicReply(self, message, text):
- reply = message.buildReply(text)
- if message.getType() == 'groupchat':
- reply.setType('groupchat')
- reply.getTo().setResource('')
- self._client.send(reply)
- print u"--> reply=[%s]" % reply
+ def _publicReply(self, message, text):
+ reply = message.buildReply(text)
+ if message.getType() == 'groupchat':
+ reply.setType('groupchat')
+ reply.getTo().setResource('')
+ self._client.send(reply)
+ print u"--> reply=[%s]" % reply
- def _presenceHandler(self, client, presence):
- print u"<-- presence=[%s]" % presence
- # gestion de notre propre présence
- if presence.getTo() <> self._jid: return
- # confirmation de l'entrée dans un salon
- if presence.getFrom() in self._salons:
- self._salons[presence.getFrom()]['joined'] = True
- self._client.send(xmpp.Message(to=presence.getFrom().getStripped(),typ='groupchat',body='Bonjour tout le monde !'))
+ def _presenceHandler(self, client, presence):
+ print u"<-- presence=[%s]" % presence
+ # gestion de notre propre présence
+ if presence.getTo() <> self._jid: return
+ # confirmation de l'entrée dans un salon
+ if presence.getFrom() in self._salons:
+ self._salons[presence.getFrom()]['joined'] = True
+ self._client.send(xmpp.Message(to=presence.getFrom().getStripped(),typ='groupchat',body='Bonjour tout le monde !'))
- def _messageHandler(self, client, message):
- print u"<-- message=[%s]" % message
- jid = message.getFrom()
- text = message.getBody()
- # FIXME: on ne traite pas les messages d'erreur (pour le moment)
- if message.getType() == 'error': return
- # FIXME: on ne traite pas les messages vides de texte (pour le moment)
- # FIXME: du coup on rate les invitations, au moins...
- if text == None: return
- # on ne traite pas les messages qui viennent de nous-même (sinon boucle)
- if (jid == self._jid) or (jid in self._salons): return
- # on ne traite pas les messages des salons hormis les commandes explicites
- if (message.getType() == 'groupchat') and not text.startswith('!'): return
- print u"... on traite..."
- if text.startswith('!'): text = text[1:]
- if text.find(' ') >= 0:
- command,args = text.split(' ',1)
- else:
- command,args = text,''
- if command == 'aide':
- reply = """Aide de %s :
+ def _messageHandler(self, client, message):
+ print u"<-- message=[%s]" % message
+ jid = message.getFrom()
+ text = message.getBody()
+ # FIXME: on ne traite pas les messages d'erreur (pour le moment)
+ if message.getType() == 'error': return
+ # FIXME: on ne traite pas les messages vides de texte (pour le moment)
+ # FIXME: du coup on rate les invitations, au moins...
+ if text == None: return
+ # on ne traite pas les messages qui viennent de nous-même (sinon boucle)
+ if (jid == self._jid) or (jid in self._salons): return
+ # on ne traite pas les messages des salons hormis les commandes explicites
+ if (message.getType() == 'groupchat') and not text.startswith('!'): return
+ print u"... on traite..."
+ if text.startswith('!'): text = text[1:]
+ if text.find(' ') >= 0:
+ command,args = text.split(' ',1)
+ else:
+ command,args = text,''
+ if command == 'aide':
+ reply = """Aide de %s :
aide cette aide
dire <texte> répète ce texte en public
inviter <jid> invite ce contact dans le salon en cours (cassé)
sortir <salon> sort du salon en cours
demlog <numéro> affiche les données de cette DEMLOG
tel <nom> affiche le(s) numéro(s) de téléphone""" % self._jid
- self._publicReply(message, reply)
- elif command == 'dire':
- if args == '':
- self._publicReply(message, "Dire quoi ?")
- else:
- # est-ce une commande publique ?
- if message.getType() == 'groupchat':
- reply = message.buildReply(args)
- reply.setType('groupchat')
- reply.getTo().setResource('')
- self._client.send(reply)
- # est-ce qu'on parle au bot via le contact d'un salon ?
- elif message.getTo() in self._salons:
- reply = message.buildReply(args)
- reply.setTo(message.getTo().getStripped())
- self._client.send(reply)
- # envoyer sur tous les salons... gasp!
- else:
- reply = message.buildReply(args)
- reply.setType('groupchat')
- for salon in self._salons.keys():
- reply.setTo(xmpp.JID(salon).getStripped())
- self._client.send(reply)
- # envoi à une destination précise : trouver à qui on veut parler
- #if (args.find('@') >= 0) and (args.find('@') < args.find(' ')):
- # say_jid,args = args.split(' ',1)
- # reply = message.buildReply(args)
- # reply.setTo(say_jid)
- elif command == 'inviter':
- if args == '':
- self._publicReply(message, "Inviter qui ?")
- else:
- for un_jid in args.split(' '):
- self.cmd_invite(un_jid, jid.getStripped())
- elif command == 'entrer':
- if args == '':
- self._publicReply(message, "Entrer dans quel salon ?")
- else:
- self.cmd_join(args + '/' + self._jid.getResource())
- elif command == 'sortir':
- if args == '' and (message.getType() == "groupchat") :
- args = jid.getStripped()
- if args == '':
- self._publicReply(message, "Sortir de quel salon ?")
- else:
- self.cmd_part(args + '/' + self._jid.getResource())
- elif command == u'numéro':
- self._publicReply(message, """C'est une référence à la série culte « Le Prisonnier ».\nvoir http://fr.wikipedia.org/wiki/Le_Prisonnier""")
- elif command == 'quitter':
- if jid.getStripped() == self._jid.getStripped():
- self._publicReply(message, "D'accord, au revoir !")
- if args <> '': self._quitter_message = args
- self._quitter = True
- else:
- self._publicReply(message, u"Commande '%s' non autorisée." % command)
- #elif command == 'budget':
- # try:
- # xml = urllib.urlopen(BUDGET_URL % args).read()
- # doc = minidom.parseString(xml).documentElement
- # reply = u"Budget pour %s :" % args
- # for node in doc.childNodes:
- # if node.nodeType == node.ELEMENT_NODE:
- # for value in [n.wholeText for n in node.childNodes]:
- # reply += '\n%s : %s' % (node.nodeName, value)
- # except:
- # reply = u"Erreur pour %s." % args
- # self._publicReply(message, reply)
- elif command == 'demlog':
- if args == '':
- reply = u"Quelle numéro de DEMLOG ?"
- else:
- demlog = xmlurlopen(DEMLOG_URL % args)
- reply = xmltojab(demlog, 'demlog').rstrip()
- if reply <> '':
- reply = (u"Données pour la DEMLOG %s :\n" % args) + reply
- else:
- reply = u"Erreur pour la DEMLOG %s." % args
- self._publicReply(message, reply)
- elif command == 'tel':
- if args == '':
- reply = u"Téléphone de qui ?"
- else:
- reply = _phonefind(xmlurlopen(PHONE_URL), args).rstrip()
- if reply <> '':
- reply = u"Recherche de téléphone pour '%s' :\n%s" % (args, reply)
- else:
- reply = u"Pas de téléphone trouvé pour '%s'." % args
- self._publicReply(message, reply)
- else:
- self._publicReply(message, "Commande '%s' inconnue." % command)
+ self._publicReply(message, reply)
+ elif command == 'dire':
+ if args == '':
+ self._publicReply(message, "Dire quoi ?")
+ else:
+ # est-ce une commande publique ?
+ if message.getType() == 'groupchat':
+ reply = message.buildReply(args)
+ reply.setType('groupchat')
+ reply.getTo().setResource('')
+ self._client.send(reply)
+ # est-ce qu'on parle au bot via le contact d'un salon ?
+ elif message.getTo() in self._salons:
+ reply = message.buildReply(args)
+ reply.setTo(message.getTo().getStripped())
+ self._client.send(reply)
+ # envoyer sur tous les salons... gasp!
+ else:
+ reply = message.buildReply(args)
+ reply.setType('groupchat')
+ for salon in self._salons.keys():
+ reply.setTo(xmpp.JID(salon).getStripped())
+ self._client.send(reply)
+ # envoi à une destination précise : trouver à qui on veut parler
+ #if (args.find('@') >= 0) and (args.find('@') < args.find(' ')):
+ # say_jid,args = args.split(' ',1)
+ # reply = message.buildReply(args)
+ # reply.setTo(say_jid)
+ elif command == 'inviter':
+ if args == '':
+ self._publicReply(message, "Inviter qui ?")
+ else:
+ for un_jid in args.split(' '):
+ self.cmd_invite(un_jid, jid.getStripped())
+ elif command == 'entrer':
+ if args == '':
+ self._publicReply(message, "Entrer dans quel salon ?")
+ else:
+ self.cmd_join(args + '/' + self._jid.getResource())
+ elif command == 'sortir':
+ if args == '' and (message.getType() == "groupchat") :
+ args = jid.getStripped()
+ if args == '':
+ self._publicReply(message, "Sortir de quel salon ?")
+ else:
+ self.cmd_part(args + '/' + self._jid.getResource())
+ elif command == u'numéro':
+ self._publicReply(message, """C'est une référence à la série culte « Le Prisonnier ».\nvoir http://fr.wikipedia.org/wiki/Le_Prisonnier""")
+ elif command == 'quitter':
+ if jid.getStripped() == self._jid.getStripped():
+ self._publicReply(message, "D'accord, au revoir !")
+ if args <> '': self._quitter_message = args
+ self._quitter = True
+ else:
+ self._publicReply(message, u"Commande '%s' non autorisée." % command)
+ #elif command == 'budget':
+ # try:
+ # xml = urllib.urlopen(BUDGET_URL % args).read()
+ # doc = minidom.parseString(xml).documentElement
+ # reply = u"Budget pour %s :" % args
+ # for node in doc.childNodes:
+ # if node.nodeType == node.ELEMENT_NODE:
+ # for value in [n.wholeText for n in node.childNodes]:
+ # reply += '\n%s : %s' % (node.nodeName, value)
+ # except:
+ # reply = u"Erreur pour %s." % args
+ # self._publicReply(message, reply)
+ elif command == 'demlog':
+ if args == '':
+ reply = u"Quelle numéro de DEMLOG ?"
+ else:
+ demlog = xmlurlopen(DEMLOG_URL % args)
+ reply = xmltojab(demlog, 'demlog').rstrip()
+ if reply <> '':
+ reply = (u"Données pour la DEMLOG %s :\n" % args) + reply
+ else:
+ reply = u"Erreur pour la DEMLOG %s." % args
+ self._publicReply(message, reply)
+ elif command == 'tel':
+ if args == '':
+ reply = u"Téléphone de qui ?"
+ else:
+ reply = _phonefind(xmlurlopen(PHONE_URL), args).rstrip()
+ if reply <> '':
+ reply = u"Recherche de téléphone pour '%s' :\n%s" % (args, reply)
+ else:
+ reply = u"Pas de téléphone trouvé pour '%s'." % args
+ self._publicReply(message, reply)
+ else:
+ self._publicReply(message, "Commande '%s' inconnue." % command)
- def _iqHandler(self, client, iq):
- print u"<-- iq=[%s]" % iq
+ def _iqHandler(self, client, iq):
+ print u"<-- iq=[%s]" % iq
- def cmd_join(self, room, password=''):
- #if self._salons.has_key(room) and self._salons[room]['joined']: return
- self._salons[room] = { 'joined': False, 'password': password }
- p = xmpp.Presence(to=room, priority='0', show='available', status="Je ne suis pas un numéro, je suis un bot libre !")
- x = p.setTag('x', namespace=xmpp.NS_MUC)
- if password <> '': x.setTagData('password', password)
- x.addChild('history', {'maxchars':'0','maxstanzas':'0'})
- self._client.send(p)
+ def cmd_join(self, room, password=''):
+ #if self._salons.has_key(room) and self._salons[room]['joined']: return
+ self._salons[room] = { 'joined': False, 'password': password }
+ p = xmpp.Presence(to=room, priority='0', show='available', status="Je ne suis pas un numéro, je suis un bot libre !")
+ x = p.setTag('x', namespace=xmpp.NS_MUC)
+ if password <> '': x.setTagData('password', password)
+ x.addChild('history', {'maxchars':'0','maxstanzas':'0'})
+ self._client.send(p)
- def cmd_part(self, room):
- p = xmpp.Presence(to=room, typ='unavailable')
- p.setTag('x', namespace=xmpp.NS_MUC)
- self._client.send(p)
+ def cmd_part(self, room):
+ p = xmpp.Presence(to=room, typ='unavailable')
+ p.setTag('x', namespace=xmpp.NS_MUC)
+ self._client.send(p)
- def cmd_invite(self, jid, room):
- m = xmpp.Message(to=jid,typ='normal',frm=room)
- x = m.setTag('x', namespace=xmpp.NS_MUC_USER)
- invite = x.addChild('invite', {'from':self._jid})
- reason = invite.addChild('reason')
- reason.addData(u"Vous êtes invité sur '%s'." % room)
- m.setTag('x', {'jid':room}, 'jabber:x:conference')
- print "DEBUG: m=[%s]" % m
- self._client.send(m)
+ def cmd_invite(self, jid, room):
+ m = xmpp.Message(to=jid,typ='normal',frm=room)
+ x = m.setTag('x', namespace=xmpp.NS_MUC_USER)
+ invite = x.addChild('invite', {'from':self._jid})
+ reason = invite.addChild('reason')
+ reason.addData(u"Vous êtes invité sur '%s'." % room)
+ m.setTag('x', {'jid':room}, 'jabber:x:conference')
+ print "DEBUG: m=[%s]" % m
+ self._client.send(m)
- def admins(self, new_admins=False):
- old_admins = self._admins
- if new_admins: self.admins = new_admins
- return old_admins
+ def admins(self, new_admins=False):
+ old_admins = self._admins
+ if new_admins: self.admins = new_admins
+ return old_admins
- def run(self):
- while (not self._quitter):
- try:
- self._client.Process(1)
- except KeyboardInterrupt:
- self._quitter = True
- self._client.send(xmpp.Message(to='test@reunion.auf.org', body=self._quitter_message, typ='groupchat'))
+ def run(self):
+ while (not self._quitter):
+ try:
+ self._client.Process(1)
+ except KeyboardInterrupt:
+ self._quitter = True
+ self._client.send(xmpp.Message(to='test@reunion.auf.org', body=self._quitter_message, typ='groupchat'))
-if __name__ == "__main__":
- if len(sys.argv)<>3:
- print "Usage: %s username@domain.tld password" % __file__
- sys.exit(-1)
- try:
- bot = Bot(jid=sys.argv[1], password=sys.argv[2])
- except IOError, msg:
- print "ERROR: %s" % msg
- sys.exit(-1)
- bot.run()
+if __name__ == '__main__':
+ if len(sys.argv) != 3:
+ print "Usage: %s jid password" % __file__
+ sys.exit(-1)
+ try:
+ bot = Bot(jid=sys.argv[1], password=sys.argv[2])
+ except IOError, msg:
+ print "ERROR: %s" % msg
+ sys.exit(-1)
+ bot.run()
# vim: ts=4 sw=4 et