123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
-
- import time
- from typing import List
- from million.model.hole import Hole
- from million.model.message import Message
- from million.model.sequence import Sequence
-
-
- def compute_sequences(messages: List[Message], accepted_max: int = 1_000_000) -> List[Sequence]:
- sequences: List[Sequence] = []
- current_sequence = Sequence(
- start=messages[0].get_counted_value(),
- start_message=messages[0],
- end=messages[0].get_counted_value(),
- end_message=messages[0]
- )
- for i in range(1, len(messages)):
- message = messages[i]
- message_value = message.get_counted_value()
- if message_value > accepted_max:
- continue
- if message_value - current_sequence.end == 1:
- current_sequence.end = message_value
- current_sequence.end_message = message
- else:
- sequences.append(current_sequence)
- current_sequence = Sequence(
- start=message_value,
- start_message=message,
- end=message_value,
- end_message=message
- )
-
- # order the sequences by start
- sequences.sort(key=lambda s: s.start)
-
- merged_sequences: List[Sequence] = []
- current_sequence = sequences[0]
- for i in range(1, len(sequences)):
- sequence = sequences[i]
- sequence_start_is_in_current_sequence = current_sequence.start <= sequence.start and current_sequence.end >= sequence.start
- sequence_end_is_further = sequence.end > current_sequence.end
- sequence_start_is_current_end_or_next = sequence.start == current_sequence.end + 1
-
- if sequence_start_is_in_current_sequence or sequence_start_is_current_end_or_next:
- if sequence_end_is_further:
- current_sequence.end = sequence.end
- current_sequence.end_message = sequence.end_message
- else:
- merged_sequences.append(current_sequence)
- current_sequence = sequence
-
- # Having merged the sequences once, any sequence having start = end can be removed
- return [s for s in merged_sequences if s.start != s.end]
-
-
- def find_holes(messages: List[Message], accepted_max: int = 1_000_000) -> List[Hole]:
- """
- Find the holes in the conversation
- """
- merged_sequences = compute_sequences(messages, accepted_max)
- holes = []
- for i in range(1, len(merged_sequences)):
- previous_sequence = merged_sequences[i - 1]
- sequence = merged_sequences[i]
- if sequence.start - previous_sequence.end > 1:
- holes.append(Hole(
- start=previous_sequence.end,
- end=sequence.start,
- start_message=previous_sequence.end_message,
- end_message=sequence.start_message
- ))
- return holes
-
- def _find_value_around_index(messages: List[Message], value, idx, amplitude) -> int:
- check_value = lambda x: messages[x].get_counted_value() == value
-
- if check_value(idx): return idx
-
- for offset in range(1, amplitude):
- o_idx = idx + offset * +1
- if check_value(o_idx):
- return o_idx
-
- o_idx = idx + offset * -1
- if check_value(o_idx):
- return o_idx
-
- return -1
-
- def _open_sequence(sequences: List[Sequence], msg: Message):
- sequence = Sequence(
- start=msg.get_counted_value(),
- start_message=msg,
- end=-1,
- end_message=msg
- )
-
- sequences.append(sequence)
-
- def _close_sequence(sequences: List[Sequence]):
- if len(sequences) == 0: return
-
- sequences[-1].end = sequences[-1].end_message.get_counted_value()
-
- def _opened_sequence(sequences: List[Sequence]):
- return len(sequences) > 0 and sequences[-1].end == -1
-
- def find_sequences_v2(messages: List[Message]) -> List[Sequence]:
- current = 1
- base_idx = 0
- amplitude = 200
-
- sequences = []
-
- while base_idx < len(messages):
- curr_idx = _find_value_around_index(messages, current, base_idx, amplitude)
- print(f"searching {current} from [{messages[base_idx]}]\t-> {'Not found' if curr_idx == -1 else 'Itself' if curr_idx == base_idx else messages[curr_idx]}")
-
- if curr_idx != -1: #trouvé
-
- if not _opened_sequence(sequences):
- _open_sequence(sequences, messages[curr_idx])
- else:
- sequences[-1].end_message = messages[curr_idx]
-
- base_idx = curr_idx + 1
- current += 1
- else: # pas trouvé
-
- # fermer la sequence si ouverte
- if _opened_sequence(sequences):
- _close_sequence(sequences)
-
- if messages[base_idx].get_counted_value() < current:
- base_idx += 1
- else:
- current += 1
-
-
- #time.sleep(.005)
-
-
- return sequences
|