from typing import List from million.model.hole import Hole from million.model.message import Message from million.model.sequence import Sequence def compute_sequences(messages: List[Message], accepted_max: int = 1_000_000) -> List[Sequence]: sequences: List[Sequence] = [] current_sequence = Sequence( start=messages[0].get_counted_value(), start_message=messages[0], end=messages[0].get_counted_value(), end_message=messages[0] ) for i in range(1, len(messages)): message = messages[i] message_value = message.get_counted_value() if message_value > accepted_max: continue if message_value - current_sequence.end == 1: current_sequence.end = message_value current_sequence.end_message = message else: sequences.append(current_sequence) current_sequence = Sequence( start=message_value, start_message=message, end=message_value, end_message=message ) # order the sequences by start sequences.sort(key=lambda s: s.start) merged_sequences: List[Sequence] = [] current_sequence = sequences[0] for i in range(1, len(sequences)): sequence = sequences[i] sequence_start_is_in_current_sequence = current_sequence.start <= sequence.start and current_sequence.end >= sequence.start sequence_end_is_further = sequence.end > current_sequence.end sequence_start_is_current_end_or_next = sequence.start == current_sequence.end + 1 if sequence_start_is_in_current_sequence or sequence_start_is_current_end_or_next: if sequence_end_is_further: current_sequence.end = sequence.end current_sequence.end_message = sequence.end_message else: merged_sequences.append(current_sequence) current_sequence = sequence # Having merged the sequences once, any sequence having start = end can be removed return [s for s in merged_sequences if s.start != s.end] def find_holes(messages: List[Message], accepted_max: int = 1_000_000) -> List[Hole]: """ Find the holes in the conversation """ merged_sequences = compute_sequences(messages, accepted_max) holes = [] for i in range(1, len(merged_sequences)): previous_sequence = merged_sequences[i - 1] sequence = merged_sequences[i] if sequence.start - previous_sequence.end > 1: holes.append(Hole( start=previous_sequence.end, end=sequence.start, start_message=previous_sequence.end_message, end_message=sequence.start_message )) return holes def find_holesV2(messages: List[Message]) -> List[Hole]: current = 1 msg_idx = 0 threshold = 1000 result = [] while True: msgCurrent = next((m for m in messages if m.get_counted_value() == current), None) if msgCurrent: if len(result) > 0 and result[-1].end < 1: result[-1].end = current-1 result[-1].end_message = msgCurrent messages.remove(msgCurrent) current += 1 else: if len(result) == 0 or result[-1].end > 0: hole = Hole( start = current, end = 0, start_message = Message(sender_name='',timestamp_ms=0), end_message = Message(sender_name='',timestamp_ms=0) ) result.append(hole) elif current - result[-1].start > threshold: break return result