+# -*- coding: utf-8 -*-
+
import time, datetime
import operator
annees.sort(reverse=True)
return annees
- def get_q_inconnu(self, prefix):
- date_debut_nulle = Q(**{"%s%s__isnull" % (prefix, KEY_DATE_DEBUT) : True})
- date_fin_nulle = Q(**{"%s%s__isnull" % (prefix, KEY_DATE_FIN) : True})
- return Q(date_debut_nulle & date_fin_nulle)
-
- def get_q_range(self, prefix, borne_gauche=None, borne_droite=None):
-
- date_debut_nulle = Q(**{"%s%s__isnull" % (prefix, KEY_DATE_DEBUT) : True})
- date_fin_nulle = Q(**{"%s%s__isnull" % (prefix, KEY_DATE_FIN) : True})
- date_debut_superieure_ou_egale_a_borne_gauche = Q(**{"%s%s__gte" % (prefix, KEY_DATE_DEBUT) : borne_gauche})
- date_debut_strict_superieure_ou_egale_a_borne_gauche = Q(**{"%s%s__gt" % (prefix, KEY_DATE_DEBUT) : borne_gauche})
- date_debut_inferieure_ou_egale_a_borne_gauche = Q(**{"%s%s__lte" % (prefix, KEY_DATE_DEBUT) : borne_gauche})
- date_fin_superieure_ou_egale_a_borne_gauche = Q(**{"%s%s__gte" % (prefix, KEY_DATE_FIN) : borne_gauche})
- date_fin_inferieure_ou_egale_a_borne_droite = Q(**{"%s%s__lte" % (prefix, KEY_DATE_FIN) : borne_droite})
- date_fin_strict_inferieure_ou_egale_a_borne_droite = Q(**{"%s%s__lt" % (prefix, KEY_DATE_FIN) : borne_droite})
- date_debut_inferieure_ou_egale_a_borne_droite = Q(**{"%s%s__lte" % (prefix, KEY_DATE_DEBUT) : borne_droite})
- date_fin_superieure_ou_egale_a_borne_droite = Q(**{"%s%s__gte" % (prefix, KEY_DATE_FIN) : borne_droite})
-
- if borne_droite is None:
- q_range = date_debut_strict_superieure_ou_egale_a_borne_gauche
+ def get_q_actifs(self):
+ qs = self.model.objects.get_query_set()
+ q = qs.get_q_actifs()
+ return q
- if borne_gauche is None:
- q_range = date_fin_strict_inferieure_ou_egale_a_borne_droite
+ def get_q_inactifs(self):
+ qs = self.model.objects.get_query_set()
+ q = qs.get_q_inactifs()
+ return q
- if borne_droite is not None and borne_gauche is not None:
- q_range = (date_debut_superieure_ou_egale_a_borne_gauche & date_fin_inferieure_ou_egale_a_borne_droite) | \
- ((date_debut_inferieure_ou_egale_a_borne_gauche | date_debut_nulle) & date_fin_superieure_ou_egale_a_borne_gauche & date_fin_inferieure_ou_egale_a_borne_droite) | \
- ((date_fin_superieure_ou_egale_a_borne_droite | date_fin_nulle) & date_debut_inferieure_ou_egale_a_borne_droite) | \
- (date_debut_inferieure_ou_egale_a_borne_gauche & date_fin_superieure_ou_egale_a_borne_droite)
+ def get_q_futurs(self):
+ qs = self.model.objects.get_query_set()
+ q = qs.get_q_futurs()
+ return q
- if borne_droite is None and borne_gauche is None:
- q_range = Q()
+ def get_q_inconnus(self):
+ qs = self.model.objects.get_query_set()
+ q = qs.get_q_inconnus()
+ return q
- return q_range
+ def get_q_range(self, prefix, borne_gauche=None, borne_droite=None):
+ # on filtre si on a au moins une borne, car par défaut q_actifs est
+ # un filtre sur actif à l'instant présent
+ if borne_gauche is not None and borne_droite is not None:
+ qs = self.model.objects.get_query_set()
+ q = qs.get_q_actifs(borne_gauche, borne_droite)
+ return q
+ else:
+ return Q()
def purge_params(self, lookup_params):
self.lookup_recherche_temporelle = {}
prefix = self.get_prefix()
borne_gauche = None
borne_droite = None
+
for k, v in self.lookup_recherche_temporelle.items():
if k.endswith(KEY_ANNEE):
if k.endswith(KEY_STATUT):
aujourdhui = datetime.date.today()
if v == STATUT_ACTIF:
- borne_gauche = aujourdhui
- borne_droite = aujourdhui
+ q = q & self.get_q_actifs()
elif v == STATUT_INACTIF:
- # dans le cas d'une FK, on retire des inactifs ceux qui ont une FK active
- if prefix != "":
- q_range = self.get_q_range(prefix, aujourdhui, aujourdhui)
- id_actifs = [o.id for o in qs.filter(q_range).distinct()]
- qs = qs.exclude(id__in=id_actifs)
+ q = q & self.get_q_inactifs()
borne_droite = aujourdhui
elif v == STATUT_FUTUR:
- borne_gauche = aujourdhui
+ q = q & self.get_q_futurs()
elif v == STATUT_INCONNU:
- q = q & self.get_q_inconnu(prefix)
+ q = q & self.get_q_inconnus()
q_range = self.get_q_range(prefix, borne_gauche, borne_droite)
return q & q_range
use_distinct = True
qs = self.root_query_set
+
lookup_params = self.params.copy() # a dictionary of the query string
for i in (ALL_VAR, ORDER_VAR, ORDER_TYPE_VAR, SEARCH_VAR, IS_POPUP_VAR, TO_FIELD_VAR):
if i in lookup_params:
q_temporel = self.get_q_temporel(qs)
q = Q(**lookup_params) & q_temporel
-
+
# Apply lookup parameters from the query string.
try:
qs = qs.filter(q)
class ActifsQuerySet(QuerySet):
+ debut_field = 'date_debut'
+ fin_field = 'date_fin'
- def _actifs(self, debut_field, fin_field, date_min=None, date_max=None,
- annee=None):
+ def get_q_actifs(self, date_min=None, date_max=None, annee=None):
+ q = Q()
+ if annee:
+ janvier = date(annee, 1, 1)
+ decembre = date(annee, 12, 31)
+ date_min = max(janvier, date_min) if date_min else janvier
+ date_max = min(decembre, date_max) if date_max else decembre
+ if not date_min and not date_max:
+ date_min = date_max = date.today()
+ if date_min:
+ q = q & (
+ Q(**{self.fin_field + '__gte': date_min}) |
+ Q(**{self.fin_field: None}))
+ if date_max:
+ q = q & (
+ Q(**{self.debut_field + '__lte': date_max}) |
+ Q(**{self.debut_field: None}))
+
+ q_inconnus = self.get_q_inconnus()
+ return q & ~q_inconnus
+
+ def actifs(self, date_min=None, date_max=None, annee=None):
qs = self
+ q = self.get_q_actifs(date_min, date_max, annee)
+ return qs.filter(q)
+
+ def get_q_inconnus(self, date_min=None, date_max=None, annee=None):
+ q = Q(**{
+ self.debut_field + '__isnull': True,
+ self.fin_field + '__isnull': True,
+ })
+ return q
+
+ def inconnus(self):
+ qs = self
+ q = self.get_q_inconnus()
+ return qs.filter(q)
+
+ def get_q_inactifs(self, date_min=None, date_max=None, annee=None):
+ actifs_ids = [e['id'] for e in self.actifs(date_min, date_max,
+ annee).values('id')]
+ q_non_actifs = ~Q(id__in=actifs_ids)
+ q_inconnus = self.get_q_inconnus()
+ q_futurs = self.get_q_futurs(date_min, date_max, annee)
+ return q_non_actifs & ~q_inconnus & ~q_futurs
+
+ def inactifs(self, date_min=None, date_max=None, annee=None):
+ return self.get_q_inactifs(date_min, date_max, annee)
+
+ def get_q_futurs(self, date_min=None, date_max=None, annee=None):
+ q = Q()
if annee:
janvier = date(annee, 1, 1)
decembre = date(annee, 12, 31)
if not date_min and not date_max:
date_min = date_max = date.today()
if date_min:
- qs = qs.filter(
- Q(**{fin_field + '__gte': date_min}) |
- Q(**{fin_field: None}))
+ q = q & Q(**{self.debut_field + '__gt': date_min})
if date_max:
- qs = qs.filter(
- Q(**{debut_field + '__lte': date_max}) |
- Q(**{debut_field: None}))
- return qs
+ q = q & Q(**{self.debut_field + '__gt': date_max})
+ return q
- def actifs(self, *args, **kwargs):
- return self._actifs('date_debut', 'date_fin', *args, **kwargs)
+ def futurs(self, date_min=None, date_max=None, annee=None):
+ return self.get_q_futurs(date_min, date_max, annee)
class PosteQuerySet(ActifsQuerySet):
def get_query_set(self):
return PosteQuerySet(self.model).select_related('type_poste')
- def actifs(self, *args, **kwargs):
- return self.get_query_set().actifs(*args, **kwargs)
-
def ma_region_ou_service(self, user):
return super(PosteManager, self).ma_region_ou_service(user)
return DossierQuerySet(self.model) \
.select_related('poste', 'employe')
- def actifs(self, *args, **kwargs):
- return self.get_query_set().actifs(*args, **kwargs)
-
def sans_contrats_ou_echus(self, *args, **kwargs):
app = self.model._meta.app_label
sql = """
class EmployeQuerySet(ActifsQuerySet):
-
- def actifs(self, date_min=None, date_max=None, annee=None):
- return self \
- ._actifs('rh_dossiers__date_debut', 'rh_dossiers__date_fin') \
- .distinct()
+ debut_field = 'rh_dossiers__date_debut'
+ fin_field = 'rh_dossiers__date_fin'
class EmployeManager(models.Manager):
return self.get_query_set().actifs(*args, **kwargs)
+class ContratQuerySet(ActifsQuerySet):
+ pass
+
+class ContratManager(models.Manager):
+
+ def get_query_set(self):
+ return ContratQuerySet(self.model)\
+ .select_related('dossier', 'dossier__poste')
+
+
class PosteComparaisonManager(SecurityManager):
use_for_related_fields = True
prefixe_implantation = "implantation__region"
DossierComparaisonManager, \
PosteComparaisonManager, \
TypeRemunerationManager, \
+ ContratManager, \
RemunerationManager
from project.rh.validators import validate_date_passee
### CONTRATS
-class ContratManager(models.Manager):
-
- def get_query_set(self):
- return super(ContratManager, self).get_query_set() \
- .select_related('dossier', 'dossier__poste')
-
-
class Contrat_(models.Model):
"""
Document juridique qui encadre la relation de travail d'un Employe
# on le porte dans le référentiel employé
ref_e = ref.Employe(id=u.id,
nom=e.nom, prenom=e.prenom,
- implantation=e.poste_principal().implantation,
- implantation_physique=e.poste_principal().implantation,
+ implantation=e.dossier_principal().poste.implantation,
+ implantation_physique=e.dossier_principal().poste.implantation,
service=ref.Service.objects.get(id=1))
ref_e.save()
# -*- coding: utf-8 -*-
+import datetime
from django.core.urlresolvers import reverse
from project.rh.test.common import RhTest
+from project.rh.change_list import KEY_STATUT, STATUT_ACTIF, STATUT_INACTIF,\
+ STATUT_FUTUR
+
+
+class EmployeVolumetrieTest(RhTest):
+ """
+ Test les notions d'actifs / inactifs
+ """
+
+ today = datetime.today()
+ annee_moins_2 = datetime.date(today.year - 2, 1, 1)
+ annee_moins_1 = datetime.date(today.year - 1, 1, 1)
+ annee_plus_2 = datetime.date(today.year + 2, 1, 1)
+ annee_plus_1 = datetime.date(today.year + 1, 1, 1)
+
+ def test_actifs(self):
+ """
+ Test la remontée des dossiers actifs pour 4 employés différents
+ avec chacun un dossier.
+ """
+ # 2 dossiers actifs
+ self.dossier_cnf_ngaoundere.date_debut = self.annee_moins_1
+ self.dossier_cnf_ngaoundere.date_fin = self.annee_plus_1
+ self.dossier_cnf_ngaoundere.save()
+
+ self.dossier_cnf_bangui.date_debut = self.annee_moins_1
+ self.dossier_cnf_bangui.date_fin = self.annee_plus_1
+ self.dossier_cnf_bangui.save()
+
+ # 1 dossier inactif
+ self.dossier_bap_bureau.date_debut = self.annee_moins_2
+ self.dossier_bap_bureau.date_fin = self.annee_moins_1
+ self.dossier_bap_bureau.save()
+
+ # 1 dossier futur
+ self.dossier_bap_ifi.date_debut = self.annee_plus_1
+ self.dossier_bap_ifi.date_fin = self.annee_plus_2
+ self.dossier_bap_ifi.save()
+
+ self._test_drh()
+ url = reverse('admin:rh_employe_changelist')
+ params = {KEY_STATUT: STATUT_ACTIF}
+ qs = self.client.get(url, params).context['cl'].query_set
+ self.assertEqual(len(qs), 2)
+
+ # Puis 1 seul dossier actif
+ self.dossier_cnf_bangui.date_debut = self.annee_moins_2
+ self.dossier_cnf_bangui.date_fin = self.annee_moins_1
+ self.dossier_cnf_bangui.save()
+
+ qs = self.client.get(url, params).context['cl'].query_set
+ self.assertEqual(len(qs), 1)
+
+ def test_inactifs(self):
+ """
+ Test la remontée des dossiers inactifs de 4 employés différents
+ avec chacun un dossier
+ """
+ # 2 dossiers actifs
+ self.dossier_cnf_ngaoundere.date_debut = self.annee_moins_1
+ self.dossier_cnf_ngaoundere.date_fin = self.annee_plus_1
+ self.dossier_cnf_ngaoundere.save()
+
+ self.dossier_cnf_bangui.date_debut = self.annee_moins_1
+ self.dossier_cnf_bangui.date_fin = self.annee_plus_1
+ self.dossier_cnf_bangui.save()
+
+ # 1 dossier inactif
+ self.dossier_bap_bureau.date_debut = self.annee_moins_2
+ self.dossier_bap_bureau.date_fin = self.annee_moins_1
+ self.dossier_bap_bureau.save()
+
+ # 1 dossier futur
+ self.dossier_bap_ifi.date_debut = self.annee_plus_1
+ self.dossier_bap_ifi.date_fin = self.annee_plus_2
+ self.dossier_bap_ifi.save()
+
+ self._test_drh()
+ url = reverse('admin:rh_employe_changelist')
+ params = {KEY_STATUT: STATUT_INACTIF}
+ qs = self.client.get(url, params).context['cl'].query_set
+ self.assertEqual(len(qs), 1)
+
+ # Puis 1 seul dossier actif
+ self.dossier_cnf_bangui.date_debut = self.annee_moins_2
+ self.dossier_cnf_bangui.date_fin = self.annee_moins_1
+ self.dossier_cnf_bangui.save()
+
+ qs = self.client.get(url, params).context['cl'].query_set
+ self.assertEqual(len(qs), 2)
+
+ def test_futurs(self):
+ """
+ Test la remontée des dossiers futurs pour 4 employés différents
+ avec chacun un dossier.
+ """
+ # 2 dossiers actifs
+ self.dossier_cnf_ngaoundere.date_debut = self.annee_moins_1
+ self.dossier_cnf_ngaoundere.date_fin = self.annee_plus_1
+ self.dossier_cnf_ngaoundere.save()
+
+ self.dossier_cnf_bangui.date_debut = self.annee_moins_1
+ self.dossier_cnf_bangui.date_fin = self.annee_plus_1
+ self.dossier_cnf_bangui.save()
+
+ # 1 dossier inactif
+ self.dossier_bap_bureau.date_debut = self.annee_moins_2
+ self.dossier_bap_bureau.date_fin = self.annee_moins_1
+ self.dossier_bap_bureau.save()
+
+ # 1 dossier futur
+ self.dossier_bap_ifi.date_debut = self.annee_plus_1
+ self.dossier_bap_ifi.date_fin = self.annee_plus_2
+ self.dossier_bap_ifi.save()
+
+ self._test_drh()
+ url = reverse('admin:rh_employe_changelist')
+ params = {KEY_STATUT: STATUT_FUTUR}
+ qs = self.client.get(url, params).context['cl'].query_set
+ self.assertEqual(len(qs), 1)
+
+ # Puis 1 seul dossier actif
+ self.dossier_cnf_bangui.date_debut = self.annee_moins_2
+ self.dossier_cnf_bangui.date_fin = self.annee_moins_1
+ self.dossier_cnf_bangui.save()
+
+ qs = self.client.get(url, params).context['cl'].query_set
+ self.assertEqual(len(qs), 1)
+
+ def test_2_dossiers_actifs(self):
+ """
+ Test employe qui a 2 dossiers actifs.
+ """
+ self.dossier_bap_bureau.delete()
+ self.dossier_bap_ifi.delete()
+
+ # 2 dossiers actifs
+ self.dossier_cnf_ngaoundere.date_debut = self.annee_moins_1
+ self.dossier_cnf_ngaoundere.date_fin = self.annee_plus_1
+ self.dossier_cnf_ngaoundere.save()
+
+ self.dossier_cnf_bangui.date_debut = self.annee_moins_1
+ self.dossier_cnf_bangui.date_fin = self.annee_plus_1
+ self.dossier_cnf_bangui.save()
+
+ self._test_drh()
+ url = reverse('admin:rh_employe_changelist')
+ params = {KEY_STATUT: STATUT_ACTIF}
+
+ qs = self.client.get(url, params).context['cl'].query_set
+ self.assertEqual(len(qs), 1)
+
+ def test_1_dossier_actif(self):
+ """
+ Test employe qui a 1 dossier actifs et 1 inactif.
+ """
+ self.dossier_bap_bureau.delete()
+ self.dossier_bap_ifi.delete()
+
+ self.dossier_cnf_ngaoundere = self.employe_cnf_ngaoundere
+ self.dossier_cnf_ngaoundere.date_debut = self.annee_moins_2
+ self.dossier_cnf_ngaoundere.date_fin = None
+ self.dossier_cnf_ngaoundere.save()
+
+ self.dossier_cnf_bangui.employe = self.employe_cnf_ngaoundere
+ self.dossier_cnf_bangui.date_debut = self.annee_moins_2
+ self.dossier_cnf_bangui.date_fin = self.annee_moins_1
+ self.dossier_cnf_bangui.save()
+
+ self._test_drh()
+ url = reverse('admin:rh_employe_changelist')
+ params = {KEY_STATUT: STATUT_ACTIF}
+
+ qs = self.client.get(url, params).context['cl'].query_set
+ self.assertEqual(len(qs), 1)
+
+ def test_2_dossiers_inactifs(self):
+ """
+ Test employe qui a 2 dossiers inactifs.
+ """
+ self.dossier_bap_bureau.delete()
+ self.dossier_bap_ifi.delete()
+
+ self.dossier_cnf_ngaoundere.employe = self.employe_cnf_ngaoundere
+ self.dossier_cnf_ngaoundere.date_debut = self.annee_moins_2
+ self.dossier_cnf_ngaoundere.date_fin = self.annee_moins__1
+ self.dossier_cnf_ngaoundere.save()
+
+ self.dossier_cnf_bangui.employe = self.employe_cnf_ngaoundere
+ self.dossier_cnf_bangui.date_debut = self.annee_moins_2
+ self.dossier_cnf_bangui.date_fin = self.annee_moins__1
+ self.dossier_cnf_bangui.save()
+
+ self._test_drh()
+ url = reverse('admin:rh_employe_changelist')
+ params = {KEY_STATUT: STATUT_ACTIF}
+
+ qs = self.client.get(url, params).context['cl'].query_set
+ self.assertEqual(len(qs), 0)
+
class EmployeAddTest(RhTest):
"""
def _test_grp_service_utilisateurs(self):
"""
- Un membre du groupe service utilisateur ne peut pas supprimer un employé
+ Un membre du groupe service utilisateur
+ ne peut pas supprimer un employé
"""
self._test_grp_service_utilisateurs()
self._test_acces_ko(self.url)
def test_drh2(self):
"""
- Un DRH (2ieme niveau) peut voir tous les aperçus d'employés avec tous les dossiers
+ Un DRH (2ieme niveau) peut voir tous les aperçus d'employés
+ avec tous les dossiers
"""
self._test_drh2()
self._test_acces_ok(self.url)
def _test_grp_haute_direction(self):
"""
- Un membre de la haute direction ne peut pas voir les aperçus d'employés
+ Un membre de la haute direction ne peut pas voir
+ les aperçus d'employés
"""
self._test_grp_haute_direction()
self._test_acces_ko(self.url)
def _test_grp_service_utilisateurs(self):
"""
- Un membre du groupe service utilisateur ne pas pas voir les aperçus d'employés
+ Un membre du groupe service utilisateur ne pas pas voir
+ les aperçus d'employés
"""
self._test_grp_service_utilisateurs()
self._test_acces_ko(self.url)
USE_L10N = True
USE_THOUSAND_SEPARATOR = False
DATE_FORMAT = 'd-m-Y'
+DATE_INPUT_FORMATS = ('%d-%m-%Y', )
SESSION_SAVE_EVERY_REQUEST = True
SESSION_EXPIRE_AT_BROWSER_CLOSE = True