Coverage for tests/unit/test_spinner.py: 97%
295 statements
« prev ^ index » next coverage.py v7.6.1, created at 2025-04-04 03:33 -0600
« prev ^ index » next coverage.py v7.6.1, created at 2025-04-04 03:33 -0600
1import io
2import time
3from typing import Callable, Any
5import pytest
7from muutils.spinner import (
8 SPINNERS,
9 spinner_decorator,
10 SpinnerContext,
11 Spinner,
12 SpinnerConfig,
13)
16def test_spinner_simple():
17 @spinner_decorator(update_interval=0.05)
18 def long_running_function_simple() -> str:
19 """
20 An example function decorated with spinner_decorator, using only the spinner and elapsed time.
22 Returns:
23 str: A completion message.
24 """
25 for _ in range(1):
26 time.sleep(0.1) # Simulate some work
27 return "Simple function completed"
29 print("\nRunning simple function with only spinner and elapsed time:")
30 result2: str = long_running_function_simple()
31 print(result2)
34def test_spinner_complex():
35 # Example usage
36 @spinner_decorator(
37 message="Current value: ",
38 mutable_kwarg_key="update_status",
39 update_interval=0.05,
40 )
41 def long_running_function_with_status(
42 normal_arg: int, update_status: Callable[[Any], None]
43 ) -> str:
44 """
45 An example function decorated with spinner_decorator, using all features.
47 Args:
48 normal_arg (int): A normal argument to demonstrate that other arguments still work.
49 update_status (Callable[[Any], None]): Function to update the status displayed by the spinner.
51 Returns:
52 str: A completion message.
53 """
54 for i in range(normal_arg):
55 time.sleep(0.1) # Simulate some work
56 update_status(f"Step {i+1} of {normal_arg}")
57 return "Function with status completed"
59 # Run the example functions
60 print("Running function with status updates:")
61 result1: str = long_running_function_with_status(1) # type: ignore[call-arg]
62 print(result1)
65def test_spinner_decorator_bare():
66 @spinner_decorator()
67 def example_function():
68 return "Done"
70 result = example_function()
71 assert result == "Done"
74def test_spinner_ctx_mgr():
75 with SpinnerContext(message="Current value: ", update_interval=0.05) as spinner:
76 for i in range(1):
77 time.sleep(0.1)
78 spinner.update_value(f"Step {i+1}")
79 print("Done!")
82def test_spinner_initialization():
83 spinner = Spinner()
84 assert isinstance(spinner, Spinner)
85 assert isinstance(spinner.config.working, list)
86 assert isinstance(spinner.config.success, str)
87 assert isinstance(spinner.config.fail, str)
88 assert isinstance(spinner.update_interval, float)
89 assert isinstance(spinner.current_value, str)
90 assert isinstance(spinner.message, str)
91 assert isinstance(spinner.format_string, str)
92 assert hasattr(spinner.output_stream, "write")
93 assert callable(spinner.output_stream.write)
94 assert callable(spinner.update_value)
96 assert spinner.config.working == ["|", "/", "-", "\\"]
97 assert spinner.format_string == "\r{spinner} ({elapsed_time:.2f}s) {message}{value}"
98 assert spinner.update_interval == 0.1
101def test_spinner_update_value():
102 spinner = Spinner()
103 spinner.update_value("Test")
104 assert spinner.current_value == "Test"
107def test_spinner_context_manager():
108 string_io = io.StringIO()
109 with SpinnerContext(output_stream=string_io):
110 pass
111 assert string_io.getvalue().endswith("\n")
114@spinner_decorator()
115def example_function():
116 return "Done"
119def test_spinner_decorator():
120 result = example_function()
121 assert result == "Done"
124def test_spinner_custom_chars():
125 spinner = Spinner(config=["A", "B", "C"])
126 assert spinner.config.working == ["A", "B", "C"]
129def test_spinner_custom_time_format():
130 spinner = Spinner(format_string="[{elapsed_time:.1f}s]")
131 assert spinner.format_string == "[{elapsed_time:.1f}s]"
134def test_spinner_context_manager_with_updates():
135 string_io = io.StringIO()
136 with SpinnerContext(
137 message="Status: ", output_stream=string_io, update_interval=0.05
138 ) as spinner:
139 spinner.update_value("Working")
140 time.sleep(0.1)
141 spinner.update_value("Finishing")
142 time.sleep(0.1)
144 output = string_io.getvalue()
145 print(output)
146 assert "Status: Working" in output
147 assert "Status: Finishing" in output
150def test_spinner_context_exception_handling():
151 string_io = io.StringIO()
152 try:
153 with SpinnerContext(output_stream=string_io, update_interval=0.05) as spinner:
154 spinner.update_value("Before exception")
155 time.sleep(0.1)
156 raise ValueError("Test exception")
157 except ValueError:
158 pass
160 output = string_io.getvalue()
161 print(output)
162 assert "Before exception" in output
163 assert output.endswith("\n")
166def test_spinner_long_running_task():
167 string_io = io.StringIO()
169 @spinner_decorator(
170 message="Iteration: ",
171 output_stream=string_io,
172 update_interval=0.05,
173 mutable_kwarg_key="update",
174 )
175 def long_task(iterations, update):
176 for i in range(iterations):
177 update(i + 1)
178 time.sleep(0.1)
180 long_task(3, update=lambda x: x)
182 output = string_io.getvalue()
183 print(output)
184 for i in range(1, 4):
185 assert f"Iteration: {i}" in output
188def test_spinner_init_from_config():
189 """Test various ways of initializing spinner with config parameter"""
190 # From string key
191 sp1 = Spinner(config="clock")
192 assert sp1.config.working == [
193 "🕛",
194 "🕐",
195 "🕑",
196 "🕒",
197 "🕓",
198 "🕔",
199 "🕕",
200 "🕖",
201 "🕗",
202 "🕘",
203 "🕙",
204 "🕚",
205 ]
206 assert sp1.config.success == "✔️"
207 assert sp1.config.fail == "❌"
209 # From list
210 sp2 = Spinner(config=["A", "B", "C"])
211 assert sp2.config.working == ["A", "B", "C"]
212 assert sp2.config.success == "✔️" # Default
213 assert sp2.config.fail == "❌" # Default
215 # From dict
216 sp3 = Spinner(config=dict(working=["1", "2"], success="OK", fail="NO"))
217 assert sp3.config.working == ["1", "2"]
218 assert sp3.config.success == "OK"
219 assert sp3.config.fail == "NO"
221 # From SpinnerConfig
222 cfg = SpinnerConfig(working=["X", "Y"], success="DONE", fail="FAIL")
223 sp4 = Spinner(config=cfg)
224 assert sp4.config.working == ["X", "Y"]
225 assert sp4.config.success == "DONE"
226 assert sp4.config.fail == "FAIL"
229def test_spinner_exception_messages():
230 """Test that exception messages are preserved during spinner execution"""
231 string_io = io.StringIO()
232 custom_message = "Custom error occurred!"
234 # Test with context manager
235 try:
236 with SpinnerContext(output_stream=string_io, message="Processing: ") as spinner:
237 spinner.update_value("Step 1")
238 time.sleep(0.1)
239 raise ValueError(custom_message)
240 except ValueError as e:
241 assert str(e) == custom_message
242 output = string_io.getvalue()
243 assert "Processing: Step 1" in output
244 assert output.count("\n") == 1 # Should have exactly one newline at the end
246 # Test with decorator
247 string_io = io.StringIO()
249 @spinner_decorator(output_stream=string_io, message="Working: ")
250 def failing_function():
251 time.sleep(0.1)
252 raise RuntimeError(custom_message)
254 with pytest.raises(RuntimeError) as exc_info:
255 failing_function()
257 assert str(exc_info.value) == custom_message
258 output = string_io.getvalue()
259 assert "Working: " in output
260 assert output.count("\n") == 1
263def test_spinner_state_transitions():
264 """Test that spinner state transitions work correctly"""
265 spinner = Spinner()
266 assert spinner.state == "initialized"
268 spinner.start()
269 assert spinner.state == "running"
271 spinner.stop()
272 assert spinner.state == "success"
274 # New spinner for failure test
275 spinner = Spinner()
276 spinner.start()
277 spinner.stop(failed=True)
278 assert spinner.state == "fail"
281def test_spinner_nested():
282 """Test nested spinners behavior"""
283 string_io = io.StringIO()
285 with SpinnerContext(output_stream=string_io, message="Outer: ") as outer:
286 outer.update_value("Step 1")
287 time.sleep(0.1)
289 with SpinnerContext(output_stream=string_io, message="Inner: ") as inner:
290 inner.update_value("Processing")
291 time.sleep(0.1)
293 outer.update_value("Step 2")
294 time.sleep(0.1)
296 output = string_io.getvalue()
297 assert "Outer: Step 1" in output
298 assert "Inner: Processing" in output
299 assert "Outer: Step 2" in output
302def test_spinner_value_updates():
303 """Test that the spinner correctly stores and formats values"""
304 spinner = Spinner(format_string="{spinner} {value}")
306 # Test various types of values
307 test_values = [
308 42,
309 3.14,
310 ["a", "b", "c"],
311 {"key": "value"},
312 (1, 2, 3),
313 None,
314 True,
315 b"bytes",
316 ]
318 for value in test_values:
319 spinner.update_value(value)
320 assert spinner.current_value == value
322 # Test format string rendering without output stream
323 rendered = spinner.format_string.format(
324 spinner=spinner.config.working[0], elapsed_time=0.0, message="", value=value
325 )
326 assert str(value) in rendered
329@pytest.mark.skip(
330 reason="""shows a warning:
331tests/unit/test_spinner.py::test_spinner_output_stream_errors
332 .venv/Lib/site-packages/_pytest/threadexception.py:82: PytestUnhandledThreadExceptionWarning: Exception in thread Thread-16 (spin)
334 Traceback (most recent call last):
335 File "Lib/threading.py", line 1052, in _bootstrap_inner
336 self.run()
337 File "Lib/threading.py", line 989, in run
338 self._target(*self._args, **self._kwargs)
339 File "muutils/spinner.py", line 369, in spin
340 self.output_stream.write(output)
341 File "tests/unit/test_spinner.py", line 328, in write
342 raise IOError("Write failed")
343 OSError: Write failed
345 warnings.warn(pytest.PytestUnhandledThreadExceptionWarning(msg))
347"""
348)
349def test_spinner_output_stream_errors():
350 """Test spinner behavior with problematic output streams"""
352 class BrokenStream:
353 def write(self, _):
354 raise IOError("Write failed")
356 def flush(self):
357 raise IOError("Flush failed")
359 # Test with broken stream
360 with pytest.raises(IOError):
361 with SpinnerContext(output_stream=BrokenStream()): # type: ignore
362 time.sleep(0.01)
365def test_spinner_width_calculations():
366 """Test that individual updates are width-compliant"""
367 narrow_width = 40
368 spinner = Spinner()
369 spinner.term_width = narrow_width
371 test_value = "ABC"
372 spinner.update_value(test_value)
374 # Format a single update line
375 output_line = spinner.format_string.format(
376 spinner=spinner.config.working[0],
377 elapsed_time=0.0,
378 message="Test: ",
379 value=test_value,
380 ).rstrip() # Remove any trailing spaces but keep \r
382 # Check the actual content length (excluding \r)
383 content_length = len(output_line.replace("\r", ""))
384 assert (
385 content_length <= narrow_width
386 ), f"Line too long: {content_length} > {narrow_width}"
389def test_format_string_updates():
390 """Test that format_string_when_updated modifies the formatting as expected"""
391 spinner = Spinner(
392 format_string="Normal: {value}", format_string_when_updated="Update: {value}"
393 )
395 # Test normal format
396 normal = spinner.format_string.format(
397 spinner=spinner.config.working[0], elapsed_time=0.0, message="", value="test"
398 )
399 assert normal.startswith("Normal: ")
401 # Test update format
402 spinner.value_changed = True
403 if spinner.format_string_when_updated:
404 update = spinner.format_string_when_updated.format(
405 spinner=spinner.config.working[0],
406 elapsed_time=0.0,
407 message="",
408 value="test",
409 )
410 assert update.startswith("Update: ")
413def test_spinner_state_handling():
414 """Test spinner state transitions and error handling"""
415 spinner = Spinner()
416 assert spinner.state == "initialized"
418 # Test normal flow
419 spinner.start()
420 assert spinner.state == "running"
421 spinner.stop(failed=False)
422 assert spinner.state == "success"
424 # Test error flow
425 spinner = Spinner()
426 spinner.start()
427 spinner.stop(failed=True)
428 assert spinner.state == "fail"
430 # Test context manager error handling
431 error_caught = False
432 try:
433 with SpinnerContext() as sp:
434 raise ValueError("Test error")
435 except ValueError:
436 error_caught = True
437 assert sp.state == "fail"
439 assert error_caught, "Context manager should propagate errors"
442@pytest.mark.parametrize(
443 "kwargs, expected_substrings, find_all",
444 [
445 (
446 # Input
447 dict(
448 format_string="Test: {message}{value}", # Include message placeholder
449 message="MSG:",
450 initial_value="VAL",
451 ),
452 # Expected substrings
453 ["Test:", "MSG:", "VAL"],
454 True,
455 ),
456 (
457 # Input with empty values
458 dict(format_string="{spinner}{value}", message="", initial_value=""),
459 # Should at least contain a spinner char
460 [char for char in Spinner().config.working],
461 False,
462 ),
463 ],
464)
465def test_spinner_output_formatting(kwargs, expected_substrings, find_all):
466 """Test that spinner formatting is applied correctly without checking exact output"""
468 spinner = Spinner(**kwargs)
469 # Test format string compilation
470 output = spinner.format_string.format(
471 spinner=spinner.config.working[0],
472 elapsed_time=0.0,
473 message=spinner.message,
474 value=spinner.current_value,
475 )
477 # Verify required parts are present
478 if find_all:
479 for part in expected_substrings:
480 assert part in output, f"Missing {part} in output: {output}"
481 else:
482 found = False
483 for part in expected_substrings:
484 if part in output:
485 found = True
486 break
487 assert found, f"None of {expected_substrings} found in output: {output}"
490@pytest.mark.parametrize(
491 "config",
492 [
493 "default",
494 "clock",
495 ["A", "B"],
496 ["A", "B", "C"],
497 dict(),
498 dict(working=["X", "Y"]),
499 dict(success="OK"),
500 dict(fail="NO"),
501 dict(working=["1", "2"], success="OK"),
502 dict(working=["1", "2"], fail="NO"),
503 dict(success="OK", fail="NO"),
504 dict(working=["1", "2"], success="OK", fail="NO"),
505 SpinnerConfig(working=["1", "2"]),
506 SpinnerConfig(working=["1", "2"], success="OK", fail="NO"),
507 ],
508)
509def test_spinner_config_validation(config):
510 """Test that spinner configuration is validated correctly"""
511 sp = Spinner(config=config)
512 if isinstance(config, str):
513 pass
514 elif isinstance(config, list):
515 assert sp.config.working == config
516 elif isinstance(config, dict):
517 if "working" in config:
518 assert sp.config.working == config["working"]
519 if "success" in config:
520 assert sp.config.success == config["success"]
521 if "fail" in config:
522 assert sp.config.fail == config["fail"]
523 elif isinstance(config, SpinnerConfig):
524 assert sp.config == config
525 assert sp.config.working == config.working
526 assert sp.config.success == config.success
527 assert sp.config.fail == config.fail
530@pytest.mark.parametrize(
531 "kwargs, exception_type",
532 [
533 (dict(config=123), TypeError),
534 (dict(config=123.45), TypeError),
535 (dict(config=True), TypeError),
536 (dict(config=None), TypeError),
537 (dict(config=""), KeyError),
538 (dict(config="nonexistent_style"), KeyError),
539 (dict(config=[]), ValueError),
540 (dict(config=dict(nonexistent_key="stuff")), TypeError),
541 (dict(format_string="{invalid_key}"), ValueError),
542 (dict(unknown_param=123), ValueError),
543 ],
544)
545def test_spinner_init_invalid(kwargs, exception_type):
546 """Test invalid initialization scenarios"""
547 with pytest.raises(exception_type):
548 Spinner(**kwargs)
551@pytest.mark.parametrize(
552 "config,expected",
553 [
554 # ASCII-only configurations
555 (SpinnerConfig(working=["|", "/", "-", "\\"], success="#", fail="X"), True),
556 (SpinnerConfig(working=[".", "o", "O"], success="*", fail="x"), True),
557 (SpinnerConfig(working=["[ ]", "[- ]"], success="[--]", fail="[XX]"), True),
558 # Non-ASCII configurations
559 (SpinnerConfig(working=["←", "↑", "→", "↓"], success="►", fail="✖"), False),
560 (SpinnerConfig(working=["🌍", "🌎", "🌏"], success="✔️", fail="❌"), False),
561 (SpinnerConfig(working=["⠋", "⠙", "⠹"], success="⣿", fail="⢿"), False),
562 # Mixed ASCII and Unicode
563 (SpinnerConfig(working=["->", "=>", "→"], success="✓", fail="X"), False),
564 # Whitespace
565 (SpinnerConfig(working=[" ", " |", "| "], success=" ", fail=" "), True),
566 ],
567)
568def test_is_ascii(config: SpinnerConfig, expected: bool):
569 """Test is_ascii() method with various configurations"""
570 assert config.is_ascii() is expected
573@pytest.mark.parametrize(
574 "config,expected",
575 [
576 # Equal length configurations
577 (SpinnerConfig(working=["abc", "def", "ghi"], success="xyz", fail="err"), True),
578 (SpinnerConfig(working=["🌍 ", "🌎 ", "🌏 "], success="✔️ ", fail="❌ "), False),
579 (SpinnerConfig(working=["[=]", "[-]", "[|]"], success="[#]", fail="[X]"), True),
580 # Varying length configurations
581 (SpinnerConfig(working=[".", "..", "..."], success="***", fail="x"), False),
582 (
583 SpinnerConfig(
584 working=["[ ]", "[= ]", "[===]"], success="[==]", fail="[X]"
585 ),
586 False,
587 ),
588 (
589 SpinnerConfig(working=["📡 ", "📡· ", "📡···"], success="📡", fail="❌"),
590 False,
591 ),
592 # Whitespace variations
593 (SpinnerConfig(working=[" ", " ", " "], success=" ", fail=" "), False),
594 # Single characters
595 (SpinnerConfig(working=["a", "b", "c"], success="x", fail="y"), True),
596 ],
597)
598def test_eq_lens(config: SpinnerConfig, expected: bool):
599 """Test eq_lens() method with various configurations"""
600 assert config.eq_lens() is expected
603@pytest.mark.parametrize(
604 "config",
605 [
606 # Valid configurations
607 SpinnerConfig(working=["|", "/", "-", "\\"], success="#", fail="X"),
608 SpinnerConfig(working=["🌍", "🌎", "🌏"], success="✔️", fail="❌"),
609 SpinnerConfig(working=["⠋"], success="⣿", fail="⢿"), # Single working char
610 SpinnerConfig(working=[""], success="", fail=""), # Empty strings
611 ],
612)
613def test_valid_configs(config: SpinnerConfig):
614 """Test valid spinner configurations"""
615 assert config.is_valid() is True
618@pytest.mark.parametrize(
619 "invalid_config",
620 [
621 # Invalid working list type
622 pytest.param(
623 lambda: SpinnerConfig(working="|/-\\", success="#", fail="X"), # type: ignore[arg-type]
624 id="string-instead-of-list",
625 ),
626 # Invalid working list contents
627 pytest.param(
628 lambda: SpinnerConfig(working=[1, 2, 3], success="#", fail="X"), # type: ignore[list-item]
629 id="non-string-items",
630 ),
631 # Invalid success type
632 pytest.param(
633 lambda: SpinnerConfig(working=["|", "/"], success=123, fail="X"), # type: ignore[arg-type]
634 id="non-string-success",
635 ),
636 # Invalid fail type
637 pytest.param(
638 lambda: SpinnerConfig(working=["|", "/"], success="#", fail=None), # type: ignore[arg-type]
639 id="none-fail",
640 ),
641 # Empty working list
642 pytest.param(
643 lambda: SpinnerConfig(working=[], success="#", fail="X"),
644 id="empty-working-list",
645 ),
646 ],
647)
648def test_invalid_configs(invalid_config):
649 """Test that invalid configurations raise ValueError"""
650 with pytest.raises(ValueError):
651 invalid_config()
654@pytest.mark.parametrize(
655 "config,ascii_result,eq_lens_result",
656 [
657 # Mixed ASCII/Unicode test cases
658 (
659 SpinnerConfig(working=["->", "=>", "→"], success="✓", fail="X"),
660 False, # is_ascii
661 False, # eq_lens
662 ),
663 # Multi-byte Unicode test cases
664 (
665 SpinnerConfig(working=["🌍", "🌎"], success="🎉", fail="💥"),
666 False, # is_ascii
667 True, # eq_lens
668 ),
669 # Whitespace test cases
670 (
671 SpinnerConfig(working=[" ", " |", "| "], success="VV", fail="XX"),
672 True, # is_ascii
673 True, # eq_lens
674 ),
675 # Empty string test cases
676 (
677 SpinnerConfig(working=[""], success="", fail=""),
678 True, # is_ascii
679 True, # eq_lens
680 ),
681 ],
682)
683def test_combined_validations(
684 config: SpinnerConfig, ascii_result: bool, eq_lens_result: bool
685):
686 """Test multiple validation methods together"""
687 assert config.is_valid() is True
688 assert config.is_ascii() is ascii_result
689 assert config.eq_lens() is eq_lens_result
692# Additional edge cases and stress tests
693@pytest.mark.parametrize(
694 "test_input,expected_valid",
695 [
696 # Very long strings
697 (
698 SpinnerConfig(
699 working=["a" * 1000, "b" * 1000], success="c" * 1000, fail="d" * 1000
700 ),
701 True,
702 ),
703 # Special characters
704 (SpinnerConfig(working=["\n", "\t", "\r"], success="\n", fail="\r"), True),
705 # Mixed newlines and spaces
706 (SpinnerConfig(working=[" \n ", " \t ", " \r "], success=" ", fail=" "), True),
707 ],
708)
709def test_edge_cases(test_input: SpinnerConfig, expected_valid: bool):
710 """Test edge cases and special scenarios"""
711 assert test_input.is_valid() is expected_valid
714@pytest.mark.parametrize(
715 "name, config",
716 list(SPINNERS.items()),
717)
718def test_all(name: str, config: SpinnerConfig):
719 """Test all spinner configurations"""
720 assert config.is_valid()
721 # just test the methods run, they won't all be ASCII or equal length
722 config.is_ascii()
723 config.eq_lens()
725 # creating config from this config
726 assert SpinnerConfig.from_any(name) == config
727 assert SpinnerConfig.from_any(config) == config
728 assert (
729 SpinnerConfig.from_any(
730 dict(working=config.working, success=config.success, fail=config.fail)
731 )
732 == config
733 )