Multithreading and Twisted Framework in Python

Application of Multithreading in Multi-User Applications

Multithreading is particularly suitable for multi-user applications as it can handle multiple client requests simultaneously without blocking the entire application.

Multithreaded Server Example

import threading
import socket
import time
import logging
from queue import Queue

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('MultiThreadedServer')

class ThreadPool:
    """Simple thread pool implementation"""
    def __init__(self, num_threads):
        self.tasks = Queue()
        self.threads = []
        self.is_running = True
        
        # Create worker threads
        for i in range(num_threads):
            thread = threading.Thread(target=self._worker, args=(i,))
            thread.daemon = True
            thread.start()
            self.threads.append(thread)
    
    def _worker(self, thread_id):
        """Worker thread function"""
        logger.info(f"Worker thread {thread_id} started")
        
        while self.is_running:
            try:
                # Get task
                task = self.tasks.get(timeout=1)
                if task is None:
                    break
                
                # Execute task
                func, args, kwargs = task
                try:
                    func(*args, **kwargs)
                except Exception as e:
                    logger.error(f"Task execution error: {e}")
                
                # Mark task as done
                self.tasks.task_done()
                
            except:
                continue
        
        logger.info(f"Worker thread {thread_id} ended")
    
    def add_task(self, func, *args, **kwargs):
        """Add task to thread pool"""
        if self.is_running:
            self.tasks.put((func, args, kwargs))
    
    def wait_completion(self):
        """Wait for all tasks to complete"""
        self.tasks.join()
    
    def shutdown(self):
        """Shutdown thread pool"""
        self.is_running = False
        for thread in self.threads:
            thread.join()

class MultiThreadedServer:
    """Multithreaded TCP server"""  
    
    def __init__(self, host='localhost', port=8888, max_threads=10):
        self.host = host
        self.port = port
        self.thread_pool = ThreadPool(max_threads)
        self.is_running = False
        self.client_count = 0
        self.lock = threading.Lock()
    
    def handle_client(self, client_socket, client_address):
        """Handle client connection"""
        client_id = 0
        
        with self.lock:
            self.client_count += 1
            client_id = self.client_count
        
        logger.info(f"Client {client_id} connected: {client_address}")
        
        try:
            # Send welcome message
            welcome_msg = f"Welcome! You are client number {client_id}\n"
            client_socket.send(welcome_msg.encode('utf-8'))
            
            # Handle client requests
            while True:
                # Receive data
                data = client_socket.recv(1024).decode('utf-8').strip()
                
                if not data:
                    break
                
                logger.info(f"Client {client_id} sent: {data}")
                
                # Process command
                if data.lower() == 'time':
                    response = f"Current time: {time.ctime()}\n"
                elif data.lower() == 'count':
                    response = f"Current client count: {self.client_count}\n"
                elif data.lower() == 'quit':
                    response = "Goodbye!\n"
                    client_socket.send(response.encode('utf-8'))
                    break
                else:
                    response = f"Echo: {data}\n"
                
                # Send response
                client_socket.send(response.encode('utf-8'))
                
        except Exception as e:
            logger.error(f"Error handling client {client_id}: {e}")
        
        finally:
            # Close connection
            client_socket.close()
            with self.lock:
                self.client_count -= 1
            logger.info(f"Client {client_id} disconnected")
    
    def start(self):
        """Start server"""
        self.is_running = True
        
        # Create server socket
        server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        
        try:
            server_socket.bind((self.host, self.port))
            server_socket.listen(5)
            logger.info(f"Server started on {self.host}:{self.port}")
            
            while self.is_running:
                try:
                    # Accept client connection
                    server_socket.settimeout(1)  # Timeout to check stop flag
                    client_socket, client_address = server_socket.accept()
                    
                    # Submit client handling task to thread pool
                    self.thread_pool.add_task(
                        self.handle_client, 
                        client_socket, 
                        client_address
                    )
                    
                except socket.timeout:
                    # Timeout, continue loop to check stop flag
                    continue
                except Exception as e:
                    if self.is_running:
                        logger.error(f"Error accepting connection: {e}")
        
        except Exception as e:
            logger.error(f"Server error: {e}")
        
        finally:
            server_socket.close()
            self.thread_pool.shutdown()
            logger.info("Server has shut down")
    
    def stop(self):
        """Stop server"""
        self.is_running = False
        logger.info("Stopping server...")

def test_client_connection(client_id):
    """Test client connection"""
    try:
        client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        client_socket.connect(('localhost', 8888))
        
        # Receive welcome message
        welcome = client_socket.recv(1024).decode('utf-8')
        print(f"Client {client_id}: {welcome.strip()}")
        
        # Send some commands
        commands = ['Hello', 'time', 'count', 'quit']
        for cmd in commands:
            client_socket.send(f"{cmd}\n".encode('utf-8'))
            if cmd == 'quit':
                break
            response = client_socket.recv(1024).decode('utf-8')
            print(f"Client {client_id} received: {response.strip()}")
        
        client_socket.close()
        
    except Exception as e:
        print(f"Client {client_id} error: {e}")

def demo_multithreaded_server():
    """Demonstrate multithreaded server"""
    import threading
    import time
    
    # Start server
    server = MultiThreadedServer(max_threads=5)
    server_thread = threading.Thread(target=server.start)
    server_thread.daemon = True
    server_thread.start()
    
    # Wait for server to start
    time.sleep(1)
    
    # Start multiple test clients
    client_threads = []
    for i in range(3):
        thread = threading.Thread(target=test_client_connection, args=(i,))
        client_threads.append(thread)
        thread.start()
    
    # Wait for all clients to finish
    for thread in client_threads:
        thread.join()
    
    # Stop server
    time.sleep(1)
    server.stop()

if __name__ == "__main__":
    demo_multithreaded_server()

Detailed Explanation of the Twisted Framework

Twisted is an event-driven networking programming framework that uses a callback-based programming model.

Basic Concepts of Twisted

from twisted.internet import reactor, protocol, defer, task
from twisted.protocols import basic
import time

# Simple Twisted echo server
class EchoProtocol(basic.LineReceiver):
    """Echo protocol handler"""
    
    def connectionMade(self):
        """Called when client connection is established"""
        self.factory.clients.append(self)
        peer = self.transport.getPeer()
        print(f"Client connected: {peer.host}:{peer.port}")
        self.sendLine(b"Welcome to the Twisted echo server!")
    
    def connectionLost(self, reason):
        """Called when client connection is lost"""
        self.factory.clients.remove(self)
        print("Client disconnected")
    
    def lineReceived(self, line):
        """Called when a line of data is received"""
        message = line.decode('utf-8')
        print(f"Received message: {message}")
        
        # Echo reply
        response = f"Echo: {message}"
        self.sendLine(response.encode('utf-8'))
        
        # If it's a "quit" command, disconnect
        if message.lower() == 'quit':
            self.transport.loseConnection()

class EchoFactory(protocol.Factory):
    """Protocol factory"""
    
    def __init__(self):
        self.clients = []
    
    def buildProtocol(self, addr):
        return EchoProtocol()

def run_echo_server():
    """Run echo server"""
    print("Starting Twisted echo server...")
    reactor.listenTCP(9000, EchoFactory())
    reactor.run()

# Asynchronous operation example
class AsyncOperations:
    """Twisted asynchronous operation example"""
    
    @defer.inlineCallbacks
    def async_operation_1(self):
        """Asynchronous operation 1"""
        print("Starting asynchronous operation 1...")
        yield task.deferLater(reactor, 2, lambda: None)  # Simulate 2 seconds delay
        print("Asynchronous operation 1 completed")
        defer.returnValue("Operation 1 result")
    
    @defer.inlineCallbacks
    def async_operation_2(self, data):
        """Asynchronous operation 2"""
        print(f"Starting asynchronous operation 2, using data: {data}")
        yield task.deferLater(reactor, 1, lambda: None)  # Simulate 1 second delay
        result = f"Processed: {data.upper()}"
        print("Asynchronous operation 2 completed")
        defer.returnValue(result)
    
    @defer.inlineCallbacks
    def parallel_operations(self):
        """Execute multiple asynchronous operations in parallel"""
        print("Starting parallel operations...")
        
        # Execute multiple operations in parallel
        deferred_1 = self.async_operation_1()
        deferred_2 = self.async_operation_2("hello twisted")
        
        # Wait for all operations to complete
        results = yield defer.gatherResults([deferred_1, deferred_2])
        
        print(f"Parallel operations completed, results: {results}")
        defer.returnValue(results)
    
    def run_demo(self):
        """Run asynchronous operation demonstration"""
        print("=== Twisted Asynchronous Operation Demonstration ===")
        
        # Serial execution
        @defer.inlineCallbacks
        def sequential_demo():
            print("\n1. Serial execution:")
            result1 = yield self.async_operation_1()
            result2 = yield self.async_operation_2(result1)
            print(f"Serial execution result: {result2}")
        
        # Parallel execution
        @defer.inlineCallbacks
        def parallel_demo():
            print("\n2. Parallel execution:")
            results = yield self.parallel_operations()
            print(f"Parallel execution result: {results}")
        
        # Error handling
        @defer.inlineCallbacks
        def error_handling_demo():
            print("\n3. Error handling:")
            try:
                # Simulate an operation that will fail
                yield task.deferLater(reactor, 0.5, lambda: 1/0)
            except Exception as e:
                print(f"Caught exception: {e}")
            else:
                print("Operation successful")
        
        # Execute all demonstrations
        @defer.inlineCallbacks
        def run_all_demos():
            yield sequential_demo()
            yield parallel_demo()
            yield error_handling_demo()
            print("\nAll demonstrations completed")
            reactor.stop()
        
        run_all_demos()

def twisted_demo():
    """Run Twisted demonstration"""
    demo = AsyncOperations()
    demo.run_demo()
    reactor.run()

if __name__ == "__main__":
    # Run asynchronous operation demonstration
    twisted_demo()

Twisted Web Server

from twisted.web import server, resource
from twisted.internet import reactor, endpoints
import json
import time

class SimpleAPI(resource.Resource):
    """Simple API resource"""
    
    def __init__(self):
        resource.Resource.__init__(self)
        self.putChild(b"time", TimeResource())
        self.putChild(b"echo", EchoResource())
        self.putChild(b"users", UsersResource())
    
    def getChild(self, name, request):
        """Get child resource"""
        if name == b"":
            return self
        return resource.Resource.getChild(self, name, request)
    
    def render_GET(self, request):
        """Handle GET request"""
        response = {
            "message": "Welcome to the Twisted API",
            "endpoints": {
                "/time": "Get current time",
                "/echo": "Echo service",
                "/users": "User management"
            }
        }
        return json.dumps(response, ensure_ascii=False).encode('utf-8')

class TimeResource(resource.Resource):
    """Time resource"""
    
    def render_GET(self, request):
        return json.dumps({
            "timestamp": time.time(),
            "datetime": time.ctime()
        }).encode('utf-8')

class EchoResource(resource.Resource):
    """Echo resource"""
    
    def render_GET(self, request):
        message = request.args.get(b"message", [b"Hello"])[0].decode('utf-8')
        return json.dumps({
            "echo": message,
            "timestamp": time.time()
        }).encode('utf-8')
    
    def render_POST(self, request):
        """Handle POST request"""
        try:
            data = json.loads(request.content.getvalue().decode('utf-8'))
            message = data.get('message', 'Hello')
        except:
            message = "Invalid JSON"
        
        return json.dumps({
            "echo": message,
            "method": "POST",
            "timestamp": time.time()
        }).encode('utf-8')

class UsersResource(resource.Resource):
    """User resource (simple in-memory storage)"""
    
    def __init__(self):
        resource.Resource.__init__(self)
        self.users = {
            1: {"id": 1, "name": "Zhang San", "email": "[email protected]"},
            2: {"id": 2, "name": "Li Si", "email": "[email protected]"}
        }
        self.next_id = 3
    
    def render_GET(self, request):
        """Get user list or single user"""
        user_id = request.args.get(b"id", [None])[0]
        
        if user_id:
            try:
                user_id = int(user_id)
                user = self.users.get(user_id)
                if user:
                    return json.dumps(user).encode('utf-8')
                else:
                    request.setResponseCode(404)
                    return json.dumps({"error": "User does not exist"}).encode('utf-8')
            except ValueError:
                request.setResponseCode(400)
                return json.dumps({"error": "Invalid user ID"}).encode('utf-8')
        
        # Return all users
        return json.dumps(list(self.users.values())).encode('utf-8')
    
    def render_POST(self, request):
        """Create new user"""
        try:
            data = json.loads(request.content.getvalue().decode('utf-8'))
            user_id = self.next_id
            self.next_id += 1
            
            user = {
                "id": user_id,
                "name": data.get("name", ""),
                "email": data.get("email", "")
            }
            
            self.users[user_id] = user
            
            request.setResponseCode(201)
            return json.dumps(user).encode('utf-8')
            
        except Exception as e:
            request.setResponseCode(400)
            return json.dumps({"error": str(e)}).encode('utf-8')

def run_twisted_web_server():
    """Run Twisted Web server"""
    root = SimpleAPI()
    site = server.Site(root)
    
    print("Starting Twisted Web server at http://localhost:8080")
    endpoint = endpoints.TCP4ServerEndpoint(reactor, 8080)
    endpoint.listen(site)
    
    reactor.run()

if __name__ == "__main__":
    run_twisted_web_server()

Twisted Database Integration

from twisted.internet import reactor, defer
from twisted.enterprise import adbapi
import sqlite3

class DatabaseManager:
    """Database manager"""
    
    def __init__(self, db_path=":memory:"):  
        # Create database connection pool
        self.dbpool = adbapi.ConnectionPool(
            "sqlite3",
            db_path,
            check_same_thread=False,
            cp_min=1,
            cp_max=5
        )
        
        # Initialize database
        self.init_database()
    
    @defer.inlineCallbacks
    def init_database(self):
        """Initialize database tables"""
        create_table_sql = """
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            email TEXT UNIQUE NOT NULL,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
        """
        yield self.dbpool.runOperation(create_table_sql)
        print("Database initialization complete")
    
    @defer.inlineCallbacks
    def create_user(self, name, email):
        """Create user"""
        insert_sql = "INSERT INTO users (name, email) VALUES (?, ?)"
        try:
            result = yield self.dbpool.runOperation(insert_sql, (name, email))
            defer.returnValue({"success": True, "message": "User created successfully"})
        except sqlite3.IntegrityError:
            defer.returnValue({"success": False, "message": "Email already exists"})
        except Exception as e:
            defer.returnValue({"success": False, "message": str(e)})
    
    @defer.inlineCallbacks
    def get_users(self):
        """Get all users"""
        query_sql = "SELECT * FROM users ORDER BY created_at DESC"
        rows = yield self.dbpool.runQuery(query_sql)
        
        users = []
        for row in rows:
            users.append({
                "id": row[0],
                "name": row[1],
                "email": row[2],
                "created_at": row[3]
            })
        
        defer.returnValue(users)
    
    @defer.inlineCallbacks
    def get_user_by_id(self, user_id):
        """Get user by ID"""
        query_sql = "SELECT * FROM users WHERE id = ?"
        rows = yield self.dbpool.runQuery(query_sql, (user_id,))
        
        if rows:
            row = rows[0]
            defer.returnValue({
                "id": row[0],
                "name": row[1],
                "email": row[2],
                "created_at": row[3]
            })
        else:
            defer.returnValue(None)
    
    @defer.inlineCallbacks
    def demo_database_operations(self):
        """Demonstrate database operations"""
        print("=== Twisted Database Operations Demonstration ===")
        
        # Create a few users
        users_data = [
            ("Zhang San", "[email protected]"),
            ("Li Si", "[email protected]"),
            ("Wang Wu", "[email protected]")
        ]
        
        for name, email in users_data:
            result = yield self.create_user(name, email)
            if result["success"]:
                print(f"User created successfully: {name}")
            else:
                print(f"User creation failed: {result['message']}")
        
        # Query all users
        users = yield self.get_users()
        print(f"\nAll users ({len(users)}):")
        for user in users:
            print(f"  - {user['name']} ({user['email']})")
        
        # Query single user
        if users:
            user = yield self.get_user_by_id(users[0]['id'])
            if user:
                print(f"\nQueried user: {user['name']}")

def run_database_demo():
    """Run database demonstration"""
    db_manager = DatabaseManager()
    
    @defer.inlineCallbacks
    def run_demo():
        yield db_manager.demo_database_operations()
        reactor.stop()
    
    run_demo()
    reactor.run()

if __name__ == "__main__":
    run_database_demo()

Comparison of Multithreading vs Twisted

import time
import threading
from twisted.internet import reactor, defer, task

def compare_approaches():
    """Compare multithreading and Twisted approaches"""  
    
    # Simulate I/O intensive task
    def io_task(task_id, duration):
        """Simulate I/O task"""
        print(f"Starting I/O task {task_id} (expected duration: {duration} seconds)")
        time.sleep(duration)
        print(f"Completed I/O task {task_id}")
        return f"Task {task_id} result"
    
    # Multithreaded version
    def multithreaded_version():
        print("=== Multithreaded Version ===")
        start_time = time.time()
        
        threads = []
        results = []
        
        def worker(task_id, duration):
            result = io_task(task_id, duration)
            results.append(result)
        
        # Create and start threads
        for i in range(5):
            duration = (i % 3) + 1  # Random duration of 1-3 seconds
            thread = threading.Thread(target=worker, args=(i, duration))
            threads.append(thread)
            thread.start()
        
        # Wait for all threads to complete
        for thread in threads:
            thread.join()
        
        elapsed = time.time() - start_time
        print(f"Multithreaded version total time: {elapsed:.2f} seconds")
        print(f"Results: {results}")
        return elapsed
    
    # Twisted version
    @defer.inlineCallbacks
    def twisted_version():
        print("\n=== Twisted Version ===")
        start_time = time.time()
        
        tasks = []
        
        for i in range(5):
            duration = (i % 3) + 1  # Random duration of 1-3 seconds
            
            # Convert blocking I/O task to Twisted delayed task
            deferred_task = task.deferLater(reactor, duration, io_task, i, duration)
            tasks.append(deferred_task)
        
        # Execute all tasks in parallel
        results = yield defer.gatherResults(tasks)
        
        elapsed = time.time() - start_time
        print(f"Twisted version total time: {elapsed:.2f} seconds")
        print(f"Results: {results}")
        defer.returnValue(elapsed)
    
    # Run comparison
    print("Starting comparison of multithreading and Twisted...")
    
    # Run multithreaded version
    thread_time = multithreaded_version()
    
    # Run Twisted version
    @defer.inlineCallbacks
    def run_comparison():
        twisted_time = yield twisted_version()
        
        print(f"\n=== Performance Comparison ===")
        print(f"Multithreading time: {thread_time:.2f} seconds")
        print(f"Twisted time: {twisted_time:.2f} seconds")
        
        if twisted_time < thread_time:
            print(f"Twisted is faster, advantage: {thread_time/twisted_time:.2f}x")
        else:
            print(f"Multithreading is faster, advantage: {twisted_time/thread_time:.2f}x")
        
        reactor.stop()
    
    run_comparison()
    reactor.run()

if __name__ == "__main__":
    compare_approaches()

Conclusion

Multithreading Application Scenarios:

  • Multi-User Servers: Each client connection uses a thread
  • Parallel I/O Operations: Simultaneously handle multiple file or network requests
  • Background Task Processing: Does not affect the responsiveness of the main thread

Advantages of the Twisted Framework:

  • Callback-Based Asynchronous Programming: High-performance event-driven architecture
  • Built-in Protocol Support: HTTP, TCP, UDP, SSL, etc.
  • Database Integration: Asynchronous database operations
  • Error Handling: Powerful Deferred error handling mechanism
  • Scalability: Suitable for building high-concurrency servers

Selection Recommendations:

  • Multithreading: Suitable for relatively simple concurrency needs, intuitive programming model
  • Twisted: Suitable for high-performance network applications that need to handle a large number of concurrent connections

Both technologies have their applicable scenarios, and the choice depends on specific application requirements, team familiarity, and performance needs.

Leave a Comment