79349958

Date: 2025-01-12 13:24:44
Score: 1
Natty:
Report link

It seems that @Bhanuday Sharma is right and it cannot greatly improve curve_fit function performance and largely lessen computation time, if not writing a low level code.

However, by utilizing multiprocessing module, especially multiprocessing.Process() function, its performance can be largely upgraded, lessening computation time by ca. 76%, when tested in one million+ datasets for fitting.

Since you do not provide original data (5 000 000), thus I have uploaded one test file with 1314300 rows ("test.npy", which can be accessed from this website: https://www.mediafire.com/file/fa5nq6nycvawa4w/test.npy/file) for performance comparison. Besides, your predefined func is not complex, which is not applicable in reality. Suppose we have a quadratic function to fit, which is in the form of f(x, y) = ax^2 + by^2 + cxy + dx + ey + f and we prepare to utilize one million+ data for fitting.


Code part:

import numpy as np
import multiprocess as mp
from scipy.optimize import curve_fit
from timeit import default_timer as timer

oneArr = np.load('test.npy')
rowCnt = oneArr.shape[0]
xres, yres = 2, 2
xy = np.array([[-xres, yres], [0, yres], [xres, yres], [-xres, 0], [0, 0], [xres, 0], [-xres, -yres], [0, -yres], [xres, -yres]]).T
funcQuadratic = lambda xy, a, b, c, d, e, f: a * xy[0, :] ** 2 + b * xy[1, :] ** 2 + c * xy[0, :] * xy[1, :] + d * xy[0, :] + e * xy[1, :] + f

## Sequential for loop method
tic = timer()
result_scipy_for_loop = np.zeros((rowCnt, 6))
for row in range(rowCnt):
    result_scipy_for_loop[row, :] = curve_fit(f = funcQuadratic, xdata = xy, ydata = oneArr[row, :], p0 = (1, 1, 1, 1, 1, 1), method = 'lm')[0]
tac = timer()
print("result_scipy_for_loop is:", result_scipy_for_loop, "and its time usage is:", tac - tic, "seconds")

## Multiprocessing Process() function
def curve_fit_process(queue, rowRng):
    result = [curve_fit(f = funcQuadratic, xdata = xy, ydata = oneArr[idx, :], p0 = (1, 1, 1, 1, 1, 1), method = 'lm')[0] for idx in rowRng]
    queue.put(result)

q = mp.Queue()
tic = timer()
num_processes = 30
processes = []
if (rowCnt % num_processes) != 0:
    chunks = [np.arange(idx * (rowCnt // num_processes + 1), idx * (rowCnt // num_processes + 1) + rowCnt // num_processes + 1) for idx in range(num_processes)]
    for chunk in chunks:
        process = mp.Process(target = curve_fit_process, args = (q, chunk, ))
        processes.append(process)
        process.start()
        print("process: ", process.name, '->', process.pid, "starts...")

    ret = [q.get() for process in processes]
    print("ret is:", ret)

    for process in processes:
        process.join()
   
else:
    chunks = [np.arange(idx * (rowCnt // num_processes), idx * (rowCnt // num_processes) + rowCnt // num_processes) for idx in range(num_processes)]
    for chunk in chunks:
        process = mp.Process(target = curve_fit_process, args = (q, chunk, ))
        processes.append(process)
        process.start()
        print("process: ", process.name, '->', process.pid, "starts...")

    ret = [q.get() for process in processes]
    print("ret is:", ret)

    for process in processes:
        process.join()

tac = timer()
print("mp.Process() function time usage is:", tac - tic, "seconds.")

In my case, it only utilizes 119.21885330599616 seconds by multiprocessing.Process(), compared by traditional for loop style (491.4169644650028 seconds.).

Hope it's useful for someone in the future.

Reasons:
  • Long answer (-1):
  • Has code block (-0.5):
  • Unregistered user (0.5):
  • User mentioned (1): @Bhanuday
  • Low reputation (1):
Posted by: Jw L