AIOHTTPハンズオン

ごはんでまなぶ非同期処理

_images/gohan.png
  • 「米を炊く」と「調理する」は同時にできる

  • 「味噌汁をつくる」が終わってから「おかずをつくる」

  • 「米を炊く」と「調理する」が両方おわったら「いただきます」

コード例

 1import asyncio
 2import time
 3import logging
 4
 5logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s", datefmt="%X")
 6
 7
 8async def kome():
 9    logging.info("米を炊くヨ")
10    await asyncio.sleep(10)
11    logging.info("米が炊けたヨ")
12
13
14async def chori():
15    logging.info("調理するヨ")
16
17    logging.info("味噌汁つくるヨ")
18    await asyncio.sleep(3)
19    logging.info("味噌汁できたヨ")
20
21    logging.info("おかずつくるヨ")
22    await asyncio.sleep(6)
23    logging.info("おかずできたヨ")
24
25    logging.info("調理おわったよヨ")
26
27
28def taberu():
29    logging.info("いただきます")
30    time.sleep(3)
31    logging.info("ごちそうさまでした")
32
33
34async def itadakimasu():
35
36    logging.info("--- ご飯できるまで待つ ---")
37
38    kome_task = asyncio.create_task(kome())
39    chori_task = asyncio.create_task(chori())
40    await kome_task
41    await chori_task
42
43    logging.info("--- ご飯できたので食べる ---")
44
45    taberu()
46
47
48asyncio.run(itadakimasu())

出力例

14:47:14 --- ご飯できるまで待つ ---
14:47:14 米を炊くヨ
14:47:14 調理するヨ
14:47:14 味噌汁つくるヨ
14:47:17 味噌汁できたヨ
14:47:17 おかずつくるヨ
14:47:23 おかずできたヨ
14:47:23 調理おわったよヨ
14:47:24 米が炊けたヨ
14:47:24 --- ご飯できたので食べる ---
14:47:24 いただきます
14:47:27 ごちそうさまでした

asyncioとは

  • async/await 構文を使い並行処理のコードを書くためのライブラリ

  • 非同期フレームワークの基盤

    • ネットワークとウェブサーバ

    • データベース接続ライブラリ

    • 分散タスクキュー

  • IOバウンドや高レベルの構造化されたネットワークコードに適している

高レベルAPI

  • 並行にPythonコルーチンを起動し、実行全体を管理する

  • ネットワークIOとIPCを執り行う

  • subprocessesを管理する

  • キューを使ってタスクを分散する

  • 並列処理のコードを同期させる

低レベルAPI

  • 非同期APIを提供するイベントループの作成と管理

    • ネットワーク通信

    • サブプロセスの実行

    • OSシグナル

  • Transportを使った効率的なprotocolを実装

  • コールバックを用いたライブラリとasync/await構文を使ったコードの橋渡し

コルーチン

  • サブルーチンのより一般的な形式

  • サブルーチンには決められた地点から入り、別の決められた地点から出る

  • 多くの様々な地点から入る、出る、再開できる

  • async def 文で実装できる

  • 実態はジェネレータ

Task

  • コルーチンを平行にスケジュール

  • asyncio.create_task 関数でコルーチンをTaskにラップできる

awaitableオブジェクト

  • await式の中で使えるオブジェクト

  • 多くのasyncio APIはawaitableオブジェクトを受け取るように設計されている

  • 3つの種類がある

    • コルーチン

    • Task

    • Future

コルーチン

コルーチン関数

async def で定義された関数

コルーチンオブジェクト

コルーチン関数 を呼び出すと返ってくるオブジェクト.

コルーチン関数定義

  • async def で関数を定義すると、コルーチンとなる

>>> async def func():
...     print("呼んだ?")
...
>>> cor = func()
>>> print(type(cor))
<class 'coroutine'>
  • 単にコルーチンを呼び出しただけでは実行されない

>>> func()
<coroutine object func at 0x7f774767d6c0>

コルーチンの実行

  • コルーチンを await

  • awaitした関数( main )を asyncio.run 関数から実行

>>> import asyncio
>>>
>>> async def main():
...     await func()
...
>>> asyncio.run(main())
呼んだ?

練習問題

コルーチンを2つ実行してください。コルーチンの関数は渡された引数をprintします。

>>> asyncio.run(main("1こめ", "2こめ"))
1こめ
2こめ

Task

asyncio.create_task 関数によるTaskの作成

>>> import asyncio
>>>
>>> async def func():
...     pass
...
>>> async def main():
...     task = asyncio.create_task(func())
...     print(type(task))
...
>>> asyncio.run(main())
<class '_asyncio.Task'>

Taskオブジェクトが生成される

単にコルーチンをawaitして実行

 1import asyncio
 2import time
 3
 4
 5async def main():
 6    await asyncio.sleep(2)
 7    await asyncio.sleep(5)
 8
 9
10start = time.time()
11asyncio.run(main())
12print(f"time: {time.time() - start}")

実行結果: time: 7.0069193840026855

Taskを作成して実行

 1import asyncio
 2import time
 3
 4
 5async def main():
 6    task1 = asyncio.create_task(asyncio.sleep(2))
 7    task2 = asyncio.create_task(asyncio.sleep(5))
 8    await asyncio.gather(task1, task2)
 9
10
11start = time.time()
12asyncio.run(main())
13print(f"time: {time.time() - start}")

実行結果: time: 5.004063844680786

並行なTask実行

  • awaitableオブジェクトを並行して実行

  • asyncio.gather に渡されたシーケンスは並行実行される

asyncio.gatherにコルーチンを渡した例

 1import asyncio
 2import time
 3
 4
 5async def main():
 6    await asyncio.gather(asyncio.sleep(2), asyncio.sleep(5))
 7
 8
 9start = time.time()
10asyncio.run(main())
11print(f"time: {time.time() - start}")

実行結果: time: 5.007250785827637

練習問題

次のコードの get_status_code 関数を並行に実行する main 関数を作成してください

 1import asyncio
 2import time
 3
 4import aiohttp
 5
 6
 7async def get_status_code(n):
 8    url = f"https://httpbin.org/delay/{n}"
 9
10    async with aiohttp.ClientSession() as session:
11        async with session.get(url) as response:
12
13            print("Status:", response.status)
14
15
16start = time.time()
17asyncio.run(get_status_code(3))
18print(f"time: {time.time() - start}")

Wait

  • awaitableオブジェクト(Taskやコルーチン)はさまざまな条件で待機できる

  • タイムアウトを設定できる

  • 例外に応じて処理を制御できる

タイムアウト
要素の終了待機

Future

  • 非同期処理の最終結果を表現する特別な低レベルのawaitableオブジェクト

    • pending

    • finished

    • cancelled

  • コルーチンはFutureの結果が返されるか例外が送出されるまでawaitされる

asyncio.wait_for

  • awaitableオブジェクトが完了するかタイムアウトになるのを待つ

  • awaitableオブジェクトがコルーチンだった場合はTaskとしてスケジュールされる

  • タイムアウトの場合はTaskをキャンセルし、 asyncio.TimeoutError を送出する

 1import asyncio
 2import time
 3
 4
 5async def main():
 6    await asyncio.wait_for(asyncio.sleep(3), timeout=1)
 7
 8
 9start = time.time()
10asyncio.run(main())
11print(f"time: {time.time() - start}")

asyncio.TimeoutError になる

asyncio.as_completed

  • awaitableオブジェクトを同時に実行する

  • 完了した順に、イテレータを返す

  • イテレーションが完了する前にタイムアウトした場合は asyncio.TimeoutError を送出する

 1import asyncio
 2import time
 3
 4
 5async def neru(n):
 6    await asyncio.sleep(n)
 7    return n
 8
 9
10async def main():
11    for f in asyncio.as_completed([neru(2), neru(4), neru(1)], timeout=3):
12        result = await f
13        print(f"{result=}")
14
15
16start = time.time()
17asyncio.run(main())
18print(f"time: {time.time() - start}")
result=1
result=2
Traceback (most recent call last):
  File "/home/driller/repo/events/20210912/docs/source/code/as_completed.py", line 17, in <module>
    asyncio.run(main())
  File "/usr/local/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/local/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/home/driller/repo/events/20210912/docs/source/code/as_completed.py", line 12, in main
    result = await f
  File "/usr/local/lib/python3.9/asyncio/tasks.py", line 613, in _wait_for_one
    raise exceptions.TimeoutError
asyncio.exceptions.TimeoutError

asyncio.wait

  • awaitableオブジェクトを同時に実行する

  • 完了したタスクと保留中のタスクのset(集合)を返す

  • return_when でいつ結果を返すかを指定する

 1import asyncio
 2import time
 3
 4
 5async def main():
 6    tasks = [
 7        asyncio.create_task(asyncio.sleep(1)),
 8        asyncio.create_task(asyncio.sleep(3)),
 9    ]
10    done, pending = await asyncio.wait(tasks, timeout=2)
11    
12    for t in tasks:
13        print(f"{t} in done: {t in done}")
14        print(f"{t} in pending: {t in pending}")
15
16
17start = time.time()
18asyncio.run(main())
19print(f"time: {time.time() - start}")
<Task finished name='Task-2' coro=<sleep() done, defined at /usr/local/lib/python3.9/asyncio/tasks.py:636> result=None> in done: True
<Task finished name='Task-2' coro=<sleep() done, defined at /usr/local/lib/python3.9/asyncio/tasks.py:636> result=None> in pending: False
<Task pending name='Task-3' coro=<sleep() running at /usr/local/lib/python3.9/asyncio/tasks.py:654> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f0123edb2b0>()]>> in done: False
<Task pending name='Task-3' coro=<sleep() running at /usr/local/lib/python3.9/asyncio/tasks.py:654> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f0123edb2b0>()]>> in pending: True
time: 2.0029919147491455

return_when

asyncio.waitがいつ返すかを指定

FIRST_COMPLETED
  • いずれかのFutureが終了したかキャンセルされたときに返す

FIRST_EXCEPTION
  • いずれかのFutureが例外の送出で終了した場合に返す

  • 例外を送出したFutureがない場合は、 ALL_COMPLETED と等価になる

ALL_COMPLETED
  • すべてのFutureが終了したかキャンセルされたときに返す(デフォルト)

 1import asyncio
 2import time
 3from asyncio.tasks import FIRST_COMPLETED
 4
 5
 6async def main():
 7    tasks = [
 8        asyncio.create_task(asyncio.sleep(1)),
 9        asyncio.create_task(asyncio.sleep(3)),
10    ]
11    done, pending = await asyncio.wait(tasks, timeout=2, return_when=FIRST_COMPLETED)
12
13    for t in tasks:
14        print(f"{t} in done: {t in done}")
15        print(f"{t} in pending: {t in pending}")
16
17
18start = time.time()
19asyncio.run(main())
20print(f"time: {time.time() - start}")
<Task finished name='Task-2' coro=<sleep() done, defined at /usr/local/lib/python3.9/asyncio/tasks.py:636> result=None> in done: True
<Task finished name='Task-2' coro=<sleep() done, defined at /usr/local/lib/python3.9/asyncio/tasks.py:636> result=None> in pending: False
<Task pending name='Task-3' coro=<sleep() running at /usr/local/lib/python3.9/asyncio/tasks.py:654> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fe15a347400>()]>> in done: False
<Task pending name='Task-3' coro=<sleep() running at /usr/local/lib/python3.9/asyncio/tasks.py:654> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fe15a347400>()]>> in pending: True
time: 1.0022432804107666

練習問題

  1. asyncio.as_completed に複数のコルーチンを渡し、うち1つのコルーチンは例外を送出してください

  2. 1.の例外をtry-except処理してください

ブロッキング/ノンブロッキング

ブロッキング

  • 処理が終了するまで待つ

  • 待っている間はほかの処理を行えない

time.sleep はブロッキングするので、完了を待ってから次の処理に移る

 1import asyncio
 2import time
 3
 4
 5async def neru(n):
 6    time.sleep(n)
 7
 8
 9async def main():
10    await asyncio.create_task(neru(2))
11    await asyncio.create_task(neru(5))
12
13
14start = time.time()
15asyncio.run(main(), debug=True)
16print(f"time: {time.time() - start}")

実行結果: time: 7.009732246398926

ノンブロッキング

  • ブロッキングされない

  • ほかの処理ができる

asyncio.sleep はノンブロッキング処理なので、待っている間に次の処理に移れる

 1import asyncio
 2import time
 3
 4
 5async def main():
 6    await asyncio.create_task(asyncio.sleep(2))
 7    await asyncio.create_task(asyncio.sleep(5))
 8
 9
10start = time.time()
11asyncio.run(main())
12print(f"time: {time.time() - start}")

実行結果: time: 5.004534006118774

ノンブロッキング関数の実装

 1import asyncio
 2import time
 3
 4
 5async def neru(n):
 6    loop = asyncio.get_running_loop()
 7    loop.run_in_executor(None, time.sleep, n)
 8
 9
10async def main():
11    await asyncio.create_task(neru(3))
12    await asyncio.create_task(neru(5))
13
14
15start = time.time()
16asyncio.run(main())
17print(f"time: {time.time() - start}")

実行結果: time: 5.005532503128052

練習問題

次のコードを非同期(ノンブロッキング)に実装してください

 1import time
 2import urllib.request
 3
 4
 5def _get_status_code(url):
 6    with urllib.request.urlopen(url) as res:
 7        return res.status
 8
 9
10def get_status_code(cor_name, n):
11    url = f"https://httpbin.org/delay/{n}"
12    http_status = _get_status_code(url)
13    print(f"{cor_name}: {http_status}")
14
15
16def main():
17    get_status_code("cor1", 2)
18    get_status_code("cor2", 5)
19
20
21start = time.time()
22main()
23print(f"time: {time.time() - start}")

イベントループ

  • コルーチンを実行

    • 非同期関数( async def )は実行しても処理されない

    • コルーチンの実態はジェネレータ

    • イベントループはコルーチンをイテレーションする

イベントループの取得

asyncio.get_running_loop

現在のスレッドのイベントループを取得

asyncio.get_event_loop
  • 現在のイベントループを取得

  • asyncio.set_event_loop が呼ばれていない場合は新しいイベントループを作成

  • 取得したループはカレントループとなる

  • より高レベルの asyncio.run を検討できる

asyncio.set_event_loop

現在のスレッドのイベントループとして設定

asyncio.new_event_loop
  • 新しいイベントループオブジェクトを作成

  • asyncio.set_event_loop でカレントループとなる

イベントループの処理

loop.run_until_complete
  • Futureが完了するまで実行する

  • 引数にコルーチンが渡された場合、 asyncio.Task として実行するようにスケジュールされる

loop.run_forever

stop が呼び出されるまでイベントループを実行し続ける

loop.stop

イベントループを停止する

loop.is_running

イベントループが実行中の場合は True を返す

loop.is_closed

イベントループがクローズされた場合は True を返す

loop.close
  • イベントループをクローズする

  • 保留中のコールバックは破棄される

  • executorは終了を待たずに停止される

イベントループのファイナライズ

coroutine loop.shutdown_asyncgens
  • デフォルトのexecutorの aclose をスケジュールする

  • すべての非同期ジェネレータをファイナライズする

  • asyncio.run では自動で実行される

coroutine loop.shutdown_default_executor
  • デフォルトのexecutorのクローズをスケジュールする

  • ThreadPoolExecutor に参加するのを待つ

  • asyncio.run では自動で実行される

参考資料

AIOHTTP

特徴

  • 非同期HTTP通信するための Client / Server ツール

インストール

$ pip install aiohttp

Getting Started

Client

 1import aiohttp
 2import asyncio
 3
 4
 5async def main():
 6    # session 作成。非同期コンテキストマネージャ async with を使うと
 7    # 処理終了時に session はクローズされる
 8    async with aiohttp.ClientSession() as session:
 9
10        # URL へアクセス。.get()時に読み込むのはヘッダー情報のみ
11        async with session.get("http://python.org") as response:
12            print(response.status)
13            print(response.headers["content-type"])
14
15            # body をテキストで非同期に読み込む
16            html = await response.text()
17            print(html[:15])
18
19
20asyncio.run(main())
# windows の場合、
# asyncio.run() の前に asyncio.set_event_loop_policy() をセット
# する必要があるかもしれません
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main())

Server

Misc

AIOHTTP を使う際によく使うツール
1from yarl import URL 
2
3url = URL('https://connpass.com/')
4
5print(url / 'explore')
6print(url / 'search' % {'q': 'aiohttp'})

出力

https://connpass.com/explore
https://connpass.com/search?q=aiohttp

AIOHTTP Client デモ

Requests を使った場合

  • Requests を使って通常のHTTPリクエストでAPIへアクセスする

$ pip install requests
 1import time 
 2
 3import requests
 4import logging
 5logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s", datefmt="%X")
 6
 7
 8def human_requests():
 9    for num in range(1,151):
10        url = f"https://pokeapi.co/api/v2/pokemon/{num}"
11        resp = requests.get(url)
12        if resp.status_code == 200:
13            pokemon = resp.json()
14            logging.info(f"{pokemon['id']}: {pokemon['name']}")
15
16start = time.time()
17human_requests()
18end = time.time()
19logging.info(f"実行結果: time: {end-start}")

実行結果: time: 5.970675468444824

非同期にリクエストをする場合

  • await resp.json() でbody をjsonで要求。

  • bodyを待っている間に、後続のポケモンURL

  • jsonが返ってきたら、URLを投げている処理を一旦止めて、ポケモン ID と name を出力

 1import time
 2import asyncio
 3
 4import aiohttp
 5import logging
 6logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s", datefmt="%X")
 7
 8async def main():
 9    async with aiohttp.ClientSession() as session:
10        for num in range(1, 151):
11            url = f"https://pokeapi.co/api/v2/pokemon/{num}"
12            async with session.get(url) as resp:
13                pokemon = await resp.json()
14                logging.info(f"{pokemon['id']}: {pokemon['name']}")
15
16
17start = time.time()
18asyncio.run(main())
19end = time.time()
20logging.info(f"実行結果: time: {end-start}")

実行結果: time: 2.4725234508514404

タスクリストを先に作って非同期にリクエストする場合

  • まずは非同期リクエスト用のタスクリストを作成し、 asyncio.create_task へ渡す。

  • このタスクリストを asyncio.gather へ渡して全てのタスクを並行に実行する。この実行を await して全部完了するまで待つ。返り値の順序は、create_task で作った順序と同じ。

 1import time
 2import asyncio
 3import aiohttp
 4
 5import logging
 6
 7logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s", datefmt="%X")
 8
 9
10async def get_pokemon(s, url):
11    async with s.get(url) as resp:
12        pokemon = await resp.json()
13        return (pokemon["id"], pokemon["name"])
14
15
16async def main():
17    async with aiohttp.ClientSession() as session:
18        tasks = list()
19        for num in range(1, 151):
20            url = f"https://pokeapi.co/api/v2/pokemon/{num}"
21            tasks.append(asyncio.create_task(get_pokemon(session, url)))
22
23        pokemons = await asyncio.gather(*tasks)
24        for id, pokemon in pokemons:
25            logging.info(f"{id}: {pokemon}")
26
27
28start = time.time()
29asyncio.run(main())
30logging.info("end")
31end = time.time()
32logging.info(f"実行結果: time: {end-start}")

実行結果: time: 0.4347381591796875

  • 返り値の順番は気にせず、通信が終わった順に取得する場合

 1import time
 2import asyncio
 3import aiohttp
 4
 5import logging
 6
 7logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s", datefmt="%X")
 8
 9
10async def get_pokemon(s, url):
11    async with s.get(url) as resp:
12        pokemon = await resp.json()
13        return (pokemon["id"], pokemon["name"])
14
15
16async def main():
17    async with aiohttp.ClientSession() as session:
18        tasks = list()
19        for num in range(1, 151):
20            url = f"https://pokeapi.co/api/v2/pokemon/{num}"
21            tasks.append(asyncio.create_task(get_pokemon(session, url)))
22
23        for pokemon in asyncio.as_completed(tasks):
24            results = await pokemon
25            print(results)
26
27
28start = time.time()
29asyncio.run(main())
30logging.info("end")
31end = time.time()
32logging.info(f"実行結果: time: {end-start}")

AIOHTTP-Client Quickstart

async with

 1import asyncio
 2import aiohttp
 3
 4
 5async def main():
 6    async with aiohttp.ClientSession() as session:
 7        async with session.get("http://httpbin.org/get") as resp:
 8            print(resp.status)
 9            print(await resp.json())
10
11asyncio.run(main())
  • 非同期のコンテキストマネージャ

  • ClientSessionを使った処理が終わったら session を close してくれる。

  • もし session context manager を使わない場合は以下のように .close() メソッドを呼び出し必ずクローズする。

 1import asyncio
 2import aiohttp
 3
 4
 5async def main():
 6    session = aiohttp.ClientSession()
 7    resp = await session.get("http://httpbin.org/get")
 8    print(await resp.json())
 9    await session.close()
10
11asyncio.run(main())

with文のネスト

async with (aiohttp.ClientSession() as session,
            session.get("http://httpbin.org/get") as resp):
            :
            :

async for

  • 非同期イテレータ/ジェネレータ用の for

  • 下記2つは同じ意味

async for a in async_iterable:
   await do_a_thing(a)
it = async_iterable.__aiter__()
while True:
   try:
      a = await it.__anext__()
   except StopAsyncIteration:
      break

   await do_a_thing(a)

ClientSession

  • aiohttp.ClientSession

  • クライアントセッションクラス。

  • インスタンス化して(以下 session で表現)、そのオブジェクトメソッドを使ってリクエストを行う

session.request()

  • aiohttp.ClientSession.request

  • 非同期のHTTPリクエストを実行

  • 引数(必須)
    • method (str) – HTTP method

    • url – URL。文字列もしくは yarl URL オブジェクト

  • オプション引数:ドキュメント参照

  • 返り値は aiohttp.ClientResponse インスタンスオブジェクト

session.get()

  • aiohttp.ClientSession.get

  • GET リクエスト

  • session.request() の第一引数が GET に固定されているメソッド

  • 引数(必須)
    • url – URL。文字列もしくは yarl URL オブジェクト

  • オプション引数:ドキュメント参照

  • 返り値は aiohttp.ClientResponse インスタンスオブジェクト

URLリクエストにパラメータ を渡す
 1import asyncio
 2import aiohttp
 3
 4
 5async def main():
 6    async with aiohttp.ClientSession() as session:
 7        params = {"limit": "10", "offset": "20"}
 8        async with session.get(
 9            "https://pokeapi.co/api/v2/pokemon", params=params
10        ) as resp:
11            print(resp.status)
12            print(await resp.json())
13
14
15asyncio.run(main())
16
  • params オプションで渡す

  • 同じキーに対して2つ以上の値を渡したい場合は、MultiDict もしくは、タプルのリストで渡す

    • MultiDict({'a': [1, 3]})

    • MultiDict([('a', 1), ('a', 3)])

    • ([('a', 1), ('a', 3)])

その他HTTPメソッド

ClientResponse

  • aiohttp.ClientResponse

  • session.request() とそのファミリーが返すクラス

  • API call だけがインスタンス化する(以下 resp と表現する)

  • ユーザがこのクラスをインスタンス化することは一切ない

  • コンテキストマネージャ async with での処理が完了すると release される

  • 主なインスタンスメソッド
    • resp.read() : レスポンス body を byte で読み込む。

    • resp.text(encoding=None): レスポンス body を文字列で読み込む。エンコーディング指定可

    • resp.json(encoding=None) : レスポンス body を JSON で読み込み、辞書型オブジェクトで返す。エンコーディング指定可

バイナリデータの読み込み

  • バイナリデータは .read() で取得可

 1import asyncio
 2import aiohttp
 3
 4
 5async def main():
 6    async with aiohttp.ClientSession() as session:
 7        async with session.get("https://pokeapi.co/api/v2/pokemon/25") as resp:
 8            html = await resp.json()
 9            async with session.get(html["sprites"]["front_default"]) as png:
10                with open("/tmp/pikachu.png", "wb") as f:
11                    f.write(await png.read())
12
13
14asyncio.run(main())

streaming response

  • read() json() text() は メモリにロードするので、巨大なサイズのファイルの読み込みには aiohttp.StreamReader のインスタンスの .content アトリビュートの利用を検討したほうがよい

  • よく使われる方法としては、chunk size を指定してファイル等に書き込むなどする

 1import asyncio
 2import aiohttp
 3
 4
 5async def main():
 6    async with aiohttp.ClientSession() as session:
 7        async with session.get("https://api.github.com/events") as resp:
 8            with open("/tmp/bigfile.txt", "wb") as f:
 9                while True:
10                    chunk = await resp.content.read(100)  # 100b, -1 ですべて
11                    # print(len(chunk))
12                    if not chunk:
13                        print("No chunk")
14                        break
15                    f.write(chunk)
16
17
18asyncio.run(main())

websockets

 1import asyncio
 2import aiohttp
 3
 4import logging
 5
 6logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s", datefmt="%X")
 7
 8
 9async def main():
10    async with aiohttp.ClientSession() as session:
11        async with session.ws_connect("wss://ftx.com/ws") as ws:
12            await ws.send_str('{"op":"ping"}')
13            # await ws.send_json({"op":"ping"})
14            msg = await ws.receive()
15            if msg.type == aiohttp.WSMsgType.TEXT:
16                logging.info(msg.json())
17
18asyncio.run(main())

AIOHTTP-Client Exercise

目的

  • Clientを使った簡単な練習問題に取組む

  • インターネット経由で取得できるデータを、並行に取得して処理する簡単なPythonを書けるようになる

課題1

 1import asyncio
 2import aiohttp
 3
 4import logging
 5
 6logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s", datefmt="%X")
 7
 8
 9async def main():
10    async with aiohttp.ClientSession() as session:
11        async with session.ws_connect("wss://ftx.com/ws") as ws:
12            # サブスクライブ
13            await ws.send_str(
14                '{"op": "subscribe", "channel": "trades", "market": "BTC-PERP"}'
15            )
16            async for msg in ws:
17                if msg.type == aiohttp.WSMsgType.TEXT:
18                    logging.info(msg.data)
19                else:
20                    break
21
22
23asyncio.run(main())
  1. サブスクライブ時に文字列を渡していますが、json を渡すように書き換えてください。

  2. websockets を参考にして async for ではなく ws.recieve() して、非同期イテレータを使わないどうなるのか実験してください。

  3. 以下は、暗号資産APIに関する課題なので興味がある方はやってみて下さい。

    1. "BTC-PERP" 以外のマーケットをサブスクライブしてデータを取得して下さい。マーケットリストは FTX Markets を参照してください。

    2. orderbook チャンネルをサブスクライブしてください。参照: Websocket API

    3. Bybit の API を使って 適当なリアルタイムデータを取得して下さい。参照: WebSocket Data – Bybit API Docs

課題2

  • 今日のハンズオン内容をベースにした問題

  1. バイナリデータの読み込み のポケモン画像取得を非同期で150匹取得してローカルに保存して下さい

  2. https://connpass.com/about/api/ を使って、キーワードが "aiohttp" のイベントを探し、event_id のみリストで取得して下さい。ただし、connpass api は .json() で取得出来ないので、 .text() で取得して下さい。

Indices and tables