Coverage for muutils/logger/log_util.py: 0%

33 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-28 17:24 +0000

1from __future__ import annotations 

2from muutils.jsonlines import jsonl_load_log 

3 

4 

5def get_any_from_stream(stream: list[dict], key: str) -> None: 

6 """get the first value of a key from a stream. errors if not found""" 

7 for msg in stream: 

8 if key in msg: 

9 return msg[key] 

10 

11 raise KeyError(f"key '{key}' not found in stream") 

12 

13 

14def gather_log(file: str) -> dict[str, list[dict]]: 

15 """gathers and sorts all streams from a log""" 

16 data: list[dict] = jsonl_load_log(file) 

17 output: dict[str, list[dict]] = dict() 

18 

19 for item in data: 

20 stream: str = item.get("_stream", "default") 

21 if stream not in output: 

22 output[stream] = list() 

23 output[stream].append(item) 

24 

25 return output 

26 

27 

28def gather_stream( 

29 file: str, 

30 stream: str, 

31) -> list[dict]: 

32 """gets all entries from a specific stream in a log file""" 

33 data: list[dict] = jsonl_load_log(file) 

34 

35 output: list[dict] = list() 

36 

37 for item in data: 

38 # select for the stream 

39 if ("_stream" in item) and (item["_stream"] == stream): 

40 output.append(item) 

41 return output 

42 

43 

44def gather_val( 

45 file: str, 

46 stream: str, 

47 keys: tuple[str], 

48 allow_skip: bool = True, 

49) -> list[list]: 

50 """gather specific keys from a specific stream in a log file 

51 

52 example: 

53 if "log.jsonl" has contents: 

54 ```jsonl 

55 {"a": 1, "b": 2, "c": 3, "_stream": "s1"} 

56 {"a": 4, "b": 5, "c": 6, "_stream": "s1"} 

57 {"a": 7, "b": 8, "c": 9, "_stream": "s2"} 

58 ``` 

59 then `gather_val("log.jsonl", "s1", ("a", "b"))` will return 

60 ```python 

61 [ 

62 [1, 2], 

63 [4, 5] 

64 ] 

65 ``` 

66 

67 """ 

68 data: list[dict] = jsonl_load_log(file) 

69 

70 output: list[list] = list() 

71 

72 for item in data: 

73 # select for the stream 

74 if ("_stream" in item) and (item["_stream"] == stream): 

75 # select for the keys 

76 if all(k in item for k in keys): 

77 output.append(list(item[k] for k in keys)) 

78 elif not allow_skip: 

79 raise ValueError(f"missing keys '{keys = }' in '{item = }'") 

80 

81 return output