import time from typing import List from million.model.hole import Hole from million.model.message import Message from million.model.sequence import Sequence def compute_sequences(messages: List[Message], accepted_max: int = 1_000_000) -> List[Sequence]: sequences: List[Sequence] = [] current_sequence = Sequence( start=messages[0].get_counted_value(), start_message=messages[0], end=messages[0].get_counted_value(), end_message=messages[0] ) for i in range(1, len(messages)): message = messages[i] message_value = message.get_counted_value() if message_value > accepted_max: continue if message_value - current_sequence.end == 1: current_sequence.end = message_value current_sequence.end_message = message else: sequences.append(current_sequence) current_sequence = Sequence( start=message_value, start_message=message, end=message_value, end_message=message ) # order the sequences by start sequences.sort(key=lambda s: s.start) merged_sequences: List[Sequence] = [] current_sequence = sequences[0] for i in range(1, len(sequences)): sequence = sequences[i] sequence_start_is_in_current_sequence = current_sequence.start <= sequence.start and current_sequence.end >= sequence.start sequence_end_is_further = sequence.end > current_sequence.end sequence_start_is_current_end_or_next = sequence.start == current_sequence.end + 1 if sequence_start_is_in_current_sequence or sequence_start_is_current_end_or_next: if sequence_end_is_further: current_sequence.end = sequence.end current_sequence.end_message = sequence.end_message else: merged_sequences.append(current_sequence) current_sequence = sequence # Having merged the sequences once, any sequence having start = end can be removed return [s for s in merged_sequences if s.start != s.end] def find_holes(messages: List[Message], accepted_max: int = 1_000_000) -> List[Hole]: """ Find the holes in the conversation """ merged_sequences = compute_sequences(messages, accepted_max) holes = [] for i in range(1, len(merged_sequences)): previous_sequence = merged_sequences[i - 1] sequence = merged_sequences[i] if sequence.start - previous_sequence.end > 1: holes.append(Hole( start=previous_sequence.end, end=sequence.start, start_message=previous_sequence.end_message, end_message=sequence.start_message )) return holes def _find_value_around_index(messages: List[Message], value, idx, amplitude) -> int: check_value = lambda x: messages[x].get_counted_value() == value if check_value(idx): return idx for offset in range(1, amplitude): o_idx = idx + offset * +1 if check_value(o_idx): return o_idx o_idx = idx + offset * -1 if check_value(o_idx): return o_idx return -1 def _open_sequence(sequences: List[Sequence], msg: Message): sequence = Sequence( start=msg.get_counted_value(), start_message=msg, end=-1, end_message=msg ) sequences.append(sequence) def _close_sequence(sequences: List[Sequence]): if len(sequences) == 0: return sequences[-1].end = sequences[-1].end_message.get_counted_value() def _opened_sequence(sequences: List[Sequence]): return len(sequences) > 0 and sequences[-1].end == -1 def find_sequences_v2(messages: List[Message]) -> List[Sequence]: current = 1 base_idx = 0 amplitude = 200 sequences = [] while base_idx < len(messages): curr_idx = _find_value_around_index(messages, current, base_idx, amplitude) print(f"searching {current} from [{messages[base_idx]}]\t-> {'Not found' if curr_idx == -1 else 'Itself' if curr_idx == base_idx else messages[curr_idx]}") if curr_idx != -1: #trouvé if not _opened_sequence(sequences): _open_sequence(sequences, messages[curr_idx]) else: sequences[-1].end_message = messages[curr_idx] base_idx = curr_idx + 1 current += 1 else: # pas trouvé # fermer la sequence si ouverte if _opened_sequence(sequences): _close_sequence(sequences) if messages[base_idx].get_counted_value() < current: base_idx += 1 else: current += 1 #time.sleep(.005) return sequences