IntroductionIn the field of meteorological data science, the explosive growth of data poses a severe challenge to computational efficiency. Data generated from global climate model outputs, high-resolution satellite remote sensing, and dense observation networks can easily reach TB or even PB levels. Traditional single-process Python scripts often run slowly when handling these CPU-intensive tasks (such as variable calculations, regional statistics, spatiotemporal interpolation, etc.), failing to fully utilize the powerful computing capabilities of large servers.To break through this bottleneck and achieve true parallel computing, we must turn to multi-process operations.The Python multiprocessing module is used formulti-process parallel computing, which can fully utilizemulti-core CPUs to accelerate tasks and improve program execution efficiency.Process Pool – An ExampleBased on global 144×56 grid points, daily precipitation data over 40 years has been collected. Its linear trend has already been provided.The current task is to calculate and save the linear trend of precipitation at the specified time index.First, import the libraries and complete the initialization
import os
import numpy as np
import datetime
from datetime import datetime,timedelta
import tqdm
from tqdm import tqdm
import multiprocessing
from multiprocessing import Pool
trend = np.random.rand(144,56)
intercept = np.random.rand(144,56)
time_init = datetime(1980,1,1)
Write a function to calculate and output the results
def processrain(tindex):
rain = (tindex+1)*trend+intercept
rain_time = time_init+timedelta(days=tindex)
fname = f"{rain_time.year}{rain_time.month:02d}{rain_time.day:02d}.npy"
np.save(os.path.join(savepath,fname),rain)
Finally, execute the following
if __name__ == "__main__":
with Pool(processes=4) as pool:
results = pool.map(processrain, [0, 1, 2, 3, 4])
Pool(processes=4): Creates a process pool with a maximum of 4 processes;pool.map(processrain,[0,1,2,3,4]): Distributes 5 tasks to different processes for execution.Multi-sequence FittingIf the second parameter passed to pool.map is an ndarray, how will the tasks be executed in parallel?
def calculate(row):
return np.sum(row)
if __name__ == "__main__":
x = np.arange(1,10).reshape(3,-1)
with Pool(processes=4) as pool:
results = pool.map(calculate, x)
print(results)
The final result shows as[6,15,24]: indicating thatpool.map iterates over the first dimension of the iterable object.Therefore, when we want to fit a normal distribution to a daily precipitation time series (over 40 years) of a large grid (144×56), we can:Write a function for fitting a single point time series
from scipy.stats import norm
def fitnorm(data):
mu,std = norm.fit(data)
return mu,std
Parallel computation
if __name__ == "__main__":
x = np.random.rand(40*365,144,56)
with Pool(processes=20) as pool:
results = pool.map(fitnorm,np.transpose(x.reshape(40*365,-1)))
mu_list = [r[0] for r in results]
std_list = [r[1] for r in results]
mu_final = np.array(mu_list).reshape(144,56)
std_final = np.array(std_list).reshape(144,56)
mu_final and std_final are the normal distribution parameters for each grid point.Multi-process DownloadFor example, when downloading ERA5 data, a multi-process approach can be used when the download volume is large.
import cdsapi
import numpy as np
import os
from multiprocessing import Pool
import tqdm
from tqdm import tqdm
def downfile(year,month):
dataset = "derived-era5-single-levels-daily-statistics"
request = {
"product_type": "reanalysis",
"variable": ["potential_evaporation"],
"year": f"{year}",
"month": f"{month:02d}",
"day": [
"01", "02", "03",
"04", "05", "06",
"07", "08", "09",
"10", "11", "12",
"13", "14", "15",
"16", "17", "18",
"19", "20", "21",
"22", "23", "24",
"25", "26", "27",
"28", "29", "30",
"31"
],
"daily_statistic": "daily_mean",
"time_zone": "utc+00:00",
"frequency": "1_hourly"
}
client = cdsapi.Client()
client.retrieve(dataset, request, f"{year}{month:02d}.nc")
tasks = [(y,m) for y in range(1961,2024) for m in range(1,13)]
with Pool(processes=4) as pool:
results = list(tqdm(
pool.starmap(downfile, tasks),
total=len(tasks),
desc="Processing",
))