1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768 |
-
-
- from typing import List
- from million.model.hole import Hole
- from million.model.message import Message
- from million.model.sequence import Sequence
- import million.analyze.message_evaluation as msg_ev
-
-
- def compute_sequences(messages: List[Message], accepted_max: int = 1_000_000) -> List[Sequence]:
- sequences: List[Sequence] = []
- current_sequence = Sequence(
- start_message=messages[0],
- end_message=messages[0]
- )
- for i in range(1, len(messages)):
- message = messages[i]
- message_value = msg_ev.compute(message)
- if message_value > accepted_max:
- continue
- if message_value - current_sequence.end() == 1:
- current_sequence.end_message = message
- else:
- sequences.append(current_sequence)
- current_sequence = Sequence(
- start_message=message,
- end_message=message
- )
-
- # order the sequences by start
- sequences.sort(key=lambda s: s.start())
-
- merged_sequences: List[Sequence] = []
- current_sequence = sequences[0]
- for i in range(1, len(sequences)):
- sequence = sequences[i]
- sequence_start_is_in_current_sequence = current_sequence.start() <= sequence.start() and current_sequence.end() >= sequence.start()
- sequence_end_is_further = sequence.end() > current_sequence.end()
- sequence_start_is_current_end_or_next = sequence.start() == current_sequence.end() + 1
-
- if sequence_start_is_in_current_sequence or sequence_start_is_current_end_or_next:
- if sequence_end_is_further:
- current_sequence.end_message = sequence.end_message
- else:
- merged_sequences.append(current_sequence)
- current_sequence = sequence
-
- # Having merged the sequences once, any sequence having start = end can be removed
- return [s for s in merged_sequences if s.start() != s.end()]
-
-
- def find_holes(messages: List[Message], accepted_max: int = 1_000_000) -> List[Hole]:
- """
- Find the holes in the conversation
- """
- merged_sequences = compute_sequences(messages, accepted_max)
- holes = []
- for i in range(1, len(merged_sequences)):
- previous_sequence = merged_sequences[i - 1]
- sequence = merged_sequences[i]
- if sequence.start() - previous_sequence.end() > 1:
- holes.append(Hole(
- start=previous_sequence.end(),
- end=sequence.start(),
- start_message=previous_sequence.end_message,
- end_message=sequence.start_message
- ))
- return holes
|