How to Calculate File Checksum in Python

Checksum is finger print of a file. If two files have the same checksum, they are identical files.

There are several types of checksum and the ones we use the most are MD5 and SHA256 based on my experience. Here is the sample code in Python.

import hashlib

class playground(unittest.TestCase):
    def test_sha256(self):
        file_name = 'test.jpg'
        with open(file_name, 'rb') as file:
            bytes = file.read()
            hash = hashlib.sha256(bytes).hexdigest()
            print(hash)

    def test_md5(self):
        file_name = 'test.jpg'
        with open(file_name, 'rb') as file:
            print(hashlib.md5(file.read()).hexdigest())

As a result, the SHA256 of test.jpg is c91834ce2d9e57edb6ccd118c10e5fb3b0eacfb8a8ecda73ae6680ced50009de and MD5 is 61a6ef0dd8c606fc0eb4c676f0e4296a

You don’t need Python to calculate these checksums. Here is the example you can run on your Mac or Linux. I’m sure you can run it on your Linux Subsystem on Windows as well.

❯ md5 test.jpg
MD5 (test.jpg) = 61a6ef0dd8c606fc0eb4c676f0e4296a

❯ openssl dgst -sha256 test.jpg
SHA256(test.jpg)= c91834ce2d9e57edb6ccd118c10e5fb3b0eacfb8a8ecda73ae6680ced50009de

How to Get a Row in SQLite with Python

I want to get the value of id where col1 = buhi and col2 = buhi.

Here is the Python code I came up with.

conn = sqlite3.connect('test.sqlite')
cur = conn.cursor()
params = ('buhi', 'buhi')
cur.execute('''SELECT id, col1, col2 FROM HOGE 
    WHERE col1 = ? AND col2 = ?''', params)
print(cur.fetchone())

cur.fetchone() returns the matching data in tuple.

(5, 'buhi', 'buhi')

So if you want to use any specific value, you can access it like…

cur.fetchone()[0]

Edit: If no row matches the SQL statement, fetchone() returns None, so it may be necessary to check if the data exists like the following.

row = cur.fetchone('''SELECT id, col1, col2 FROM HOGE 
    WHERE col1 = ? AND col2 = ?''', params)
if row is not None:
    print(row)

How to Update SQLite Database with Python

I was trying to update a table in SQLite database and this worked for me.

    def test_update(self):
        conn = sqlite3.connect('test.sqlite')
        cur = conn.cursor()

        params = ('buhi', 'buhi', 1)

        cur.execute('''UPDATE HOGE 
        SET col1 = ?,
        col2 = ?
        WHERE id = ?
        ''', params)
        conn.commit()

As you can see, all the parameters can be passed as a tuple and the ?’s represent the values that will be replaced by the actual values.

The last line of the code will actually updates the data in the database.

Multi Threading with map in Python

I wrote a blog article regarding advanced map with Python earlier and I found multi threading process using map with ThreadPoolExecutor. If you want to run multiple long running processes at the same time in multiple threads, this example may be for you. Here is the sample code.

import unittest
from concurrent import futures
from concurrent.futures import ThreadPoolExecutor
from itertools import repeat
import time

class playground(unittest.TestCase):

    def long_running_function(self, a, n):
        print(f"Executing {n}...")
        time.sleep(3)
        return a ** n

    def test_advanced_map(self):
        start_time = time.time()
        n = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
        with ThreadPoolExecutor(max_workers=3) as executor:
            fs = map(executor.submit, repeat(self.long_running_function), repeat(2), n)
            results = futures.wait(list(fs))
            for result in results.done:
                if result.done():
                    print(f"{result.result()}")
                if result.exception() is not None:
                    print(f"{result.exception()}")

        elapsed_time = time.time() - start_time
        print(f"took {elapsed_time}")

Output:

Executing 1...
Executing 2...
Executing 3...
Executing 4...
Executing 5...
Executing 6...
Executing 7...
Executing 8...
Executing 9...
Executing 10...

took 12.019917964935303

So that long_running_function takes 2 parameters. a is a fixed parameter and n is for each element of the n list.

So if you executed the long_running_function sequentially for each element, it would take 30 seconds because the function sleeps for 3 seconds but using 3 threads at a time, it took only 12 seconds. 10 elements should equal to 4 batches, hence 12 seconds. If there are hundreds or thousands of items, you can imagine this method can be a huge gain.

In test_advanced_map function, it instantiates ThreadPoolExecutor with max_workers value 3. It means 3 threads run at the same time. Depending on your program, you can increase the number but be careful not to increase the number too much because it can hog your memory.

And using map function, you execute the executor.submit function. You pass long_running_function as a fixed parameter as a function to be executed. And then, you pass repeat(2) and n for the parameter a for the long_running_function. You can execute a function for each element of list using map function in Python. I wrote about it before here.

Lastly, this method can handle returned results and unhandled exceptions. I believe this is the best way to do multi threading in Python. I could be wrong, so please comment if you have better ideas! 🙂

Advanced map with Python

As I was writing Python code using map, I came across an issue. When I ran the following code, I came across an error.

import unittest


class test(unittest.TestCase):
    def func1(self, x, y):
        return x ** y

    def test_map(self):
        a = [1, 2, 3, 4]
        results = map(self.func1, a, 2)
        print(results)

I basically wanted to pass 2 for the y parameter of func1 instead of another list. Here is the error I got.

FAILED (errors=1)

Error
Traceback (most recent call last):
  File "C:\Users\hiriu\dev\hoge\test.py", line 10, in test_map
    results = map(self.func1, a, 2)
TypeError: 'int' object is not iterable

Right, the number 2 is not a collection and is not iterable. How do I solve this problem? I searched the web and I found the following solution.

import unittest
import functools

class test(unittest.TestCase):
    def func1(self, x, y):
        return x ** y

    def test_map(self):
        a = [1, 2, 3, 4]
        results = map(functools.partial(self.func1, y=2), a)
        print(list(results))

By using the functools.partial, you get to pass a fixed value to the portion of the function. Here is the output.

[1, 4, 9, 16]

Process finished with exit code 0

Update:

I found an easier way to map a fixed parameter. Here is the example. It’s more readable and maintainable.

import unittest
from itertools import repeat


class test(unittest.TestCase):
    def func1(self, x, y):
        return x ** y

    def test_map(self):
        a = [1, 2, 3, 4]
        results = map(self.func1, a, repeat(2))
        print(list(results))

map Function in Python

map function in Python is a convenient way to execute function for a collection. Let’s see an example that does not use map function.

class playground(unittest.TestCase):
    def pow(self, n):
        return n**n

    def test_pow(self):
        numbers = range(10)
        for number in numbers:
            result = self.pow(number)
            print(result)

Output:

1
1
4
27
256
3125
46656
823543
16777216
387420489

The example above just executes the pow function sequentially for every item in the integer list.

If you use a map function, the code becomes concise and easier to manage. It might be a little confusing but if you get used to it, it’s not too bad.

class playground(unittest.TestCase):
    def pow(self, n):
        return n**n

    def test_map(self):
        numbers = range(10)
        results = map(self.pow, numbers)
        print(list(results))

Output:

[1, 1, 4, 27, 256, 3125, 46656, 823543, 16777216, 387420489]

I didn’t know 0^0 was 1… I knew n^0 was always 1 but… Interesting. 🙂

Batch Processing with Python with Multithreading (Improved)

I wrote an article on how to do batch processing with multithreads in Python last week but there are things that my sample code wasn’t handling.

  • Handle results from the threaded function.
  • Handle exceptions from the threaded function.

With these 2 points in mind, I rewrote the sample code.

from concurrent.futures import ThreadPoolExecutor
from concurrent import futures
import time


def task(n):
    print(f"processing {n}")
    if n % 5 == 0:
        raise Exception("It is divisible by 5")
    time.sleep(1)
    return True


def main():
    print("Starting ThreadPoolExecutor")
    tasks = []
    fs = []
    for i in range(23):
        tasks.append(task)

    with ThreadPoolExecutor(max_workers=5) as executor:
        for i, t in enumerate(tasks):
            future = executor.submit(t, i)
            fs.append(future)
        results = futures.wait(fs)
    for result in results.done:
        if result.done():
            print(f"{result.done()}")
        if result.exception() is not None:
            print(f"Handle exception here: {result.exception()}")


if __name__ == '__main__':
    start_time = time.time()
    main()
    elapsed_time = time.time() - start_time
    print(f"Took {elapsed_time}")

Here is the output:

Starting ThreadPoolExecutor
processing 0
processing 1
processing 2
processing 3
processing 4
processing 5
processing 6
processing 7
processing 8
processing 9
processing 10
processing 11
processing 12
processing 13
processing 14
processing 15
processing 16
processing 17
processing 18
processing 19
processing 20
processing 21
processing 22
True
Handle exception here: It is divisible by 5
True
True
True
Handle exception here: It is divisible by 5
True
True
Handle exception here: It is divisible by 5
True
True
True
True
True
True
True
True
True
True
Handle exception here: It is divisible by 5
True
True
True
True
True
True
Handle exception here: It is divisible by 5
True
Took 4.017247915267944

This way, you can handle situations where you are expecting certain results from the threaded function and also exception situation. The previous sample did not have any of those, so this sample is a better one. Also it is easier to specify the number of concurrent threads.

Batch Processing with Python Multithreading

I want to execute 5 threads at a time but I have 23 things I want to run in total. Here is the code I came up with.

import threading
from multiprocessing import Pool
import time


class MyThread(threading.Thread):
    def run(id):
        print(f"thread {id}")
        time.sleep(3)


if __name__ == '__main__':
    start_time = time.time()
    threads = []
    batch_size = 5
    for i in range(23):
        threads.append(MyThread.run)

    batch_index = 1
    thread_index = 1
    while len(threads) > 0:
        pool = Pool()
        print(f"Batch {batch_index}")
        for j in range(batch_size):
            if threads:
                t = threads.pop()
                pool.apply_async(t, (thread_index,))
                thread_index += 1
        pool.close()
        pool.join()
        batch_index += 1

    elapsed_time = time.time() - start_time
    print(f"Took {elapsed_time}")

Output:

Batch 1
thread 1
thread 2
thread 3
thread 4
thread 5
Batch 2
thread 6
thread 7
thread 8
thread 9
thread 10
Batch 3
thread 11
thread 12
thread 13
thread 14
thread 15
Batch 4
thread 16
thread 17
thread 18
thread 19
thread 20
Batch 5
thread 21
thread 22
thread 23
Took 15.523964166641235

Each thread takes 3 seconds. If I executed the function sequentially, it would take at least 60 seconds but with the 5 threads at a time, it ended up with only 12 seconds. This is a huge improvement.

Another thing to note is that I declared threads variable as list. List has pop() method in Python. This returns the item (thread object in this case) and removes it from the list. This way, you can use the list to keep track on the threads.

I also needed to add if threads: to check if the threads still has items in case the number of threads is not divisible by 5. If I had 23 threads I want to execute, it attempts to execute 20, 21, 22, 23, 24, 25. 24 and 25 do not exist in the list so it errors out. To prevent such a situation, the if statement is necessary.

Multithread Processing with Python

I did fair amount of multithreading programming with C# in the past but never tried it with Python. Let’s imagine a function that takes 3 seconds. You want to execute it 3 times. The total execution time should be 9 seconds like the following.

import threading
import time


class MyThread(threading.Thread):
    def run():
        # Long running process
        time.sleep(3)
        print('Done')


if __name__ == '__main__':
    start_time = time.time()
    MyThread.run()
    MyThread.run()
    MyThread.run()
    elapsed_time = time.time() - start_time
    print(f"Took {elapsed_time}")

Output:

Done
Done
Done
Took 9.006911993026733

If you can run the functions in 3 different threads at the same time, you could save time. Here is the multithread sample in Python.

import threading
from multiprocessing import Pool
import time


class MyThread(threading.Thread):
    def run():
        time.sleep(3)
        print('hoge')


if __name__ == '__main__':
    start_time = time.time()
    pool = Pool()
    pool.apply_async(MyThread.run)
    pool.apply_async(MyThread.run)
    pool.apply_async(MyThread.run)
    pool.close()
    pool.join()
    elapsed_time = time.time() - start_time
    print(f"Took {elapsed_time}")

Here is the output:

hoge
hoge
hoge
Took 3.179738998413086

It should take 9 seconds if those processes run sequentially but it only took 3.17 seconds. It’s because the 3 threads run at the same time like the following image.

It’s not too difficult to do multithreading in Python. There is one more thing I am thinking about. What if you need to run 100 processes but you want to limit the number of threads to 5 at a time? I will write about it after this blog article.

How to Add Keys and Values to an Exisisting YAML with PyYAML

I found it rather harder than I thought to add key and value to YAML with Python. For example…

You have a YAML file like the following.

root:

And you want to add key1, value1 and key2, value2 and key3, value3 under the root node. I had thought it would be quite as simple as appending child nodes but it turned out that I spent a few hours to figure it out.

The goal YAML I would like in this example is the following.

root:
    key1: value1
    key2: value2
    key3: value3

Here is what you do to achieve it.

import io
import yaml

if __name__ == '__main__':
    with open("data.yaml", 'r') as stream:
        data_loaded = yaml.safe_load(stream)
        additional_data = {"key1": "value1", "key2": "value2", "key3": "value3"}
        for key, value in additional_data.items():
            if data_loaded['root'] is None:
                data_loaded['root'] = {key: value}
            else:
                data_loaded['root'].update({key: value})

        with io.open('data.yaml', 'w', encoding='utf8') as outfile:
            yaml.dump(data_loaded, outfile, default_flow_style=False, allow_unicode=True)

There might be simpler solutions but that’s what I came up with and it does what I want it to.

If you add an array data, then the YAML outcome would look like this, which is OK if it meets your requirement but it didn’t mine.

root:
- key1: value1
- key2: value2
- key3: value3