Python: Thread Pooling for concurrency.

Threading is usually referred to having multiple processes working at the same time on a single CPU. In other words, different processes are running concurrently on a single CPU. People usually tend to confuse concurrency with parallelism. Although they sound similar they are very different.

Concurrency vs Parallelism:

Concurrency is about dealing with multiple processes at once, parallelism is doing multiple processes at once. An application can be concurrent but not parallel, this means that the the application can handle more than one task at a time, but both of them never run together. An application can also be parallel but not concurrent i.e it can process multiple subtasks of a process in a multi-core setup. An application that is not concurrent or parallel executes each program one by one sequentially. Distinguishing parallelism from concurrency is important to seek a fitting way to solve large scale problems.

Concurrency in Python:

Concurrency is achieved in python using a method called multithreading. Multithreading refers to concurrently executing multiple threads(tasks, function calls) by rapidly switching the control of the CPU between threads (called context switching). The Python Global Interpreter Lock limits one thread to run at a time even if the machine contains multiple processors.​

Implementation:

We are going to implement a file capture program which looks for csv files inside a specific folder. Whenever a new file is kept inside this folder, our program will identify it and move it to a different folder after processing it.

Suppose we had to create a large number of threads for our multithreaded tasks. It would be computationally expensive and could cause many performance issues, due to too many threads. A major issue could be in the throughput getting limited. We can solve this problem by creating a pool of threads. A thread pool may be defined as the group of pre-instantiated and idle threads, which stand ready to be given work.

import os
import shutil
import pandas as pd
import concurrent.futures
import time
def get_source_directory():#You can make this dynamic using a config file from which you can #refer the location from
return "/home/file/"
def get_dest_directory():
return "/home/file/completed/"
def check_files():
directory = get_source_directory()
files=[]
for filename in os.list(directory):
if filename.endswith(".csv"):
capture_file_dir = os.path.join(directory, filename)
files.append(capture_file_dir)
return files
def process_file(file):
time.delay(5)
df = pd.read_csv(file)
dest = get_dest_directory()
output_path = os.path.join(dest, "newname.csv")
df.to_csv(output_path)
return
def check_completion(file, fut):
try:
fut.result()
print("Processing of the file was successfull")
except Exception as e:
print("Processing of the file failed")

The above snippet of code helps us checks whether the file is present in the directory that we have provider, processes the file and checks whether it is completed. In this example we are hardcoding the location of the file, but this can be made dynamic using a config file and a package called configparser which can be used to read the config file.

Now lets go ahead and implement a ThreadPoolExecutor which helps us create a pool of threads which has a size that we define. ThreadPoolExecutor takes an argument max_workers, which is the number of idle threads waiting to be given a task. The optimal value of max_workers depends on many factors like the waiting time and execution time of a task, CPU utilization etc. For programming languages like java which does not lock itself to a single core the number of cores can also be considered as a factor while computing the optimal pool size. The following is the main loop of the program which should be executed when we run the program

optimal_pool_size = 5 # Assumed
with concurrent.futures.ThreadPoolExecutor(max_worker=5) as executor:
while True:
files = check_files()
if len(files)>0:
for file in files:
future = executor.submit(process_file, file)
future.add_done_callback(check_completion)

In the above snippet we are running the function process_file which receives a csv file’s path as input. The with statement in which this block of code comes under is used for resource management and exception handling. It ensures that this process does not block other parts of the code if an exception is raised.

Here we are continuously checking the given source path for csv files. If a csv file is dropped into ‘/home/files’ it will immediately get picked up by our program. The process_file function takes the files path as input and reads that csv file into a dataframe, a delay of 5 seconds is used to demonstrate how multi-threading affects the flow of the program. Here we have defined 5 threads in the thread pool, this mean this program can handle up to 5 files concurrently. If we place 2 csv files simultaneously in the source path, the program would see that there are 2 files inside the directory. The program would then loop through these files and assign each of them a thread. We had already discussed that concurrency does not mean parallel processing. What happens here is that the first file would start processing inside the function process_file. The process_file function has a time.delay(5) inside it. When the program executes this line of code the current thread processing file1 pauses and gives control to the other thread which is handling file2. It will also pause for 5 seconds. The control switches between these programs depending on their waiting and execution time. Without using threads this program would require more than 10 seconds to execute due to the time.delay used. Using thread pool it would take approximately 5 seconds to complete the program assuming the time taken for other operation inside proceess_files does not take up considerable amount of time.

When we submit a function as a thread it returns a future object. This future object has a method called result(), which throws an error if the process inside that thread is not complete. This can be used to check whether a thread executed successfully. To refine this further we attach a callback function to this thread which executes when the program inside that thread completes or errors out. Call back function helps us use future.result() at the right time as calling this before the completion of the thread throws an exception irrespective of whether the program is running fine. This way we can check whether the thread completed successfully or exited the program due to some error. The function check_completion is used to do this.

future = executor.submit(process_file, file)
future.add_done_callback(check_completion)

Splitting a program into multiple threads can have many benefits depending on the use case. If the program has lot of waiting time, multi-threading can be a really good solution. Multi-threading can be very useful for making bulk api calls, bulk writing/reading operations etc.

GIT Repo for Task Scheduler using ThreadPoolExecutor: https://gitlab.com/pranoy-open-projects/job-scheduler.git

Hello fellow Developers, my name's Pranoy. I'm a 24 year old programmer living in Kerala, India.