Detailed Analysis of the Scrapy Framework | Python Basics Series

1. Overview of the Scrapy Framework

What is Scrapy?

Scrapy is an open-source web crawling framework written in Python, specifically designed for quickly and efficiently scraping data from websites. It provides a complete solution for spider development, including request scheduling, data extraction, and data storage functionalities.

Core Advantages of Scrapy

  • High Performance: Asynchronous processing, supports concurrent requests
  • Scalable: Middleware system, easy to customize
  • Complete Functionality: Built-in data pipelines, middleware, and schedulers
  • Active Community: Rich extensions and plugins

2. Detailed Architecture of Scrapy

Component Architecture Diagram

Engine (Engine)
    ↑↓
Scheduler (Scheduler)
    ↑↓
Downloader (Downloader)
    ↑↓
Spiders (Spiders)
    ↑↓
Item Pipeline (Item Pipeline)
    ↑↓
Downloader Middlewares (Downloader Middlewares)
    ↑↓
Spider Middlewares (Spider Middlewares)

Data Flow Process

  1. Engine retrieves the initial request from the Spider
  2. Engine sends the request to the Scheduler
  3. Scheduler returns the request to the Engine
  4. Engine sends the request to the Downloader
  5. Downloader downloads the page and returns the Response to the Engine
  6. Engine sends the Response to the Spider for processing
  7. Spider parses the Response, returning Items or new Requests
  8. Engine sends Items to the Item Pipeline and new Requests to the Scheduler
  9. Repeat steps 2-8 until there are no new requests

3. Scrapy Project Structure

Creating a Scrapy Project

# Install Scrapy
pip install scrapy

# Create project
scrapy startproject myproject

Project Directory Structure

myproject/
├── scrapy.cfg                  # Project configuration file
└── myproject/                  # Project Python module
    ├── __init__.py
    ├── items.py               # Data model definitions
    ├── middlewares.py         # Middleware definitions
    ├── pipelines.py           # Data pipelines
    ├── settings.py            # Project settings
    └── spiders/               # Spiders directory
        ├── __init__.py
        └── example_spider.py  # Spider implementation

4. Detailed Explanation of Core Components

Items – Data Models

# items.py
import scrapy
from itemloaders.processors import TakeFirst, MapCompose
from w3lib.html import remove_tags

def clean_price(value):
    """Clean price data"""
    if value:
        return value.replace('¥','').replace(',','').strip()
    return value

def clean_text(value):
    """Clean text data"""
    if value:
        return value.strip()
    return value

class ProductItem(scrapy.Item):
    # Define data fields
    name = scrapy.Field(
        input_processor=MapCompose(remove_tags, clean_text),
        output_processor=TakeFirst()
    )
    price = scrapy.Field(
        input_processor=MapCompose(remove_tags, clean_price),
        output_processor=TakeFirst()
    )
    description = scrapy.Field(
        input_processor=MapCompose(remove_tags, clean_text),
        output_processor=TakeFirst()
    )
    url = scrapy.Field(
        output_processor=TakeFirst()
    )
    image_urls = scrapy.Field()
    images = scrapy.Field()
    crawled_at = scrapy.Field(
        output_processor=TakeFirst()
    )

Spiders – Core of the Crawlers

# spiders/product_spider.py
import scrapy
from myproject.items import ProductItem
from urllib.parse import urljoin
import json

class ProductSpider(scrapy.Spider):
    name = "products"  # Unique identifier for the spider
    allowed_domains = ["example.com"]  # Allowed domains
    start_urls = ["https://www.example.com/products"]  # Starting URL

    # Custom settings (will override settings.py)
    custom_settings = {
        'CONCURRENT_REQUESTS': 4,
        'DOWNLOAD_DELAY': 1,
        'FEED_EXPORT_ENCODING': 'utf-8',
    }

    def parse(self, response):
        """Parse product list page"""
        self.logger.info(f"Parsing product list page: {response.url}")

        # Extract product links
        product_links = response.css('div.product-item a::attr(href)').getall()
        for link in product_links:
            product_url = urljoin(response.url, link)
            yield scrapy.Request(
                url=product_url,
                callback=self.parse_product,
                meta={'original_url': product_url}
            )

        # Pagination handling
        next_page = response.css('a.next::attr(href)').get()
        if next_page:
            next_page_url = urljoin(response.url, next_page)
            yield scrapy.Request(
                url=next_page_url,
                callback=self.parse
            )

    def parse_product(self, response):
        """Parse product detail page"""
        self.logger.info(f"Parsing product detail page: {response.url}")

        item = ProductItem()

        # Use CSS selectors to extract data
        item['name'] = response.css('h1.product-title::text').get()
        item['price'] = response.css('.price::text').get()
        item['description'] = ' '.join(response.css('.description ::text').getall())
        item['url'] = response.meta.get('original_url', response.url)
        item['image_urls'] = response.css('.product-image img::attr(src)').getall()
        item['crawled_at'] = scrapy.utils.project.get_project_settings().get('CRAWLED_AT')

        # Data validation
        if not item['name']:
            self.logger.warning(f"Missing product name: {response.url}")
            return

        yield item

    def start_requests(self):
        """Override start requests to add custom logic"""
        for url in self.start_urls:
            yield scrapy.Request(
                url=url,
                callback=self.parse,
                headers={
                    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
                }
            )

Pipelines – Data Pipelines

# pipelines.py
import pymongo
import mysql.connector
from scrapy.exceptions import DropItem
import logging
from itemadapter import ItemAdapter

class DataValidationPipeline:
    """Data validation pipeline"""

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)

        # Check required fields
        required_fields = ['name', 'price', 'url']
        for field in required_fields:
            if not adapter.get(field):
                raise DropItem(f"Missing required field: {field}")

        # Validate price format
        price = adapter.get('price')
        if price and not self.is_valid_price(price):
            raise DropItem(f"Invalid price format: {price}")

        return item

    def is_valid_price(self, price):
        """Validate price format"""
        try:
            float(price)
            return True
        except (ValueError, TypeError):
            return False

class MongoDBPipeline:
    """MongoDB storage pipeline"""

    def __init__(self, mongo_uri, mongo_db):
        self.mongo_uri = mongo_uri
        self.mongo_db = mongo_db
        self.client = None
        self.db = None

    @classmethod
    def from_crawler(cls, crawler):
        """Get configuration from settings"""
        return cls(
            mongo_uri=crawler.settings.get('MONGO_URI'),
            mongo_db=crawler.settings.get('MONGO_DATABASE', 'scrapy_db')
        )

    def open_spider(self, spider):
        """Connect to the database when the spider starts"""
        self.client = pymongo.MongoClient(self.mongo_uri)
        self.db = self.client[self.mongo_db]
        spider.logger.info(f"Connected to MongoDB: {self.mongo_uri}")

    def close_spider(self, spider):
        """Disconnect from the database when the spider closes"""
        if self.client:
            self.client.close()
            spider.logger.info("MongoDB connection closed")

    def process_item(self, item, spider):
        """Process and store data"""
        try:
            collection_name = item.__class__.__name__.replace('Item', '').lower()
            collection = self.db[collection_name]

            # Use URL as unique identifier to avoid duplicates
            adapter = ItemAdapter(item)
            collection.update_one(
                {'url': adapter['url']},
                {'$set': dict(adapter)},
                upsert=True
            )

            spider.logger.info(f"Data saved to MongoDB: {adapter['url']}")
            return item

        except Exception as e:
            spider.logger.error(f"MongoDB storage failed: {e}")
            raise DropItem(f"Storage failed: {e}")

class MySQLPipeline:
    """MySQL storage pipeline"""

    def __init__(self, mysql_host, mysql_db, mysql_user, mysql_password):
        self.mysql_host = mysql_host
        self.mysql_db = mysql_db
        self.mysql_user = mysql_user
        self.mysql_password = mysql_password
        self.conn = None

    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            mysql_host=crawler.settings.get('MYSQL_HOST'),
            mysql_db=crawler.settings.get('MYSQL_DATABASE'),
            mysql_user=crawler.settings.get('MYSQL_USER'),
            mysql_password=crawler.settings.get('MYSQL_PASSWORD')
        )

    def open_spider(self, spider):
        """Create database connection and table"""
        self.conn = mysql.connector.connect(
            host=self.mysql_host,
            user=self.mysql_user,
            password=self.mysql_password,
            database=self.mysql_db
        )
        self.create_table()

    def create_table(self):
        """Create data table"""
        create_table_sql = """
        CREATE TABLE IF NOT EXISTS products (
            id INT AUTO_INCREMENT PRIMARY KEY,
            name VARCHAR(255) NOT NULL,
            price DECIMAL(10,2),
            description TEXT,
            url VARCHAR(500) UNIQUE,
            crawled_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
        """
        with self.conn.cursor() as cursor:
            cursor.execute(create_table_sql)
        self.conn.commit()

    def process_item(self, item, spider):
        """Store data in MySQL"""
        adapter = ItemAdapter(item)

        insert_sql = """
        INSERT INTO products (name, price, description, url)
        VALUES (%s, %s, %s, %s)
        ON DUPLICATE KEY UPDATE
            name=VALUES(name),
            price=VALUES(price),
            description=VALUES(description)
        """
        try:
            with self.conn.cursor() as cursor:
                cursor.execute(insert_sql, (
                    adapter.get('name'),
                    adapter.get('price'),
                    adapter.get('description'),
                    adapter.get('url')
                ))
            self.conn.commit()
            spider.logger.info(f"Data saved to MySQL: {adapter['url']}")

        except Exception as e:
            self.conn.rollback()
            spider.logger.error(f"MySQL storage failed: {e}")
            raise DropItem(f"MySQL storage failed: {e}")

        return item

    def close_spider(self, spider):
        if self.conn:
            self.conn.close()

class JsonWriterPipeline:
    """JSON file writing pipeline"""

    def open_spider(self, spider):
        self.file = open('products.json', 'w', encoding='utf-8')
        self.file.write('[\n')
        self.first_item = True

    def close_spider(self, spider):
        self.file.write('\n]')
        self.file.close()

    def process_item(self, item, spider):
        line = '' if self.first_item else ',\n'
        self.first_item = False

        import json
        line += json.dumps(dict(item), ensure_ascii=False, indent=2)
        self.file.write(line)
        return item

Middlewares – Middleware System

Downloader Middleware

# middlewares.py
import random
from scrapy import signals
from scrapy.downloadermiddlewares.useragent import UserAgentMiddleware
from scrapy.downloadermiddlewares.retry import RetryMiddleware
from scrapy.utils.response import response_status_message
import time

class RandomUserAgentMiddleware(UserAgentMiddleware):
    """Random User-Agent middleware"""

    def __init__(self, user_agents):
        self.user_agents = user_agents

    @classmethod
    def from_crawler(cls, crawler):
        settings = crawler.settings
        user_agents = settings.get('USER_AGENTS', [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36'
        ])
        return cls(user_agents)

    def process_request(self, request, spider):
        if self.user_agents:
            request.headers['User-Agent'] = random.choice(self.user_agents)

class ProxyMiddleware:
    """Proxy middleware"""

    def __init__(self, proxy_list):
        self.proxy_list = proxy_list

    @classmethod
    def from_crawler(cls, crawler):
        proxy_list = crawler.settings.get('PROXY_LIST', [])
        return cls(proxy_list)

    def process_request(self, request, spider):
        if self.proxy_list and not request.meta.get('proxy'):
            proxy = random.choice(self.proxy_list)
            request.meta['proxy'] = proxy
            spider.logger.debug(f"Using proxy: {proxy}")

class CustomRetryMiddleware(RetryMiddleware):
    """Custom retry middleware"""

    def __init__(self, settings):
        super().__init__(settings)

    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler.settings)

    def process_response(self, request, response, spider):
        # Check if a retry is needed
        if response.status in [429, 503]:  # Rate limit or service unavailable
            spider.logger.warning(f"Encountered limit, waiting to retry: {response.url}")
            time.sleep(10)  # Wait 10 seconds
            return self._retry(request, response.status, spider) or response
        return super().process_response(request, response, spider)

class SeleniumMiddleware:
    """Selenium middleware for handling JavaScript-rendered pages"""

    def __init__(self):
        from selenium import webdriver
        from selenium.webdriver.chrome.options import Options

        chrome_options = Options()
        chrome_options.add_argument('--headless')
        chrome_options.add_argument('--no-sandbox')
        chrome_options.add_argument('--disable-dev-shm-usage')

        self.driver = webdriver.Chrome(options=chrome_options)

    def process_request(self, request, spider):
        # Only process requests marked for JS rendering
        if request.meta.get('selenium'):
            self.driver.get(request.url)

            # Wait for the page to load
            import time
            time.sleep(2)

            # Get page source
            html = self.driver.page_source

            # Return Response object
            from scrapy.http import HtmlResponse
            return HtmlResponse(
                url=request.url,
                body=html.encode('utf-8'),
                request=request,
                encoding='utf-8'
            )

    def spider_closed(self):
        self.driver.quit()

5. Settings – Project Configuration

# settings.py
import datetime

BOT_NAME = 'myproject'

SPIDER_MODULES = ['myproject.spiders']
NEWSPIDER_MODULE = 'myproject.spiders'

# Spider etiquette
ROBOTSTXT_OBEY = True
DOWNLOAD_DELAY = 1
CONCURRENT_REQUESTS = 16
CONCURRENT_REQUESTS_PER_DOMAIN = 8

# User agent
USER_AGENT = 'myproject (+http://www.myproject.com)'

# Pipeline configuration
ITEM_PIPELINES = {
    'myproject.pipelines.DataValidationPipeline': 100,
    'myproject.pipelines.MongoDBPipeline': 200,
    'myproject.pipelines.MySQLPipeline': 300,
    'myproject.pipelines.JsonWriterPipeline': 400,
}

# Middleware configuration
DOWNLOADER_MIDDLEWARES = {
    'myproject.middlewares.RandomUserAgentMiddleware': 400,
    'myproject.middlewares.ProxyMiddleware': 500,
    'myproject.middlewares.CustomRetryMiddleware': 600,
    'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': None,  # Disable default
}

# Retry settings
RETRY_TIMES = 3
RETRY_HTTP_CODES = [500, 502, 503, 504, 522, 524, 408, 429]

# Database configuration
MONGO_URI = 'mongodb://localhost:27017'
MONGO_DATABASE = 'scrapy_data'

MYSQL_HOST = 'localhost'
MYSQL_DATABASE = 'scrapy_data'
MYSQL_USER = 'root'
MYSQL_PASSWORD = 'password'

# Custom settings
CRAWLED_AT = datetime.datetime.now().isoformat()

# Extensions
EXTENSIONS = {
    'scrapy.extensions.telnet.TelnetConsole': None,
}

# Logging
LOG_LEVEL = 'INFO'
LOG_FILE = 'scrapy.log'

# Cache (for development and debugging)
HTTPCACHE_ENABLED = True
HTTPCACHE_EXPIRATION_SECS = 3600

6. Advanced Crawling Techniques

CrawlSpider Example

# spiders/advanced_spider.py
from scrapy.spiders import CrawlSpider, Rule
from scrapy.linkextractors import LinkExtractor
from myproject.items import ProductItem

class AdvancedProductSpider(CrawlSpider):
    name = "advanced_products"
    allowed_domains = ["example.com"]
    start_urls = ["https://www.example.com"]

    rules = (
        # Extract product links, follow links, call parse_product
        Rule(
            LinkExtractor(
                restrict_css='.product-list, .pagination',
                deny=('cart', 'checkout', 'login')
            ),
            callback='parse_product',
            follow=True
        ),
    )

    def parse_product(self, response):
        """Parse product page"""
        # Check if it is a product page
        if not response.css('.product-detail'):
            return

        item = ProductItem()
        item['name'] = response.css('h1::text').get()
        item['price'] = response.css('.price::text').get()
        item['url'] = response.url

        yield item

XMLFeedSpider Example

# spiders/xml_spider.py
from scrapy.spiders import XMLFeedSpider
from myproject.items import NewsItem

class NewsSpider(XMLFeedSpider):
    name = "news"
    allowed_domains = ["example.com"]
    start_urls = ['https://www.example.com/rss/news.xml']
    iterator = 'iternodes'  # Default
    itertag = 'item'  # Iteration node

    def parse_node(self, response, node):
        """Parse XML node"""
        item = NewsItem()
        item['title'] = node.xpath('title/text()').get()
        item['link'] = node.xpath('link/text()').get()
        item['description'] = node.xpath('description/text()').get()
        item['pub_date'] = node.xpath('pubDate/text()').get()

        return item

7. Running and Deploying

Running the Spider

# run_spider.py
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from myproject.spiders.product_spider import ProductSpider

def run_spider():
    process = CrawlerProcess(get_project_settings())
    process.crawl(ProductSpider)
    process.start()

if __name__ == "__main__":
    run_spider()

Command Line Execution

# Basic run
scrapy crawl products

# Save results to JSON
scrapy crawl products -o products.json

# Save results to CSV
scrapy crawl products -o products.csv

# Use custom settings
scrapy crawl products -s LOG_LEVEL=DEBUG -s CONCURRENT_REQUESTS=8

# Run multiple spiders
scrapy crawl products & scrapy crawl news

Scrapyd Deployment

# setup.py
from setuptools import setup, find_packages

setup(
    name='myproject',
    version='1.0',
    packages=find_packages(),
    entry_points={'scrapy': ['settings = myproject.settings']},
)
# Deploy to Scrapyd
pip install scrapyd-client
scrapyd-deploy

8. Debugging and Testing

Debugging Shell

# Debug in Scrapy Shell
from scrapy.shell import inspect_response

def parse(self, response):
    # Enter interactive Shell
    inspect_response(response, self)

# Or use command line
# scrapy shell "https://www.example.com"

Unit Testing

# tests/test_spider.py
import unittest
from scrapy.http import Request, HtmlResponse
from myproject.spiders.product_spider import ProductSpider

class TestProductSpider(unittest.TestCase):

    def setUp(self):
        self.spider = ProductSpider()

    def test_parse(self):
        # Create test response
        test_html = """
        <html>
            <div class="product-item">
                <a href="/product/1">Product 1</a>
            </div>
            <a class="next" href="/page2">Next</a>
        </html>
        """

        response = HtmlResponse(
            url='http://example.com',
            body=test_html.encode('utf-8')
        )

        results = list(self.spider.parse(response))

        # Validate results
        self.assertEqual(len(results), 2)  # 1 product request + 1 pagination request
        self.assertIsInstance(results[0], Request)
        self.assertEqual(results[0].url, 'http://example.com/product/1')

if __name__ == '__main__':
    unittest.main()

Conclusion

The Scrapy framework provides a complete solution for web crawling:

  1. Structured Design: Clear component separation, easy to maintain and extend
  2. High Performance: Asynchronous processing, supports concurrent requests
  3. Rich Middleware: Customizable request processing flow
  4. Flexible Data Pipelines: Supports various data storage methods
  5. Powerful Selectors: Supports CSS and XPath selectors
  6. Comprehensive Toolchain: Includes debugging, testing, and deployment tools

By effectively utilizing the various components of Scrapy, one can build efficient, stable, and maintainable web crawling applications.

Thank you for following our public account. We appreciate your likes and shares. We will continue to update articles and learning experiences related to front-end development, back-end development, and artificial intelligence technology. The future is full of uncertainties, but we can continuously improve ourselves and prepare for a better future.

Python Basics Series | Python Crawling Technology (Part 2)

Python Basics | Python Selenium Testing Toolkit

Python Basics Series | Python Crawling Technology (Part 1)

Python Basics Series | Python Setting pip Mirror Source

Python Basics Series | Python Basics Knowledge (Part 6)

Python Basics Series | Python Basics Knowledge (Part 5)

Python Basics Series | Python Basics Knowledge (Part 4)

Python Basics Series | Python Basics Knowledge (Part 3)

Python Basics Series | Python Basics Knowledge (Part 2)

Python Basics Series | Python Basics Knowledge (Part 1)

Processing Lists in Python

Introduction to Python Basic Syntax: Learning Python from Scratch

Deploying Python Projects in Windows Environment

Essentials | Introduction to AI and Setting Up Python Environment

Leave a Comment