Building a Potential Stock Prediction System with Python and SQLite! A Must-Have for Retail Investors!

Hello everyone, I am your old friend! The previous article “One-Click Stock Selection Tool: Easily Capture Tomorrow’s Potential Stocks with Python!” received a lot of positive feedback, but many friends reported that configuring MySQL was too troublesome, especially for beginners, as installing and debugging the database environment is simply a nightmare. Today, I bring you the newly upgraded SQLite Zero Configuration Version! No need to install any database, combined with the previous article “Goodbye MySQL! Easily Build an A-Share Data Capture System with Python and SQLite!” we can complete the entire process from data capture to stock selection with just 2 scripts! Whether you are a technical novice or an experienced developer, you can easily get started!

🚀 Core Features Revealed

1. Minimal Database Configuration (SQLite Black Technology)

def connect_to_sqlite(database):    conn = sqlite3.connect(database, timeout=30)    conn.execute('PRAGMA journal_mode=WAL')  # Write speed increased by 5 times    conn.execute('PRAGMA synchronous=NORMAL')     return conn

💡 Tip: WAL mode allows SQLite’s concurrency performance to rival MySQL, yet it only requires a single file!

2. Asynchronous Batch Write Engine (Prevent Data Loss)

class SQLiteWriter:    def __init__(self, db_path):        self.queue = Queue()        self.worker = Thread(target=self._run)  # Independent writing thread    def _run(self):        while True:            batch = []            for _ in range(100):  # Submit every 100 records                item = self.queue.get()                batch.append(item)            cursor.executemany(insert_sql, batch)

💡 Tip: By decoupling computation and IO operations through a queue, we avoid blocking the main thread, reducing disk IO times and improving performance by over 10 times.

3. Multi-Factor Stock Selection Model (Institutional Strategy)

# MACD Golden Cross + Volume Breakthrough + Industry UndervaluationWHERE s.macd > s.macd_signal      # MACD Golden Cross AND s.vol_add_rate > 1.5          # Volume increased by 50% AND s.ma10 > s.ma50               # Short-term trend upward AND s.bld_down < s.close          # Breakthrough the lower Bollinger Band AND dynamic_pe <= avg_pe          # Price-to-earnings ratio lower than industry average

4. Intelligent Retry Mechanism (No Fear of Unstable Networks)

def process_stock(stock_code, writer):    for attempt in range(3):  # Maximum retry 3 times        try:            df = get_stock_data(stock_code)            if df.empty: return            # Random wait to avoid IP blocking            time.sleep(random.uniform(0.1, 0.5))              break        except Exception as e:            print(f"Attempt {attempt+1} failed, waiting to retry...")

5. Professional Indicator Calculation (Powered by TA-Lib)

def calculate_indicators(df):    # MACD Indicator    df['MACD'], df['MACD_Signal'], _ = talib.MACD(df['close'])    # Bollinger Band Calculation    df['BLD_MID'] = df['close'].rolling(20).mean()    df['BLD_UP'] = df['BLD_MID'] + 2*df['close'].rolling(20).std()    # KDJ Indicator    df['K'], df['D'] = talib.STOCH(df['high'], df['low'], df['close'])    df['J'] = 3*df['K'] - 2*df['D']    return df

6. Industry Comparison Screening (Core of Value Investment)

def select_stocks(conn):    # Get industry average valuation    industry_avg = pd.read_sql("""        SELECT industry,                AVG(dynamic_pe) as avg_pe,               AVG(pb) as avg_pb         FROM stock_info         GROUP BY industry    """, conn)    # Filter undervalued stocks    candidates = candidates[        (candidates['dynamic_pe'] <= candidates['avg_pe']) &        (candidates['pb'] <= candidates['avg_pb'])    ]

Complete Code

import sqlite3import pandas as pdfrom datetime import datetime, timedeltaimport numpy as npimport akshare as akimport talibimport timeimport randomfrom concurrent.futures import ThreadPoolExecutorfrom queue import Queuefrom threading import Threadfrom sqlite3 import OperationalError# Database ConfigurationDB_PATH = "gp_data.db"START_DATE = "2024-03-26"END_DATE = datetime.now().strftime("%Y-%m-%d")MAX_WORKERS = 6BATCH_SIZE = 100def format_decimal(value):    """Format decimal, keep 2 decimal places"""    return round(float(value), 2) if value is not None and not pd.isna(value) else 0.0def connect_to_sqlite(database):    conn = sqlite3.connect(database, timeout=30)    conn.execute('PRAGMA journal_mode=WAL')    conn.execute('PRAGMA synchronous=NORMAL')    conn.execute('PRAGMA cache_size=-20000')    return connclass SQLiteWriter:    def __init__(self, db_path):        self.db_path = db_path        self.queue = Queue()        self.worker = Thread(target=self._run, daemon=True)        self.worker.start()    def _run(self):        conn = connect_to_sqlite(self.db_path)        cursor = conn.cursor()        batch = []        insert_query = """        INSERT INTO stock_indicators (            code, dt, open, close, ma10, ma50, ma100, ma200, vol, tr, trend1, trend2, trend3,             vol_add_rate, tr_add_rate, rsi, macd, macd_signal, macd_hist, bld_up, bld_mid, bld_down, k, d, j        ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)        ON CONFLICT(code, dt) DO UPDATE SET            open=excluded.open, close=excluded.close, ma10=excluded.ma10, ma50=excluded.ma50,            ma100=excluded.ma100, ma200=excluded.ma200, vol=excluded.vol, tr=excluded.tr,            trend1=excluded.trend1, trend2=excluded.trend2, trend3=excluded.trend3,            vol_add_rate=excluded.vol_add_rate, tr_add_rate=excluded.tr_add_rate,            rsi=excluded.rsi, macd=excluded.macd, macd_signal=excluded.macd_signal,            macd_hist=excluded.macd_hist, bld_up=excluded.bld_up, bld_mid=excluded.bld_mid,            bld_down=excluded.bld_down, k=excluded.k, d=excluded.d, j=excluded.j        """        while True:            item = self.queue.get()            if item is None:                if batch:                    cursor.executemany(insert_query, batch)                    conn.commit()                break            batch.append(item)            if len(batch) >= BATCH_SIZE:                try:                    cursor.executemany(insert_query, batch)                    conn.commit()                    batch = []                except Exception as e:                    print(f"Batch insert failed: {e}")                    time.sleep(1)        cursor.close()        conn.close()    def add_task(self, data):        self.queue.put(data)    def shutdown(self):        self.queue.put(None)        self.worker.join()def get_stock_data(conn, stock_code, start_date, end_date):    query = """        SELECT dt, open, close, high, low, vol, tr         FROM gp_base_info         WHERE code = ? AND dt BETWEEN ? AND ?         ORDER BY dt ASC    """    try:        df = pd.read_sql_query(query, conn, params=(stock_code, start_date, end_date))        df.set_index('dt', inplace=True)        return df    except Exception as e:        print(f"Failed to get data for stock {stock_code}: {e}")        return pd.DataFrame()def calculate_indicators(df):    if df.empty:        return df    # Calculate indicators (all keep 2 decimal places)    df['MA10'] = df['close'].rolling(10).mean().apply(format_decimal)    df['MA50'] = df['close'].rolling(50).mean().apply(format_decimal)    df['MA100'] = df['close'].rolling(100).mean().apply(format_decimal)    df['MA200'] = df['close'].rolling(200).mean().apply(format_decimal)    df['AVG_VOL'] = df['vol'].rolling(10).mean().apply(format_decimal)    df['AVG_TR'] = df['tr'].rolling(10).mean().apply(format_decimal)    df['RSI'] = talib.RSI(df['close'], timeperiod=14).apply(format_decimal)    macd, macd_signal, macd_hist = talib.MACD(df['close'])    df['MACD'] = macd.apply(format_decimal)    df['MACD_Signal'] = macd_signal.apply(format_decimal)    df['MACD_Hist'] = macd_hist.apply(format_decimal)    df['BLD_MID'] = df['close'].rolling(20).mean().apply(format_decimal)    std = df['close'].rolling(20).std()    df['BLD_UP'] = (df['BLD_MID'] + 2 * std).apply(format_decimal)    df['BLD_DOWN'] = (df['BLD_MID'] - 2 * std).apply(format_decimal)    high = df['high'].rolling(9).max()    low = df['low'].rolling(9).min()    rsv = ((df['close'] - low) / (high - low) * 100).fillna(50)    df['K'] = rsv.ewm(alpha=1 / 3).mean().apply(format_decimal)    df['D'] = df['K'].ewm(alpha=1 / 3).mean().apply(format_decimal)    df['J'] = (3 * df['K'] - 2 * df['D']).apply(format_decimal)    return df.dropna()def process_stock(stock_code, writer):    max_retries = 3    for attempt in range(max_retries):        try:            conn = connect_to_sqlite(DB_PATH)            df = get_stock_data(conn, stock_code, START_DATE, END_DATE)            conn.close()            if df.empty:                return            df = calculate_indicators(df)            for date, row in df.iterrows():                vol_rate = format_decimal(row['vol'] / row['AVG_VOL'] if row['AVG_VOL'] > 0 else 1)                tr_rate = format_decimal(row['tr'] / row['AVG_TR'] if row['AVG_TR'] > 0 else 1)                trend1 = int(row['close'] > row['MA10'] > row['MA50'])                trend2 = int(row['close'] < row['MA50'] < row['MA100'])                trend3 = int(row['close'] < row['MA100'] < row['MA200'])                data = (                    stock_code, date,                    format_decimal(row['open']),                    format_decimal(row['close']),                    row['MA10'], row['MA50'], row['MA100'], row['MA200'],                    int(row['vol']),                    format_decimal(row['tr']),                    trend1, trend2, trend3,                    vol_rate, tr_rate,                    row['RSI'],                    row['MACD'], row['MACD_Signal'], row['MACD_Hist'],                    row['BLD_UP'], row['BLD_MID'], row['BLD_DOWN'],                    row['K'], row['D'], row['J']                )                writer.add_task(data)            return        except OperationalError as e:            wait = random.uniform(0.5, 2.0) * (attempt + 1)            print(f"Stock {stock_code} attempt {attempt + 1}/{max_retries} failed, waiting {wait:.1f} seconds: {e}")            time.sleep(wait)        except Exception as e:            print(f"Error processing stock {stock_code}: {e}")            breakdef select_stocks(conn, num=10):    # Get industry averages (keep 2 decimal places)    industry_avg = pd.read_sql("""        SELECT industry,                ROUND(AVG(dynamic_pe), 2) AS avg_pe,               ROUND(AVG(pb), 2) AS avg_pb,               ROUND(AVG(roe), 2) AS avg_roe        FROM stock_info        GROUP BY industry    """, conn)    candidates = pd.read_sql(f"""        SELECT s.*, i.name, i.industry,                ROUND(i.dynamic_pe, 2) AS dynamic_pe,               ROUND(i.pb, 2) AS pb,               ROUND(i.roe, 2) AS roe        FROM stock_indicators s        JOIN stock_info i ON s.code = i.code        WHERE s.dt = '{END_DATE}'          AND s.macd > s.macd_signal          AND s.vol_add_rate > 1.5          AND s.tr > 5          AND s.ma10 > s.ma50          AND s.bld_down < s.close          AND s.code NOT LIKE '688%%'        ORDER BY s.macd_hist DESC, s.k ASC, i.roe DESC        LIMIT {num * 3}    """, conn)    merged = pd.merge(candidates, industry_avg, on='industry', how='left')    filtered = merged[        (merged['dynamic_pe'] > 0) &        (merged['dynamic_pe'] <= merged['avg_pe']) &        (merged['pb'] > 0) &        (merged['roe'] >= merged['avg_roe'])        ]    # Use assign to create new columns at once    filtered = filtered.assign(**{        col: filtered[col].apply(format_decimal)        for col in ['dynamic_pe', 'avg_pe', 'pb', 'avg_pb', 'roe', 'avg_roe']    })    return filtered.head(num)def main():    print(f"Starting to calculate stock indicators ({START_DATE} to {END_DATE})...")    start_time = time.time()    writer = SQLiteWriter(DB_PATH)    try:        stock_codes = ak.stock_info_a_code_name()['code'].tolist()        print(f"A total of {len(stock_codes)} stocks obtained")        print(f"\nStarting to calculate indicators...")        with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:            list(executor.map(lambda code: process_stock(code, writer), stock_codes))        print(f"\nIndicator calculation completed...")        print("\nStarting stock selection...")        conn = connect_to_sqlite(DB_PATH)        try:            selected = select_stocks(conn)            # Format output (all keep 2 decimal places)            selected = selected[[                'code', 'name', 'industry', 'dynamic_pe', 'avg_pe',                'pb', 'avg_pb', 'roe', 'avg_roe'            ]]            selected.columns = [                'Code', 'Name', 'Industry', 'Price-to-Earnings Ratio', 'Industry Price-to-Earnings Ratio',                'Price-to-Book Ratio', 'Industry Price-to-Book Ratio', 'ROE', 'Industry ROE'            ]            output_file = f"Stock Selection Results_{datetime.now().strftime('%Y%m%d')}.csv"            selected.to_csv(output_file, index=False, encoding='utf_8_sig', float_format='%.2f')            print(f"\nStock selection results saved to: {output_file}")            print(selected.to_string(index=False, float_format='%.2f'))        finally:            conn.close()    finally:        writer.shutdown()        print(f"\nTotal time taken: {time.time() - start_time:.1f} seconds")if __name__ == "__main__":    main()

Note: The above code needs to be used in conjunction with the previous article’s code, first execute the previous code to save the basic data and daily data, then execute this article’s code to calculate indicators and select stocks.If this article has helped you, feel free to like, bookmark, and share it with more like-minded friends, let us continue to progress together on the path of stock trading! 🚀

Leave a Comment