Batch Processing with Python with Multithreading (Improved)

I wrote an article on how to do batch processing with multithreads in Python last week but there are things that my sample code wasn’t handling.

  • Handle results from the threaded function.
  • Handle exceptions from the threaded function.

With these 2 points in mind, I rewrote the sample code.

from concurrent.futures import ThreadPoolExecutor
from concurrent import futures
import time


def task(n):
    print(f"processing {n}")
    if n % 5 == 0:
        raise Exception("It is divisible by 5")
    time.sleep(1)
    return True


def main():
    print("Starting ThreadPoolExecutor")
    tasks = []
    fs = []
    for i in range(23):
        tasks.append(task)

    with ThreadPoolExecutor(max_workers=5) as executor:
        for i, t in enumerate(tasks):
            future = executor.submit(t, i)
            fs.append(future)
        results = futures.wait(fs)
    for result in results.done:
        if result.done():
            print(f"{result.done()}")
        if result.exception() is not None:
            print(f"Handle exception here: {result.exception()}")


if __name__ == '__main__':
    start_time = time.time()
    main()
    elapsed_time = time.time() - start_time
    print(f"Took {elapsed_time}")

Here is the output:

Starting ThreadPoolExecutor
processing 0
processing 1
processing 2
processing 3
processing 4
processing 5
processing 6
processing 7
processing 8
processing 9
processing 10
processing 11
processing 12
processing 13
processing 14
processing 15
processing 16
processing 17
processing 18
processing 19
processing 20
processing 21
processing 22
True
Handle exception here: It is divisible by 5
True
True
True
Handle exception here: It is divisible by 5
True
True
Handle exception here: It is divisible by 5
True
True
True
True
True
True
True
True
True
True
Handle exception here: It is divisible by 5
True
True
True
True
True
True
Handle exception here: It is divisible by 5
True
Took 4.017247915267944

This way, you can handle situations where you are expecting certain results from the threaded function and also exception situation. The previous sample did not have any of those, so this sample is a better one. Also it is easier to specify the number of concurrent threads.

Batch Processing with Python Multithreading

I want to execute 5 threads at a time but I have 23 things I want to run in total. Here is the code I came up with.

import threading
from multiprocessing import Pool
import time


class MyThread(threading.Thread):
    def run(id):
        print(f"thread {id}")
        time.sleep(3)


if __name__ == '__main__':
    start_time = time.time()
    threads = []
    batch_size = 5
    for i in range(23):
        threads.append(MyThread.run)

    batch_index = 1
    thread_index = 1
    while len(threads) > 0:
        pool = Pool()
        print(f"Batch {batch_index}")
        for j in range(batch_size):
            if threads:
                t = threads.pop()
                pool.apply_async(t, (thread_index,))
                thread_index += 1
        pool.close()
        pool.join()
        batch_index += 1

    elapsed_time = time.time() - start_time
    print(f"Took {elapsed_time}")

Output:

Batch 1
thread 1
thread 2
thread 3
thread 4
thread 5
Batch 2
thread 6
thread 7
thread 8
thread 9
thread 10
Batch 3
thread 11
thread 12
thread 13
thread 14
thread 15
Batch 4
thread 16
thread 17
thread 18
thread 19
thread 20
Batch 5
thread 21
thread 22
thread 23
Took 15.523964166641235

Each thread takes 3 seconds. If I executed the function sequentially, it would take at least 60 seconds but with the 5 threads at a time, it ended up with only 12 seconds. This is a huge improvement.

Another thing to note is that I declared threads variable as list. List has pop() method in Python. This returns the item (thread object in this case) and removes it from the list. This way, you can use the list to keep track on the threads.

I also needed to add if threads: to check if the threads still has items in case the number of threads is not divisible by 5. If I had 23 threads I want to execute, it attempts to execute 20, 21, 22, 23, 24, 25. 24 and 25 do not exist in the list so it errors out. To prevent such a situation, the if statement is necessary.

Multithread Processing with Python

I did fair amount of multithreading programming with C# in the past but never tried it with Python. Let’s imagine a function that takes 3 seconds. You want to execute it 3 times. The total execution time should be 9 seconds like the following.

import threading
import time


class MyThread(threading.Thread):
    def run():
        # Long running process
        time.sleep(3)
        print('Done')


if __name__ == '__main__':
    start_time = time.time()
    MyThread.run()
    MyThread.run()
    MyThread.run()
    elapsed_time = time.time() - start_time
    print(f"Took {elapsed_time}")

Output:

Done
Done
Done
Took 9.006911993026733

If you can run the functions in 3 different threads at the same time, you could save time. Here is the multithread sample in Python.

import threading
from multiprocessing import Pool
import time


class MyThread(threading.Thread):
    def run():
        time.sleep(3)
        print('hoge')


if __name__ == '__main__':
    start_time = time.time()
    pool = Pool()
    pool.apply_async(MyThread.run)
    pool.apply_async(MyThread.run)
    pool.apply_async(MyThread.run)
    pool.close()
    pool.join()
    elapsed_time = time.time() - start_time
    print(f"Took {elapsed_time}")

Here is the output:

hoge
hoge
hoge
Took 3.179738998413086

It should take 9 seconds if those processes run sequentially but it only took 3.17 seconds. It’s because the 3 threads run at the same time like the following image.

It’s not too difficult to do multithreading in Python. There is one more thing I am thinking about. What if you need to run 100 processes but you want to limit the number of threads to 5 at a time? I will write about it after this blog article.

Round to the Nearest Multiple of 5

I needed a code snippet that can get the nearest multiple of 5 based on a number.

For example, 11 should be 15, 16 should be 20, 26 should be 30 and so forth.

11 -> 15
16 -> 20
26 -> 30

Here is the way to think about this problem. So the final result should be multiple of 5, so if the original number is divided by 5, the remainder should be 0, 1, 2, 3 or 4. If you divide 11 by 5, the remainder is 4. If you add this 4 to 11, you get the nearest multiple of 5. So to generalize this to an equation…

original_num = 11
result = original_num + original_num % 5

It’s really easy when you think about it, but if you have not gone through a problem like this before, maybe it’s hard to come up with the idea to use remainder.

Integer Array Intersection (Improved)

Right after I wrote the previous blog entry regarding integer array intersection, I thought of sorting the two parameters before I did anything with those arrays. I think I came up with a better algorithm. Let’s look at the code.

    public void testIntersect2()
    {
        int[] nums1 = {4,7,9,7,6,7};
        int[] nums2 = {5,0,0,6,1,6,2,2,4};

        int[] result = intersect2(nums1, nums2);
        for (int i : result)
        {
            System.out.println(i);
        }
    }

    private int[] intersect2(int[] nums1, int[] nums2)
    {
        Arrays.sort(nums1);
        Arrays.sort(nums2);

        int[] shorter;
        int[] longer;

        if (nums1.length < nums2.length) {
            shorter = nums1;
            longer = nums2;
        }
        else {
            shorter = nums2;
            longer = nums1;
        }

        int[] nums = new int[shorter.length];

        int j = 0;
        int index = 0;

        for(int i = 0; i < shorter.length; i++) {
            while (j < longer.length && shorter[i] >= longer[j]) {
                if (shorter[i] == longer[j]) {
                    nums[index] = shorter[i];
                    index++;
                    j++;
                    break;
                }
                j++;
            }
        }

        int[] result = new int[index];
        for(int i = 0; i < index; i++)
        {
            result[i] = nums[i];
        }

        return result;
    }

What’s better about the second attempt on this algorithm is I did not create an ArrayList for longer variable. It takes less space. The only extra data in the memory is the result as int[] so space complexity-wise it’s O(n+1) = O(n). Also convering ArrayList<Integer> to int[] seems to take some time. When I measured the code execution time, my previous code took 34 ms but this one took only 1 ms. It doesn’t seem much because it’s only 34 ms with the less efficient code but if the system execute the function millions and billions of times, it adds up!

So the logic is that you iterate through the sorted shorter integer array from the first element and you move each element in the longer one until the element matches the one in the longer array or the value in the shorter array is greater than equal to the current element in the longer array.

With this algorithm, we are iterating the shorter variable once and the longer one just about once (or less). The time complexity may be O(n).

After all, sorting the integer array at the beginning was a good idea. 🙂

Integer Array Intersection

Let’s think about 2 integer arrays.

int[] nums1 = {1, 2, 3, 9, 9, 9, 8, 5};
int[] nums2 = {9, 9};

Need to write a function that returns the common elements in both arrays. In this example, nums2 has two 9’s and nums1 has three 9’s, so the function must return {9, 9} (two 9’s).

I’ve solved the problem but I’m not sure if I have done it in the most efficient way. I will show my code below.

    public void testIntersect()
    {
        Integer[] nums1 = {9, 6, 8, 9, 9};
        Integer[] nums2 = {9, 8, 9, 5, 5, 5, 5};

        int[] result = intersect(nums1, nums2);
        for (int i : result)
        {
            System.out.println(i);
        }
    }

    public int[] intersect(Integer[] nums1, Integer[] nums2) {
        List<Integer> longer;
        Integer[] shorter;
        if (nums1.length < nums2.length) {
            longer = Arrays.stream(nums2).collect(Collectors.toList());
            shorter = nums1;
        }
        else {
            longer = Arrays.stream(nums1).collect(Collectors.toList());
            shorter = nums2;
        }

        List<Integer> result = new ArrayList<>();

        for(int i = 0; i < shorter.length; i++) {
            if (longer.contains(shorter[i])) {
                longer.remove(shorter[i]);
                result.add(shorter[i]);
            }
        }
        return result.stream().mapToInt(i -> i).toArray();
    }

The first thing I thought of doing is that I want to iterate through the shorter array so I declared longer as List<Integer> and shorter as just an integer array. The reason the longer variable is declared as List<Integer> is because I need to be able to remove elements from it. The logic is that if an element in the shorter array, the element in the longer array should be removed because if shorter contains two 9’s and longer array contains a single 9, the second 9 in the shorter array should not match the 9 in the longer array.

result variable as List<Integer> contains the matched elements and it finally returns the array.

The time complexity for this code may be O(n) and the space complexity may be O(2n).

I’m thinking there may be cleverer ways to accomplish this…

Reverse Integer

I’m doing some algorithm practice to improve my programming skills and to keep myself sharp. It’s really refreshing for an engineer like me who is so busy with day-to-day tasks which doesn’t require much of sophisticated algorithms though my job involves coding for automation.

In this article, I’m going to reverse integers. For example, if you receive 12345 and then function must return 54321. The function must handle the case where if the result overflows the integer max or min value.

I initially came up with a way to get absolute value, convert it to a string, reverse it and if the original value is negative, insert “-” to the index 0 of the string. Finally if you parse the string to integer, you get the result. This is not an ideal solution for several reasons. Here is the code.

    public void testPoorReverseInteger()
    {
        int reversed = poorReverseInteger(12345);
        System.out.println(reversed);
        assertEquals(reversed, 54321);

        reversed = poorReverseInteger(-12345);
        System.out.println(reversed);
        assertEquals(reversed, -54321);
    }

    private int poorReverseInteger(int input)
    {
        StringBuilder s = new StringBuilder(Integer.toString(Math.abs(input)));
        s = s.reverse();
        int result = 0;

        if (input < 0)
            s.insert(0, "-");

        try
        {
            result = Integer.parseInt(s.toString());
        }
        catch(NumberFormatException exp)
        {
            System.out.println(exp.toString());
            return 0;
        }

        return result;
    }

I think the time complexity is probably O(n) but the space complexity is O(2n). I’m also using exception for handling overflow situation and using exception for validation is relatively expensive. If you expect the exception to happen very often, the time complexity will add up. Also converting from integer to string and string back to integer is also not such an efficient process.

Here is another smarter way to reverse an integer.

    public void testReverseInteger()
    {
        int input = -12345456;
        int result = reverseInteger(input);
        System.out.println(result);
        assertEquals(result, -65454321);
    }

    private int reverseInteger(int input)
    {
        //max integer 2147483647
        //min integer -2147483648
        int result = 0;
        while (input != 0) {
            int lastDigit = input % 10;
            input /= 10;

            if (result > Integer.MAX_VALUE/10 || (result == Integer.MAX_VALUE/10 && lastDigit > 7))
            {
                return 0;
            }

            if (result < Integer.MIN_VALUE/10 || (result == Integer.MIN_VALUE/10 && lastDigit < -8))
            {
                return 0;
            }

            result = result * 10 + lastDigit;
        }

        return result;
    }

Declaring int result = 0; is no brainer. We will use it to keep the result for returning the value.

The logic of the function is to get the value of the last digit (line# 15) by getting a remainder of division by 10, reduce the value of input by dividing it by 10, check the overflow situation (from line# 18 to 26) and add it to the result variable. So for the first iteration of 12345, the last digit is 5. When 12345 is divided by 10, the value is reduced to 1234 because the variable is integer. The decimal point is omitted. Then in the next iteration, 4 is contained in the last digit. The result variable contains 5 at the point of hitting the line# 28, 5*10 + 4 = 54. If you repeat this process until input is 0, you will get 54321.

Let’s talk about handling the overflow situation.

if (result > Integer.MAX_VALUE/10 || (result == Integer.MAX_VALUE/10 && lastDigit > 7))

If the value of result variable is greater than the value of 1/10 of the max integer value, the result will be definitely larger than the max integer value, so there will be a overflow situation on line# 28.

Another situation that needs to handle is if the value of result variable is equals to the 1/10 of the max integer value and the last digit is larger than 7. The value of max integer is 2147483647 so if the value of lastDigit variable is greater than 7, the result will be greater than 2147483647. The validation for the min integer value is the same idea.

The latter method is better in a sense that the space complexity is O(n) but the time complexity is O(1) because it only deals with integer which is limited to 32 bit data. It also doesn’t depend on exception for overflow validation.

Recap

Knowing the nature of the data you are dealing with makes it more efficient to work with it. I’m planning to have more fun with algorithm especially in Java.