from typing import List from million.model.hole import Hole from million.model.message import Message from million.model.sequence import Sequence import million.analyze.message_evaluation as msg_val def compute_sequences(messages: List[Message], accepted_max: int = 1_000_000) -> List[Sequence]: sequences: List[Sequence] = [] current = Sequence(start_message=messages[0]) for message in messages[1:]: if msg_val.get(message) > accepted_max: continue if msg_val.get(message) == current.end() + 1: current.end_message = message else: sequences.append(current) current = Sequence(start_message=message) # order the sequences by start sequences.sort(key=lambda s: s.start()) merged_sequences: List[Sequence] = [] previous = sequences[0] for sequence in sequences[1:]: if previous.overlaps(sequence): previous.merge(sequence) else: merged_sequences.append(previous) previous = sequence # Having merged the sequences once, any sequence having start = end can be removed return [s for s in merged_sequences if s.length() > 1] def find_holes(messages: List[Message], accepted_max: int = 1_000_000) -> List[Hole]: """ Find the holes in the conversation """ merged_sequences = compute_sequences(messages, accepted_max) holes = [] for i in range(1, len(merged_sequences)): previous_sequence = merged_sequences[i - 1] sequence = merged_sequences[i] if sequence.start() - previous_sequence.end() > 1: holes.append(Hole( start=previous_sequence.end(), end=sequence.start(), start_message=previous_sequence.end_message, end_message=sequence.start_message )) return holes