For the last seven years, I kept re-implementing the same pattern: A parallel map loop that divides the work among several processes or threads..
My very first attempts were built on Python’s standard tools, e.g., multiprocessing.map, and I later explored pqdm (the “Parallel TQDM” library) to simplify the code (as well as to visualize progress on long-running loops) when workers were stateless. Those solutions did work but always felt incomplete and required repeated boilerplate. Much later did I explore tools like Dask and Ray (see examples in the end of the post) and found that, for this specific pattern, they still required a lot of complexity and infrastructure for something that should be simple: A simple, streaming, stateful, parallel map loop with accurate progress tracking and minimal boilerplate — that actually works everywhere.
In 2024, I finally distilled the pattern into a tiny library: mtasklite.
Below is the motivation, the flaws of existing approaches, and what makes mtasklite unique.
Common Pain Points with Existing Tools
Even for simple parallel loops, many libraries fall short:
1. Stateful workers with flexible initialization
Most map APIs assume stateless functions. When workers must load models or context before processing, solutions become awkward or require elaborate hacks.
2. True streaming input/output
Many frameworks materialize outputs in bulk rather than yielding results as soon as they are ready. This breaks bounded-memory pipelines.
3. Size-aware streaming for progress tracking for limited-size input/output
Tools that process fixed-size input arrays often fail to expose an output with a usable len(...). Without a known length, progress bars like tqdm can’t estimate remaining time.
4. Notebook compatibility
Some parallel tools (e.g., Ray actors, certain ways of using multiprocessing) behave unpredictably in Jupyter/IPython environments.
5. Boilerplate and cognitive overhead
Great libraries like Dask and Ray are powerful, but for a simple map pattern they introduce clusters, schedulers, plugins, actors, futures, and daemons (see examples in the end of the post).
6. Lightweight API
Many real workloads don’t need a full distributed system — just a clean, streaming, parallel map loop.
A more detailed discussion of these limitations follows later.
What Makes mtasklite Different
Minimal Boilerplate
You don’t need:
- Clusters or schedulers
- Actors or plugins
- Futures or object references, which need to be checked for completion
- Background daemons or services
Stateful Workers with Flexible Initialization
Workers can:
- Initialize once with stateful setup
- Receive per-worker init arguments
- Maintain local state across tasks
This is essential for real workloads — model loading, database connections, tokenizers, GPU context, etc.
True Streaming Input + Output
mtasklite works with:
- Any input iterator
- True streaming of results
⟶ Outputs appear as soon as they’re ready
⟶ Memory stays bounded (unless requested)
⟶ No buffering of the entire input and output set
This is crucial for real world pipelines that process huge datasets.
Size-Aware Output for Progress Bars
When the input iterator has a known length, the mtasklite output iterator provides a length function as well. This enables easy and accurate progress bars (tqdm, etc.).
Works Everywhere
No matter the environment:
- Jupyter Notebooks
- Python REPL
- Scripts
- Interactive environments
mtasklite avoids features that frequently misbehave in notebooks or interactive shells.
Lightweight and Elegant API
The API stays close to familiar Python map semantics:
for out in parallel_map(f, xs):
...
Easy Migration from pqdm
If you’re already a pqdm user — the “Parallel TQDM” library built on concurrent.futures + tqdm — switching to mtasklite is very easy. The API is nearly 100% compatible, and there’s a dedicated migration guide describing a couple of tiny but necessary differences.
That means you can enjoy:
- Per-worker initialization
- True streaming output
- Sized iterators for progress bars
- Notebook-friendly behavior
…without rewriting your existing loops.
A Motivating Example
Below is a (somewhat contrived) example of a stateful, class-based, worker that computes a square root. It is implemented as a regular class with the constructor and a __call__ function with the only nuance: It is decorated using the @delayed_init decorator. This decorator "wraps" the actual object inside a shell object, which only memorizes object's initialization parameters. An actual instantiation is delayed till a worker process (or thread) starts.
from mtasklite import delayed_init
from mtasklite.processes import pqdm
@delayed_init
class Square:
def __init__(self, worker_id):
print(f"Initializing worker {worker_id}")
def __call__(self, x):
return x * x
inputs = [1, 2, 3, 4, 5]
with pqdm(inputs, [Square(0), Square(1), Square(2), Square(3)]) as out:
for y in out:
print(y)
This provides:
- Stateful worker initialization
- Streaming output
- Sized iterator compatible with progress bars
- Bounded memory
- Works in notebooks and scripts
- No external infrastructure
Why Other Tools Did Not Fit
Below is a concise explanation of why other popular tools don’t satisfy this pattern without complexity.
multiprocessing + tqdm
Using multiprocessing and tqdm together requires a lot of repetition and manual work:
- Worker init routines are limited
- Integrating with progress bars often requires imap/imap_unordered patterns
- You must manage iterator lengths manually to drive tqdm correctly
- Behavior can be unpredictable in notebooks
Yes, you can piece together something yourself — but you end up rewriting the same wrapper logic repeatedly. Below is an example of how cumbersome it is:
from tqdm.auto import tqdm
from multiprocessing import Pool
def init_worker(proc_id):
global WORKER_ID
WORKER_ID = proc_id
print(f"[Worker init] ID = {WORKER_ID}")
def worker_task(a):
# Use global WORKER_ID if needed
return a * a
if __name__ == "__main__":
proc_ids = [0,1,2,3]
input_vals = list(range(20))
chunksize=1
result = []
with Pool(processes=len(proc_ids), initializer=init_worker, initargs=(proc_ids.pop(0),)) as pool:
# map is a blocking call that return the result only when all input is processed
#for ret_val in tqdm(pool.map(worker_task, input_vals, chunksize)):
for ret_val in tqdm(pool.imap(worker_task, input_vals, chunksize)):
result.append(ret_val)
print(result)
Dask
Dask is powerful for distributed computation, but for this pattern it is an overkill (which, again, requires additional boilerplate).Dask excels at DAGs and large dataframes, but doesn’t treat this lightweight map pattern as a first-class use case. In this specific example it was also 10x slower compared to mtasklite (with a 6-core Intel Mac).
from dask.distributed import LocalCluster, Client
from tqdm.auto import tqdm
class SquareActor:
def __init__(self, proc_id):
import multiprocess as mp
print(f"Initialized process {mp.current_process()} with argument = {proc_id}")
self.proc_id = proc_id
def compute(self, a):
# Method to call for each task
return a * a
def run_with_square_actors(input_arr, proc_ids):
# Start cluster & client via context managers
with LocalCluster(n_workers=len(proc_ids), processes=True) as cluster:
with Client(cluster) as client:
# Create one actor per proc_id
actor_futures = [
client.submit(SquareActor, pid, actor=True)
for pid in proc_ids
]
actors = [f.result() for f in actor_futures]
# Dispatch work to actors by calling the method `.compute(...)`
results = []
for i, val in enumerate(tqdm(input_arr)):
actor = actors[i % len(actors)]
# This returns an actor future; .result() waits for execution
results.append(actor.compute(val).result())
return results
if __name__ == "__main__":
input_arr = list(range(20))
proc_ids = [0, 1, 2, 3]
result = run_with_square_actors(input_arr, proc_ids)
print(result)
Ray
Ray’s actor model solves stateful workers elegantly, but:
- Ray runtime initialization often conflicts with notebooks
- There’s no built-in streaming map API; you write your own loops
- API feels distributed-system-first rather than lightweight
Ray is a powerful platform, but still not “simple map loop first.” It can also be quite heavy-weight (due to start up overhead). In this specific example, it is about 40x slower compared to mtasklite:
#!/usr/bin/env python
import ray
from collections import deque
from tqdm.auto import tqdm
ray.init()
@ray.remote
class Worker:
def __init__(self, proc_id):
import multiprocess as mp
print(f"Initialized worker {proc_id} in process {mp.current_process()}")
self.proc_id = proc_id
def compute(self, x):
return x * x
def ray_stateful_stream_map(inputs, proc_ids, worker_qty=8):
"""
A generator that streams outputs as they complete,
using stateful Ray actors with proc_ids.
Args:
inputs: An iterable of inputs.
proc_ids: A list of init args for each actor.
worker_qty: Max number of workers
"""
actors = [Worker.remote(pid) for pid in proc_ids]
pending = deque()
it = iter(inputs)
# Submit initial batch
for _ in range(min(worker_qty, len(proc_ids))):
try:
val = next(it)
except StopIteration:
break
# round-robin
actor = actors[len(pending) % len(actors)]
pending.append(actor.compute.remote(val))
while pending:
# Wait for the next ready future
ready, _ = ray.wait(list(pending), num_returns=1)
ref = ready[0]
pending.remove(ref)
yield ray.get(ref)
# Submit more work
try:
val = next(it)
actor = actors[len(pending) % len(actors)]
pending.append(actor.compute.remote(val))
except StopIteration:
pass
result = []
for ret_val in tqdm(ray_stateful_stream_map(range(1, 21), proc_ids=[0,1,2,3], worker_qty=8)):
result.append(ret_val)
print(result)
Conclusion
I did not set out to build another parallelism library — I just kept rewriting this same code for six years:
- Stateful workers with init
- Streaming outputs
- Sized outputs for progress bars
- Minimal boilerplate
- Works everywhere
Once I distilled it into a tiny, composable API, it became clear this pattern deserved its own package.
That package is mtasklite — a tiny, elegant, minimal toolkit for real-world parallel map loops.
GitHub:
https://github.com/searchivarius/py_mtasklite