Scribbling

Python: Concurrency with asyncio (asyncio 기반 비동기 프로그래밍) 본문

Computer Science/Python

Python: Concurrency with asyncio (asyncio 기반 비동기 프로그래밍)

focalpoint 2022. 5. 17. 17:22

 

Thread vs Asyncio

스레드나 코루틴을 통해 콘솔 에니메이션을 구현할 수 있다.

먼저 스레드를 이용하는 코드이다.

Console animation can be implemented with either threads or coroutines.

First, with thread.

import threading
import itertools
import time
import sys

class Signal:
    go = True

def spin(msg, signal):
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))
        time.sleep(.1)
        if not signal.go:
            break
    write(' ' * len(status) + '\x08' * len(status))

def slow_function():
    time.sleep(3)
    return 42

def supervisor():
    signal = Signal()
    spinner = threading.Thread(target=spin, args=('thinking!', signal))
    print('spinner object:', spinner)
    spinner.start()
    result = slow_function()
    signal.go = False
    spinner.join()
    return result

def main():
    result = supervisor()
    print('Answer:', result)

main()

 

파이썬에는 스레드를 종료시키는 API가 없다. 스레드에 메시지를 보내 종료시켜야 한다.

There is no thread-terminating api in python. We should send a messeage to stop it.

 

다음은 asyncio를 이용한 버전이다. asyncio 코루틴은 yield from이나 asyncio.async() 등의 함수에 전달하여 구동해야 한다. 구동하지 않으면 동작하지 않는다.

Second, asyncio version. Asyncio coroutines should be primed with 'yield from' or asyncio functions. If not primed, they do not run.

import asyncio
import itertools
import sys

async def spin(msg):
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))
        try:
            await asyncio.sleep(.1)
        except asyncio.CancelledError:
            break
    write(' ' * len(status) + '\x08' * len(status))

async def slow_function():
    await asyncio.sleep(3)
    return 42

async def supervisor():
    spinner = asyncio.ensure_future(spin('thinking'))
    print('spinner object:', spinner)
    result = await slow_function()
    spinner.cancel()
    return result

def main():
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(supervisor())
    loop.close()
    print('Answer:', result)

main()

asyncio.ensure_future() 함수는 Task 객체의 실행을 스케줄링 한다.

asyncio.ensure_future() function schedules the execution of the task object.

 

스레드로 복잡한 프로그램을 구현하다보면, 스케줄러가 언제든 스레드를 중단시킬 수 있으므로 프로그램을 분석하는 것이 매우 어렵다. 프로그램의 크리티컬 섹션을 보호하기 위해 락을 사용하는 등의 추가적인 조치가 필요하다.

코루틴은 기본적으로 인터럽트로부터 보호된다. 코루틴은 제어권을 yield나 yield from으로 스케줄러에 넘기기 때문이다. 따라서 CancelledError 예외를 처리하기만 하면 된다.

It is very difficult to analyze a complicated program with a number of threads because scheduler can terminate a thread at any time. To pretect critical session from any interrupts, additional measures such as locks are necessary.

Coroutines are basically safe from interrupts. Coroutines hand over control to the scheduler only through 'yield' or 'yield from', thus handling CancelledError exceptions alone will do.

 

 

Downloading with asyncio and aiohttp

asyncio는 TCP와 UDP만 직접 지원하므로, 비동기 HTTP 클라이언트/서버를 구현하려면 aiohttp를 사용한다.

asyncio를 통한 비동기 프로그래밍은, 메인 루프와 코루틴 간에 제어권이 오고가는 과정에서 큐에 있는 task가 순차적으로 수행되는 과정이다.

Import aiohttp to implement a asynchronous HTTP Client/server, as asyncio only supports TCP and UDP.

Asynchronous programming with asyncio is a process of sequentially completeing tasks in a queue while controls are handed over between main loop and coroutines.

- Asyncio를 사용할 때는 직접 코루틴 체인을 구동하는 것이 아니라, asyncio 이벤트 루프가 처리하도록 해야 한다.

- 가장 안쪽의 하위 제너레이터는 실제 입출력을 수행하는 라이브러리 함수여야 한다.

- With Asyncio, let asyncio event loop handle priming coroutines.

- Innermost generator should be a function from a library that executes input/ouput operations.

블로킹 함수는 디스크나 네트워크의 입출력의 수행으로 정의된다. 스레드는 좋은 해법이지만 수천 개의 연결을 처리해야 한다면 메모리 부하로 인해 스레드를 사용하기 어렵다. 이러한 관점에서, 코루틴은 적절한 대안이다.

* Asyncio에서는 아직 비동기 파일 시스템 API를 제공하고 있지 않다.

Blocking functions are defined as execution of input/outputs from/to disks and networks. Thread is a great solution in general, but is not feasible for thousands of connections. Coroutines, in this respect, can be an alternative solution.

* Asyncio does not provide file system API yet.

 

import asyncio
import collections
import sys

import aiohttp
from aiohttp import web
import tqdm

from flags2_common import main, HTTPStatus, Result, save_flag


DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000


class FetchError(Exception):
    def __init__(self, country_code):
        self.country_code = country_code


async def get_flag(session, base_url, cc):
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    async with session.get(url) as resp:
        if resp.status == 200:
            image = await resp.read()
            return image
        if resp.status == 404:
            raise web.HTTPNotFound()
        else:
            raise aiohttp.HttpProcessingError(
                code=resp.status, message=resp.reason,
                headers=resp.headers)


async def download_one(session, cc, base_url, semaphore, verbose):
    async with semaphore:
        try:
            image = await get_flag(session, base_url, cc)
        except web.HTTPNotFound:
            status = HTTPStatus.not_found
            msg = 'not found'
        except Exception as exc:
            raise FetchError(cc) from exc
        else:
            save_flag(image, cc.lower() + '.gif')
            status = HTTPStatus.ok
            msg = 'OK'
        if verbose and msg:
            print(cc, msg)
        return Result(status, cc)


async def download_coro(cc_list, base_url, verbose, concur_req):
    async with aiohttp.ClientSession() as session:
        counter = collections.Counter()
        semaphore = asyncio.Semaphore(concur_req)

        to_do = [download_one(session, cc, base_url, semaphore, verbose) for cc in sorted(cc_list)]
        to_do = asyncio.as_completed(to_do)
        if not verbose:
            to_do = tqdm.tqdm(to_do, total=len(cc_list))

        for future in to_do:
            try:
                res = await future
            except FetchError as exc:
                country_code = exc.country_code
                try:
                    error_msg = exc.__cause__.args[0]
                except IndexError:
                    error_msg = exc.__cause__.__clas__.__name__
                if verbose and error_msg:
                    print('*** Error for {}: {}'.format(country_code, error_msg))
            else:
                status = res.status
            counter[status] += 1
        return counter


def download_many(cc_list, base_url, verbose, concur_req):
    loop = asyncio.get_event_loop()
    coro = download_coro(cc_list, base_url, verbose, concur_req)
    counter = loop.run_until_complete(coro)
    loop.close()
    return counter


if __name__ == '__main__':
    py_ver = int(f"{sys.version_info.major}{sys.version_info.minor}")
    if py_ver > 37 and sys.platform.startswith('win'):
        asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
    main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)

Semaphore 인수는 asyncio.Semaphore 객체로, 동시 요청 수를 제한하기 위한 동기화 장치다. 세마포어는 내부에 카운터를 가지고 있는 객체로서 acquire() 코루틴 메서드를 호출할 때마다 감소하고, release() 코루틴 메서드를 호출할 때마다 증가한다. 카운터가 0보다 클 때는 acquire()를 호출해도 블로킹되지 않지만, 카운터가 0일 때 acquire()를 호출하면 다른 곳에서 release()를 호출해서 카운터를 증가시켜줄 때까지 블로킹된다.

기존에 정의한 main 함수는 코루틴이 아닌 일반 함수 download_many를 인수로 취하기 때문에, 불가피하게 download_coro 함수가 필요하다.

Asyncio.Future 객체의 결과는 result() method를 호출하기 보다는 await를 사용하는 것이 용이하다.

asyncio.as_completed()가 반환하는 Future 객체는 as_completed()를 호출할 때 전달한 Future 객체와 다르다. 그래서 FetchError class가 필요하다.

Semaphore is a device to limit the number of simultaneous requests. Semaphore is an object with a counter which decreases by one when acquire() method is called and increases with release() method. If the number is bigger than 0, it does not get blocked with acquire(), however, if the number is 0, it gets blocked until release() method is called from somewhere else.

Main function we defined previously takes a normal function(not a coroutine) download_many as its parameter, thus making it indispensable to implement an additional function, download_coro.

Use await when getting the result of Asyncio.future rather than call result() method.

Future objects returned by asyncio.as_completed() method are not the same as the ones we passed to the it, therefore we need a customized FetchError class.

 

현재 파이썬 3.8이상 version + Windows OS에서 asyncio 모듈이 runtime error를 발생시킨다. 

해당 조건 하에서는 아래 코드를 사용해야 한다.

Currently, under windows operating system and python from 3.8 up, asyncio module raises a runtime error.

Under the condition, add the below code.

Reference: https://gmyankee.tistory.com/330

 

python 3.8 RuntimeError: Event loop is closed

python 3.8 이후부터 windows에서 async 한 개발을 하면 정상 동작을 하였음에도 불구하고 다음과 같은 오류가 나타납니다. RuntimeError: Event loop is closed 이러한 이슈로 많은 사람들이 해결책을 찾기 위해

gmyankee.tistory.com

if __name__ == '__main__':
    py_ver = int(f"{sys.version_info.major}{sys.version_info.minor}")
    if py_ver > 37 and sys.platform.startswith('win'):
        asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

 

위의 코드에서 save_flag()는 블로킹 함수다. 스레드 버전에서 save_flag()는 GIL을 해제하여 다른 스레드가 진행되도록 허용하여 문제가 되지 않지만, flag2_asyncio.py에서 save_flag()는 유일한 스레드를 블로킹하여 문제가 된다. 이런 문제는 이벤트 루프 객체의 run_in_executor() 메서드로 해결할 수 있다. asyncio 이벤트 루프는 스레드 풀 실행자를 내부에 가지고 있으며 run_in_executor() 메서드에 실행할 콜러블을 전달 가능하다.

In the above code, save_flag() is a blocking function. In thread version, save_flag() is not a problem by releasing GIL allowing another thread to proceed. In flag2_asyncio.py, however, save_flag() blocks the only thread and delays the execution. run_in_executor() method can resolve this. Asyncio's event loop has an internal thread pool executor.

async def download_one(session, cc, base_url, semaphore, verbose):
    async with semaphore:
        try:
            image = await get_flag(session, base_url, cc)
        except web.HTTPNotFound:
            status = HTTPStatus.not_found
            msg = 'not found'
        except Exception as exc:
            raise FetchError(cc) from exc
        else:
            loop = asyncio.get_event_loop()
            loop.run_in_executor(None, save_flag, image, cc.lower()+'.gif')
            status = HTTPStatus.ok
            msg = 'OK'
        if verbose and msg:
            print(cc, msg)
        return Result(status, cc)

 

한 번에 여러 가지 요청하기: 스레드 혹은 asyncio에서의 구현이 용이하다.

Doing multiple requests for each download: easy to implement with thread or asyncio.

async def http_get(session, url):
    async with session.get(url) as resp:
        if resp.status == 200:
            ctype = resp.headers.get('Content-type', '').lower()
            if 'json' in ctype or url.endswith('json'):
                data = await resp.json()
            else:
                data = await resp.read()
            return data
        elif resp.status == 404:
            raise web.HTTPNotFound()
        else:
            raise aiohttp.HttpProcessingError(
                code=resp.status, message=resp.reason,
                headers=resp.headers)

async def get_country(session, base_url, cc):
    url = '{}/{cc}/metadata.json'.format(base_url, cc=cc.lower())
    metadata = await http_get(session, url)
    return metadata['country']

async def get_flag(session, base_url, cc):
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    return await http_get(session, url)

async def download_one(session, cc, base_url, semaphore, verbose):
    async with semaphore:
        try:
            image = await get_flag(session, base_url, cc)
            country = await get_country(session, base_url, cc)
        except web.HTTPNotFound:
            status = HTTPStatus.not_found
            msg = 'not found'
        except Exception as exc:
            raise FetchError(cc) from exc
        else:
            country = country.replace(' ', '_')
            filename = '{}-{}.gif'.format(country, cc)
            loop = asyncio.get_event_loop()
            loop.run_in_executor(None, save_flag, image, filename)
            status = HTTPStatus.ok
            msg = 'OK'
        if verbose and msg:
            print(cc, msg)
        return Result(status, cc)

 

Asyncio와 aiohttp를 이용하면 TCP나 web server 구현이 가능하다.

동시성을 향상시키려면, 서버 디스크에 있는 데이터베이스에 대한 접근은 비동기식으로 실행해야 한다. 그렇지 않으면 결과가 나올 때까지 이벤트 루프가 블로킹된다. 응답 지연 문제를 처리하기 위해 응답을 페이지로 구분할 수 있다. 예를 들어 최대 200행으로 제한하고, 사용자가 클릭하거나 스크롤하면 데이터를 더 가져오는 방식이 있다.

결과를 부분적으로 보내는 데 필요한 대부분의 코드는 브라우저 측에서 구현된다. 때문에 구글 및 대부분의 대형 인터넷 서비스는 상당 부분을 클라이언트 측에 의존한다. 영리한 비동기식 클라이언트는 서버 자원을 더욱 효율적으로 활용한다.

With asyncio and aiohttp, we can implement TCP or web servers.

To better concurrency, access to databases should be executed asynchronously. Otherwise, event loop will be blocked until results come out. Requests can be divided into several pages to handle latency issues. For example, fetching additional data when the user clicks or scrolls downs while limiting to 200 rows at a time.

Most of codes that send results in parts are implemented from browser side, thus major internet services such as Google largely depend upon clients. Smart clients utilize server resources far more efficiently.