--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# slapd-backend-shell - backend shell pour slapd (OpenLDAP)
+# Copyright ©2010 Agence universitaire de la Francophonie
+# http://www.auf.org/
+# Licence : GNU General Public License, version 3
+# Auteur : Progfou <jean-christophe.andre@auf.org>
+# Création : 2010-03-23
+#
+# Documentation de base : voir "man slapd-shell"
+#
+# Mise en place :
+# - placer ce script dans /usr/local/sbin/slapd-backend-shell
+# - le rendre exécutable
+# - ajouter les lignes suivantes dans /etc/ldap/slapd.conf
+# moduleload back_shell
+# database shell
+# suffix "o=shell"
+# bind /usr/local/sbin/slapd-shell-backend
+# unbind /usr/local/sbin/slapd-shell-backend
+# search /usr/local/sbin/slapd-shell-backend
+# - ajouter éventuellement "loglevel stats shell" et surveiller /var/log/debug
+# - créer un dossier /var/log/ldap et l'attribuer à openldap:openldap en 0700
+# - relancer le service slapd
+# - tester avec (le mot de passe est test1) :
+# ldapsearch -x -b o=shell -D mail=test1@example.com,o=shell -W
+#
+
+import os
+import sys
+import logging
+from crypt import crypt
+
+LDAP_BASE = 'o=shell'
+LOGFILE = '/var/log/ldap/backend-shell.log'
+
+#=============================================================================
+# CONSTANTES
+#=============================================================================
+
+# BIND OPERATION, voir http://tools.ietf.org/html/rfc2251#section-4.2
+LDAP_AUTH_SIMPLE = 128
+
+# BIND RESPONSE, voir http://tools.ietf.org/html/rfc2251#section-4.2.3
+# et http://tools.ietf.org/html/rfc2251#section-4.1.10
+# quand l'opération se termine avec succès
+LDAP_SUCCESS = 0
+# quand ce serveur n'est pas approprié
+LDAP_REFERRAL = 10
+# quand c'est un méchanisme SASL inconnu
+LDAP_AUTH_METHOD_NOT_SUPPORTED = 7
+# quand on veut forcer une authentification (interdir l'accès anonyme) :
+LDAP_INAPPROPRIATE_AUHTENTICATION = 48
+# quand le mot de passe est incorrect :
+LDAP_INVALID_CREDENTIALS = 49
+
+#=============================================================================
+# OUTILS
+#=============================================================================
+
+_cache = {
+ 'test1@example.com': '$1$2JF5hLaZ$tZz4RXau.GCTqUsAmCeXM/',
+ 'test2@example.com': '$1$o2YURjVS$Qd02DXFpGqPtfy1yGH1CM0',
+}
+
+def find_cred(key):
+ """Recherche du mot de passe crypté en fonction de l'utilisateur.
+ On cherche d'abord dans le cache... et un jour dans la vraie base. ;-)
+ """
+ if key in _cache:
+ return _cache[key]
+ return None
+
+def search_object(filter=None):
+ """On recherche les objects correspondant aux filtre.
+ """
+ result = [ ]
+ # FIXME: tout pour le moment...
+ # TODO: tenir compte de filter
+ for key in _cache:
+ dn = 'mail=%s,%s' % (key, LDAP_BASE)
+ obj = { 'dn': [ dn ] }
+ for k,v in (x.split('=') for x in dn.split(',')):
+ if k not in obj:
+ obj[k] = [ v ]
+ else:
+ obj[k].append(v)
+ obj.update({'mail': [ key ] })
+ obj.update({'userPassword': [ '{CRYPT}' + _cache[key] ] })
+ result.append(obj)
+ return result
+
+def get_params():
+ params = { }
+ line = sys.stdin.readline().rstrip('\n').rstrip('\r')
+ while line != '':
+ key,value = line.split(': ')
+ if key not in params:
+ params[key] = [ value ]
+ else:
+ params[key].append(value)
+ line = sys.stdin.readline().rstrip('\n').rstrip('\r')
+ return params
+
+def ldap_result(code=None, matched=None, info=None):
+ lines = [ 'RESULT' ]
+ if code != None:
+ lines.append('code: %s' % code)
+ if matched != None:
+ lines.append('matched: %s' % matched)
+ if info != None:
+ lines.append('info: %s' % info)
+ return '\n'.join(lines) + '\n'
+
+#=============================================================================
+# PROGRAMME PRINCIPAL
+#=============================================================================
+
+logging.basicConfig(level=logging.DEBUG, filename=LOGFILE,
+ format="%(asctime)s %(levelname)s %(message)s")
+logging.info('Starting.')
+
+command = sys.stdin.readline().rstrip('\n').rstrip('\r')
+logging.debug("Got command '%s'." % command)
+
+if command == 'UNBIND':
+ # msgid: <message id>
+ # <repeat { "suffix:" <database suffix DN> }>
+ # dn: <bound DN>
+ params = get_params()
+ logging.debug("Got params '%s'." % params)
+ # FIXME: on devrait vérifier les suffixes, le dn, noter le unbind, ...
+ print ldap_result(code=LDAP_SUCCESS)
+ sys.exit(0)
+
+if command == 'BIND':
+ # msgid: <message id>
+ # <repeat { "suffix:" <database suffix DN> }>
+ # dn: <DN>
+ # method: <method number>
+ # credlen: <length of <credentials>>
+ # cred: <credentials>
+ params = get_params()
+ logging.debug("Got params '%s'." % params)
+ # on vérifie qu'on supporte bien la méthode demandée
+ method = int(params['method'][0])
+ if method != LDAP_AUTH_SIMPLE:
+ logging.info("Unsupported auth method '%s'." % method)
+ print ldap_result(code=LDAP_AUTH_METHOD_NOT_SUPPORTED)
+ sys.exit(0)
+ # on vérifie que la demande concerne bien notre base
+ if LDAP_BASE not in params['suffix']:
+ logging.warning("'%s' not in requested suffixes." % LDAP_BASE)
+ print ldap_result(code=LDAP_REFERRAL)
+ sys.exit(0)
+ # on vérifie que le 'dn' n'est pas vide (pas de connexion anonyme)
+ dn = params['dn'][0]
+ if dn == '':
+ logging.info("Rejecting anonymous connection.")
+ print ldap_result(code=LDAP_INAPPROPRIATE_AUTHENTICATION)
+ sys.exit(0)
+ # on vérifie que le 'dn' demandé est valide (contient 'mail')
+ dn_dict = dict([x.split('=') for x in dn.split(',')])
+ if 'mail' not in dn_dict:
+ logging.info("Invalid dn '%s' (missing 'mail' component)." % dn)
+ print ldap_result(code=LDAP_INVALID_CREDENTIALS)
+ sys.exit(0)
+ # on vérifie que l'objet demandé existe bien dans la base
+ mail = dn_dict['mail']
+ cred = find_cred(mail)
+ if not cred:
+ logging.info("Can't find credential for '%s'." % mail)
+ print ldap_result(code=LDAP_INVALID_CREDENTIALS)
+ sys.exit(0)
+ # on vérifie que le mot de passe correspond bien
+ if crypt(params['cred'][0], cred) != cred:
+ logging.info("Credential mismatch for '%s'." % mail)
+ print ldap_result(code=LDAP_INVALID_CREDENTIALS)
+ sys.exit(0)
+ # si on a passé tout ça, c'est tout bon ! :-)
+ logging.debug("Successful bind for '%s'." % mail)
+ print ldap_result(code=LDAP_SUCCESS)
+ sys.exit(0)
+
+if command == 'SEARCH':
+ # msgid: <message id>
+ # <repeat { "suffix:" <database suffix DN> }>
+ # base: <base DN>
+ # scope: <0-2, see ldap.h>
+ # deref: <0-3, see ldap.h>
+ # sizelimit: <size limit>
+ # timelimit: <time limit>
+ # filter: <filter>
+ # attrsonly: <0 or 1>
+ # attrs: <"all" or space-separated attribute list>
+ params = get_params()
+ logging.debug("Got params '%s'." % params)
+ # on vérifie que la demande concerne bien notre base
+ if LDAP_BASE not in params['suffix']:
+ logging.warning("'%s' not in requested suffixes." % LDAP_BASE)
+ print ldap_result(code=LDAP_REFERRAL)
+ sys.exit(0)
+ base = params['base'][0]
+ if base != LDAP_BASE:
+ logging.warning("Invalid base '%s'." % base)
+ print ldap_result(code=LDAP_REFERRAL)
+ sys.exit(0)
+ # on affiche le résultat
+ sizelimit = int(params['sizelimit'][0])
+ filter = params['filter'][0]
+ attrs = params['attrs'][0].split(' ')
+ logging.debug("Start of search for '%s' (sizelimit=%s, attrs='%s')." \
+ % (filter, sizelimit, attrs))
+ for obj in search_object(filter)[0:sizelimit]:
+ if attrs[0] == 'all':
+ attr_list = obj.keys()
+ else:
+ attr_list = attrs
+ print 'dn: %s' % obj['dn'][0]
+ attr_list = set(attr_list) - set(['dn','userPassword'])
+ for attr in attr_list:
+ try:
+ for value in obj[attr]:
+ print '%s: %s' % (attr, value)
+ except:
+ pass
+ print ""
+ # si on a passé tout ça, c'est tout bon ! :-)
+ logging.debug("End of search.")
+ print ldap_result(code=LDAP_SUCCESS)
+ sys.exit(0)
+
+sys.exit(1)