from typing import List from million.model.hole import Hole from million.model.message import Message from million.model.sequence import Sequence def compute_sequences(messages: List[Message], accepted_max: int = 1_000_000) -> List[Sequence]: sequences: List[Sequence] = [] current_sequence = Sequence( start=messages[0].get_counted_value(), start_message=messages[0], end=messages[0].get_counted_value(), end_message=messages[0] ) for i in range(1, len(messages)): message = messages[i] message_value = message.get_counted_value() if message_value > accepted_max: continue if message_value - current_sequence.end == 1: current_sequence.end = message_value current_sequence.end_message = message else: sequences.append(current_sequence) current_sequence = Sequence( start=message_value, start_message=message, end=message_value, end_message=message ) # order the sequences by start sequences.sort(key=lambda s: s.start) merged_sequences: List[Sequence] = [] current_sequence = sequences[0] for i in range(1, len(sequences)): sequence = sequences[i] sequence_start_is_in_current_sequence = current_sequence.start <= sequence.start and current_sequence.end >= sequence.start sequence_end_is_further = sequence.end > current_sequence.end sequence_start_is_current_end_or_next = sequence.start == current_sequence.end + 1 if sequence_start_is_in_current_sequence or sequence_start_is_current_end_or_next: if sequence_end_is_further: current_sequence.end = sequence.end current_sequence.end_message = sequence.end_message else: merged_sequences.append(current_sequence) current_sequence = sequence # Having merged the sequences once, any sequence having start = end can be removed return [s for s in merged_sequences if s.start != s.end] def find_holes(messages: List[Message], accepted_max: int = 1_000_000) -> List[Hole]: """ Find the holes in the conversation """ merged_sequences = compute_sequences(messages, accepted_max) holes = [] for i in range(1, len(merged_sequences)): previous_sequence = merged_sequences[i - 1] sequence = merged_sequences[i] if sequence.start - previous_sequence.end > 1: holes.append(Hole( start=previous_sequence.end, end=sequence.start, start_message=previous_sequence.end_message, end_message=sequence.start_message )) return holes