from typing import List from million.model.hole import Hole from million.model.message import Message from million.model.sequence import Sequence def compute_sequences(messages: List[Message], accepted_max: int = 1_000_000) -> List[Sequence]: sequences: List[Sequence] = [] current_sequence = Sequence( start=messages[0].get_counted_value(), start_message=messages[0], end=messages[0].get_counted_value(), end_message=messages[0] ) for i in range(1, len(messages)): message = messages[i] message_value = message.get_counted_value() if message_value > accepted_max: continue if message_value - current_sequence.end == 1: current_sequence.end = message_value current_sequence.end_message = message else: sequences.append(current_sequence) current_sequence = Sequence( start=message_value, start_message=message, end=message_value, end_message=message ) # order the sequences by start sequences.sort(key=lambda s: s.start) merged_sequences: List[Sequence] = [] current_sequence = sequences[0] for i in range(1, len(sequences)): sequence = sequences[i] sequence_start_is_in_current_sequence = current_sequence.start <= sequence.start and current_sequence.end >= sequence.start sequence_end_is_further = sequence.end > current_sequence.end sequence_start_is_current_end_or_next = sequence.start == current_sequence.end + 1 if sequence_start_is_in_current_sequence or sequence_start_is_current_end_or_next: if sequence_end_is_further: current_sequence.end = sequence.end current_sequence.end_message = sequence.end_message else: merged_sequences.append(current_sequence) current_sequence = sequence # Having merged the sequences once, any sequence having start = end can be removed return [s for s in merged_sequences if s.start != s.end] def find_holes(messages: List[Message], accepted_max: int = 1_000_000) -> List[Hole]: """ Find the holes in the conversation """ merged_sequences = compute_sequences(messages, accepted_max) holes = [] for i in range(1, len(merged_sequences)): previous_sequence = merged_sequences[i - 1] sequence = merged_sequences[i] if sequence.start - previous_sequence.end > 1: holes.append(Hole( start=previous_sequence.end, end=sequence.start, start_message=previous_sequence.end_message, end_message=sequence.start_message )) return holes def find_holesV2(messages: List[Message]) -> List[Hole]: current = 1 msg_idx = 0 threshold = 1000 limitAhead = 100 limitBehind = 20 holes = [] while msg_idx < len(messages): #search value current in messages from msgIdx, with lookahead then lookbehind for i in range(0, limitAhead): msgCurrent = messages[msg_idx + i] if msgCurrent.get_counted_value() == current: break if msgCurrent.get_counted_value() != current: for i in range(1, limitBehind): msgCurrent = messages[msg_idx - i] if msgCurrent.get_counted_value() == current: break if msgCurrent.get_counted_value() == current: # la valeur current a été trouvé dans la zone de recherche print(f"{msgCurrent.sender_name} : {msgCurrent.content}") # si un trou était ouvert il faut le fermer if len(holes) > 0 and holes[-1].end == 0: holes[-1].end = current-1 holes[-1].end_message = msgCurrent print(f"\t{current-1}") msg_idx += 1 else: # la valeur current n'a pas été trouvée # on est dans un trou # si aucun trou n'est ouvert, on en crée un if len(holes) == 0 or holes[-1].end > 0: hole = Hole( start=current, end=0, start_message=messages[msg_idx], end_message=Message(sender_name='',timestamp_ms=0) ) holes.append(hole) print(f"\t HOLE : {hole.start}\n\t\t...") current += 1 return holes