12345678910111213141516171819202122232425262728293031323334353637383940414243 |
-
- import json, os, re
- from typing import List
-
- from million.model.fb_export import FacebookExport
-
-
- def is_file_valid(file_name: str) -> bool:
- return os.path.splitext(file_name)[-1].lower() == '.json'
-
- def valid_dirfiles(file_dir: str) -> List[str]:
- return [os.path.join(file_dir, file_name)
- for file_name in os.listdir(file_dir)
- if is_file_valid(file_name)]
-
- def parse_file(file_name: str) -> FacebookExport:
- if not is_file_valid(file_name): return None
-
- with open(file_name, 'rb') as f:
- fixed_json = __read_broken_fb_json(f.read())
- json_data = json.loads(fixed_json)
- return (FacebookExport(**json_data))
-
- def parse_dirfiles(file_dir: str) -> FacebookExport:
- exports = [parse_file(f) for f in valid_dirfiles(file_dir)]
- if len(exports) == 0: return
-
- for other in exports[1:]:
- exports[0].messages.extend(other.messages)
- exports[0].participants.extend(other.participants)
-
- exports[0].messages.sort(key = lambda m: m.timestamp_ms)
- exports[0].participants = set(exports[0].participants)
- return exports[0]
-
- def __read_broken_fb_json(binary_data):
- repaired = re.sub(
- rb'\\u00([\da-f]{2})',
- lambda m: bytes.fromhex(m.group(1).decode()),
- binary_data
- )
-
- return repaired.decode('utf8')
|