Coverage for muutils / logger / log_util.py: 100%
35 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-22 18:25 -0700
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-22 18:25 -0700
1from __future__ import annotations
2from typing import Any, TypeVar
3from muutils.jsonlines import jsonl_load_log
5T_StreamValue = TypeVar("T_StreamValue")
8def get_any_from_stream(
9 stream: list[dict[str, T_StreamValue]], key: str
10) -> T_StreamValue:
11 """get the first value of a key from a stream. errors if not found"""
12 for msg in stream:
13 if key in msg:
14 return msg[key]
16 raise KeyError(f"key '{key}' not found in stream")
19def gather_log(file: str) -> dict[str, list[dict[str, Any]]]:
20 """gathers and sorts all streams from a log"""
21 data: list[dict[str, Any]] = jsonl_load_log(file)
22 output: dict[str, list[dict[str, Any]]] = dict()
24 for item in data:
25 stream: str = item.get("_stream", "default")
26 if stream not in output:
27 output[stream] = list()
28 output[stream].append(item)
30 return output
33def gather_stream(
34 file: str,
35 stream: str,
36) -> list[dict[str, Any]]:
37 """gets all entries from a specific stream in a log file"""
38 data: list[dict[str, Any]] = jsonl_load_log(file)
40 output: list[dict[str, Any]] = list()
42 for item in data:
43 # select for the stream
44 if ("_stream" in item) and (item["_stream"] == stream):
45 output.append(item)
46 return output
49def gather_val(
50 file: str,
51 stream: str,
52 keys: tuple[str, ...],
53 allow_skip: bool = True,
54) -> list[list[Any]]:
55 """gather specific keys from a specific stream in a log file
57 example:
58 if "log.jsonl" has contents:
59 ```jsonl
60 {"a": 1, "b": 2, "c": 3, "_stream": "s1"}
61 {"a": 4, "b": 5, "c": 6, "_stream": "s1"}
62 {"a": 7, "b": 8, "c": 9, "_stream": "s2"}
63 ```
64 then `gather_val("log.jsonl", "s1", ("a", "b"))` will return
65 ```python
66 [
67 [1, 2],
68 [4, 5]
69 ]
70 ```
72 """
73 data: list[dict[str, Any]] = jsonl_load_log(file)
75 output: list[list[Any]] = list()
77 for item in data:
78 # select for the stream
79 if ("_stream" in item) and (item["_stream"] == stream):
80 # select for the keys
81 if all(k in item for k in keys):
82 output.append(list(item[k] for k in keys))
83 elif not allow_skip:
84 raise ValueError(f"missing keys '{keys = }' in '{item = }'")
86 return output