Skip to content

Python: Threading and ThreadPoolExecutor Examples

from threading import Thread
from time import sleep, time
def slowFunction(time=5):
"""Create example of a slow function"""
sleep(time)
t0 = time()
threads = []
for i in range(10):
t = Thread(target=slowFunction, args=(1,))
t.start()
threads.append(t)
for t in threads:
t.join(timeout=15)
print(f"Finished in: {time()-t0}")
Finished in: 1.0109996795654297

ThreadPoolExecutor provides a higher-level interface for managing thread pools, making it easier to parallelize tasks without manually managing threads.

from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep, time
def process_task(task_id, duration):
"""Simulate a task that takes some time"""
sleep(duration)
return f"Task {task_id} completed in {duration}s"
t0 = time()
# Create a pool with 5 worker threads
with ThreadPoolExecutor(max_workers=5) as executor:
# Submit tasks to the executor
futures = [executor.submit(process_task, i, 1) for i in range(10)]
# Wait for all tasks to complete and collect results
for future in as_completed(futures):
print(future.result())
print(f"Finished in: {time()-t0:.2f}s")
# Output: Finished in: ~2.00s (10 tasks / 5 workers)
from concurrent.futures import ThreadPoolExecutor
from time import sleep, time
def square(n):
"""Calculate square of a number (with simulated delay)"""
sleep(0.1)
return n * n
t0 = time()
with ThreadPoolExecutor(max_workers=4) as executor:
numbers = range(20)
results = list(executor.map(square, numbers))
print(f"Results: {results}")
print(f"Finished in: {time()-t0:.2f}s")
# Output: Finished in: ~0.50s (20 tasks / 4 workers)
from concurrent.futures import ThreadPoolExecutor, as_completed
def risky_operation(value):
"""Operation that might fail"""
if value % 3 == 0:
raise ValueError(f"Cannot process {value}")
return value * 2
with ThreadPoolExecutor(max_workers=3) as executor:
futures = {executor.submit(risky_operation, i): i for i in range(10)}
for future in as_completed(futures):
original_value = futures[future]
try:
result = future.result()
print(f"Success: {original_value} -> {result}")
except Exception as exc:
print(f"Error processing {original_value}: {exc}")

asyncio is best for high-concurrency I/O workloads (HTTP calls, DB/network waits) without creating many OS threads.

import asyncio
from time import perf_counter
async def fetch(task_id: int, delay: float) -> str:
await asyncio.sleep(delay)
return f"Task {task_id} finished"
async def main():
t0 = perf_counter()
tasks = [fetch(i, 1.0) for i in range(10)]
results = await asyncio.gather(*tasks)
print(results[:3], "...")
print(f"Finished in: {perf_counter() - t0:.2f}s")
# ~1.00s because waits overlap
asyncio.run(main())
import asyncio
sem = asyncio.Semaphore(3)
async def process_item(i: int) -> int:
async with sem:
await asyncio.sleep(0.5)
return i * i
async def main():
tasks = [process_item(i) for i in range(10)]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
import asyncio
async def slow_call():
await asyncio.sleep(5)
return "done"
async def main():
try:
result = await asyncio.wait_for(slow_call(), timeout=1.0)
print(result)
except asyncio.TimeoutError:
print("Timed out")
asyncio.run(main())

Use this when you already have blocking code and still want async orchestration.

import asyncio
from concurrent.futures import ThreadPoolExecutor
from time import sleep
def blocking_work(x: int) -> int:
sleep(0.5)
return x * 10
async def main():
loop = asyncio.get_running_loop()
with ThreadPoolExecutor(max_workers=4) as pool:
tasks = [loop.run_in_executor(pool, blocking_work, i) for i in range(8)]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
  • ThreadPoolExecutor: great for existing blocking I/O code.
  • asyncio: best for native async I/O with high concurrency.
  • Mix them when needed: async orchestration + thread pool for legacy blocking calls.
ApproachBest ForStrengthsTrade-offsTypical Interview Use
threading.ThreadSmall number of manual threadsFull control over lifecycleMore boilerplate, easier to make coordination mistakes”Show basic thread start/join flow”
ThreadPoolExecutorBlocking I/O tasks in parallelSimple API, easy scaling with max_workersStill uses threads, not ideal for very high concurrency counts”Parallelize N blocking tasks with error handling”
asyncioHigh-concurrency async I/OLow overhead per task, structured cancellation/timeoutsRequires async-compatible libraries and event-loop thinking”Design non-blocking fan-out with bounded concurrency”
asyncio + run_in_executorAsync app with legacy blocking codeLets you keep async orchestration while reusing blocking functionsAdds mixed model complexity”Integrate old sync code in async pipeline”
  1. “I need to parallelize existing blocking API calls quickly.” Use ThreadPoolExecutor.

  2. “I need thousands of concurrent network waits with strict timeout behavior.” Use asyncio with Semaphore + wait_for.

  3. “My app is already async, but one library is blocking.” Use run_in_executor for just the blocking boundary.

  4. “I need low-level thread lifecycle control for a special case.” Use raw threading.Thread.

1) Basic async and await with concurrent work

Section titled “1) Basic async and await with concurrent work”
import asyncio
import random
async def worker(name: str, delay: float) -> str:
await asyncio.sleep(delay)
return f"{name} done after {delay:.2f}s"
async def main() -> None:
jobs = [worker(f"job-{i}", random.uniform(0.2, 1.0)) for i in range(8)]
results = await asyncio.gather(*jobs)
print("\n".join(results))
if __name__ == "__main__":
asyncio.run(main())

Interview notes:

  • Common bug: forgetting to await asyncio.gather, so coroutines never execute.
  • Trade-off: gather is simple and fast for fan-out, but one failing task can fail the whole call unless return_exceptions=True.
  • Likely follow-up: “How do you keep partial successes when one task fails?“

2) Create tasks, cancel, and handle timeouts

Section titled “2) Create tasks, cancel, and handle timeouts”
import asyncio
async def long_task() -> None:
try:
while True:
await asyncio.sleep(0.3)
print("working...")
except asyncio.CancelledError:
print("cleanup on cancel")
raise
async def main() -> None:
t = asyncio.create_task(long_task())
try:
await asyncio.wait_for(asyncio.sleep(1.2), timeout=2)
finally:
t.cancel()
try:
await t
except asyncio.CancelledError:
print("task cancelled")
if __name__ == "__main__":
asyncio.run(main())

Interview notes:

  • Common bug: canceling tasks without awaiting them, leaving noisy warnings and incomplete cleanup.
  • Trade-off: cancellation improves responsiveness, but requires cleanup discipline in CancelledError handlers.
  • Likely follow-up: “How do you enforce cancellation budgets across nested tasks?“
import asyncio
import random
async def fetch(i: int, sem: asyncio.Semaphore) -> str:
async with sem:
await asyncio.sleep(random.uniform(0.2, 1.0))
return f"item {i}"
async def main() -> None:
sem = asyncio.Semaphore(3)
tasks = [asyncio.create_task(fetch(i, sem)) for i in range(12)]
for coro in asyncio.as_completed(tasks):
print(await coro)
if __name__ == "__main__":
asyncio.run(main())

Interview notes:

  • Common bug: unbounded task creation even with semaphore; memory still spikes if task list is huge.
  • Trade-off: semaphore protects downstream dependencies but can increase end-to-end latency.
  • Likely follow-up: “How do you pick semaphore size in production?“

4) Producer and consumer with asyncio.Queue

Section titled “4) Producer and consumer with asyncio.Queue”
import asyncio
import random
async def producer(q: asyncio.Queue[int], n: int) -> None:
for i in range(n):
await asyncio.sleep(random.uniform(0.05, 0.2))
await q.put(i)
await q.put(-1) # sentinel
async def consumer(q: asyncio.Queue[int]) -> None:
while True:
item = await q.get()
if item == -1:
await q.put(-1)
q.task_done()
break
await asyncio.sleep(random.uniform(0.05, 0.2))
print(f"processed {item}")
q.task_done()
async def main() -> None:
q: asyncio.Queue[int] = asyncio.Queue()
prod = asyncio.create_task(producer(q, 20))
cons = [asyncio.create_task(consumer(q)) for _ in range(3)]
await prod
await q.join()
await asyncio.gather(*cons)
if __name__ == "__main__":
asyncio.run(main())

Interview notes:

  • Common bug: forgetting q.task_done(), causing q.join() to block forever.
  • Trade-off: queue-based decoupling smooths burst traffic but adds buffering delay and backpressure tuning needs.
  • Likely follow-up: “How would you shut down cleanly with multiple producers and consumers?“

5) Run blocking code without freezing the loop (threads)

Section titled “5) Run blocking code without freezing the loop (threads)”
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def blocking_io(x: int) -> str:
time.sleep(0.5)
return f"blocking {x}"
async def main() -> None:
loop = asyncio.get_running_loop()
with ThreadPoolExecutor(max_workers=4) as pool:
tasks = [loop.run_in_executor(pool, blocking_io, i) for i in range(10)]
for r in await asyncio.gather(*tasks):
print(r)
if __name__ == "__main__":
asyncio.run(main())

Interview notes:

  • Common bug: sending CPU-heavy work to thread pool and expecting linear speedup under GIL.
  • Trade-off: run_in_executor is great for legacy blocking I/O, but mixed async/thread models add complexity.
  • Likely follow-up: “When would you switch from thread pool to process pool?“
import asyncio
import random
async def stream(n: int):
for i in range(n):
await asyncio.sleep(random.uniform(0.1, 0.4))
yield i
async def main() -> None:
async for i in stream(8):
print("got", i)
if __name__ == "__main__":
asyncio.run(main())

Interview notes:

  • Common bug: buffering all stream values before processing, defeating streaming benefits.
  • Trade-off: streaming lowers memory footprint and time-to-first-result, but makes retry/idempotency design more complex.
  • Likely follow-up: “How do you handle partial failures mid-stream?“
import asyncio
async def main() -> None:
proc = await asyncio.create_subprocess_exec(
"python", "-c", "print('hello from child')",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
out, err = await proc.communicate()
print("stdout:", out.decode().strip())
print("stderr:", err.decode().strip())
print("code:", proc.returncode)
if __name__ == "__main__":
asyncio.run(main())

Interview notes:

  • Common bug: ignoring stderr/return code and treating any stdout as success.
  • Trade-off: async subprocess keeps event loop responsive, but process management and security constraints become critical.
  • Likely follow-up: “How would you enforce subprocess timeout and kill policy?”
  1. How do you keep partial successes when one task fails in gather? Use asyncio.gather(..., return_exceptions=True) and post-process results into success/failure buckets, or use as_completed for per-task handling.

  2. How do you enforce cancellation budgets across nested tasks? Pass a deadline via asyncio.timeout() or shared context-style deadline value, then propagate cancellation and require each layer to handle CancelledError cleanly.

  3. How do you choose semaphore size in production? Start from downstream capacity and p95 latency targets, then tune using load tests and live error/latency curves to avoid saturation.

  4. How do you shut down cleanly with multiple producers and consumers? Use sentinels per consumer or coordinated cancellation, drain queue, call task_done correctly, and await all task completions before loop exit.

  5. When do you switch from thread pool to process pool? Use thread pool for blocking I/O, process pool for CPU-bound work where GIL limits throughput; validate with profiling before changing.

  6. How do you handle partial failures mid-stream with async generators? Emit structured error events or checkpoints, make downstream consumers idempotent, and support resume/retry from last committed offset.

  7. How do you enforce subprocess timeout and kill policy? Wrap with asyncio.wait_for, on timeout send terminate then kill fallback, capture stdout/stderr/exit code, and mark result as controlled failure.