Python: Threading and ThreadPoolExecutor Examples
Table of Contents
Section titled “Table of Contents”Basic Usage
Section titled “Basic Usage”from threading import Threadfrom time import sleep, time
def slowFunction(time=5): """Create example of a slow function""" sleep(time)
t0 = time()threads = []
for i in range(10): t = Thread(target=slowFunction, args=(1,)) t.start() threads.append(t)for t in threads: t.join(timeout=15)print(f"Finished in: {time()-t0}")Finished in: 1.0109996795654297ThreadPoolExecutor
Section titled “ThreadPoolExecutor”ThreadPoolExecutor provides a higher-level interface for managing thread pools, making it easier to parallelize tasks without manually managing threads.
Basic Example
Section titled “Basic Example”from concurrent.futures import ThreadPoolExecutor, as_completedfrom time import sleep, time
def process_task(task_id, duration): """Simulate a task that takes some time""" sleep(duration) return f"Task {task_id} completed in {duration}s"
t0 = time()
# Create a pool with 5 worker threadswith ThreadPoolExecutor(max_workers=5) as executor: # Submit tasks to the executor futures = [executor.submit(process_task, i, 1) for i in range(10)]
# Wait for all tasks to complete and collect results for future in as_completed(futures): print(future.result())
print(f"Finished in: {time()-t0:.2f}s")# Output: Finished in: ~2.00s (10 tasks / 5 workers)Using map() for Simple Parallelism
Section titled “Using map() for Simple Parallelism”from concurrent.futures import ThreadPoolExecutorfrom time import sleep, time
def square(n): """Calculate square of a number (with simulated delay)""" sleep(0.1) return n * n
t0 = time()
with ThreadPoolExecutor(max_workers=4) as executor: numbers = range(20) results = list(executor.map(square, numbers))
print(f"Results: {results}")print(f"Finished in: {time()-t0:.2f}s")# Output: Finished in: ~0.50s (20 tasks / 4 workers)Error Handling with ThreadPoolExecutor
Section titled “Error Handling with ThreadPoolExecutor”from concurrent.futures import ThreadPoolExecutor, as_completed
def risky_operation(value): """Operation that might fail""" if value % 3 == 0: raise ValueError(f"Cannot process {value}") return value * 2
with ThreadPoolExecutor(max_workers=3) as executor: futures = {executor.submit(risky_operation, i): i for i in range(10)}
for future in as_completed(futures): original_value = futures[future] try: result = future.result() print(f"Success: {original_value} -> {result}") except Exception as exc: print(f"Error processing {original_value}: {exc}")AsyncIO Examples
Section titled “AsyncIO Examples”asyncio is best for high-concurrency I/O workloads (HTTP calls, DB/network waits) without creating many OS threads.
Basic async gather()
Section titled “Basic async gather()”import asynciofrom time import perf_counter
async def fetch(task_id: int, delay: float) -> str: await asyncio.sleep(delay) return f"Task {task_id} finished"
async def main(): t0 = perf_counter() tasks = [fetch(i, 1.0) for i in range(10)] results = await asyncio.gather(*tasks) print(results[:3], "...") print(f"Finished in: {perf_counter() - t0:.2f}s") # ~1.00s because waits overlap
asyncio.run(main())Bounded Concurrency with Semaphore
Section titled “Bounded Concurrency with Semaphore”import asyncio
sem = asyncio.Semaphore(3)
async def process_item(i: int) -> int: async with sem: await asyncio.sleep(0.5) return i * i
async def main(): tasks = [process_item(i) for i in range(10)] results = await asyncio.gather(*tasks) print(results)
asyncio.run(main())Timeout and Cancellation
Section titled “Timeout and Cancellation”import asyncio
async def slow_call(): await asyncio.sleep(5) return "done"
async def main(): try: result = await asyncio.wait_for(slow_call(), timeout=1.0) print(result) except asyncio.TimeoutError: print("Timed out")
asyncio.run(main())Running Blocking Code from AsyncIO
Section titled “Running Blocking Code from AsyncIO”Use this when you already have blocking code and still want async orchestration.
import asynciofrom concurrent.futures import ThreadPoolExecutorfrom time import sleep
def blocking_work(x: int) -> int: sleep(0.5) return x * 10
async def main(): loop = asyncio.get_running_loop() with ThreadPoolExecutor(max_workers=4) as pool: tasks = [loop.run_in_executor(pool, blocking_work, i) for i in range(8)] results = await asyncio.gather(*tasks) print(results)
asyncio.run(main())Quick Rule of Thumb
Section titled “Quick Rule of Thumb”ThreadPoolExecutor: great for existing blocking I/O code.asyncio: best for native async I/O with high concurrency.- Mix them when needed: async orchestration + thread pool for legacy blocking calls.
Comparison Table
Section titled “Comparison Table”| Approach | Best For | Strengths | Trade-offs | Typical Interview Use |
|---|---|---|---|---|
threading.Thread | Small number of manual threads | Full control over lifecycle | More boilerplate, easier to make coordination mistakes | ”Show basic thread start/join flow” |
ThreadPoolExecutor | Blocking I/O tasks in parallel | Simple API, easy scaling with max_workers | Still uses threads, not ideal for very high concurrency counts | ”Parallelize N blocking tasks with error handling” |
asyncio | High-concurrency async I/O | Low overhead per task, structured cancellation/timeouts | Requires async-compatible libraries and event-loop thinking | ”Design non-blocking fan-out with bounded concurrency” |
asyncio + run_in_executor | Async app with legacy blocking code | Lets you keep async orchestration while reusing blocking functions | Adds mixed model complexity | ”Integrate old sync code in async pipeline” |
Scenario Chooser
Section titled “Scenario Chooser”-
“I need to parallelize existing blocking API calls quickly.” Use
ThreadPoolExecutor. -
“I need thousands of concurrent network waits with strict timeout behavior.” Use
asynciowithSemaphore+wait_for. -
“My app is already async, but one library is blocking.” Use
run_in_executorfor just the blocking boundary. -
“I need low-level thread lifecycle control for a special case.” Use raw
threading.Thread.
AsyncIO Pattern Cookbook
Section titled “AsyncIO Pattern Cookbook”1) Basic async and await with concurrent work
Section titled “1) Basic async and await with concurrent work”import asyncioimport random
async def worker(name: str, delay: float) -> str: await asyncio.sleep(delay) return f"{name} done after {delay:.2f}s"
async def main() -> None: jobs = [worker(f"job-{i}", random.uniform(0.2, 1.0)) for i in range(8)] results = await asyncio.gather(*jobs) print("\n".join(results))
if __name__ == "__main__": asyncio.run(main())Interview notes:
- Common bug: forgetting to
awaitasyncio.gather, so coroutines never execute. - Trade-off:
gatheris simple and fast for fan-out, but one failing task can fail the whole call unlessreturn_exceptions=True. - Likely follow-up: “How do you keep partial successes when one task fails?“
2) Create tasks, cancel, and handle timeouts
Section titled “2) Create tasks, cancel, and handle timeouts”import asyncio
async def long_task() -> None: try: while True: await asyncio.sleep(0.3) print("working...") except asyncio.CancelledError: print("cleanup on cancel") raise
async def main() -> None: t = asyncio.create_task(long_task()) try: await asyncio.wait_for(asyncio.sleep(1.2), timeout=2) finally: t.cancel() try: await t except asyncio.CancelledError: print("task cancelled")
if __name__ == "__main__": asyncio.run(main())Interview notes:
- Common bug: canceling tasks without awaiting them, leaving noisy warnings and incomplete cleanup.
- Trade-off: cancellation improves responsiveness, but requires cleanup discipline in
CancelledErrorhandlers. - Likely follow-up: “How do you enforce cancellation budgets across nested tasks?“
3) Concurrency limit with Semaphore
Section titled “3) Concurrency limit with Semaphore”import asyncioimport random
async def fetch(i: int, sem: asyncio.Semaphore) -> str: async with sem: await asyncio.sleep(random.uniform(0.2, 1.0)) return f"item {i}"
async def main() -> None: sem = asyncio.Semaphore(3) tasks = [asyncio.create_task(fetch(i, sem)) for i in range(12)] for coro in asyncio.as_completed(tasks): print(await coro)
if __name__ == "__main__": asyncio.run(main())Interview notes:
- Common bug: unbounded task creation even with semaphore; memory still spikes if task list is huge.
- Trade-off: semaphore protects downstream dependencies but can increase end-to-end latency.
- Likely follow-up: “How do you pick semaphore size in production?“
4) Producer and consumer with asyncio.Queue
Section titled “4) Producer and consumer with asyncio.Queue”import asyncioimport random
async def producer(q: asyncio.Queue[int], n: int) -> None: for i in range(n): await asyncio.sleep(random.uniform(0.05, 0.2)) await q.put(i) await q.put(-1) # sentinel
async def consumer(q: asyncio.Queue[int]) -> None: while True: item = await q.get() if item == -1: await q.put(-1) q.task_done() break await asyncio.sleep(random.uniform(0.05, 0.2)) print(f"processed {item}") q.task_done()
async def main() -> None: q: asyncio.Queue[int] = asyncio.Queue() prod = asyncio.create_task(producer(q, 20)) cons = [asyncio.create_task(consumer(q)) for _ in range(3)] await prod await q.join() await asyncio.gather(*cons)
if __name__ == "__main__": asyncio.run(main())Interview notes:
- Common bug: forgetting
q.task_done(), causingq.join()to block forever. - Trade-off: queue-based decoupling smooths burst traffic but adds buffering delay and backpressure tuning needs.
- Likely follow-up: “How would you shut down cleanly with multiple producers and consumers?“
5) Run blocking code without freezing the loop (threads)
Section titled “5) Run blocking code without freezing the loop (threads)”import asyncioimport timefrom concurrent.futures import ThreadPoolExecutor
def blocking_io(x: int) -> str: time.sleep(0.5) return f"blocking {x}"
async def main() -> None: loop = asyncio.get_running_loop() with ThreadPoolExecutor(max_workers=4) as pool: tasks = [loop.run_in_executor(pool, blocking_io, i) for i in range(10)] for r in await asyncio.gather(*tasks): print(r)
if __name__ == "__main__": asyncio.run(main())Interview notes:
- Common bug: sending CPU-heavy work to thread pool and expecting linear speedup under GIL.
- Trade-off:
run_in_executoris great for legacy blocking I/O, but mixed async/thread models add complexity. - Likely follow-up: “When would you switch from thread pool to process pool?“
6) Async generator for streaming results
Section titled “6) Async generator for streaming results”import asyncioimport random
async def stream(n: int): for i in range(n): await asyncio.sleep(random.uniform(0.1, 0.4)) yield i
async def main() -> None: async for i in stream(8): print("got", i)
if __name__ == "__main__": asyncio.run(main())Interview notes:
- Common bug: buffering all stream values before processing, defeating streaming benefits.
- Trade-off: streaming lowers memory footprint and time-to-first-result, but makes retry/idempotency design more complex.
- Likely follow-up: “How do you handle partial failures mid-stream?“
7) Async subprocess
Section titled “7) Async subprocess”import asyncio
async def main() -> None: proc = await asyncio.create_subprocess_exec( "python", "-c", "print('hello from child')", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) out, err = await proc.communicate() print("stdout:", out.decode().strip()) print("stderr:", err.decode().strip()) print("code:", proc.returncode)
if __name__ == "__main__": asyncio.run(main())Interview notes:
- Common bug: ignoring stderr/return code and treating any stdout as success.
- Trade-off: async subprocess keeps event loop responsive, but process management and security constraints become critical.
- Likely follow-up: “How would you enforce subprocess timeout and kill policy?”
Rapid-Fire Async Interview Q and A
Section titled “Rapid-Fire Async Interview Q and A”-
How do you keep partial successes when one task fails in
gather? Useasyncio.gather(..., return_exceptions=True)and post-process results into success/failure buckets, or useas_completedfor per-task handling. -
How do you enforce cancellation budgets across nested tasks? Pass a deadline via
asyncio.timeout()or shared context-style deadline value, then propagate cancellation and require each layer to handleCancelledErrorcleanly. -
How do you choose semaphore size in production? Start from downstream capacity and p95 latency targets, then tune using load tests and live error/latency curves to avoid saturation.
-
How do you shut down cleanly with multiple producers and consumers? Use sentinels per consumer or coordinated cancellation, drain queue, call
task_donecorrectly, and await all task completions before loop exit. -
When do you switch from thread pool to process pool? Use thread pool for blocking I/O, process pool for CPU-bound work where GIL limits throughput; validate with profiling before changing.
-
How do you handle partial failures mid-stream with async generators? Emit structured error events or checkpoints, make downstream consumers idempotent, and support resume/retry from last committed offset.
-
How do you enforce subprocess timeout and kill policy? Wrap with
asyncio.wait_for, on timeout send terminate then kill fallback, capture stdout/stderr/exit code, and mark result as controlled failure.