Batch Processing with Python Multithreading

I want to execute 5 threads at a time but I have 23 things I want to run in total. Here is the code I came up with.

import threading
from multiprocessing import Pool
import time


class MyThread(threading.Thread):
    def run(id):
        print(f"thread {id}")
        time.sleep(3)


if __name__ == '__main__':
    start_time = time.time()
    threads = []
    batch_size = 5
    for i in range(23):
        threads.append(MyThread.run)

    batch_index = 1
    thread_index = 1
    while len(threads) > 0:
        pool = Pool()
        print(f"Batch {batch_index}")
        for j in range(batch_size):
            if threads:
                t = threads.pop()
                pool.apply_async(t, (thread_index,))
                thread_index += 1
        pool.close()
        pool.join()
        batch_index += 1

    elapsed_time = time.time() - start_time
    print(f"Took {elapsed_time}")

Output:

Batch 1
thread 1
thread 2
thread 3
thread 4
thread 5
Batch 2
thread 6
thread 7
thread 8
thread 9
thread 10
Batch 3
thread 11
thread 12
thread 13
thread 14
thread 15
Batch 4
thread 16
thread 17
thread 18
thread 19
thread 20
Batch 5
thread 21
thread 22
thread 23
Took 15.523964166641235

Each thread takes 3 seconds. If I executed the function sequentially, it would take at least 60 seconds but with the 5 threads at a time, it ended up with only 12 seconds. This is a huge improvement.

Another thing to note is that I declared threads variable as list. List has pop() method in Python. This returns the item (thread object in this case) and removes it from the list. This way, you can use the list to keep track on the threads.

I also needed to add if threads: to check if the threads still has items in case the number of threads is not divisible by 5. If I had 23 threads I want to execute, it attempts to execute 20, 21, 22, 23, 24, 25. 24 and 25 do not exist in the list so it errors out. To prevent such a situation, the if statement is necessary.

Multithread Processing with Python

I did fair amount of multithreading programming with C# in the past but never tried it with Python. Let’s imagine a function that takes 3 seconds. You want to execute it 3 times. The total execution time should be 9 seconds like the following.

import threading
import time


class MyThread(threading.Thread):
    def run():
        # Long running process
        time.sleep(3)
        print('Done')


if __name__ == '__main__':
    start_time = time.time()
    MyThread.run()
    MyThread.run()
    MyThread.run()
    elapsed_time = time.time() - start_time
    print(f"Took {elapsed_time}")

Output:

Done
Done
Done
Took 9.006911993026733

If you can run the functions in 3 different threads at the same time, you could save time. Here is the multithread sample in Python.

import threading
from multiprocessing import Pool
import time


class MyThread(threading.Thread):
    def run():
        time.sleep(3)
        print('hoge')


if __name__ == '__main__':
    start_time = time.time()
    pool = Pool()
    pool.apply_async(MyThread.run)
    pool.apply_async(MyThread.run)
    pool.apply_async(MyThread.run)
    pool.close()
    pool.join()
    elapsed_time = time.time() - start_time
    print(f"Took {elapsed_time}")

Here is the output:

hoge
hoge
hoge
Took 3.179738998413086

It should take 9 seconds if those processes run sequentially but it only took 3.17 seconds. It’s because the 3 threads run at the same time like the following image.

It’s not too difficult to do multithreading in Python. There is one more thing I am thinking about. What if you need to run 100 processes but you want to limit the number of threads to 5 at a time? I will write about it after this blog article.