#!/usr/bin/env python # -*- coding: utf-8 -*- """ Outil d'export de données w.c.s. Copyright : Agence universitaire de la Francophonie — www.auf.org Licence : GNU General Public Licence, version 2 Auteur : Jean Christophe André Date de création : 15 octobre 2009 Depends: wcs, python-simplejson, python-magic """ import os import os.path import shutil import logging from time import gmtime, strftime import simplejson as json import magic import mimetypes from wcs import publisher from wcs.formdef import FormDef from wcs.fields import TitleField, CommentField, TextField, \ StringField, ItemField, ItemsField, EmailField, \ DateField, FileField, BoolField, TableField def reduce_to_alnum(s, replacement_char='-'): """réduction d'une chaîne de caractères à de l'alpha-numérique""" avec_accent = u'çÇáàâÁÀÂéèêëÉÈÊËíìîïÍÌÎÏóòôöÓÒÔÖúùûüÚÙÛÜýỳyÿÝỲYŸ' sans_accent = u'cCaaaAAAeeeeEEEEiiiiIIIIooooOOOOuuuuUUUUyyyyYYYY' if type(s) is not unicode: s = unicode(s, 'utf-8') u = False r = '' for c in s: index = avec_accent.find(c) if index >= 0: r += sans_accent[index] elif ('a' <= c.lower() <= 'z') or ('0' <= c <= '9'): r += c elif len(r) > 0 and r[-1] != replacement_char: r += replacement_char else: # r == '' or r[-1] == replacement_char pass r = r.strip(replacement_char) if not u: r = r.encode('utf-8') return r def extract_fields(formdef, output_directory): """nommage des champs de façon unique""" # TODO: devrait retourner un résultat, qui serait alors sauvé en dehors # XXX: hack temporaire… :-/ global field_names f = open(os.path.join(output_directory, 'field-names.txt'), 'w') field_names = {} field_names_duplicates = {} for field in formdef.fields: if isinstance(field, TitleField) or isinstance(field, CommentField): continue if field.varname: name = field.varname else: name = reduce_to_alnum(field.label,'_').lower() if name in field_names.values(): # duplicat field_names_duplicates[name] = field_names_duplicates.get(name, 1) + 1 name = '%s_%d' % (name, field_names_duplicates[name]) field_names.update({field.id: name}) print >>f, "%s:%s:%s" % (field.id, field_names[field.id], field.label) f.close() f = open(os.path.join(output_directory, 'field-names.json'), 'wb') f.write(json.dumps(field_names, ensure_ascii=False)) f.close() def extract_data(formdef, output_directory): """extraction des données du formulaire""" # TODO: devrait retourner un résultat, qui serait alors sauvé en dehors # XXX: hack temporaire… :-/ global pub # on charge la base des types MIME une fois pour toutes #magicmime = magic.Magic(mime=True) => ce sera pour plus tard… magicmime = magic.open(magic.MAGIC_MIME) magicmime.load() liste_dossiers = [] for object in formdef.data_class().select(): if object.user is None: logging.warning("Dossier '%s' sans utilisateur associé ?!?"\ " On ignore...", object.id) continue result = { 'num_dossier': object.id, 'wcs_status': object.status, 'wcs_workflow_status': (object.status.startswith('wf-') and \ object.get_workflow_status().name or None), 'wcs_user_email': object.user.email, 'wcs_user_display_name': object.user.display_name, #'wcs_last_modified': strftime('%Y-%m-%d %H:%M:%S', gmtime(object.last_modified())), 'wcs_comments': [], } if object.evolution is not None: for e in object.evolution: if e.comment is not None: who = pub.user_class.get(e.who).display_name time = strftime('%Y-%m-%d %H:%M:%S', e.time) comment = '%s -- %s %s' % (e.comment, who, time) result['wcs_comments'].append(comment) qfiles = { } for field in formdef.fields: field_id = str(field.id) if not field_id in object.data: continue if isinstance(field, TitleField) or isinstance(field, CommentField): continue field_name = field_names[field_id] data = object.data.get(field_id) if data is None: result[field_name] = None continue if isinstance(field, StringField) or isinstance(field, TextField) \ or isinstance(field, EmailField) or isinstance(field, ItemField): result[field_name] = data elif isinstance(field, ItemsField) or isinstance(field, TableField): result[field_name] = data # liste => peux-être joindre sur ';' elif isinstance(field, BoolField): result[field_name] = (data == 'True') elif isinstance(field, DateField): if isinstance(data, time.struct_time): result[field_name] = '%04d-%02d-%02d' % (data.tm_year, data.tm_mon, data.tm_mday) else: result[field_name] = data elif isinstance(field, FileField): if '.' in data.orig_filename: extension = data.orig_filename.rpartition('.')[2].lower() else: # il n'y a pas d'extension dans le nom de fichier p = os.path.join(pub.app_dir, 'uploads', data.qfilename) try: #m = magicmime.from_file(p) => ce sera pour plus tard… m = magicmime.file(p).split()[0].strip(';') extension = mimetypes.guess_extension(m) except: logging.warning("Type de fichier inconnu pour '%s'.", p) extension = None if extension is not None: extension = extension[1:] else: extension = 'unknown' result[field_name] = "%s.%s" % (field_name, extension) qfiles[field_name] = data.qfilename else: logging.warning("Type de champ inconnu '%s' pour '%s' (%s).", field.__class__.__name__, field_name, field.label) num_dossier = result['num_dossier'] nom = reduce_to_alnum(result.get('nom','sans-nom')).upper() prenom = reduce_to_alnum(result.get('prenom','sans-prenom')).upper() adel = result.get('adresse_electronique','sans-adel').replace('@','-').lower() filename = "%04d-%s-%s-%s" % (num_dossier, nom, prenom, adel) liste_dossiers.append(filename + '.json') # copie des fichiers joints for f in qfiles: result[f] = filename + '_' + result[f] src = os.path.join(pub.app_dir, 'uploads', qfiles[f]) dst = os.path.join(output_directory, 'data', result[f]) if not os.path.exists(dst) or os.path.getmtime(src) > os.path.getmtime(dst): shutil.copy2(src, dst) os.chmod(dst, 0644) # génération du fichier JSON jsonname = os.path.join(output_directory, 'data', filename + '.json') f = open(jsonname, 'wb') f.write(json.dumps(result, ensure_ascii=False).encode('utf-8')) f.close() logging.info("Dossier '%s' : %s.", filename, result['wcs_workflow_status']) liste_dossiers.sort() f = open(os.path.join(output_directory, 'liste-dossiers.json'), 'wb') f.write(json.dumps(liste_dossiers, ensure_ascii=False)) f.close() if __name__ == '__main__': import sys if len(sys.argv) != 4: print >>sys.stderr, "Usage : %s " % sys.argv[0] sys.exit(1) VHOST = sys.argv[2] FORM_NAME = sys.argv[3] OUTPUT_DIRECTORY = os.path.join(sys.argv[1], VHOST, FORM_NAME) os.umask(0022) # création du dossier d'extraction, au besoin if not os.path.isdir(os.path.join(OUTPUT_DIRECTORY, 'data')): os.makedirs(os.path.join(OUTPUT_DIRECTORY, 'data'), 0755) logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(message)s', filename=os.path.join(OUTPUT_DIRECTORY, 'last-run.log'), filemode='w') logging.info('Début.') pub = publisher.WcsPublisher.create_publisher() pub.app_dir = os.path.join(pub.app_dir, VHOST) formdef = FormDef.get_by_urlname(FORM_NAME) extract_fields(formdef, OUTPUT_DIRECTORY) try: extract_data(formdef, OUTPUT_DIRECTORY) except: logging.exception("Interruption du traitement pour cause d'erreur !") logging.info('Fin.')