Cwork: A High-Performance Python Library!

“Small tasks, big concurrency”—these six words are the hallmark of the cwork module.

It packs threads, processes, and coroutines into a “toolbox,” allowing you to run your CPU at full capacity with just three lines of code, and another three lines to fully utilize IO.

In this article, we will unpack this toolbox to see just how smooth it operates.

1. Three Lines to Set Up a Thread Pool

The ThreadPool in cwork is like a delivery rider, always ready to respond.

Below, we will concurrently compress 20 images without manually writing a Queue or worrying about joins:

from cwork import ThreadPool
import requests, PIL.Image as Image, io

def compress(url):
    img = Image.open(io.BytesIO(requests.get(url).content))
    img.save(f"thumb_{url.split('/')[-1]}", quality=60)

urls = [f"https://picsum.photos/800/600?random={i}" for i in range(20)]
with ThreadPool(max_workers=8) as pool:
    pool.map(compress, urls)
print("All thumbnails have been generated")

2. One Line to Switch to a Process Pool

CPU-intensive tasks fear the GIL, but the ProcessPool interface in cwork is 100% symmetrical with the ThreadPool; just change one word to switch engines:

from cwork import ProcessPool
import math

def is_prime(n):
    if n < 2: return False
    for i in range(2, int(math.sqrt(n)) + 1):
        if n % i == 0: return False
    return True

nums = range(10**6, 10**6 + 10000)
with ProcessPool() as pool:
    print(sum(pool.map(is_prime, nums)))

3. Asynchronous Task Flow—Let Tasks Queue Themselves

cwork.asyncflow uses decorators to write dependencies directly into the code, rather than in documentation.

Below, we simulate a three-stage pipeline: “crawl first, then save, and finally notify”:

import asyncio, aiohttp
from cwork import asyncflow

@asyncflow.depend_on()
async def crawl(url):
    async with aiohttp.ClientSession() as s:
        return await s.get(url, timeout=5)

@asyncflow.depend_on(crawl)
async def save(result):
    with open("data.txt", "a") as f:
        f.write(await result.text())

@asyncflow.depend_on(save)
async def notify(_):
    print("New data has been saved, DingTalk bot can be arranged!")

asyncio.run(asyncflow.run("https://httpbin.org/uuid"))

4. Advantages Comparison

Compared to concurrent.futures, cwork unifies the three sets of APIs for threads, processes, and asynchronous operations, allowing engine switching without changing logic;

Compared to Celery, it has zero dependencies and zero configuration, and can be deployed with a single file. However, it has clear drawbacks: it does not come with built-in monitoring, and task failure retries must be manually added with decorators.

Conclusion—For scripts under a hundred lines or one-time batch processing, cwork is the most handy; for long-term tasks requiring queue persistence, Celery or RQ is still the way to go.

5. Conclusion

After seeing these three tools, are you itching to try them out?

Share your use cases in the comments, and let’s explore cwork together!

Recommended Reading:

  • The Lightweight Champion! Build a Visual DAG with Python Prefect in 5 Minutes
  • Httpx, an Efficient Python Module!
  • Lightweight | Stout Mini Cannon: Zero Dependency Rainbow Output, Turn Your Terminal into IMAX!
  • Denise, an Invisible Python Tool!

Leave a Comment