Coverage for tests / unit / benchmark_parallel / benchmark_parallel.py: 92%

181 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-18 02:51 -0700

1"""Benchmark test comparing run_maybe_parallel with other parallelization techniques. 

2 

3Run with: python tests/benchmark_parallel.py 

4""" 

5 

6from __future__ import annotations 

7 

8from pathlib import Path 

9import time 

10import multiprocessing 

11from typing import List, Callable, Any, Dict, Optional, Sequence, Tuple, Union 

12import pandas as pd # type: ignore[import-untyped] # pyright: ignore[reportMissingTypeStubs] 

13import numpy as np 

14from collections import defaultdict 

15 

16from muutils.parallel import run_maybe_parallel 

17 

18 

19def cpu_bound_task(x: int) -> int: 

20 """CPU-intensive task for benchmarking.""" 

21 # Simulate CPU work with a loop 

22 result = 0 

23 for i in range(1000): 

24 result += (x * i) % 1000 

25 return result 

26 

27 

28def io_bound_task(x: int) -> int: 

29 """IO-bound task for benchmarking.""" 

30 time.sleep(0.001) # Simulate I/O wait 

31 return x * 2 

32 

33 

34def light_cpu_task(x: int) -> int: 

35 """Light CPU task for benchmarking.""" 

36 return x**2 + x * 3 + 7 

37 

38 

39class BenchmarkRunner: 

40 """Run benchmarks and collect timing data.""" 

41 

42 def __init__(self): 

43 self.results = defaultdict(list) 

44 self.cpu_count = multiprocessing.cpu_count() 

45 

46 def time_execution(self, func: Callable[..., float], *args, **kwargs) -> float: 

47 """Time a single execution.""" 

48 start = time.perf_counter() 

49 func(*args, **kwargs) 

50 return time.perf_counter() - start 

51 

52 def benchmark_method( 

53 self, 

54 method_name: str, 

55 method_func: Callable, 

56 task_func: Callable, 

57 data: List[int], 

58 runs: int = 3, 

59 ) -> Dict[str, float]: 

60 """Benchmark a single method multiple times.""" 

61 times = [] 

62 for _ in range(runs): 

63 _, duration = method_func(task_func, data) 

64 times.append(duration) 

65 

66 return { 

67 "mean": float(np.mean(times)), 

68 "std": float(np.std(times)), 

69 "min": float(np.min(times)), 

70 "max": float(np.max(times)), 

71 "median": float(np.median(times)), 

72 } 

73 

74 def run_benchmark_suite( 

75 self, 

76 data_sizes: Sequence[int], 

77 task_funcs: Dict[str, Callable[[int], int]], 

78 runs_per_method: int = 3, 

79 ) -> pd.DataFrame: 

80 """Run complete benchmark suite and return results as DataFrame.""" 

81 

82 for data_size in data_sizes: 

83 test_data = list(range(data_size)) 

84 

85 for task_name, task_func in task_funcs.items(): 

86 print(f"\nBenchmarking {task_name} with {data_size} items...") 

87 

88 # Sequential baseline 

89 stats = self.benchmark_method( 

90 "sequential", 

91 benchmark_sequential, 

92 task_func, 

93 test_data, 

94 runs_per_method, 

95 ) 

96 self._record_result("sequential", task_name, data_size, stats) 

97 

98 # Pool.map 

99 stats = self.benchmark_method( 

100 "pool_map", 

101 benchmark_pool_map, 

102 task_func, 

103 test_data, 

104 runs_per_method, 

105 ) 

106 self._record_result("pool_map", task_name, data_size, stats) 

107 

108 # Pool.imap with optimal chunk size 

109 chunksize = max(1, data_size // (self.cpu_count * 4)) 

110 imap_func = lambda f, d: benchmark_pool_imap(f, d, chunksize=chunksize) # noqa: E731 

111 stats = self.benchmark_method( 

112 "pool_imap", imap_func, task_func, test_data, runs_per_method 

113 ) 

114 self._record_result("pool_imap", task_name, data_size, stats) 

115 

116 # Pool.imap_unordered 

117 imap_unord_func = lambda f, d: benchmark_pool_imap_unordered( # noqa: E731 

118 f, d, chunksize=chunksize 

119 ) 

120 stats = self.benchmark_method( 

121 "pool_imap_unordered", 

122 imap_unord_func, 

123 task_func, 

124 test_data, 

125 runs_per_method, 

126 ) 

127 self._record_result("pool_imap_unordered", task_name, data_size, stats) 

128 

129 # run_maybe_parallel (ordered) 

130 rmp_func = lambda f, d: benchmark_run_maybe_parallel( # noqa: E731 

131 f, d, parallel=True 

132 ) 

133 stats = self.benchmark_method( 

134 "run_maybe_parallel", 

135 rmp_func, 

136 task_func, 

137 test_data, 

138 runs_per_method, 

139 ) 

140 self._record_result("run_maybe_parallel", task_name, data_size, stats) 

141 

142 # run_maybe_parallel (unordered) 

143 rmp_unord_func = lambda f, d: benchmark_run_maybe_parallel( # noqa: E731 

144 f, d, parallel=True, keep_ordered=False 

145 ) 

146 stats = self.benchmark_method( 

147 "run_maybe_parallel_unordered", 

148 rmp_unord_func, 

149 task_func, 

150 test_data, 

151 runs_per_method, 

152 ) 

153 self._record_result( 

154 "run_maybe_parallel_unordered", task_name, data_size, stats 

155 ) 

156 

157 return self._create_dataframe() 

158 

159 def _record_result( 

160 self, method: str, task: str, data_size: int, stats: Dict[str, float] 

161 ): 

162 """Record benchmark result.""" 

163 self.results["method"].append(method) 

164 self.results["task"].append(task) 

165 self.results["data_size"].append(data_size) 

166 self.results["mean_time"].append(stats["mean"]) 

167 self.results["std_time"].append(stats["std"]) 

168 self.results["min_time"].append(stats["min"]) 

169 self.results["max_time"].append(stats["max"]) 

170 self.results["median_time"].append(stats["median"]) 

171 

172 def _create_dataframe(self) -> pd.DataFrame: 

173 """Create DataFrame from results.""" 

174 df = pd.DataFrame(self.results) 

175 

176 # Calculate speedup relative to sequential 

177 sequential_times = df[df["method"] == "sequential"][ 

178 ["task", "data_size", "mean_time"] 

179 ] 

180 sequential_times = sequential_times.rename( 

181 columns={"mean_time": "sequential_time"} 

182 ) 

183 

184 df = df.merge(sequential_times, on=["task", "data_size"]) 

185 df["speedup"] = df["sequential_time"] / df["mean_time"] 

186 

187 return df 

188 

189 

190def benchmark_sequential(func: Callable, data: List[int]) -> Tuple[List[Any], float]: 

191 """Benchmark sequential processing.""" 

192 start = time.perf_counter() 

193 results = [func(x) for x in data] 

194 end = time.perf_counter() 

195 return results, end - start 

196 

197 

198def benchmark_pool_map( 

199 func: Callable, data: List[int], processes: int | None = None 

200) -> Tuple[List[Any], float]: 

201 """Benchmark using multiprocessing.Pool.map.""" 

202 start = time.perf_counter() 

203 with multiprocessing.Pool(processes) as pool: 

204 results = pool.map(func, data) 

205 end = time.perf_counter() 

206 return results, end - start 

207 

208 

209def benchmark_pool_imap( 

210 func: Callable, data: List[int], processes: int | None = None, chunksize: int = 1 

211) -> Tuple[List[Any], float]: 

212 """Benchmark using multiprocessing.Pool.imap.""" 

213 start = time.perf_counter() 

214 with multiprocessing.Pool(processes) as pool: 

215 results = list(pool.imap(func, data, chunksize=chunksize)) 

216 end = time.perf_counter() 

217 return results, end - start 

218 

219 

220def benchmark_pool_imap_unordered( 

221 func: Callable, data: List[int], processes: int | None = None, chunksize: int = 1 

222) -> Tuple[List[Any], float]: 

223 """Benchmark using multiprocessing.Pool.imap_unordered.""" 

224 start = time.perf_counter() 

225 with multiprocessing.Pool(processes) as pool: 

226 results = list(pool.imap_unordered(func, data, chunksize=chunksize)) 

227 end = time.perf_counter() 

228 return results, end - start 

229 

230 

231def benchmark_run_maybe_parallel( 

232 func: Callable, 

233 data: List[int], 

234 parallel: Union[bool, int], 

235 keep_ordered: bool = True, 

236 chunksize: int | None = None, 

237) -> Tuple[List[Any], float]: 

238 """Benchmark using run_maybe_parallel.""" 

239 start = time.perf_counter() 

240 results = run_maybe_parallel( 

241 func=func, 

242 iterable=data, 

243 parallel=parallel, 

244 keep_ordered=keep_ordered, 

245 chunksize=chunksize, 

246 pbar="none", # Disable progress bar for fair comparison 

247 ) 

248 end = time.perf_counter() 

249 return results, end - start 

250 

251 

252def plot_speedup_by_data_size( 

253 df: pd.DataFrame, task_type: str | None = None, save_path: str | Path | None = None 

254): 

255 """Plot speedup vs data size for different methods.""" 

256 import matplotlib.pyplot as plt # type: ignore[import-untyped] 

257 

258 fig, ax = plt.subplots(figsize=(10, 6)) 

259 

260 # Filter by task type if specified 

261 plot_df = df[df["task"] == task_type] if task_type else df 

262 

263 # Group by method and plot 

264 for method in plot_df["method"].unique(): 

265 if method == "sequential": 

266 continue 

267 method_df = plot_df[plot_df["method"] == method] 

268 ax.plot(method_df["data_size"], method_df["speedup"], marker="o", label=method) 

269 

270 ax.set_xlabel("Data Size") 

271 ax.set_ylabel("Speedup (vs Sequential)") 

272 ax.set_title(f"Speedup by Data Size{f' ({task_type} tasks)' if task_type else ''}") 

273 ax.set_xscale("log") 

274 ax.axhline(y=1, color="gray", linestyle="--", alpha=0.5) 

275 ax.legend() 

276 ax.grid(True, alpha=0.3) 

277 

278 if save_path: 

279 plt.savefig(save_path) 

280 else: 

281 plt.show() 

282 

283 

284def plot_timing_comparison( 

285 df: pd.DataFrame, data_size: int | None = None, save_path: str | Path | None = None 

286): 

287 """Plot timing comparison as bar chart.""" 

288 import matplotlib.pyplot as plt # type: ignore[import-untyped] 

289 

290 # Filter by data size if specified 

291 plot_df = df[df["data_size"] == data_size] if data_size else df 

292 

293 # Pivot for easier plotting 

294 pivot_df = plot_df.pivot_table(index="task", columns="method", values="mean_time") 

295 

296 ax = pivot_df.plot(kind="bar", figsize=(12, 6), rot=0) 

297 ax.set_ylabel("Time (seconds)") 

298 ax.set_title( 

299 f"Timing Comparison{f' (Data Size: {data_size})' if data_size else ''}" 

300 ) 

301 ax.legend(title="Method", bbox_to_anchor=(1.05, 1), loc="upper left") 

302 

303 if save_path: 

304 plt.tight_layout() 

305 plt.savefig(save_path) 

306 else: 

307 plt.show() 

308 

309 

310def plot_efficiency_heatmap(df: pd.DataFrame, save_path: str | Path | None = None): 

311 """Plot efficiency heatmap (speedup across methods and tasks).""" 

312 import matplotlib.pyplot as plt # type: ignore[import-untyped] 

313 

314 # Create pivot table for heatmap 

315 pivot_df = df.pivot_table( 

316 index=["task", "data_size"], columns="method", values="speedup" 

317 ) 

318 

319 # Create heatmap 

320 plt.figure(figsize=(12, 8)) 

321 # sns.heatmap( 

322 # pivot_df, 

323 # annot=True, 

324 # fmt=".2f", 

325 # cmap="YlOrRd", 

326 # vmin=0, 

327 # center=1, 

328 # cbar_kws={"label": "Speedup"}, 

329 # ) 

330 plt.imshow(pivot_df, aspect="auto", cmap="YlOrRd", vmin=0) 

331 plt.colorbar(label="Speedup") 

332 plt.yticks(range(len(pivot_df.index)), [f"{t[0]}-{t[1]}" for t in pivot_df.index]) 

333 plt.xticks(range(len(pivot_df.columns)), list(pivot_df.columns), rotation=45) 

334 plt.title("Parallelization Efficiency Heatmap") 

335 plt.tight_layout() 

336 

337 if save_path: 

338 plt.savefig(save_path) 

339 else: 

340 plt.show() 

341 

342 

343def print_summary_stats(df: pd.DataFrame): 

344 """Print summary statistics from benchmark results.""" 

345 print("\n=== BENCHMARK SUMMARY ===") 

346 print(f"\nTotal configurations tested: {len(df)}") 

347 

348 # Best method by task type 

349 print("\nBest methods by task type (highest average speedup):") 

350 best_by_task = ( 

351 df[df["method"] != "sequential"] 

352 .groupby("task") 

353 .apply( 

354 lambda x: x.loc[x["speedup"].idxmax()][["method", "speedup", "data_size"]] 

355 ) 

356 ) 

357 print(best_by_task) 

358 

359 # Overall best speedups 

360 print("\nTop 5 speedups achieved:") 

361 top_speedups = df[df["method"] != "sequential"].nlargest(5, "speedup")[ 

362 ["method", "task", "data_size", "speedup", "mean_time"] 

363 ] 

364 print(top_speedups) 

365 

366 # Method rankings 

367 print("\nAverage speedup by method:") 

368 avg_speedup = ( 

369 df[df["method"] != "sequential"] 

370 .groupby("method")["speedup"] 

371 .agg(["mean", "std"]) 

372 ) 

373 print(avg_speedup.sort_values("mean", ascending=False)) 

374 

375 

376_DEFAULT_TASK_FUNCS: dict[str, Callable[[int], int]] = { 

377 "cpu_bound": cpu_bound_task, 

378 "io_bound": io_bound_task, 

379 "light_cpu": light_cpu_task, 

380} 

381 

382 

383def main( 

384 data_sizes: Sequence[int] = (100, 1000, 5000, 10000), 

385 base_path: Path = Path("."), 

386 plot: bool = True, 

387 task_funcs: Optional[Dict[str, Callable[[int], int]]] = None, 

388): 

389 """Run benchmarks and display results.""" 

390 print("Starting parallelization benchmark...") 

391 

392 base_path = Path(base_path) 

393 base_path.mkdir(parents=True, exist_ok=True) 

394 

395 # Configure benchmark parameters 

396 if task_funcs is None: 

397 task_funcs = _DEFAULT_TASK_FUNCS 

398 

399 # Run benchmarks 

400 runner = BenchmarkRunner() 

401 df = runner.run_benchmark_suite(data_sizes, task_funcs, runs_per_method=3) 

402 

403 # Save results 

404 df.to_csv(base_path / "benchmark_results.csv", index=False) 

405 print("\nResults saved to benchmark_results.csv") 

406 

407 # Display summary 

408 print_summary_stats(df) 

409 

410 if plot: 

411 # Create visualizations 

412 import matplotlib # type: ignore[import-untyped] 

413 

414 matplotlib.use("Agg") # Use non-interactive backend 

415 

416 # Plot speedup by data size for each task type 

417 for task in task_funcs.keys(): 

418 plot_speedup_by_data_size(df, task, base_path / f"speedup_{task}.png") 

419 

420 # Plot timing comparison for largest data size 

421 plot_timing_comparison(df, data_sizes[-1], base_path / "timing_comparison.png") 

422 

423 # Plot efficiency heatmap 

424 plot_efficiency_heatmap(df, base_path / "efficiency_heatmap.png") 

425 

426 return df 

427 

428 

429if __name__ == "__main__": 

430 df = main() 

431 print("\nDataFrame columns:", df.columns.tolist()) 

432 print("\nFirst few rows:") 

433 print(df.head(10))