123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. from typing import List
  2. from million.model.message import Message
  3. from million.model.sequence import Sequence
  4. import million.analyze.message_evaluation as msg_val
  5. def compute_sequences(messages: List[Message], accepted_max: int = 1_000_000) -> List[Sequence]:
  6. """
  7. Takes a list of messages as input and returns a list of sequences
  8. for every following messages with following 'counted values'
  9. """
  10. sequences: List[Sequence] = [Sequence(start_message=messages[0])]
  11. for message in messages[1:]:
  12. if msg_val.get(message) > accepted_max: continue
  13. if msg_val.get(message) == sequences[-1].end() + 1:
  14. sequences[-1].end_message = message
  15. else:
  16. sequences.append(Sequence(start_message=message))
  17. return sequences
  18. def merge_duplicates(sequences: List[Sequence]) -> List[Sequence]:
  19. """
  20. Take sequences as an input and returns a list with every
  21. overlapping input sequences merged in one
  22. """
  23. o_sequences = sorted(sequences, key= lambda s : s.start())
  24. current = o_sequences[0]
  25. result = []
  26. for sequence in o_sequences[1:]:
  27. if current.overlaps(sequence):
  28. current.merge(sequence)
  29. else:
  30. result.append(current)
  31. current = sequence
  32. return result
  33. def invert_sequences(sequences: List[Sequence]) -> List[Sequence]:
  34. """
  35. Returns the sequences representing the spaces between
  36. the ones given as input
  37. """
  38. result = []
  39. for previous, current in zip(sequences[:-1],sequences[1:]):
  40. result.append(Sequence(
  41. start_message=previous.end_message,
  42. end_message=current.start_message
  43. ))
  44. return result
  45. def find_holes(messages: List[Message], accepted_max: int = 1_000_000) -> List[Sequence]:
  46. """
  47. Find the holes in the conversation
  48. TODO might need to be moved inside scripts/find_holes
  49. """
  50. sequences = compute_sequences(messages, accepted_max)
  51. merged = merge_duplicates(sequences)
  52. merged = [s for s in merged if s.length() > 1]
  53. return invert_sequences(merged)