123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- import datetime as dt
- import re
- from typing import List
- from million.model.facebook_schema import Message
- from million.model.member_movement import MemberMovement, MovementType
-
- #########################
- # CONSTANTS
- #########################
-
- _AUTHOR_ADDED_MEMBERS = r'Vous avez ajouté (.+?) au groupe\.'
- _SEVERAL_PEOPLE = r'(.+?) et (.+?)'
- _N_OTHER_PEOPLE = r'(\d+) autres personnes'
- _MEMBER_ADDED_MEMBERS = r'(.+?) a ajouté (.+?) au groupe\.'
- _MEMBER_LEFT_GROUP = r'(.+) a quitté le groupe\.'
- _AUTHOR_FIRED_MEMBERS = r'Vous avez retiré (.+?) du groupe\.'
- _MEMBER_FIRED_MEMBERS = r'(.+?) a retiré (.+?) du groupe\.'
-
- #########################
- # GLOBALS
- #########################
-
- # As computing the members movements goes through analyzing
- # the history of messages, this variable keeps the list of the
- # members tracked as being in the group
- _tracked_members : List[str]
- # Datetime of the earliest message, considered as datetime of
- # the group creation
- _initial_date : dt.datetime
- # List of movements with unnamed people added to the conversation
- # through bulk adds
- _unnamed_adds : List[MemberMovement]
- # History of the movements in the members group list computed
- # through analyzing the group message history
- _movements : List[MemberMovement]
-
- #########################
- # PRIVATE FUNC
- #########################
-
- def _add_unnamed_adds(times: int, member_adding: str, addition_date: dt.datetime):
- for _ in range(times):
- unnamed_add = MemberMovement(None, member_adding, MovementType.IN, addition_date)
- _unnamed_adds.append(unnamed_add)
-
- def _add_unnamed_add(member_adding: str, addition_date: dt.datetime):
- unnamed_add = MemberMovement(None, member_adding, MovementType.IN, addition_date)
- _unnamed_adds.append(unnamed_add)
-
- def _add_member(new_member: str, member_adding: str, addition_date: dt.datetime, estimated: bool = False):
- movement = MemberMovement(new_member, member_adding, MovementType.IN, addition_date, estimated)
- _movements.append(movement)
-
- _tracked_members.append(new_member)
-
- def _remove_member(removed_member: str, member_removing: str, deletion_date: dt.datetime):
- movement = MemberMovement(removed_member, member_removing, MovementType.OUT, deletion_date)
- _movements.append(movement)
-
- _check_member_tracked(removed_member)
- _tracked_members.remove(removed_member)
-
- # Checks if the member is tracked by the algorithm. If not, it considers
- # he was added anonymously through a bulk add or he has been there since
- # the beginning
- def _check_member_tracked(member_name: str):
- if member_name in _tracked_members: return
-
- if len(_unnamed_adds) > 0:
- unnamed_add = _unnamed_adds.pop()
- added_by = unnamed_add.initiator
- added_time = unnamed_add.date_time
- else:
- added_by = member_name
- added_time = _initial_date
-
- _add_member(member_name, added_by, added_time, estimated=True)
-
- # Returns a list of all people names found inside a group of names
- # unnamed people will be returned as None
- # examples :
- # "Jean Dupont et Marie Buffet" -> ["Jean Dupont", "Marie Buffet"]
- # "Pierre Martin et 3 autres personnes" -> ["Pierre Martin", None, None, None]
- def _resolve_names_group(names_group: str) -> List[str | None]:
- match = re.fullmatch(_SEVERAL_PEOPLE, names_group)
- if not match: return [names_group]
-
- names = [match.group(1)]
- other_people = match.group(2)
-
- if match := re.fullmatch(_N_OTHER_PEOPLE, other_people):
- n = int(match.group(1))
- names.extend([None] * n)
- else:
- names.append(other_people)
-
- return names
-
- def _handle_author_added_members(match, message_date, sender_name):
- names_group = match.group(1)
- initiator = sender_name
- for name in _resolve_names_group(names_group):
- if name is None:
- _add_unnamed_add(initiator, message_date)
- else:
- _add_member(name, initiator, message_date)
-
- def _handle_member_added_members(match, message_date):
- names_group = match.group(2)
- initiator = match.group(1)
- for name in _resolve_names_group(names_group):
- if name is None:
- _add_unnamed_add(initiator, message_date)
- else:
- _add_member(name, initiator, message_date)
-
- def _handle_member_left_group(match, message_date):
- names_group = match.group(1)
- initiator = match.group(1)
- _remove_member(names_group, initiator, message_date)
-
- def _handle_author_fired_members(match, message_date, sender_name):
- names_group = match.group(1)
- initiator = sender_name
- _remove_member(names_group, initiator, message_date)
-
- def _handle_member_fired_members(match, message_date):
- names_group = match.group(2)
- initiator = match.group(1)
- _remove_member(names_group, initiator, message_date)
-
- # Uses message content to determine group arrivals and departures
- # by detecting system-generated messages
- def _handle_message_content(message: Message):
- content = message.content
- if content is None: return
-
- sender_name = message.sender_name
- message_date = message.date_time
-
- regex_actions = [
- (_AUTHOR_ADDED_MEMBERS, _handle_author_added_members, sender_name),
- (_MEMBER_ADDED_MEMBERS, _handle_member_added_members),
- (_MEMBER_LEFT_GROUP, _handle_member_left_group),
- (_AUTHOR_FIRED_MEMBERS, _handle_author_fired_members, sender_name),
- (_MEMBER_FIRED_MEMBERS, _handle_member_fired_members)
- ]
-
- for pattern, handler, *extra_args in regex_actions:
- if (match := re.fullmatch(pattern, content)):
- handler(match, message_date, *extra_args)
- break
-
- # Reset every global variables
- def _initialize(initial_date: dt.datetime):
- global _tracked_members, _initial_date, _movements, _unnamed_adds
- _tracked_members = []
- _movements = []
- _unnamed_adds = []
- _initial_date = initial_date
-
- def compute_members_movements(messages: List[Message], participants: List[str] = []) -> List[MemberMovement]:
- _initialize(messages[0].date_time)
-
- for message in messages:
- _check_member_tracked(message.sender_name)
- _handle_message_content(message)
-
- for p in participants:
- _check_member_tracked(p)
-
- return sorted(_movements, key = lambda move: move.date_time)
|