Coverage for tests / unit / benchmark_parallel / benchmark_parallel.py: 92%
181 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-18 02:51 -0700
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-18 02:51 -0700
1"""Benchmark test comparing run_maybe_parallel with other parallelization techniques.
3Run with: python tests/benchmark_parallel.py
4"""
6from __future__ import annotations
8from pathlib import Path
9import time
10import multiprocessing
11from typing import List, Callable, Any, Dict, Optional, Sequence, Tuple, Union
12import pandas as pd # type: ignore[import-untyped] # pyright: ignore[reportMissingTypeStubs]
13import numpy as np
14from collections import defaultdict
16from muutils.parallel import run_maybe_parallel
19def cpu_bound_task(x: int) -> int:
20 """CPU-intensive task for benchmarking."""
21 # Simulate CPU work with a loop
22 result = 0
23 for i in range(1000):
24 result += (x * i) % 1000
25 return result
28def io_bound_task(x: int) -> int:
29 """IO-bound task for benchmarking."""
30 time.sleep(0.001) # Simulate I/O wait
31 return x * 2
34def light_cpu_task(x: int) -> int:
35 """Light CPU task for benchmarking."""
36 return x**2 + x * 3 + 7
39class BenchmarkRunner:
40 """Run benchmarks and collect timing data."""
42 def __init__(self):
43 self.results = defaultdict(list)
44 self.cpu_count = multiprocessing.cpu_count()
46 def time_execution(self, func: Callable[..., float], *args, **kwargs) -> float:
47 """Time a single execution."""
48 start = time.perf_counter()
49 func(*args, **kwargs)
50 return time.perf_counter() - start
52 def benchmark_method(
53 self,
54 method_name: str,
55 method_func: Callable,
56 task_func: Callable,
57 data: List[int],
58 runs: int = 3,
59 ) -> Dict[str, float]:
60 """Benchmark a single method multiple times."""
61 times = []
62 for _ in range(runs):
63 _, duration = method_func(task_func, data)
64 times.append(duration)
66 return {
67 "mean": float(np.mean(times)),
68 "std": float(np.std(times)),
69 "min": float(np.min(times)),
70 "max": float(np.max(times)),
71 "median": float(np.median(times)),
72 }
74 def run_benchmark_suite(
75 self,
76 data_sizes: Sequence[int],
77 task_funcs: Dict[str, Callable[[int], int]],
78 runs_per_method: int = 3,
79 ) -> pd.DataFrame:
80 """Run complete benchmark suite and return results as DataFrame."""
82 for data_size in data_sizes:
83 test_data = list(range(data_size))
85 for task_name, task_func in task_funcs.items():
86 print(f"\nBenchmarking {task_name} with {data_size} items...")
88 # Sequential baseline
89 stats = self.benchmark_method(
90 "sequential",
91 benchmark_sequential,
92 task_func,
93 test_data,
94 runs_per_method,
95 )
96 self._record_result("sequential", task_name, data_size, stats)
98 # Pool.map
99 stats = self.benchmark_method(
100 "pool_map",
101 benchmark_pool_map,
102 task_func,
103 test_data,
104 runs_per_method,
105 )
106 self._record_result("pool_map", task_name, data_size, stats)
108 # Pool.imap with optimal chunk size
109 chunksize = max(1, data_size // (self.cpu_count * 4))
110 imap_func = lambda f, d: benchmark_pool_imap(f, d, chunksize=chunksize) # noqa: E731
111 stats = self.benchmark_method(
112 "pool_imap", imap_func, task_func, test_data, runs_per_method
113 )
114 self._record_result("pool_imap", task_name, data_size, stats)
116 # Pool.imap_unordered
117 imap_unord_func = lambda f, d: benchmark_pool_imap_unordered( # noqa: E731
118 f, d, chunksize=chunksize
119 )
120 stats = self.benchmark_method(
121 "pool_imap_unordered",
122 imap_unord_func,
123 task_func,
124 test_data,
125 runs_per_method,
126 )
127 self._record_result("pool_imap_unordered", task_name, data_size, stats)
129 # run_maybe_parallel (ordered)
130 rmp_func = lambda f, d: benchmark_run_maybe_parallel( # noqa: E731
131 f, d, parallel=True
132 )
133 stats = self.benchmark_method(
134 "run_maybe_parallel",
135 rmp_func,
136 task_func,
137 test_data,
138 runs_per_method,
139 )
140 self._record_result("run_maybe_parallel", task_name, data_size, stats)
142 # run_maybe_parallel (unordered)
143 rmp_unord_func = lambda f, d: benchmark_run_maybe_parallel( # noqa: E731
144 f, d, parallel=True, keep_ordered=False
145 )
146 stats = self.benchmark_method(
147 "run_maybe_parallel_unordered",
148 rmp_unord_func,
149 task_func,
150 test_data,
151 runs_per_method,
152 )
153 self._record_result(
154 "run_maybe_parallel_unordered", task_name, data_size, stats
155 )
157 return self._create_dataframe()
159 def _record_result(
160 self, method: str, task: str, data_size: int, stats: Dict[str, float]
161 ):
162 """Record benchmark result."""
163 self.results["method"].append(method)
164 self.results["task"].append(task)
165 self.results["data_size"].append(data_size)
166 self.results["mean_time"].append(stats["mean"])
167 self.results["std_time"].append(stats["std"])
168 self.results["min_time"].append(stats["min"])
169 self.results["max_time"].append(stats["max"])
170 self.results["median_time"].append(stats["median"])
172 def _create_dataframe(self) -> pd.DataFrame:
173 """Create DataFrame from results."""
174 df = pd.DataFrame(self.results)
176 # Calculate speedup relative to sequential
177 sequential_times = df[df["method"] == "sequential"][
178 ["task", "data_size", "mean_time"]
179 ]
180 sequential_times = sequential_times.rename(
181 columns={"mean_time": "sequential_time"}
182 )
184 df = df.merge(sequential_times, on=["task", "data_size"])
185 df["speedup"] = df["sequential_time"] / df["mean_time"]
187 return df
190def benchmark_sequential(func: Callable, data: List[int]) -> Tuple[List[Any], float]:
191 """Benchmark sequential processing."""
192 start = time.perf_counter()
193 results = [func(x) for x in data]
194 end = time.perf_counter()
195 return results, end - start
198def benchmark_pool_map(
199 func: Callable, data: List[int], processes: int | None = None
200) -> Tuple[List[Any], float]:
201 """Benchmark using multiprocessing.Pool.map."""
202 start = time.perf_counter()
203 with multiprocessing.Pool(processes) as pool:
204 results = pool.map(func, data)
205 end = time.perf_counter()
206 return results, end - start
209def benchmark_pool_imap(
210 func: Callable, data: List[int], processes: int | None = None, chunksize: int = 1
211) -> Tuple[List[Any], float]:
212 """Benchmark using multiprocessing.Pool.imap."""
213 start = time.perf_counter()
214 with multiprocessing.Pool(processes) as pool:
215 results = list(pool.imap(func, data, chunksize=chunksize))
216 end = time.perf_counter()
217 return results, end - start
220def benchmark_pool_imap_unordered(
221 func: Callable, data: List[int], processes: int | None = None, chunksize: int = 1
222) -> Tuple[List[Any], float]:
223 """Benchmark using multiprocessing.Pool.imap_unordered."""
224 start = time.perf_counter()
225 with multiprocessing.Pool(processes) as pool:
226 results = list(pool.imap_unordered(func, data, chunksize=chunksize))
227 end = time.perf_counter()
228 return results, end - start
231def benchmark_run_maybe_parallel(
232 func: Callable,
233 data: List[int],
234 parallel: Union[bool, int],
235 keep_ordered: bool = True,
236 chunksize: int | None = None,
237) -> Tuple[List[Any], float]:
238 """Benchmark using run_maybe_parallel."""
239 start = time.perf_counter()
240 results = run_maybe_parallel(
241 func=func,
242 iterable=data,
243 parallel=parallel,
244 keep_ordered=keep_ordered,
245 chunksize=chunksize,
246 pbar="none", # Disable progress bar for fair comparison
247 )
248 end = time.perf_counter()
249 return results, end - start
252def plot_speedup_by_data_size(
253 df: pd.DataFrame, task_type: str | None = None, save_path: str | Path | None = None
254):
255 """Plot speedup vs data size for different methods."""
256 import matplotlib.pyplot as plt # type: ignore[import-untyped]
258 fig, ax = plt.subplots(figsize=(10, 6))
260 # Filter by task type if specified
261 plot_df = df[df["task"] == task_type] if task_type else df
263 # Group by method and plot
264 for method in plot_df["method"].unique():
265 if method == "sequential":
266 continue
267 method_df = plot_df[plot_df["method"] == method]
268 ax.plot(method_df["data_size"], method_df["speedup"], marker="o", label=method)
270 ax.set_xlabel("Data Size")
271 ax.set_ylabel("Speedup (vs Sequential)")
272 ax.set_title(f"Speedup by Data Size{f' ({task_type} tasks)' if task_type else ''}")
273 ax.set_xscale("log")
274 ax.axhline(y=1, color="gray", linestyle="--", alpha=0.5)
275 ax.legend()
276 ax.grid(True, alpha=0.3)
278 if save_path:
279 plt.savefig(save_path)
280 else:
281 plt.show()
284def plot_timing_comparison(
285 df: pd.DataFrame, data_size: int | None = None, save_path: str | Path | None = None
286):
287 """Plot timing comparison as bar chart."""
288 import matplotlib.pyplot as plt # type: ignore[import-untyped]
290 # Filter by data size if specified
291 plot_df = df[df["data_size"] == data_size] if data_size else df
293 # Pivot for easier plotting
294 pivot_df = plot_df.pivot_table(index="task", columns="method", values="mean_time")
296 ax = pivot_df.plot(kind="bar", figsize=(12, 6), rot=0)
297 ax.set_ylabel("Time (seconds)")
298 ax.set_title(
299 f"Timing Comparison{f' (Data Size: {data_size})' if data_size else ''}"
300 )
301 ax.legend(title="Method", bbox_to_anchor=(1.05, 1), loc="upper left")
303 if save_path:
304 plt.tight_layout()
305 plt.savefig(save_path)
306 else:
307 plt.show()
310def plot_efficiency_heatmap(df: pd.DataFrame, save_path: str | Path | None = None):
311 """Plot efficiency heatmap (speedup across methods and tasks)."""
312 import matplotlib.pyplot as plt # type: ignore[import-untyped]
314 # Create pivot table for heatmap
315 pivot_df = df.pivot_table(
316 index=["task", "data_size"], columns="method", values="speedup"
317 )
319 # Create heatmap
320 plt.figure(figsize=(12, 8))
321 # sns.heatmap(
322 # pivot_df,
323 # annot=True,
324 # fmt=".2f",
325 # cmap="YlOrRd",
326 # vmin=0,
327 # center=1,
328 # cbar_kws={"label": "Speedup"},
329 # )
330 plt.imshow(pivot_df, aspect="auto", cmap="YlOrRd", vmin=0)
331 plt.colorbar(label="Speedup")
332 plt.yticks(range(len(pivot_df.index)), [f"{t[0]}-{t[1]}" for t in pivot_df.index])
333 plt.xticks(range(len(pivot_df.columns)), list(pivot_df.columns), rotation=45)
334 plt.title("Parallelization Efficiency Heatmap")
335 plt.tight_layout()
337 if save_path:
338 plt.savefig(save_path)
339 else:
340 plt.show()
343def print_summary_stats(df: pd.DataFrame):
344 """Print summary statistics from benchmark results."""
345 print("\n=== BENCHMARK SUMMARY ===")
346 print(f"\nTotal configurations tested: {len(df)}")
348 # Best method by task type
349 print("\nBest methods by task type (highest average speedup):")
350 best_by_task = (
351 df[df["method"] != "sequential"]
352 .groupby("task")
353 .apply(
354 lambda x: x.loc[x["speedup"].idxmax()][["method", "speedup", "data_size"]]
355 )
356 )
357 print(best_by_task)
359 # Overall best speedups
360 print("\nTop 5 speedups achieved:")
361 top_speedups = df[df["method"] != "sequential"].nlargest(5, "speedup")[
362 ["method", "task", "data_size", "speedup", "mean_time"]
363 ]
364 print(top_speedups)
366 # Method rankings
367 print("\nAverage speedup by method:")
368 avg_speedup = (
369 df[df["method"] != "sequential"]
370 .groupby("method")["speedup"]
371 .agg(["mean", "std"])
372 )
373 print(avg_speedup.sort_values("mean", ascending=False))
376_DEFAULT_TASK_FUNCS: dict[str, Callable[[int], int]] = {
377 "cpu_bound": cpu_bound_task,
378 "io_bound": io_bound_task,
379 "light_cpu": light_cpu_task,
380}
383def main(
384 data_sizes: Sequence[int] = (100, 1000, 5000, 10000),
385 base_path: Path = Path("."),
386 plot: bool = True,
387 task_funcs: Optional[Dict[str, Callable[[int], int]]] = None,
388):
389 """Run benchmarks and display results."""
390 print("Starting parallelization benchmark...")
392 base_path = Path(base_path)
393 base_path.mkdir(parents=True, exist_ok=True)
395 # Configure benchmark parameters
396 if task_funcs is None:
397 task_funcs = _DEFAULT_TASK_FUNCS
399 # Run benchmarks
400 runner = BenchmarkRunner()
401 df = runner.run_benchmark_suite(data_sizes, task_funcs, runs_per_method=3)
403 # Save results
404 df.to_csv(base_path / "benchmark_results.csv", index=False)
405 print("\nResults saved to benchmark_results.csv")
407 # Display summary
408 print_summary_stats(df)
410 if plot:
411 # Create visualizations
412 import matplotlib # type: ignore[import-untyped]
414 matplotlib.use("Agg") # Use non-interactive backend
416 # Plot speedup by data size for each task type
417 for task in task_funcs.keys():
418 plot_speedup_by_data_size(df, task, base_path / f"speedup_{task}.png")
420 # Plot timing comparison for largest data size
421 plot_timing_comparison(df, data_sizes[-1], base_path / "timing_comparison.png")
423 # Plot efficiency heatmap
424 plot_efficiency_heatmap(df, base_path / "efficiency_heatmap.png")
426 return df
429if __name__ == "__main__":
430 df = main()
431 print("\nDataFrame columns:", df.columns.tolist())
432 print("\nFirst few rows:")
433 print(df.head(10))