Multi Threading with map in Python

I wrote a blog article regarding advanced map with Python earlier and I found multi threading process using map with ThreadPoolExecutor. If you want to run multiple long running processes at the same time in multiple threads, this example may be for you. Here is the sample code.

import unittest
from concurrent import futures
from concurrent.futures import ThreadPoolExecutor
from itertools import repeat
import time

class playground(unittest.TestCase):

    def long_running_function(self, a, n):
        print(f"Executing {n}...")
        time.sleep(3)
        return a ** n

    def test_advanced_map(self):
        start_time = time.time()
        n = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
        with ThreadPoolExecutor(max_workers=3) as executor:
            fs = map(executor.submit, repeat(self.long_running_function), repeat(2), n)
            results = futures.wait(list(fs))
            for result in results.done:
                if result.done():
                    print(f"{result.result()}")
                if result.exception() is not None:
                    print(f"{result.exception()}")

        elapsed_time = time.time() - start_time
        print(f"took {elapsed_time}")

Output:

Executing 1...
Executing 2...
Executing 3...
Executing 4...
Executing 5...
Executing 6...
Executing 7...
Executing 8...
Executing 9...
Executing 10...

took 12.019917964935303

So that long_running_function takes 2 parameters. a is a fixed parameter and n is for each element of the n list.

So if you executed the long_running_function sequentially for each element, it would take 30 seconds because the function sleeps for 3 seconds but using 3 threads at a time, it took only 12 seconds. 10 elements should equal to 4 batches, hence 12 seconds. If there are hundreds or thousands of items, you can imagine this method can be a huge gain.

In test_advanced_map function, it instantiates ThreadPoolExecutor with max_workers value 3. It means 3 threads run at the same time. Depending on your program, you can increase the number but be careful not to increase the number too much because it can hog your memory.

And using map function, you execute the executor.submit function. You pass long_running_function as a fixed parameter as a function to be executed. And then, you pass repeat(2) and n for the parameter a for the long_running_function. You can execute a function for each element of list using map function in Python. I wrote about it before here.

Lastly, this method can handle returned results and unhandled exceptions. I believe this is the best way to do multi threading in Python. I could be wrong, so please comment if you have better ideas! 🙂

New Stuff

I am using Visual Studio 2012 Preview 3.1 on Windows 11. I didn’t plan on it but I just installed them when I saw them available on my personal laptop. I wouldn’t do it to my work laptop but I don’t mind being bleeding edge on my personal machine.

I like new stuff. It’s been my nature for decades in my software engineer career. I believe it has been doing good to me. Being a software engineer, you always need to keep up with new technology. The technology I used to work with a decade or 2 ago is completely different from what I deal with today.

If you find it hard to do, software engineer might not be the career for you. If you find it fun to do, then you have a good chance to be a good software engineer.

I’m not saying new stuff is always better. I think balance is very important. Using everything new is not good and sticking with old way only is not good either. It’s a matter of balance.

This blog is a place where I explore new stuff and keep it so that I can come back to it if I forget and share it with the people all over the world.

Jenkins Toolset

I used to do a lot of automation with Jenkins. I worked for a software shop that had multiple instances of Jenkins for prod, stating and development with hundreds of jobs. It was not so easy to manage all of them without a tool, so I had decided to create a desktop software with WPF. I still have the project on GitHub today.

As the context menu indicates, it can do all of those things. It can list the jobs on the specified Jenkins instance and manage them. I made it downloadable on OCI’s Object Storage from here.

Here is the view of the builds of a job.

I would like to convert this project to a MAUI project eventually. I will write about the tool more later when I have some more time.

.NET 6

I don’t really talk about .NET on this blog, but I am very interested in .NET 6. I used to be a .NET developer and I used to write Windows Form applications, WPF, Windows Service and ASP .NET applications. The reason I have been away from .NET is because the current company I work for mainly uses Linux technology. (I applied for the job because I was very interested in the Linux realm.)

I am interested in .NET 6 (as of 9/13/2021, it is in preview 7) because it has a new feature of multi platform UI. When I develop software in the last few years, I have always cared for multi platform-ness. I tried Electron for the software I developed in WPF but it didn’t click with me because it’s mainly HTML and JavaScript. I should have pursued it but the thing is that I miss C# very much.

I downloaded the .NET 6 installer for macOS and installed it. After the installation, I opened terminal and ran dotnet. Here is the output I got.

I want to make sure I am using .NET 6 preview 7. So I run dotnet --version and I got the following output.

Now I am going to create a sample code. I run dotnet new --list to list the available project type.

I’m going to create a hello world sample code in console application. I run dotnet new console helloworld to create the sample project

Once dotnet new console helloworld is successful, you get a few files like this.

The Program.cs file already has a sample code like below.

// See https://aka.ms/new-console-template for more information
Console.WriteLine("Hello, World!"); 

To compile the code, you can run dotnet build in the same directory.

Once the compilation is successful, it creates bin/Debug/net6.0 directories. In net6.0 directory, it generates several compiled files like below.

As shown in the image, helloworld is an executable file, so you can be in net6.0 directory and execute it like ./helloworld But you can go back to 3 directories up and execute it like dotnet run.

According to the help, dotnet run builds and runs the project, so when you make changes to the code, you could just execute dotnet run to test things out.

I will continue to .NET 6 as time permits.

How to Install htop on Oracle Linux 7

I wrote an article on how to install htop on Oracle Linux before. Thanks to Markus, I learned that installing htop is just a matter of enabling a repo on Oracle Linux 8. I have a Oracle Linux 7 host that I use for a customer and I wanted to install htop on it. I tried to look for epel repo in /etc/yum.repos.d/oracle-linux-ol7.repo but I could not find it. So the only option for me is to add the epel repo under /etc/yum.repo.d

I looked for EPEL repo for Oracle Linux 7 and added the following in /etc/yum.repos.d/oracle-epel-ol7.repo

[ol7_developer_EPEL]
name=Oracle Linux $releasever EPEL Packages for Development ($basearch)
baseurl=https://yum$ociregion.$ocidomain/repo/OracleLinux/OL7/developer_EPEL/$basearch/
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-oracle
gpgcheck=1
enabled=1

Then run the following command.

sudo yum update
sudo yum install htop

Then, you get to install htop on Oracle Linux 7. 🙂

Troubleshooting Dockerized Blog

This blog is a dockerized WordPress blog. I noticed that my blog site was down this morning. I couldn’t even ssh into the host. I thought it was hacked somehow. After poking it around, I got it back up and running. Here is the things I did to get it back up.

  1. When I did ping hayato-iriumi.net, I got response back.
  2. After a while, I could hit the website but it wasn’t connecting to the database.
  3. I couldn’t even ssh into the host, so I restarted it.
  4. I was able to ssh into it now, so I checked the running containers with the following command.
    docker ps -a
  5. I noticed that NGINX container was failing because it could not start because port 80 was already in use.
  6. Checked which process was using port 80 with the following command.
    sudo netstat -pna | grep 80
  7. It turned out that another instance of NGINX was hogging the port. I stopped it and disabled it with the following command.
    sudo systemctl stop nginx
    sudo systemctl disable nginx
    sudo apt remove nginx
  8. I’m not sure what installed the instance of NGINX.
  9. Restarted the host.
  10. The site came back up.

I am seeing some errors in journalctl so something else may have caused the issue. This is a very common troubleshooting for Linux users but you should know where to look to troubleshoot Linux hosted service. I may rebuild this blog host again just in case it might have been hacked.

Advanced map with Python

As I was writing Python code using map, I came across an issue. When I ran the following code, I came across an error.

import unittest


class test(unittest.TestCase):
    def func1(self, x, y):
        return x ** y

    def test_map(self):
        a = [1, 2, 3, 4]
        results = map(self.func1, a, 2)
        print(results)

I basically wanted to pass 2 for the y parameter of func1 instead of another list. Here is the error I got.

FAILED (errors=1)

Error
Traceback (most recent call last):
  File "C:\Users\hiriu\dev\hoge\test.py", line 10, in test_map
    results = map(self.func1, a, 2)
TypeError: 'int' object is not iterable

Right, the number 2 is not a collection and is not iterable. How do I solve this problem? I searched the web and I found the following solution.

import unittest
import functools

class test(unittest.TestCase):
    def func1(self, x, y):
        return x ** y

    def test_map(self):
        a = [1, 2, 3, 4]
        results = map(functools.partial(self.func1, y=2), a)
        print(list(results))

By using the functools.partial, you get to pass a fixed value to the portion of the function. Here is the output.

[1, 4, 9, 16]

Process finished with exit code 0

Update:

I found an easier way to map a fixed parameter. Here is the example. It’s more readable and maintainable.

import unittest
from itertools import repeat


class test(unittest.TestCase):
    def func1(self, x, y):
        return x ** y

    def test_map(self):
        a = [1, 2, 3, 4]
        results = map(self.func1, a, repeat(2))
        print(list(results))

map Function in Python

map function in Python is a convenient way to execute function for a collection. Let’s see an example that does not use map function.

class playground(unittest.TestCase):
    def pow(self, n):
        return n**n

    def test_pow(self):
        numbers = range(10)
        for number in numbers:
            result = self.pow(number)
            print(result)

Output:

1
1
4
27
256
3125
46656
823543
16777216
387420489

The example above just executes the pow function sequentially for every item in the integer list.

If you use a map function, the code becomes concise and easier to manage. It might be a little confusing but if you get used to it, it’s not too bad.

class playground(unittest.TestCase):
    def pow(self, n):
        return n**n

    def test_map(self):
        numbers = range(10)
        results = map(self.pow, numbers)
        print(list(results))

Output:

[1, 1, 4, 27, 256, 3125, 46656, 823543, 16777216, 387420489]

I didn’t know 0^0 was 1… I knew n^0 was always 1 but… Interesting. 🙂

Batch Processing with Python with Multithreading (Improved)

I wrote an article on how to do batch processing with multithreads in Python last week but there are things that my sample code wasn’t handling.

  • Handle results from the threaded function.
  • Handle exceptions from the threaded function.

With these 2 points in mind, I rewrote the sample code.

from concurrent.futures import ThreadPoolExecutor
from concurrent import futures
import time


def task(n):
    print(f"processing {n}")
    if n % 5 == 0:
        raise Exception("It is divisible by 5")
    time.sleep(1)
    return True


def main():
    print("Starting ThreadPoolExecutor")
    tasks = []
    fs = []
    for i in range(23):
        tasks.append(task)

    with ThreadPoolExecutor(max_workers=5) as executor:
        for i, t in enumerate(tasks):
            future = executor.submit(t, i)
            fs.append(future)
        results = futures.wait(fs)
    for result in results.done:
        if result.done():
            print(f"{result.done()}")
        if result.exception() is not None:
            print(f"Handle exception here: {result.exception()}")


if __name__ == '__main__':
    start_time = time.time()
    main()
    elapsed_time = time.time() - start_time
    print(f"Took {elapsed_time}")

Here is the output:

Starting ThreadPoolExecutor
processing 0
processing 1
processing 2
processing 3
processing 4
processing 5
processing 6
processing 7
processing 8
processing 9
processing 10
processing 11
processing 12
processing 13
processing 14
processing 15
processing 16
processing 17
processing 18
processing 19
processing 20
processing 21
processing 22
True
Handle exception here: It is divisible by 5
True
True
True
Handle exception here: It is divisible by 5
True
True
Handle exception here: It is divisible by 5
True
True
True
True
True
True
True
True
True
True
Handle exception here: It is divisible by 5
True
True
True
True
True
True
Handle exception here: It is divisible by 5
True
Took 4.017247915267944

This way, you can handle situations where you are expecting certain results from the threaded function and also exception situation. The previous sample did not have any of those, so this sample is a better one. Also it is easier to specify the number of concurrent threads.

Uploading Backup File to OCI’s Object Storage via Jenkins

I have had a need to upload a zip file for backup from a Windows agent to Oracle Cloud Infrastructure’s Object Storage. Here is what I did.

Installed OCI CLI for Windows. Please follow this link to install it on Windows. Then, Install Jenkins slave on the same machine. I have a step by step instruction on how to do it. Once you install it, make sure to change the account to run the slave as to the account you used to install OCI CLI. Otherwise, it won’t work.

On the Jenkins job, using Compress-Archive Cmdlet, you can zip up some directories into a zip file.

Compress-Archive -Path $zipPaths -DestinationPath $zipFile

Please note that Compress-Archive has a limitation of 2GB. I heard that it’s the limitation of the underlining API.

Now that you have the zip file, you can upload it to Object Storage like the following.

oci os object put -bn backup --file $zipFile -ns "yournamespace" `
	--parallel-upload-count 5 --part-size 20 --verify-checksum

I am recommending this method to a customer because Object Storage is a relatively cheap and secure storage on OCI. It also supports retention duration and also replication. Great feature for relatively reasonable service.