Text Wrap in Python

I was working on a problem at HackerRank. It’s called text wrap problem. I was able to solve it with the following code.

def wrap(string, max_width):
    result = ''
    lines = len(string) / max_width
    last_chars = len(string) % max_width
    for i in range(int(lines)):
        start_index = i * max_width
        result += string[start_index:start_index+max_width] + '\n'

    if last_chars > 0:
        result += string[-last_chars:]
    return result

Then, I headed to Discussions section to see if a smarter person posted a better and more concise code. There was one.

def wrap(string, max_width):
    return "\n".join([string[i:i+max_width] for i in range(0, len(string), max_width)])

In essence, it’s pretty much the same logic but it’s more concise though I think it’s kind of harder to read. One of the important things we have to consider when coding is whether the code is maintainable and readable. But there are smarter people out there… Always learning.

Validating Downloaded File with File Size from Object Storage on OCI

This is a note for myself.

#!/usr/bin/env python3

import oci.object_storage
import urllib3
import os

def download_backup(bucket_name, file_name, local_dir):
    signer = oci.auth.signers.InstancePrincipalsSecurityTokenSigner()
    object_client = oci.object_storage.ObjectStorageClient(config={},signer=signer)

    object = object_client.get_object('id4qji14rv70', bucket_name, file_name)
    restored_file = os.path.join(local_dir, file_name)
    with open(restored_file, 'wb') as f:
        for chunk in object.data.raw.stream(1024 * 1024, decode_content=False):
            f.write(chunk)

    object_meta = object_client.head_object('id4qji14rv70', bucket_name, file_name)
    content_length=object_meta.headers['Content-Length']

    file_stats = os.stat(file_name)

    if file_stats.st_size == int(content_length):
        print(f"Validated {content_length}")
    else:
        print(f"Validation failed. Expected: {content_length} Actual: {file_stats.st_size}")

if __name__ == '__main__':
    download_backup('backup', '2022-03-21.zip', '/home/opc')

How to Convert Bytes to Appropriate Unit in Python

You sometimes get raw bytes for size of files. It’s hard to read and need a way to convert it. Here is the way you could do it.

def convert_size(size_bytes):
    if size_bytes == 0:
        return "0B"
    size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
    i = int(math.floor(math.log(size_bytes, 1024)))
    p = math.pow(1024, i)
    s = round(size_bytes / p, 2)
    return "%s %s" % (s, size_name[i])

I don’t claim that I wrote it but I found it somewhere on the Internet but I can’t remember the source…

How to Calculate File Checksum in Python

Checksum is finger print of a file. If two files have the same checksum, they are identical files.

There are several types of checksum and the ones we use the most are MD5 and SHA256 based on my experience. Here is the sample code in Python.

import hashlib

class playground(unittest.TestCase):
    def test_sha256(self):
        file_name = 'test.jpg'
        with open(file_name, 'rb') as file:
            bytes = file.read()
            hash = hashlib.sha256(bytes).hexdigest()
            print(hash)

    def test_md5(self):
        file_name = 'test.jpg'
        with open(file_name, 'rb') as file:
            print(hashlib.md5(file.read()).hexdigest())

As a result, the SHA256 of test.jpg is c91834ce2d9e57edb6ccd118c10e5fb3b0eacfb8a8ecda73ae6680ced50009de and MD5 is 61a6ef0dd8c606fc0eb4c676f0e4296a

You don’t need Python to calculate these checksums. Here is the example you can run on your Mac or Linux. I’m sure you can run it on your Linux Subsystem on Windows as well.

❯ md5 test.jpg
MD5 (test.jpg) = 61a6ef0dd8c606fc0eb4c676f0e4296a

❯ openssl dgst -sha256 test.jpg
SHA256(test.jpg)= c91834ce2d9e57edb6ccd118c10e5fb3b0eacfb8a8ecda73ae6680ced50009de

How to Get a Row in SQLite with Python

I want to get the value of id where col1 = buhi and col2 = buhi.

Here is the Python code I came up with.

conn = sqlite3.connect('test.sqlite')
cur = conn.cursor()
params = ('buhi', 'buhi')
cur.execute('''SELECT id, col1, col2 FROM HOGE 
    WHERE col1 = ? AND col2 = ?''', params)
print(cur.fetchone())

cur.fetchone() returns the matching data in tuple.

(5, 'buhi', 'buhi')

So if you want to use any specific value, you can access it like…

cur.fetchone()[0]

Edit: If no row matches the SQL statement, fetchone() returns None, so it may be necessary to check if the data exists like the following.

row = cur.fetchone('''SELECT id, col1, col2 FROM HOGE 
    WHERE col1 = ? AND col2 = ?''', params)
if row is not None:
    print(row)

How to Update SQLite Database with Python

I was trying to update a table in SQLite database and this worked for me.

    def test_update(self):
        conn = sqlite3.connect('test.sqlite')
        cur = conn.cursor()

        params = ('buhi', 'buhi', 1)

        cur.execute('''UPDATE HOGE 
        SET col1 = ?,
        col2 = ?
        WHERE id = ?
        ''', params)
        conn.commit()

As you can see, all the parameters can be passed as a tuple and the ?’s represent the values that will be replaced by the actual values.

The last line of the code will actually updates the data in the database.

Multi Threading with map in Python

I wrote a blog article regarding advanced map with Python earlier and I found multi threading process using map with ThreadPoolExecutor. If you want to run multiple long running processes at the same time in multiple threads, this example may be for you. Here is the sample code.

import unittest
from concurrent import futures
from concurrent.futures import ThreadPoolExecutor
from itertools import repeat
import time

class playground(unittest.TestCase):

    def long_running_function(self, a, n):
        print(f"Executing {n}...")
        time.sleep(3)
        return a ** n

    def test_advanced_map(self):
        start_time = time.time()
        n = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
        with ThreadPoolExecutor(max_workers=3) as executor:
            fs = map(executor.submit, repeat(self.long_running_function), repeat(2), n)
            results = futures.wait(list(fs))
            for result in results.done:
                if result.done():
                    print(f"{result.result()}")
                if result.exception() is not None:
                    print(f"{result.exception()}")

        elapsed_time = time.time() - start_time
        print(f"took {elapsed_time}")

Output:

Executing 1...
Executing 2...
Executing 3...
Executing 4...
Executing 5...
Executing 6...
Executing 7...
Executing 8...
Executing 9...
Executing 10...

took 12.019917964935303

So that long_running_function takes 2 parameters. a is a fixed parameter and n is for each element of the n list.

So if you executed the long_running_function sequentially for each element, it would take 30 seconds because the function sleeps for 3 seconds but using 3 threads at a time, it took only 12 seconds. 10 elements should equal to 4 batches, hence 12 seconds. If there are hundreds or thousands of items, you can imagine this method can be a huge gain.

In test_advanced_map function, it instantiates ThreadPoolExecutor with max_workers value 3. It means 3 threads run at the same time. Depending on your program, you can increase the number but be careful not to increase the number too much because it can hog your memory.

And using map function, you execute the executor.submit function. You pass long_running_function as a fixed parameter as a function to be executed. And then, you pass repeat(2) and n for the parameter a for the long_running_function. You can execute a function for each element of list using map function in Python. I wrote about it before here.

Lastly, this method can handle returned results and unhandled exceptions. I believe this is the best way to do multi threading in Python. I could be wrong, so please comment if you have better ideas! 🙂

Advanced map with Python

As I was writing Python code using map, I came across an issue. When I ran the following code, I came across an error.

import unittest


class test(unittest.TestCase):
    def func1(self, x, y):
        return x ** y

    def test_map(self):
        a = [1, 2, 3, 4]
        results = map(self.func1, a, 2)
        print(results)

I basically wanted to pass 2 for the y parameter of func1 instead of another list. Here is the error I got.

FAILED (errors=1)

Error
Traceback (most recent call last):
  File "C:\Users\hiriu\dev\hoge\test.py", line 10, in test_map
    results = map(self.func1, a, 2)
TypeError: 'int' object is not iterable

Right, the number 2 is not a collection and is not iterable. How do I solve this problem? I searched the web and I found the following solution.

import unittest
import functools

class test(unittest.TestCase):
    def func1(self, x, y):
        return x ** y

    def test_map(self):
        a = [1, 2, 3, 4]
        results = map(functools.partial(self.func1, y=2), a)
        print(list(results))

By using the functools.partial, you get to pass a fixed value to the portion of the function. Here is the output.

[1, 4, 9, 16]

Process finished with exit code 0

Update:

I found an easier way to map a fixed parameter. Here is the example. It’s more readable and maintainable.

import unittest
from itertools import repeat


class test(unittest.TestCase):
    def func1(self, x, y):
        return x ** y

    def test_map(self):
        a = [1, 2, 3, 4]
        results = map(self.func1, a, repeat(2))
        print(list(results))

map Function in Python

map function in Python is a convenient way to execute function for a collection. Let’s see an example that does not use map function.

class playground(unittest.TestCase):
    def pow(self, n):
        return n**n

    def test_pow(self):
        numbers = range(10)
        for number in numbers:
            result = self.pow(number)
            print(result)

Output:

1
1
4
27
256
3125
46656
823543
16777216
387420489

The example above just executes the pow function sequentially for every item in the integer list.

If you use a map function, the code becomes concise and easier to manage. It might be a little confusing but if you get used to it, it’s not too bad.

class playground(unittest.TestCase):
    def pow(self, n):
        return n**n

    def test_map(self):
        numbers = range(10)
        results = map(self.pow, numbers)
        print(list(results))

Output:

[1, 1, 4, 27, 256, 3125, 46656, 823543, 16777216, 387420489]

I didn’t know 0^0 was 1… I knew n^0 was always 1 but… Interesting. 🙂

Batch Processing with Python with Multithreading (Improved)

I wrote an article on how to do batch processing with multithreads in Python last week but there are things that my sample code wasn’t handling.

  • Handle results from the threaded function.
  • Handle exceptions from the threaded function.

With these 2 points in mind, I rewrote the sample code.

from concurrent.futures import ThreadPoolExecutor
from concurrent import futures
import time


def task(n):
    print(f"processing {n}")
    if n % 5 == 0:
        raise Exception("It is divisible by 5")
    time.sleep(1)
    return True


def main():
    print("Starting ThreadPoolExecutor")
    tasks = []
    fs = []
    for i in range(23):
        tasks.append(task)

    with ThreadPoolExecutor(max_workers=5) as executor:
        for i, t in enumerate(tasks):
            future = executor.submit(t, i)
            fs.append(future)
        results = futures.wait(fs)
    for result in results.done:
        if result.done():
            print(f"{result.done()}")
        if result.exception() is not None:
            print(f"Handle exception here: {result.exception()}")


if __name__ == '__main__':
    start_time = time.time()
    main()
    elapsed_time = time.time() - start_time
    print(f"Took {elapsed_time}")

Here is the output:

Starting ThreadPoolExecutor
processing 0
processing 1
processing 2
processing 3
processing 4
processing 5
processing 6
processing 7
processing 8
processing 9
processing 10
processing 11
processing 12
processing 13
processing 14
processing 15
processing 16
processing 17
processing 18
processing 19
processing 20
processing 21
processing 22
True
Handle exception here: It is divisible by 5
True
True
True
Handle exception here: It is divisible by 5
True
True
Handle exception here: It is divisible by 5
True
True
True
True
True
True
True
True
True
True
Handle exception here: It is divisible by 5
True
True
True
True
True
True
Handle exception here: It is divisible by 5
True
Took 4.017247915267944

This way, you can handle situations where you are expecting certain results from the threaded function and also exception situation. The previous sample did not have any of those, so this sample is a better one. Also it is easier to specify the number of concurrent threads.