Complete Guide to Asynchronous Programming in Python

Introduction

Asynchronous programming has become the standard solution for handling high concurrency scenarios in modern Python development. From web services to web scraping, from database operations to network communication, asynchronous programming can significantly enhance the concurrent processing capabilities of programs without increasing thread overhead.

With the introduction of the <span>async/await</span> syntax in Python 3.5, asynchronous programming has become more intuitive. However, to truly leverage asynchronous programming, one must understand the operation of coroutines, the workings of the event loop, and the various use cases for asynchronous tools. This article will systematically introduce the core knowledge points of Python asynchronous programming, from basic usage to advanced techniques, helping you build high-performance asynchronous applications.

Environment Setup

Python 3.7+ has built-in support for asyncio, so no additional installation is required. However, to demonstrate some practical scenarios, we need to install a few commonly used asynchronous libraries:

pip install aiohttp aiofiles aiomysql redis

Basic Asynchronous Usage

Defining and Executing Coroutines

Asynchronous functions are defined using <span>async def</span>. Calling an asynchronous function returns a coroutine object, which needs to be executed through the event loop:

import asyncio
import time

async def fetch_data(id):
    print(f"Starting to fetch data {id}")
    await asyncio.sleep(2)  # Simulate IO operation
    print(f"Data {id} fetch complete")
    return f"data_{id}"

async def main():
    start = time.time()
    
    # Serial execution
    result1 = await fetch_data(1)
    result2 = await fetch_data(2)
    
    print(f"Serial execution time: {time.time() - start:.2f} seconds")
    
    # Concurrent execution
    start = time.time()
    results = await asyncio.gather(
        fetch_data(3),
        fetch_data(4),
        fetch_data(5)
    )
    print(f"Concurrent execution time: {time.time() - start:.2f} seconds")
    print(f"Results: {results}")

# Python 3.7+
asyncio.run(main())

Output:

Starting to fetch data 1
Data 1 fetch complete
Starting to fetch data 2
Data 2 fetch complete
Serial execution time: 4.00 seconds
Starting to fetch data 3
Starting to fetch data 4
Starting to fetch data 5
Data 3 fetch complete
Data 4 fetch complete
Data 5 fetch complete
Concurrent execution time: 2.00 seconds
Results: ['data_3', 'data_4', 'data_5']

Creating Tasks

Using <span>create_task</span> allows you to schedule a coroutine for execution immediately without waiting for await:

async def process_data(name, delay):
    print(f"{name} starting processing")
    await asyncio.sleep(delay)
    print(f"{name} processing complete")
    return f"{name}_result"

async def main():
    # Creating tasks starts execution immediately
    task1 = asyncio.create_task(process_data("Task 1", 2))
    task2 = asyncio.create_task(process_data("Task 2", 1))
    
    print("Tasks created, doing other things...")
    await asyncio.sleep(0.5)
    
    # Wait for tasks to complete
    result1 = await task1
    result2 = await task2
    print(f"Results: {result1}, {result2}")

asyncio.run(main())

Output:

Tasks created, doing other things...
Task 1 starting processing
Task 2 starting processing
Task 2 processing complete
Task 1 processing complete
Results: Task 1_result, Task 2_result

Timeout Control

Use <span>wait_for</span> to set a timeout:

async def long_running_task():
    await asyncio.sleep(5)
    return "Complete"

async def main():
    try:
        result = await asyncio.wait_for(long_running_task(), timeout=2.0)
        print(result)
    except asyncio.TimeoutError:
        print("Task timed out")

asyncio.run(main())

Asynchronous Context Managers

Asynchronous context managers use <span>async with</span> syntax, suitable for resources that require asynchronous initialization and cleanup:

import asyncio

class AsyncDBConnection:
    def __init__(self, db_name):
        self.db_name = db_name
        self.connection = None
    
    async def __aenter__(self):
        print(f"Connecting to database {self.db_name}...")
        await asyncio.sleep(1)  # Simulate connection delay
        self.connection = f"Connection to {self.db_name}"
        print("Database connection successful")
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Closing database connection...")
        await asyncio.sleep(0.5)
        self.connection = None
        print("Database connection closed")
        return False  # Do not suppress exceptions
    
    async def query(self, sql):
        print(f"Executing query: {sql}")
        await asyncio.sleep(0.3)
        return [{"id": 1, "name": "test"}]

async def main():
    async with AsyncDBConnection("mydb") as db:
        results = await db.query("SELECT * FROM users")
        print(f"Query results: {results}")
    
    print("Context exited")

asyncio.run(main())

Output:

Connecting to database mydb...
Database connection successful
Executing query: SELECT * FROM users
Query results: [{'id': 1, 'name': 'test'}]
Closing database connection...
Database connection closed
Context exited

Asynchronous Decorators

Decorators are also important in asynchronous programming, allowing for logging, retries, caching, and more:

import asyncio
import functools
import time

def async_timer(func):
    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        start = time.time()
        result = await func(*args, **kwargs)
        elapsed = time.time() - start
        print(f"{func.__name__} took: {elapsed:.2f} seconds")
        return result
    return wrapper

def async_retry(max_attempts=3, delay=1):
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_attempts - 1:
                        raise
                    print(f"Attempt {attempt + 1} failed: {e}, retrying in {delay} seconds...")
                    await asyncio.sleep(delay)
        return wrapper
    return decorator

@async_timer
@async_retry(max_attempts=3, delay=0.5)
async def unstable_api_call(success_rate=0.3):
    import random
    await asyncio.sleep(0.5)
    if random.random() > success_rate:
        raise Exception("API call failed")
    return "Success"

async def main():
    try:
        result = await unstable_api_call(success_rate=0.9)
        print(f"Result: {result}")
    except Exception as e:
        print(f"Final failure: {e}")

asyncio.run(main())

Callback Functions

Although async/await is mainstream, callback patterns are still useful in certain scenarios:

import asyncio

async def task_with_callback(value, callback):
    await asyncio.sleep(1)
    result = value * 2
    # Call the callback
    await callback(result)
    return result

async def on_complete(result):
    print(f"Task complete, result: {result}")

async def on_error(error):
    print(f"Task failed, error: {error}")

async def main():
    await task_with_callback(10, on_complete)
    
    # Using add_done_callback
    task = asyncio.create_task(task_with_callback(20, on_complete))
    
    def done_callback(future):
        try:
            result = future.result()
            print(f"Result obtained via done_callback: {result}")
        except Exception as e:
            print(f"Task exception: {e}")
    
    task.add_done_callback(done_callback)
    await task

asyncio.run(main())

Output:

Task complete, result: 20
Task complete, result: 40
Result obtained via done_callback: 40

Executing Synchronous Functions in Asynchronous Code

Sometimes it is necessary to call blocking synchronous functions within asynchronous code, which can be done using <span>run_in_executor</span>:

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def blocking_io_task(n):
    print(f"Synchronous task {n} starting")
    time.sleep(2)  # Blocking operation
    print(f"Synchronous task {n} complete")
    return f"result_{n}"

def cpu_bound_task(n):
    """CPU intensive task"""
    print(f"CPU task {n} starting")
    total = sum(i * i for i in range(10**7))
    print(f"CPU task {n} complete")
    return total

async def main():
    loop = asyncio.get_event_loop()
    
    # Execute IO intensive tasks in a thread pool
    start = time.time()
    with ThreadPoolExecutor(max_workers=3) as executor:
        tasks = [
            loop.run_in_executor(executor, blocking_io_task, i)
            for i in range(3)
        ]
        results = await asyncio.gather(*tasks)
    print(f"IO task time: {time.time() - start:.2f} seconds")
    print(f"Results: {results}")
    
    # Execute CPU intensive tasks in a process pool
    start = time.time()
    with ProcessPoolExecutor(max_workers=2) as executor:
        tasks = [
            loop.run_in_executor(executor, cpu_bound_task, i)
            for i in range(2)
        ]
        results = await asyncio.gather(*tasks)
    print(f"CPU task time: {time.time() - start:.2f} seconds")

asyncio.run(main())

Asynchronous Synchronization Primitives

Asynchronous Lock (Lock)

Used to protect shared resources and prevent race conditions:

import asyncio

class Counter:
    def __init__(self):
        self.value = 0
        self.lock = asyncio.Lock()
    
    async def increment(self, name):
        async with self.lock:
            print(f"{name} acquired lock")
            temp = self.value
            await asyncio.sleep(0.1)  # Simulate processing time
            self.value = temp + 1
            print(f"{name} released lock, current value: {self.value}")

async def main():
    counter = Counter()
    
    # Without lock (comment out lock-related code to observe differences)
    tasks = [counter.increment(f"Task {i}") for i in range(5)]
    await asyncio.gather(*tasks)
    
    print(f"Final value: {counter.value}")

asyncio.run(main())

Output:

Task 0 acquired lock
Task 0 released lock, current value: 1
Task 1 acquired lock
Task 1 released lock, current value: 2
Task 2 acquired lock
Task 2 released lock, current value: 3
Task 3 acquired lock
Task 3 released lock, current value: 4
Task 4 acquired lock
Task 4 released lock, current value: 5
Final value: 5

Asynchronous Semaphore

Limits the number of concurrent executions:

import asyncio
import time

async def access_resource(sem, task_id):
    async with sem:
        print(f"Task {task_id} acquired resource")
        await asyncio.sleep(1)
        print(f"Task {task_id} released resource")

async def main():
    # Allow a maximum of 3 concurrent executions
    semaphore = asyncio.Semaphore(3)
    
    start = time.time()
    tasks = [access_resource(semaphore, i) for i in range(10)]
    await asyncio.gather(*tasks)
    
    print(f"Total time: {time.time() - start:.2f} seconds")

asyncio.run(main())

Asynchronous Condition Variable

Used for producer-consumer patterns:

import asyncio

class AsyncQueue:
    def __init__(self):
        self.queue = []
        self.condition = asyncio.Condition()
    
    async def produce(self, item):
        async with self.condition:
            self.queue.append(item)
            print(f"Produced: {item}, queue length: {len(self.queue)}")
            self.condition.notify()  # Notify consumers
    
    async def consume(self):
        async with self.condition:
            while not self.queue:
                print("Queue is empty, waiting...")
                await self.condition.wait()  # Wait for notification
            
            item = self.queue.pop(0)
            print(f"Consumed: {item}, queue length: {len(self.queue)}")
            return item

async def producer(queue, n):
    for i in range(n):
        await asyncio.sleep(0.5)
        await queue.produce(f"item_{i}")

async def consumer(queue, n):
    for i in range(n):
        await queue.consume()
        await asyncio.sleep(1)

async def main():
    queue = AsyncQueue()
    
    await asyncio.gather(
        producer(queue, 5),
        consumer(queue, 5)
    )

asyncio.run(main())

Output:

Queue is empty, waiting...
Produced: item_0, queue length: 1
Consumed: item_0, queue length: 0
Produced: item_1, queue length: 1
Produced: item_1, queue length: 0
Consumed: item_2, queue length: 1
Produced: item_3, queue length: 2
Consumed: item_2, queue length: 1
Produced: item_4, queue length: 2
Consumed: item_3, queue length: 1
Consumed: item_4, queue length: 0

Asynchronous Event

Used for signaling between coroutines:

import asyncio

async def waiter(event, name):
    print(f"{name} waiting for event...")
    await event.wait()
    print(f"{name} received event, starting execution")

async def setter(event):
    await asyncio.sleep(2)
    print("Triggering event")
    event.set()

async def main():
    event = asyncio.Event()
    
    await asyncio.gather(
        waiter(event, "Task 1"),
        waiter(event, "Task 2"),
        waiter(event, "Task 3"),
        setter(event)
    )

asyncio.run(main())

Output:

Task 1 waiting for event...
Task 2 waiting for event...
Task 3 waiting for event...
Triggering event
Task 1 received event, starting execution
Task 2 received event, starting execution
Task 3 received event, starting execution

Asynchronous Queue

The asyncio Queue is thread-safe and suitable for communication between coroutines:

import asyncio
import random

async def producer(queue, producer_id):
    for i in range(5):
        item = f"P{producer_id}-Item{i}"
        await queue.put(item)
        print(f"Producer {producer_id} produced: {item}")
        await asyncio.sleep(random.uniform(0.1, 0.5))
    
    await queue.put(None)  # End signal

async def consumer(queue, consumer_id):
    while True:
        item = await queue.get()
        if item is None:
            await queue.put(None)  # Pass end signal
            break
        
        print(f"Consumer {consumer_id} consumed: {item}")
        await asyncio.sleep(random.uniform(0.2, 0.6))
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=10)
    
    producers = [asyncio.create_task(producer(queue, i)) for i in range(2)]
    consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]
    
    await asyncio.gather(*producers)
    await queue.join()  # Wait for all tasks to complete
    
    # Cancel consumer tasks
    for c in consumers:
        c.cancel()

asyncio.run(main())

Priority Queue

import asyncio
from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PriorityItem:
    priority: int
    data: Any = field(compare=False)

async def main():
    queue = asyncio.PriorityQueue()
    
    # Add tasks with different priorities
    await queue.put(PriorityItem(3, "Low priority"))
    await queue.put(PriorityItem(1, "High priority"))
    await queue.put(PriorityItem(2, "Medium priority"))
    
    while not queue.empty():
        item = await queue.get()
        print(f"Processing: {item.data} (Priority: {item.priority})")

asyncio.run(main())

Output:

Processing: High priority (Priority: 1)
Processing: Medium priority (Priority: 2)
Processing: Low priority (Priority: 3)

Asynchronous Iterators and Generators

Asynchronous Iterators

import asyncio

class AsyncRange:
    def __init__(self, start, end):
        self.start = start
        self.end = end
        self.current = start
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if self.current >= self.end:
            raise StopAsyncIteration
        
        await asyncio.sleep(0.1)  # Simulate asynchronous operation
        value = self.current
        self.current += 1
        return value

async def main():
    async for i in AsyncRange(0, 5):
        print(f"Value: {i}")

asyncio.run(main())

Asynchronous Generators

import asyncio

async def async_generator(n):
    for i in range(n):
        await asyncio.sleep(0.5)
        yield f"item_{i}"

async def main():
    async for item in async_generator(5):
        print(f"Received: {item}")

asyncio.run(main())

Exception Handling and Cancellation

Exception Handling

import asyncio

async def task_may_fail(task_id, should_fail=False):
    await asyncio.sleep(1)
    if should_fail:
        raise ValueError(f"Task {task_id} failed")
    return f"Task {task_id} succeeded"

async def main():
    tasks = [
        asyncio.create_task(task_may_fail(1, False)),
        asyncio.create_task(task_may_fail(2, True)),
        asyncio.create_task(task_may_fail(3, False)),
    ]
    
    # gather will stop at the first exception unless return_exceptions=True is set
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} exception: {result}")
        else:
            print(f"Task {i} result: {result}")

asyncio.run(main())

Output:

Task 0 result: Task 1 succeeded
Task 1 exception: Task 2 failed
Task 2 result: Task 3 succeeded

Task Cancellation

import asyncio

async def long_task(name):
    try:
        print(f"{name} starting")
        await asyncio.sleep(10)
        print(f"{name} complete")
    except asyncio.CancelledError:
        print(f"{name} was cancelled")
        raise  # Re-raise to let the caller know the task was cancelled

async def main():
    task = asyncio.create_task(long_task("Long Task"))
    
    await asyncio.sleep(2)
    print("Cancelling task")
    task.cancel()
    
    try:
        await task
    except asyncio.CancelledError:
        print("Task confirmed cancelled")

asyncio.run(main())

Common Asynchronous Libraries

aiohttp – Asynchronous HTTP Client and Server

import aiohttp
import asyncio

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        'http://httpbin.org/delay/1',
        'http://httpbin.org/delay/2',
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        print(f"Fetched {len(results)} responses")

# asyncio.run(main())  # Uncomment to run in actual use

aiofiles – Asynchronous File Operations

import aiofiles
import asyncio

async def process_files():
    # Concurrently read and write multiple files
    async def write_file(filename, content):
        async with aiofiles.open(filename, 'w') as f:
            await f.write(content)
    
    async def read_file(filename):
        async with aiofiles.open(filename, 'r') as f:
            return await f.read()
    
    # Concurrent writing
    await asyncio.gather(
        write_file('file1.txt', 'Content 1'),
        write_file('file2.txt', 'Content 2'),
        write_file('file3.txt', 'Content 3')
    )
    
    # Concurrent reading
    contents = await asyncio.gather(
        read_file('file1.txt'),
        read_file('file2.txt'),
        read_file('file3.txt')
    )
    
    print(contents)

# asyncio.run(process_files())

aiomysql – Asynchronous MySQL Client

import aiomysql
import asyncio

async def mysql_example():
    conn = await aiomysql.connect(
        host='localhost',
        user='root',
        password='password',
        db='testdb'
    )
    
    async with conn.cursor() as cursor:
        await cursor.execute("SELECT * FROM users")
        result = await cursor.fetchall()
        print(result)
    
    conn.close()

# asyncio.run(mysql_example())

Other Common Asynchronous Libraries

  • aioredis / redis[async]: Asynchronous Redis client
  • motor: Asynchronous MongoDB driver
  • asyncpg: High-performance asynchronous PostgreSQL driver
  • httpx: HTTP client supporting async/await
  • websockets: Asynchronous WebSocket library
  • aio-pika: Asynchronous RabbitMQ client
  • aiokafka: Asynchronous Kafka client
  • aiosmtplib: Asynchronous SMTP client
  • aiodns: Asynchronous DNS resolution
  • trio: An alternative asynchronous framework to asyncio with a more modern API

Advanced Topics

Custom Event Loop Policies

In certain scenarios, it may be necessary to customize the behavior of the event loop:

import asyncio
import uvloop  # High-performance event loop

# Use uvloop to replace the default event loop (install with: pip install uvloop)
# asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

async def main():
    print(f"Current event loop: {type(asyncio.get_event_loop())}")

asyncio.run(main())

Coroutine Nesting and Task Groups

Python 3.11 introduced TaskGroup, providing better structured concurrency:

import asyncio

async def task_with_exception(name, should_fail=False):
    await asyncio.sleep(1)
    if should_fail:
        raise ValueError(f"{name} failed")
    print(f"{name} complete")
    return name

async def main():
    # Python 3.11+
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(task_with_exception("Task 1", False))
            tg.create_task(task_with_exception("Task 2", False))
            tg.create_task(task_with_exception("Task 3", False))
        
        print("All tasks completed successfully")
    except* ValueError as eg:
        print(f"Caught {len(eg.exceptions)} exceptions")
        for exc in eg.exceptions:
            print(f"  - {exc}")

# Python 3.11+
# asyncio.run(main())

Asynchronous Context Variables

Passing context information in asynchronous environments, similar to thread-local variables but friendly to coroutines:

import asyncio
from contextvars import ContextVar

# Define context variables
request_id = ContextVar('request_id', default='unknown')
user_info = ContextVar('user_info', default=None)

async def log(message):
    rid = request_id.get()
    user = user_info.get()
    print(f"[{rid}] [{user}] {message}")

async def process_request(req_id, user):
    # Set context
    request_id.set(req_id)
    user_info.set(user)
    
    await log("Starting to process request")
    await asyncio.sleep(0.5)
    await log("Processing complete")

async def main():
    # Each request has its own context
    await asyncio.gather(
        process_request("req-001", "Alice"),
        process_request("req-002", "Bob"),
        process_request("req-003", "Charlie")
    )

asyncio.run(main())

Output:

[req-001] [Alice] Starting to process request
[req-002] [Bob] Starting to process request
[req-003] [Charlie] Starting to process request
[req-001] [Alice] Processing complete
[req-002] [Bob] Processing complete
[req-003] [Charlie] Processing complete

Asynchronous Signal Handling

Gracefully shutting down in long-running asynchronous applications:

import asyncio
import signal

async def worker(name):
    try:
        while True:
            print(f"{name} working...")
            await asyncio.sleep(2)
    except asyncio.CancelledError:
        print(f"{name} received cancellation signal, cleaning up...")
        await asyncio.sleep(1)  # Simulate cleanup
        print(f"{name} cleanup complete")
        raise

async def main():
    # Create worker tasks
    tasks = [
        asyncio.create_task(worker("Worker 1")),
        asyncio.create_task(worker("Worker 2"))
    ]
    
    # Register signal handler
    loop = asyncio.get_event_loop()
    stop_event = asyncio.Event()
    
    def signal_handler():
        print("\nReceived stop signal, starting graceful shutdown...")
        stop_event.set()
    
    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, signal_handler)
    
    # Wait for stop signal
    await stop_event.wait()
    
    # Cancel all tasks
    for task in tasks:
        task.cancel()
    
    # Wait for all tasks to clean up
    await asyncio.gather(*tasks, return_exceptions=True)
    print("Application closed")

# asyncio.run(main())  # Run and test with Ctrl+C

Performance Optimization Tips

1. Avoid Creating Tasks in Loops

import asyncio

# ❌ Bad practice
async def bad_practice():
    for i in range(1000):
        await asyncio.create_task(some_async_func(i))

# ✅ Good practice
async def good_practice():
    tasks = [asyncio.create_task(some_async_func(i)) for i in range(1000)]
    await asyncio.gather(*tasks)

2. Use Semaphores to Control Concurrency

import asyncio

async def controlled_concurrency():
    semaphore = asyncio.Semaphore(10)  # Maximum 10 concurrent
    
    async def limited_task(i):
        async with semaphore:
            await some_async_func(i)
    
    tasks = [limited_task(i) for i in range(1000)]
    await asyncio.gather(*tasks)

3. Batch Processing to Reduce Context Switching

import asyncio

async def batch_processing(items, batch_size=100):
    for i in range(0, len(items), batch_size):
        batch = items[i:i + batch_size]
        tasks = [process_item(item) for item in batch]
        await asyncio.gather(*tasks)
        # Checkpoint or throttling can be done between batches
        await asyncio.sleep(0.1)

Common Pitfalls and Solutions

1. Blocking the Event Loop

import asyncio
import time

# ❌ Error: Using time.sleep blocks the entire event loop
async def bad_sleep():
    time.sleep(1)  # Blocking! Other coroutines cannot execute

# ✅ Correct: Use asyncio.sleep
async def good_sleep():
    await asyncio.sleep(1)  # Yield control

2. Forgetting to Await

# ❌ Error: Forgetting to await, coroutine will not execute
async def bad_call():
    result = some_async_func()  # Just getting coroutine object

# ✅ Correct
async def good_call():
    result = await some_async_func()

3. Calling Asynchronous Functions in Synchronous Functions

# ❌ Error
def sync_function():
    result = await async_function()  # SyntaxError

# ✅ Correct Method 1: Use asyncio.run
def sync_function():
    result = asyncio.run(async_function())

# ✅ Correct Method 2: Get Existing Loop
def sync_function():
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(async_function())

Debugging and Monitoring

Enable asyncio Debug Mode

import asyncio
import warnings

# Enable debug mode
asyncio.run(main(), debug=True)

# Or
# import os
# os.environ['PYTHONASYNCIODEBUG'] = '1'

# Show warnings for unawaited coroutines
warnings.simplefilter('always', ResourceWarning)

Monitor the Event Loop

import asyncio

async def monitor_loop():
    loop = asyncio.get_event_loop()
    
    while True:
        # Get all current tasks
        tasks = asyncio.all_tasks(loop)
        print(f"Current task count: {len(tasks)}")
        
        # Check for long-running tasks
        for task in tasks:
            if not task.done():
                print(f"  Incomplete task: {task.get_name()}")
        
        await asyncio.sleep(5)

async def main():
    monitor = asyncio.create_task(monitor_loop())
    
    # Your other tasks
    await asyncio.sleep(20)
    
    monitor.cancel()

asyncio.run(main())

Conclusion

Asynchronous programming is a powerful tool for handling high concurrency scenarios in Python, but it also requires an understanding of its principles and best practices. This article covers everything from the basic async/await syntax to advanced concurrency control, from synchronization primitives to collaboration with multithreading and multiprocessing, hoping to help you build high-performance asynchronous applications.

Key Points Review

  1. 1. Basic Concepts: Coroutines are functions that can be paused and resumed, scheduled for execution by the event loop
  2. 2. Concurrency Control: Use Lock, Semaphore, Event, and other synchronization primitives to ensure thread safety
  3. 3. Resource Management: Asynchronous context managers ensure resources are released correctly
  4. 4. Performance Optimization: Control concurrency, use batching, and avoid blocking operations
  5. 5. Error Handling: Use try-except to catch exceptions and handle cancellation signals correctly
  6. 6. Mixed Programming: Execute synchronous code in asynchronous contexts, combining thread pools and process pools

When to Choose Asynchronous

  • • ✅ IO-intensive tasks (network requests, file reading/writing, database queries)
  • • ✅ Need to handle a large number of concurrent connections (WebSocket, long polling)
  • • ✅ Asynchronous communication between microservices
  • • ❌ CPU-intensive computations (consider multiprocessing)
  • • ❌ Simple script tasks (increased complexity not worth it)

Further Reading

  • Official Python asyncio documentation
  • aiohttp documentation
  • Real Python’s asynchronous tutorial
  • asyncio source code

Mastering asynchronous programming requires practice, starting with simple asynchronous HTTP requests and gradually transitioning to complex production applications. Remember, asynchronous is not a silver bullet; using the right tools in the right scenarios is key.

Leave a Comment