Multi Threading with map in Python

I wrote a blog article regarding advanced map with Python earlier and I found multi threading process using map with ThreadPoolExecutor. If you want to run multiple long running processes at the same time in multiple threads, this example may be for you. Here is the sample code.

import unittest
from concurrent import futures
from concurrent.futures import ThreadPoolExecutor
from itertools import repeat
import time

class playground(unittest.TestCase):

    def long_running_function(self, a, n):
        print(f"Executing {n}...")
        time.sleep(3)
        return a ** n

    def test_advanced_map(self):
        start_time = time.time()
        n = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
        with ThreadPoolExecutor(max_workers=3) as executor:
            fs = map(executor.submit, repeat(self.long_running_function), repeat(2), n)
            results = futures.wait(list(fs))
            for result in results.done:
                if result.done():
                    print(f"{result.result()}")
                if result.exception() is not None:
                    print(f"{result.exception()}")

        elapsed_time = time.time() - start_time
        print(f"took {elapsed_time}")

Output:

Executing 1...
Executing 2...
Executing 3...
Executing 4...
Executing 5...
Executing 6...
Executing 7...
Executing 8...
Executing 9...
Executing 10...

took 12.019917964935303

So that long_running_function takes 2 parameters. a is a fixed parameter and n is for each element of the n list.

So if you executed the long_running_function sequentially for each element, it would take 30 seconds because the function sleeps for 3 seconds but using 3 threads at a time, it took only 12 seconds. 10 elements should equal to 4 batches, hence 12 seconds. If there are hundreds or thousands of items, you can imagine this method can be a huge gain.

In test_advanced_map function, it instantiates ThreadPoolExecutor with max_workers value 3. It means 3 threads run at the same time. Depending on your program, you can increase the number but be careful not to increase the number too much because it can hog your memory.

And using map function, you execute the executor.submit function. You pass long_running_function as a fixed parameter as a function to be executed. And then, you pass repeat(2) and n for the parameter a for the long_running_function. You can execute a function for each element of list using map function in Python. I wrote about it before here.

Lastly, this method can handle returned results and unhandled exceptions. I believe this is the best way to do multi threading in Python. I could be wrong, so please comment if you have better ideas! 🙂

Advanced map with Python

As I was writing Python code using map, I came across an issue. When I ran the following code, I came across an error.

import unittest


class test(unittest.TestCase):
    def func1(self, x, y):
        return x ** y

    def test_map(self):
        a = [1, 2, 3, 4]
        results = map(self.func1, a, 2)
        print(results)

I basically wanted to pass 2 for the y parameter of func1 instead of another list. Here is the error I got.

FAILED (errors=1)

Error
Traceback (most recent call last):
  File "C:\Users\hiriu\dev\hoge\test.py", line 10, in test_map
    results = map(self.func1, a, 2)
TypeError: 'int' object is not iterable

Right, the number 2 is not a collection and is not iterable. How do I solve this problem? I searched the web and I found the following solution.

import unittest
import functools

class test(unittest.TestCase):
    def func1(self, x, y):
        return x ** y

    def test_map(self):
        a = [1, 2, 3, 4]
        results = map(functools.partial(self.func1, y=2), a)
        print(list(results))

By using the functools.partial, you get to pass a fixed value to the portion of the function. Here is the output.

[1, 4, 9, 16]

Process finished with exit code 0

Update:

I found an easier way to map a fixed parameter. Here is the example. It’s more readable and maintainable.

import unittest
from itertools import repeat


class test(unittest.TestCase):
    def func1(self, x, y):
        return x ** y

    def test_map(self):
        a = [1, 2, 3, 4]
        results = map(self.func1, a, repeat(2))
        print(list(results))

map Function in Python

map function in Python is a convenient way to execute function for a collection. Let’s see an example that does not use map function.

class playground(unittest.TestCase):
    def pow(self, n):
        return n**n

    def test_pow(self):
        numbers = range(10)
        for number in numbers:
            result = self.pow(number)
            print(result)

Output:

1
1
4
27
256
3125
46656
823543
16777216
387420489

The example above just executes the pow function sequentially for every item in the integer list.

If you use a map function, the code becomes concise and easier to manage. It might be a little confusing but if you get used to it, it’s not too bad.

class playground(unittest.TestCase):
    def pow(self, n):
        return n**n

    def test_map(self):
        numbers = range(10)
        results = map(self.pow, numbers)
        print(list(results))

Output:

[1, 1, 4, 27, 256, 3125, 46656, 823543, 16777216, 387420489]

I didn’t know 0^0 was 1… I knew n^0 was always 1 but… Interesting. 🙂

Batch Processing with Python with Multithreading (Improved)

I wrote an article on how to do batch processing with multithreads in Python last week but there are things that my sample code wasn’t handling.

  • Handle results from the threaded function.
  • Handle exceptions from the threaded function.

With these 2 points in mind, I rewrote the sample code.

from concurrent.futures import ThreadPoolExecutor
from concurrent import futures
import time


def task(n):
    print(f"processing {n}")
    if n % 5 == 0:
        raise Exception("It is divisible by 5")
    time.sleep(1)
    return True


def main():
    print("Starting ThreadPoolExecutor")
    tasks = []
    fs = []
    for i in range(23):
        tasks.append(task)

    with ThreadPoolExecutor(max_workers=5) as executor:
        for i, t in enumerate(tasks):
            future = executor.submit(t, i)
            fs.append(future)
        results = futures.wait(fs)
    for result in results.done:
        if result.done():
            print(f"{result.done()}")
        if result.exception() is not None:
            print(f"Handle exception here: {result.exception()}")


if __name__ == '__main__':
    start_time = time.time()
    main()
    elapsed_time = time.time() - start_time
    print(f"Took {elapsed_time}")

Here is the output:

Starting ThreadPoolExecutor
processing 0
processing 1
processing 2
processing 3
processing 4
processing 5
processing 6
processing 7
processing 8
processing 9
processing 10
processing 11
processing 12
processing 13
processing 14
processing 15
processing 16
processing 17
processing 18
processing 19
processing 20
processing 21
processing 22
True
Handle exception here: It is divisible by 5
True
True
True
Handle exception here: It is divisible by 5
True
True
Handle exception here: It is divisible by 5
True
True
True
True
True
True
True
True
True
True
Handle exception here: It is divisible by 5
True
True
True
True
True
True
Handle exception here: It is divisible by 5
True
Took 4.017247915267944

This way, you can handle situations where you are expecting certain results from the threaded function and also exception situation. The previous sample did not have any of those, so this sample is a better one. Also it is easier to specify the number of concurrent threads.

Batch Processing with Python Multithreading

I want to execute 5 threads at a time but I have 23 things I want to run in total. Here is the code I came up with.

import threading
from multiprocessing import Pool
import time


class MyThread(threading.Thread):
    def run(id):
        print(f"thread {id}")
        time.sleep(3)


if __name__ == '__main__':
    start_time = time.time()
    threads = []
    batch_size = 5
    for i in range(23):
        threads.append(MyThread.run)

    batch_index = 1
    thread_index = 1
    while len(threads) > 0:
        pool = Pool()
        print(f"Batch {batch_index}")
        for j in range(batch_size):
            if threads:
                t = threads.pop()
                pool.apply_async(t, (thread_index,))
                thread_index += 1
        pool.close()
        pool.join()
        batch_index += 1

    elapsed_time = time.time() - start_time
    print(f"Took {elapsed_time}")

Output:

Batch 1
thread 1
thread 2
thread 3
thread 4
thread 5
Batch 2
thread 6
thread 7
thread 8
thread 9
thread 10
Batch 3
thread 11
thread 12
thread 13
thread 14
thread 15
Batch 4
thread 16
thread 17
thread 18
thread 19
thread 20
Batch 5
thread 21
thread 22
thread 23
Took 15.523964166641235

Each thread takes 3 seconds. If I executed the function sequentially, it would take at least 60 seconds but with the 5 threads at a time, it ended up with only 12 seconds. This is a huge improvement.

Another thing to note is that I declared threads variable as list. List has pop() method in Python. This returns the item (thread object in this case) and removes it from the list. This way, you can use the list to keep track on the threads.

I also needed to add if threads: to check if the threads still has items in case the number of threads is not divisible by 5. If I had 23 threads I want to execute, it attempts to execute 20, 21, 22, 23, 24, 25. 24 and 25 do not exist in the list so it errors out. To prevent such a situation, the if statement is necessary.

Multithread Processing with Python

I did fair amount of multithreading programming with C# in the past but never tried it with Python. Let’s imagine a function that takes 3 seconds. You want to execute it 3 times. The total execution time should be 9 seconds like the following.

import threading
import time


class MyThread(threading.Thread):
    def run():
        # Long running process
        time.sleep(3)
        print('Done')


if __name__ == '__main__':
    start_time = time.time()
    MyThread.run()
    MyThread.run()
    MyThread.run()
    elapsed_time = time.time() - start_time
    print(f"Took {elapsed_time}")

Output:

Done
Done
Done
Took 9.006911993026733

If you can run the functions in 3 different threads at the same time, you could save time. Here is the multithread sample in Python.

import threading
from multiprocessing import Pool
import time


class MyThread(threading.Thread):
    def run():
        time.sleep(3)
        print('hoge')


if __name__ == '__main__':
    start_time = time.time()
    pool = Pool()
    pool.apply_async(MyThread.run)
    pool.apply_async(MyThread.run)
    pool.apply_async(MyThread.run)
    pool.close()
    pool.join()
    elapsed_time = time.time() - start_time
    print(f"Took {elapsed_time}")

Here is the output:

hoge
hoge
hoge
Took 3.179738998413086

It should take 9 seconds if those processes run sequentially but it only took 3.17 seconds. It’s because the 3 threads run at the same time like the following image.

It’s not too difficult to do multithreading in Python. There is one more thing I am thinking about. What if you need to run 100 processes but you want to limit the number of threads to 5 at a time? I will write about it after this blog article.

How to Add Keys and Values to an Exisisting YAML with PyYAML

I found it rather harder than I thought to add key and value to YAML with Python. For example…

You have a YAML file like the following.

root:

And you want to add key1, value1 and key2, value2 and key3, value3 under the root node. I had thought it would be quite as simple as appending child nodes but it turned out that I spent a few hours to figure it out.

The goal YAML I would like in this example is the following.

root:
    key1: value1
    key2: value2
    key3: value3

Here is what you do to achieve it.

import io
import yaml

if __name__ == '__main__':
    with open("data.yaml", 'r') as stream:
        data_loaded = yaml.safe_load(stream)
        additional_data = {"key1": "value1", "key2": "value2", "key3": "value3"}
        for key, value in additional_data.items():
            if data_loaded['root'] is None:
                data_loaded['root'] = {key: value}
            else:
                data_loaded['root'].update({key: value})

        with io.open('data.yaml', 'w', encoding='utf8') as outfile:
            yaml.dump(data_loaded, outfile, default_flow_style=False, allow_unicode=True)

There might be simpler solutions but that’s what I came up with and it does what I want it to.

If you add an array data, then the YAML outcome would look like this, which is OK if it meets your requirement but it didn’t mine.

root:
- key1: value1
- key2: value2
- key3: value3

How to Insert Data into Sqlite Database from Python

sqlite3 module provides nice ways to deal with Sqlite database. We will cover a way to create a table and pump data into the table with a right way.

I’d recommend this nice tool called DB Browser for SQLite. This tool is very useful when you browse the data in the database and execute SQL scripts.

First, let’s create a sample table.

    def create_table_hoge(self, conn):
        cur = conn.cursor()
        cur.execute('''CREATE TABLE HOGE (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        col1 text,
        col2 VARCHAR(10)
        )''')

We will pump bunch of dummy data into the table using the following code.

conn = sqlite3.connect(self.db_name)
        cur = conn.cursor()
        for i in range(1000):
            cur.execute('''INSERT INTO HOGE (col1, col2)
            VALUES (?, ?);
            ''', (f"test{i}",f"test{i+1}"))
            conn.commit()
            print(cur.lastrowid)

Note that cursor.execute method takes SQL statement and the parameter values as tuple. You can see the values are being accepted with question marks and actual values are in the tuple. I haven’t tried it but it probably prevents SQL injection rather than using a straight SQL like INSERT INTO HOGE (col1, col2) VALUES (‘test1’, ‘test2’).

When I browse HOGE table with DB Browser for SQLite, it looks like the following.

How to Execute Functions Dynamically in Python

There may be times when you want to dynamically execute functions in Python.

If you want to execute function depending on conditions, you’d normally write the following code.

import unittest
class test01(unittest.TestCase):
    def test_helloworld(self):
        function_to_execute = 'foo'
        if function_to_execute == 'foo':
            test01.foo()
        elif function_to_execute == 'bar':
            test01.bar()

    def foo():
        print('hello from foo')

    def bar():
        print('hello from bar')

Writing code like that easily gets unmanageable when there are many conditions. It’s better to dynamically call functions.

import unittest

class test01(unittest.TestCase):
    def test_helloworld(self):
        function_to_execute = 'foo'
        func = getattr(test01, function_to_execute)
        func()

    def foo():
        print('hello from foo')

    def bar():
        print('hello from bar')

You might have quite a bit of functions to manage, the calling method would not be so bad by dynamically calling functions.

Round to the Nearest Multiple of 5

I needed a code snippet that can get the nearest multiple of 5 based on a number.

For example, 11 should be 15, 16 should be 20, 26 should be 30 and so forth.

11 -> 15
16 -> 20
26 -> 30

Here is the way to think about this problem. So the final result should be multiple of 5, so if the original number is divided by 5, the remainder should be 0, 1, 2, 3 or 4. If you divide 11 by 5, the remainder is 4. If you add this 4 to 11, you get the nearest multiple of 5. So to generalize this to an equation…

original_num = 11
result = original_num + original_num % 5

It’s really easy when you think about it, but if you have not gone through a problem like this before, maybe it’s hard to come up with the idea to use remainder.