Diving Into Python Concurrency and Optimizations

Photo by RetroSupply on Unsplash

Diving Into Python Concurrency and Optimizations

·

15 min read

We Must Scale Up

Today's biggest challenge is to handle the enormous amount of data, and as fast as possible.

Nowadays, Most of the products generate more insights and more data compared to the past, and also many of the products are data providers that expose APIs.

Today we'll talk about handling those endless streams with several Python optimization methods, and also understand why those methods actually work and how.

Axonius Challenge

Axonius engine communicates with 300+ different products including Cloud providers to get the full security picture of companies, this process is called Asset Management.

The services that are responsible for communicating with other products and data providers are called Adapters, they are generic services with multiple hats.

They act as ETLs (data pipelines that Extract, Transform and Load data to the system), controllers for enforcing security rules, manage logic with other Axonius services, and of course - manage the matching product states.

All of this data is being fetched to those servers, processed, cached, and normalized into Axonius databases.

Adapters can handle the enormous amounts of data and network usage, and they must handle this data efficiently - every delay in this process might cause missing security issues and present an imperfect security picture.

One of the main challenges in adapter implementation is scaling while keeping a good performance in each environment we communicate with. Think about enterprises and large companies with very complex environments that include millions of assets, those large environments must be under full control.

Today we will walk through a few concepts and optimizations for handling large streams of data, in Python, and understand why they work.

Classify your code

We can divide our run-time performance bottlenecks into two groups: IO-Bound operations and CPU-Bound operations.

In general, there are more groups (RAM, Cache, etc), but we'll focus on those popular two.

The classification of our bottlenecks is a crucial part, after we profiled our codebase and pointed what causing the performance issue, we just did most of the work.

There are many profilers in Python today, a lot of visualizations plugins, and production monitoring products (APMs) that will lead you to your bottlenecks.

In this article, we'll focus on optimizations and how they work.

IO-Bound Optimizations

The bottleneck in I/O (Input/Output) operations is simply the waiting part.

In general, those operations are usually characterized using the network, disk, etc. Simple examples are sending an HTTP request to www.axonius.com, reading or writing a file, and more.

Allocating more CPU cores (=computation power) for those kinds of operations won’t gain you a significant performance boost. Those types of methods will benefit more from concurrency methods and asynchronous design.

Concurrency is all about switching tasks efficiently, IO-Bound operations are simply wasting CPU time, and we can yield their execution context allowingo for another task to run.

Python supports concurrency and asynchronous programming natively with Multithreading and AsyncIO.

Multithreading: Why and How?

Using threads we can separate the execution flow, simply by executing our code in concurrency. It's pretty efficient for IO-bound operations that need to be executed concurrently, instead of waiting for the IO operation to end right away, we can dispatch another operation and wait for all operations to end after that.

The downside of threads is that it adds more complexity to the code so we need to make sure our program is thread-safe and if not - manage shared resources.

Behind the scenes: Threads and IO

CPython's (most popular & original implementation of Python) threads are implemented with OS threads, behind the scenes CPython's Interpreter wraps the regular OS threads library with the addition of the Pythonic thread's context and state.

When we create a new threading) thread, a new OS thread is created by calling the syscalls pthread_create() for UNIX-like systems and _beginthreadex() on Windows. Those syscalls are responsible for allocating new threads from the OS and starting them with the current process’s context.

Why it's Important? OS Scheduling! Most scheduler implementations have multiple queues for running threads. When the IO operation starts the thread push into Blocked Queue, when its finishes, the thread that waited in Blocked Queue is transferred to Ready Queue, so we can be pretty efficient with the context switching of threads and move between tasks efficiently.

How GIL and Threads Work

The CPython interpreter is not fully thread-safe. Global Interpreter Lock (GIL) is a mutex (lock) that allows only one thread to run in the CPython's interpreter at each given time, each running thread will hold the mutex and the execution context. Threads can run on different cores but only one thread will run at a time.

But how does the GIL work?

If a thread waits for an IO operation to finish and it has already acquired the GIL, how can another thread be run?

The answer to this question is simple, all the IO operations in CPython executed in Release-Dispatch-Acquire:

release the lock, dispatch IO operation, Acquire the GIL back after the operation ended.

Threads hold the GIL only when they work with Python objects, also, they can release it voluntarily (Thus, we actually can optimize CPU-bound programs with multithreading if we work with C extensions that release the GIL).

GIL is necessary for supporting multithreaded Python programs, but also a trap for CPU-bound programs as we cannot parallelize computations and the Threads context switching can be a real overhead. Since CPU-Bound operations don't wait for external events, we don't gain any profit from using multiple Threads in CPU-bound programs.

GIL exists in CPython, but there are many other interpreters which do not implement GIL.

There are a few interesting reasons for GIL existence:

  • Support non-thread-safe C extensions.

  • Increasing single-threaded programs performance as we do not juggle between threads and many locks.

  • Faster Garbage Collection process (No synchronization at reference counting).

Green Threads

Green threads are also important concepts which implement threads concept in the user space. Those are simply co-operative threads which can be switched voluntarily, in each given time one thread will run.

This concept shows great performance in IO-bound programs, and it's very similar to the next concept we are going to talk about.

Green threads implementations exist in Python, greenlet is one of the popular implementations.

Let's wrap things up, threads -

  1. Allow switching control between IO operations (concurrency).

  2. Only one thread can run at a time (GIL).

  3. Are not recommended for CPU-bound programs.

  4. Adds complexity by enforcing thread safety.

  5. Are difficult to manage and debug.

asyncio: Why and How?

How many times have you found yourself cooking and doing millions of things concurrently? You did what asyncio does!

Asyncio (PEP3156) mechanism was designed to provide high-performance with single-threaded IO-Bound applications such as web servers, network-intensive applications that use databases, distributed task queues, and more. It also provides a large package ecosystem for performing a variety of IO operations fast.

This mechanism uses the concept of Event Loop to run tasks. The event loop is tracking all the IO events within tasks, and resuming the ones which have the results. It manages our concurrency.

Let's say we wanna run several IO-bound functions, each time IO operation is dispatched the function would yield its runtime and return the control to the event loop which gives us the Future object, our task state, a promise.

Then, the event loop will see which unblocked tasks it has, those who finished will get back to run.

Unlike threads which need only to enforce thread-safety, to use asyncio we must have packages that support asyncio mechanisms - without this support we would have a regular synchronous code.

asyncio and Web Servers

Asyncio is a real game-changer for Python's WSGI frameworks and web frameworks. Since it was released many new awesome web servers (such as FastAPI) and web frameworks were published and placed Python in a better place in the performance aspect.

Web servers are the ultimate example of a case that benefits from event loops much more than threads, especially when dealing with thousands of connections simultaneously.

Most of the web servers simply get/send HTTP requests, and heavy computations are performed in an asynchronous way in another place (being stateless and response requests immediately), so it fits perfectly.

Behind the scenes: AsyncIO and IO

What makes the Asynchronous part in asyncio?

How the IO is executed in a different place while we do another task?

The answer is simple: select). select is a blocking syscall that monitors on given file descriptors, when a file descriptor is ready for read/write the function returned, simply provides us a way to multiplex many IO operations and continue only when they are ready.

select is a syscall that makes it very efficient (not a busy-wait if you suspected, it simply works with OS interrupts).

When we are trying to receive data from socket (e.g), our socket file descriptor is simply attached to the select file descriptors list, the event loop will know only when to socket is ready for reading since the select stop blocking and returning the ready file descriptor.

After the event loop gets the notification that our socket is ready for reading, it sets the corresponding future as done, and the coroutine is back to running state.

How Functions Got Stopped and Resumed?

I mentioned in the last paragraph that "the coroutine is back to running state" - Coroutines.

The great idea behind asyncio is coroutines, routines that yield their execution because they do nothing except wait for IO. await keyword gives the control back to the event loop, exactly like yield in generators.

Tuning your asyncio Limits

Let's say we have 1 Million requests to dispatch and intense-network activity in the background (Axonius is familiar with this situation) and we must do it as fast as possible, usually adding more tasks to the event loop seems a good option, but we must talk about limits here.

Theoretically, let's say we communicate with an annoying server that has poor latency, and we need to send many queries to this server to get our awesome data.

A reasonable thought will be to dispatch many fetching tasks concurrently to take advantage of this situation - each IO operation will yield its runtime and switch control to another fetch task, and this way we'll get the responses together.

This is a really good plan, but we must watch out for this symptom of unmanaged event loop which dispatch many tasks as possible:


OSError: [Errno 24] Too many open files

This error can happen if we send too many tasks concurrently.

To avoid this problem, we must perform a benchmark that will define the number of tasks we can handle concurrently, and simply control our concurrency level.

Under the hood, Sockets are simply file descriptors that we write/read from. Our operating system has a limit (for everything) of how many open file descriptors can be opened for each given process, it's an adjustable limit but a good practice is having control over the number of tasks that are being executed.

A convenient way of providing synchronization to our code with minimal effort is using asyncio.Sempahore class. Each time we send an IO operation, we'll do it under our semaphore that will manage the number of concurrent tasks that run.

It works exactly like threads Semaphore (there are more Synchronization Primitives also), every time the lock acquires the lock counter decremented, when reaching zero we'll wait for tasks to release the lock.

sem = asyncio.Semaphore(10)
async with sem:    
    # work with shared resource

Synchronous Code in asyncio

Synchronous code is a task that runs on an asyncio event loop that can be a disaster for performance.

Think about the following scenario - you dispatch many tasks that are waiting for their IO to finish. Meanwhile, the currently running task does some heavy synchronous stuff which blocks all the other tasks from running, even if their IO is ready. That stuff can come from unpredictable places, so be aware of it.

A good example is logging. Logs, in general, might be a performance issue (IO) if they are being abused -We should note that the native Logger provided by Python is blocking, so we must take it into consideration.

In those cases we can use external packages such async loggers or another architecture that won't block your program entirely in each logger.debug('I'm here!').

Benchmark Time

After having tasks controlled, perform a benchmark and find your most efficient amount of tasks that can be run concurrently.

Notice that the benchmark results can change between different setups, but it gives a good clue what is the direction.

IO-Bound: When to choose what?

You'll need to examine several things and decide accordingly what is the appropriate method.

  • Is thread-safety feasible?

    • There are many modules that we use that do not support multithreading, check it before you use multithreading.

    • Try to find the shared resources and determine how much it'll take to manage synchronization.

  • AsyncIO support

    • Check if all your IO-bound operations support asyncio.
  • Synchronous codebase

    • Using asyncio with synchronous code can be a real problem for performance as we take all the runtime from other tasks.
  • Number Of Connections

    • If we are dealing with a really large amount of connections, handling those connections with threads can be less efficient in-memory aspects since OS threads are more expensive (OS has threads limit per process) than asyncio tasks.

    • Using a managed threads pool can be a great option also.

  • IO operations speed

    • Fast IO operations can be faster using the asyncio event loop since we don't lean on OS context switches.

CPU-Bound and Optimizations

CPU bound operations are operations based on CPU computations, the bottleneck on those operations are heavy computations.

If we'll have more computation power here (CPUs), we're good. But sometimes, it's not possible whether you develop an on-premise product with limited resources or you are just trying to be efficient for costs, those programs must be scaled-up too.

Ok, so we classified our codebase bottleneck, and found it as a CPU-bound case!

Usually, those cases will benefit more from the parallel mechanism, it can be parallelism on a single machine or distributed architecture - the point is to send our computations to many other places to get our results faster.

Parallelism is all about maximizing existing resources, running your tasks independently on different resources.

We'll focus on Python's native solution for parallelism, Multiprocessing.

Python has many solutions for asynchronous distributed computations on clusters, among them.

Multiprocessing: Why and How?

Multiprocessing package provides an API for spawning (/fork) processes locally and remotely.

Using multiprocessing we can exit the GIL box and power up our programs using multiple different cores for our computations.

The syntax of the multiprocessing package is pretty the same as multithreading, it's recommended to use the concurrent package for both usages, it simplifies the usage even more and provides process pools logic.

Multiprocessing and IO Bound?

Overkill. First of all, it is an option, and it will be fast as well, theoretically, it will parallelize the IO operations and achieve that same thing.

The reason it is less recommended is that this approach requires a lot of resources (memory, runtime) and has slower IPC (Inter-Process Communication) methods.

Dispatching 100 asyncio tasks will be much lighter than dispatching 100 processes.

Few Warnings

  • Keep track of your memory consumption when you lack resources, multiprocessing can cause pretty high RAM consumption.

  • Short computations can be inefficient since the spawning process & context-switching is expensive.

  • Shared resources and IPC must be handled carefully, synchronization and serialization for non-trivial objects may be required, also forked processes can be handled differently.

So it's simple, CPU bound in Python goes to multiprocessing.

Combined Approaches Wins

After we classified our bottleneck, we should pick the optimization approaches. In real-life applications most of the cases won't be flow with only IO-bound parts and only CPU-bound parts, sometimes they'll be bound together, Let's split them again!

Distributed Concurrency

Let's go back to our 1 million requests problem - event loop or thread pool probably will be a good optimization for this scenario, but in those situations, we should consider combining methods.

Multiprocessing/distributing our IO tasks into several cores/machines workers with processes,

It will be the first step for decreasing the great load, after, we can perform load balancing into several cores/machines and in each process perform our IO-bound optimizations using asyncio or multithreading.

Let’s consider the following example, let’s say we dealing with 9M tasks, in the next diagram we simplified it into 9 tasks, but the idea remains the same. We’ll take the 9 tasks and distribute 3 for core #1 3 for core #2 3 for core #3, this way we can replace the previous linear model, one process which handles everything. The cores can also be in another machine conceptually.

IO and CPU Bound Together

Most of the ETLs have 3 simple steps: Fetch (IO), Process (CPU), Send (IO) results.

In complex scenarios, we'll have 3 bottlenecks! Not only the IO part is complex, but even the processing stage is also heavy.

A great solution that can fit those scenarios is using asyncio/multithreading for the IO parts, and sending their results into a multiprocessing pool which will process the results without blocking our main fetching method.

Combining both approaches will send the computations to be handled in other cores, while our main program will fetch and send more results.

In the next code, we can see a very simple implementation for this scenario. We'll use asyncio for fetching content from an HTTP server, the processing step will be executed in another process using ProcessPoolExecutor which can send workers to perform the processing step.

This simple implementation was more efficient from the only asyncio solution or only multiprocessing solution.

import asyncio  
import aiohttp  
from common import TARGET_URLS  
from concurrent.futures import ProcessPoolExecutor

def process(result):  
    """CPU bound function."""  
    processed_result = very_long_computation(result)  
    return processed_result  


async def fetch(session, url):  
    """IO bound function."""  
    async with session.get(url) as res:  
        result = await res.content  
    return result  


async def handle_task(url, session, pool):  
    """Fetch and process single task."""  
    result = await fetch(session, url)  
    processed_result = await asyncio.wrap_future(pool.submit(process, result))  
    return processed_result  


async def dispatcher(urls):  
    """Dispatch all tasks."""  
    pool = ProcessPoolExecutor()  

    async with aiohttp.ClientSession() as session:  
        handlers = [handle_task(url, session, pool) for url in urls]  
        results = await asyncio.gather(*handlers)  

    return results  


def main():  
    all_results = asyncio.get_event_loop().run_until_complete(  
        dispatcher(TARGET_URLS)  
    )  

    print(all_results)

There are many packages that can simplify those combined processes such as aiomultiprocess, Ray, Fiber, pypeln, etc.

"The key is in not spending time, but in investing it"

Python support varies of methods to optimize your code, and all the time new ones are released, so may the optimizations force with you.