Coverage for tests / unit / logger / test_log_util.py: 100%
79 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-18 02:51 -0700
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-18 02:51 -0700
1from __future__ import annotations
3import os
4from pathlib import Path
6import pytest
8from muutils.json_serialize import JSONitem
9from muutils.jsonlines import jsonl_write
10from muutils.logger.log_util import (
11 gather_log,
12 gather_stream,
13 gather_val,
14 get_any_from_stream,
15)
17TEMP_PATH: Path = Path("tests/_temp/logger")
20def test_gather_log():
21 """Test gathering and sorting all streams from a multi-stream log file"""
22 # Create test directory
23 os.makedirs(TEMP_PATH, exist_ok=True)
24 log_file = TEMP_PATH / "test_gather_log.jsonl"
26 # Create test data with multiple streams
27 test_data: list[JSONitem] = [
28 {"msg": "stream1_msg1", "value": 1, "_stream": "stream1"},
29 {"msg": "stream2_msg1", "value": 10, "_stream": "stream2"},
30 {"msg": "stream1_msg2", "value": 2, "_stream": "stream1"},
31 {"msg": "default_msg1", "value": 100}, # no _stream key
32 {"msg": "stream2_msg2", "value": 20, "_stream": "stream2"},
33 {"msg": "stream1_msg3", "value": 3, "_stream": "stream1"},
34 ]
36 jsonl_write(str(log_file), test_data)
38 # Gather all streams
39 result = gather_log(str(log_file))
41 # Verify correct streams are present
42 assert "stream1" in result
43 assert "stream2" in result
44 assert "default" in result
46 # Verify stream separation
47 assert len(result["stream1"]) == 3
48 assert len(result["stream2"]) == 2
49 assert len(result["default"]) == 1
51 # Verify data integrity
52 assert result["stream1"][0]["msg"] == "stream1_msg1"
53 assert result["stream1"][1]["msg"] == "stream1_msg2"
54 assert result["stream1"][2]["msg"] == "stream1_msg3"
56 assert result["stream2"][0]["msg"] == "stream2_msg1"
57 assert result["stream2"][1]["msg"] == "stream2_msg2"
59 assert result["default"][0]["msg"] == "default_msg1"
60 assert result["default"][0]["value"] == 100
63def test_gather_stream():
64 """Test extracting a specific stream from a log file"""
65 os.makedirs(TEMP_PATH, exist_ok=True)
66 log_file = TEMP_PATH / "test_gather_stream.jsonl"
68 # Create test data with multiple streams
69 test_data: list[JSONitem] = [
70 {"msg": "stream1_msg1", "idx": 1, "_stream": "target"},
71 {"msg": "stream2_msg1", "idx": 2, "_stream": "other"},
72 {"msg": "stream1_msg2", "idx": 3, "_stream": "target"},
73 {"msg": "no_stream", "idx": 4}, # no _stream key
74 {"msg": "stream2_msg2", "idx": 5, "_stream": "other"},
75 {"msg": "stream1_msg3", "idx": 6, "_stream": "target"},
76 ]
78 jsonl_write(str(log_file), test_data)
80 # Gather only the "target" stream
81 result = gather_stream(str(log_file), "target")
83 # Verify filtering
84 assert len(result) == 3
86 # Verify correct items were selected
87 assert result[0]["msg"] == "stream1_msg1"
88 assert result[0]["idx"] == 1
89 assert result[1]["msg"] == "stream1_msg2"
90 assert result[1]["idx"] == 3
91 assert result[2]["msg"] == "stream1_msg3"
92 assert result[2]["idx"] == 6
94 # Verify all items have the correct stream
95 for item in result:
96 assert item["_stream"] == "target"
98 # Test with non-existent stream
99 empty_result = gather_stream(str(log_file), "nonexistent")
100 assert len(empty_result) == 0
103def test_gather_val():
104 """Test extracting specific keys from a specific stream"""
105 os.makedirs(TEMP_PATH, exist_ok=True)
106 log_file = TEMP_PATH / "test_gather_val.jsonl"
108 # Create test data matching the example from the docstring
109 test_data: list[JSONitem] = [
110 {"a": 1, "b": 2, "c": 3, "_stream": "s1"},
111 {"a": 4, "b": 5, "c": 6, "_stream": "s1"},
112 {"a": 7, "b": 8, "c": 9, "_stream": "s2"},
113 {"a": 10, "b": 11, "_stream": "s1"}, # missing key 'c'
114 {"a": 13, "b": 14, "c": 15, "_stream": "s1"},
115 ]
117 jsonl_write(str(log_file), test_data)
119 # Test basic key extraction
120 result = gather_val(str(log_file), "s1", ("a", "b"))
122 # Verify data structure
123 assert len(result) == 4 # s1 has 4 entries
124 assert result[0] == [1, 2]
125 assert result[1] == [4, 5]
126 assert result[2] == [10, 11]
127 assert result[3] == [13, 14]
129 # Test with three keys (should skip the entry missing 'c')
130 result_three_keys = gather_val(str(log_file), "s1", ("a", "b", "c"))
131 assert len(result_three_keys) == 3 # one entry missing 'c' is skipped
132 assert result_three_keys[0] == [1, 2, 3]
133 assert result_three_keys[1] == [4, 5, 6]
134 assert result_three_keys[2] == [13, 14, 15]
136 # Test with allow_skip=False - should raise error on missing key
137 with pytest.raises(ValueError, match="missing keys"):
138 gather_val(str(log_file), "s1", ("a", "b", "c"), allow_skip=False)
140 # Test with different stream
141 result_s2 = gather_val(str(log_file), "s2", ("a", "c"))
142 assert len(result_s2) == 1
143 assert result_s2[0] == [7, 9]
145 # Test with non-existent stream
146 empty_result = gather_val(str(log_file), "nonexistent", ("a", "b"))
147 assert len(empty_result) == 0
150def test_get_any_from_stream():
151 """Test extracting first value of a key from stream and KeyError on missing key"""
152 # Test with a list of dicts
153 stream = [
154 {"foo": "bar", "value": 1},
155 {"foo": "baz", "value": 2},
156 {"other": "data", "value": 3},
157 ]
159 # Test successful key extraction (first occurrence)
160 result = get_any_from_stream(stream, "foo")
161 assert result == "bar" # should get the first one
163 # Test key that exists later
164 result_value = get_any_from_stream(stream, "value")
165 assert result_value == 1 # first occurrence
167 # Test key that appears only in later entry
168 result_other = get_any_from_stream(stream, "other")
169 assert result_other == "data"
171 # Test KeyError on missing key
172 with pytest.raises(KeyError, match="key 'nonexistent' not found in stream"):
173 get_any_from_stream(stream, "nonexistent")
175 # Test with empty stream
176 with pytest.raises(KeyError, match="key 'foo' not found in stream"):
177 get_any_from_stream([], "foo")