Async programming in Python with asyncio

For people coming from JavaScript asynchronous programming is nothing new, but for python developers getting used to async functions and future (the equivalent of promise in JS) may not be trivial

Concurrency vs Parallelism

Concurrency and parallelism can sound really similar but in programming there is an important difference.
Immagine you are writing a book while cooking, even if it seems like you are doing both tasks at the same time, what you are doing is switching between the two tasks, while you wait for the water to boil you are writing your book, but while you are chopping some vegetables you pause your writing. This is called concurrency. The only way to do these two tasks in parallel is having two people, one writing and one cooking, which is what multicore CPU do.

concurrency-parallelism

Why asyncio

Async programming allows you to write concurrent code that runs in a single thread. The first advantage compared to multiple threads is that you decide where the scheduler will switch from one task to another, which means that sharing data between tasks it's safer and easier.

def queue_push_back(x):
    if len(list) < max_size:
        list.append(x)

If we run the code above in a multithread program it's possible that two threads execute line 2 at the same time so 2 items will be added to the queue at the same time and potentially making the queue size bigger than max_size

Another advantage of asycn programming is memory usage. Every time a new thread is created some memory is used to allow context switching, if we use async programming this is not a problem since the code runs in a single thread.

How to write async code in python

Asyncio has 3 main components: coroutines, event loop, and future

Coroutine

A coroutine is the result of an asynchronous function which can be declared using the keyword async before def

async def my_task(args):
    pass
    
my_coroutine = my_task(args)

When we declare a function using the async keyword the function is not run, instead, a coroutine object is returned.

There are two ways to read the output of an async function from a coroutine.
The first way is to use the await keyword, this is possible only inside async functions and will wait for the coroutine to terminate and return the result

result = await my_task(args)

The second way is to add it to an event loop as we will see in the next sections.

Event loop

The event loop is the object which execute our asyncronous code and decide how to switch between async functions. After creating an event loop we can add multiple coroutines to it, this corutines will all be running concurrently when run_until_complete or run_forever is called.

# create loop
loop = asyncio.new_event_loop()
# add coroutine to the loop
future = loop.create_task(my_coroutine)
# stop the program and execute all coroutine added
# to the loop concurrently
loop.run_until_complete(future)
loop.close()

Future

A future is an object that works as a placeholder for the output of an asynchronous function and it gives us information about the function state.
A future is created when we add a corutine to an event loop. There are two way to this:

future1 = loop.create_task(my_coroutine)
# or
future2 = asyncio.ensure_future(my_coroutine)

The first method adds a coroutine to the loop and returns a task which is a subtype of future. The second method is very similar, it takes a coroutine and it adds it to the default loop, the only difference is that it can also accept a future, in which case it will not do anything and return the future unchanged.

A simple program

import asyncio

async def my_task(args):
    pass

def main():
    loop = asyncio.new_event_loop()
    coroutine1 = my_task()
    coroutine2 = my_task()
    task1 = loop.create_task(coroutine1)
    task2 = loop.create_task(coroutine2)
    loop.run_until_complete(asycnio.wait([task1, task2]))
    print('task1 result:', task1.result())
    print('task2 result:', task2.result())
    loop.close()

As you can see to run an asynchronous function we first need to create a coroutine, then we add it to the event loop which create a future/task. Up to this point none of the code inside our async function has been executed, only when we call loop.run_until_completed the event loop start executing all the coroutines that have been added to the loop with loop.create_task or asyncio.ensure_future.
loop.run_until_completed will block your program until the future you gave as argument is completed. In the example we used asyncio.wait() to create a future which will be complete only when all the futures passed in the argument list are completed.

Async functions

One thing to keep in mind while writing asynchronous functions in python is that just because you used async before def it doesn't mean that your function will be run concurrently. If you take a normal function and add async in front of it the event loop will run your function without interruption because you didn't specify where the loop is allowed to interrupt your function to run another coroutine. Specify where the event loop is allowed to change coroutine is really simple, every time you use the keyword await the event loop can stop running your function and run another coroutine registered to the loop.

async def print_numbers_async1(n, prefix):
    for i in range(n):
        print(prefix, i)

async def print_numbers_async2(n, prefix):
    for i in range(n):
        print(prefix, i)
        if i % 5 == 0:
            await asyncio.sleep(0)
            
loop1 = asyncio.new_event_loop()
count1_1 = loop1.create_task(print_numbers_async1(10, 'c1_1')
count2_1 = loop1.create_task(print_numbers_async1(10, 'c2_1')
loop1.run_until_complete(asyncio.wait([count1_1, count2_1])
loop1.close()

loop2 = asyncio.new_event_loop()
count1_2 = loop1.create_task(print_numbers_async1(10, 'c1_2')
count2_2 = loop1.create_task(print_numbers_async1(10, 'c2_2')
loop2.run_until_complete(asyncio.wait([count1_2, count2_2])
loop2.close()

If we execute this code we will see that loop1 will print first print all numbers with prefix c1_1 and then with the prefix c2_1 while in the second loop every 5 numbers the loop will change task.

Real world example

Now that we know the basics of asynchronous programming in python let's write some more realistic code which will download a list of pages from the internet and print a preview containing the first 3 lines of the page.

import aiohttp
import asyncio

async def print_preview(url):
    # connect to the server
    async with aiohttp.ClientSession() as session:
        # create get request
        async with session.get(url) as response:
            # wait for response
            response = await response.text()

            # print first 3 not empty lines
            count = 0
            lines = list(filter(lambda x: len(x) > 0, response.split('\n')))
            print('-'*80)
            for line in lines[:3]:
                print(line)
            print()

def print_all_pages():
    pages = [
        'http://textfiles.com/adventure/amforever.txt',
        'http://textfiles.com/adventure/ballyhoo.txt',
        'http://textfiles.com/adventure/bardstale.txt',
    ]

    tasks =  []
    loop = asyncio.new_event_loop()
    for page in pages:
        tasks.append(loop.create_task(print_preview(page)))

    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

This code should be pretty easy to understand, we start by creating an asynchronous function which downloads an URL and prints the first 3 not empty lines. Then we create a function which for each page in a list of pages call print_preview, add the coroutine the to loop and store the future inside a list of tasks. Finally, we run the event loop which will run the coroutine we added to it and it will print the preview of all the pages.

Async generator

The last feature I want to talk about is asynchronous generator. Implementing an asynchronous generator is quite simple.

import asyncio
import math
import random

async def is_prime(n):
    if n < 2:
        return True
    for i in range(2, n):
        # allow event_loop to run other coroutine
        await asyncio.sleep(0)
        if n % i == 0:
            return False
    return True

async def prime_generator(n_prime):
    counter = 0
    n = 0
    while counter < n_prime:
        n += 1
        # wait for is_prime to finish
        prime = await is_prime(n)
        if prime:
            yield n
            counter += 1

async def check_email(limit):
    for i in range(limit):
        if random.random() > 0.8:
            print('1 new email')
        else:
            print('0 new email')
        await asyncio.sleep(2)

async def print_prime(n):
    async for prime in prime_generator(n):
        print('new prime number found:', prime)

def main():
    loop = asyncio.new_event_loop()
    prime = loop.create_task(print_prime(3000))
    email = loop.create_task(check_email(10))
    loop.run_until_complete(asyncio.wait([prime, email]))
    loop.close()

Exception handling

When an unhandled exception is raised inside a coroutine it doesn't break our program as in normal synchronous programming, instead, it's stored inside the future and if you don't handle the exception before the program exit you will get the following error

Task exception was never retrieved

There are two ways to fix this, catch the exception when you access the future result or calling the future exception method.

try:
    # this will raise the exception raised during the coroutine execution
    my_promise.result()
catch Exception:
    pass

# this will return the exception raised during the coroutine execution
my_promise.exception()

Going deeper

If you have read everything up to this point you should know how to use asyncio to write concurrent code, but if you wish to go deeper and understand how asyncio works I suggest you watch the following video

If you would like to see more complex uses of asyncio or if you have any question leave a comment and I will replay to you as soon as possible

Show Comments