AIOHTTPハンズオン¶
ごはんでまなぶ非同期処理¶

「米を炊く」と「調理する」は同時にできる
「味噌汁をつくる」が終わってから「おかずをつくる」
「米を炊く」と「調理する」が両方おわったら「いただきます」
コード例¶
1import asyncio
2import time
3import logging
4
5logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s", datefmt="%X")
6
7
8async def kome():
9 logging.info("米を炊くヨ")
10 await asyncio.sleep(10)
11 logging.info("米が炊けたヨ")
12
13
14async def chori():
15 logging.info("調理するヨ")
16
17 logging.info("味噌汁つくるヨ")
18 await asyncio.sleep(3)
19 logging.info("味噌汁できたヨ")
20
21 logging.info("おかずつくるヨ")
22 await asyncio.sleep(6)
23 logging.info("おかずできたヨ")
24
25 logging.info("調理おわったよヨ")
26
27
28def taberu():
29 logging.info("いただきます")
30 time.sleep(3)
31 logging.info("ごちそうさまでした")
32
33
34async def itadakimasu():
35
36 logging.info("--- ご飯できるまで待つ ---")
37
38 kome_task = asyncio.create_task(kome())
39 chori_task = asyncio.create_task(chori())
40 await kome_task
41 await chori_task
42
43 logging.info("--- ご飯できたので食べる ---")
44
45 taberu()
46
47
48asyncio.run(itadakimasu())
出力例¶
14:47:14 --- ご飯できるまで待つ ---
14:47:14 米を炊くヨ
14:47:14 調理するヨ
14:47:14 味噌汁つくるヨ
14:47:17 味噌汁できたヨ
14:47:17 おかずつくるヨ
14:47:23 おかずできたヨ
14:47:23 調理おわったよヨ
14:47:24 米が炊けたヨ
14:47:24 --- ご飯できたので食べる ---
14:47:24 いただきます
14:47:27 ごちそうさまでした
asyncioとは¶
async/await 構文を使い並行処理のコードを書くためのライブラリ
非同期フレームワークの基盤
ネットワークとウェブサーバ
データベース接続ライブラリ
分散タスクキュー
IOバウンドや高レベルの構造化されたネットワークコードに適している
高レベルAPI¶
並行にPythonコルーチンを起動し、実行全体を管理する
ネットワークIOとIPCを執り行う
subprocessesを管理する
キューを使ってタスクを分散する
並列処理のコードを同期させる
低レベルAPI¶
非同期APIを提供するイベントループの作成と管理
ネットワーク通信
サブプロセスの実行
OSシグナル
Transportを使った効率的なprotocolを実装
コールバックを用いたライブラリとasync/await構文を使ったコードの橋渡し
コルーチン¶
サブルーチンのより一般的な形式
サブルーチンには決められた地点から入り、別の決められた地点から出る
多くの様々な地点から入る、出る、再開できる
async def 文で実装できる
実態はジェネレータ
Task¶
コルーチンを平行にスケジュール
asyncio.create_task
関数でコルーチンをTaskにラップできる
awaitableオブジェクト¶
await式の中で使えるオブジェクト
多くのasyncio APIはawaitableオブジェクトを受け取るように設計されている
3つの種類がある
コルーチン
Task
Future
コルーチン¶
- コルーチン関数
async def で定義された関数
- コルーチンオブジェクト
コルーチン関数 を呼び出すと返ってくるオブジェクト.
コルーチン関数定義¶
async def で関数を定義すると、コルーチンとなる
>>> async def func():
... print("呼んだ?")
...
>>> cor = func()
>>> print(type(cor))
<class 'coroutine'>
単にコルーチンを呼び出しただけでは実行されない
>>> func()
<coroutine object func at 0x7f774767d6c0>
コルーチンの実行¶
コルーチンを await
awaitした関数(
main
)を asyncio.run 関数から実行
>>> import asyncio
>>>
>>> async def main():
... await func()
...
>>> asyncio.run(main())
呼んだ?
練習問題¶
コルーチンを2つ実行してください。コルーチンの関数は渡された引数をprintします。
>>> asyncio.run(main("1こめ", "2こめ"))
1こめ
2こめ
Task¶
asyncio.create_task 関数によるTaskの作成
>>> import asyncio
>>>
>>> async def func():
... pass
...
>>> async def main():
... task = asyncio.create_task(func())
... print(type(task))
...
>>> asyncio.run(main())
<class '_asyncio.Task'>
Taskオブジェクトが生成される
単にコルーチンをawaitして実行¶
1import asyncio
2import time
3
4
5async def main():
6 await asyncio.sleep(2)
7 await asyncio.sleep(5)
8
9
10start = time.time()
11asyncio.run(main())
12print(f"time: {time.time() - start}")
実行結果: time: 7.0069193840026855
Taskを作成して実行¶
1import asyncio
2import time
3
4
5async def main():
6 task1 = asyncio.create_task(asyncio.sleep(2))
7 task2 = asyncio.create_task(asyncio.sleep(5))
8 await asyncio.gather(task1, task2)
9
10
11start = time.time()
12asyncio.run(main())
13print(f"time: {time.time() - start}")
実行結果: time: 5.004063844680786
並行なTask実行¶
awaitableオブジェクトを並行して実行
asyncio.gather に渡されたシーケンスは並行実行される
asyncio.gatherにコルーチンを渡した例¶
1import asyncio
2import time
3
4
5async def main():
6 await asyncio.gather(asyncio.sleep(2), asyncio.sleep(5))
7
8
9start = time.time()
10asyncio.run(main())
11print(f"time: {time.time() - start}")
実行結果: time: 5.007250785827637
練習問題¶
次のコードの get_status_code
関数を並行に実行する main
関数を作成してください
1import asyncio
2import time
3
4import aiohttp
5
6
7async def get_status_code(n):
8 url = f"https://httpbin.org/delay/{n}"
9
10 async with aiohttp.ClientSession() as session:
11 async with session.get(url) as response:
12
13 print("Status:", response.status)
14
15
16start = time.time()
17asyncio.run(get_status_code(3))
18print(f"time: {time.time() - start}")
Wait¶
awaitableオブジェクト(Taskやコルーチン)はさまざまな条件で待機できる
タイムアウトを設定できる
例外に応じて処理を制御できる
- タイムアウト
- 要素の終了待機
Future¶
非同期処理の最終結果を表現する特別な低レベルのawaitableオブジェクト
pending
finished
cancelled
コルーチンはFutureの結果が返されるか例外が送出されるまでawaitされる
asyncio.wait_for¶
awaitableオブジェクトが完了するかタイムアウトになるのを待つ
awaitableオブジェクトがコルーチンだった場合はTaskとしてスケジュールされる
タイムアウトの場合はTaskをキャンセルし、 asyncio.TimeoutError を送出する
1import asyncio
2import time
3
4
5async def main():
6 await asyncio.wait_for(asyncio.sleep(3), timeout=1)
7
8
9start = time.time()
10asyncio.run(main())
11print(f"time: {time.time() - start}")
asyncio.TimeoutError
になる
asyncio.as_completed¶
awaitableオブジェクトを同時に実行する
完了した順に、イテレータを返す
イテレーションが完了する前にタイムアウトした場合は
asyncio.TimeoutError
を送出する
1import asyncio
2import time
3
4
5async def neru(n):
6 await asyncio.sleep(n)
7 return n
8
9
10async def main():
11 for f in asyncio.as_completed([neru(2), neru(4), neru(1)], timeout=3):
12 result = await f
13 print(f"{result=}")
14
15
16start = time.time()
17asyncio.run(main())
18print(f"time: {time.time() - start}")
result=1
result=2
Traceback (most recent call last):
File "/home/driller/repo/events/20210912/docs/source/code/as_completed.py", line 17, in <module>
asyncio.run(main())
File "/usr/local/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
return future.result()
File "/home/driller/repo/events/20210912/docs/source/code/as_completed.py", line 12, in main
result = await f
File "/usr/local/lib/python3.9/asyncio/tasks.py", line 613, in _wait_for_one
raise exceptions.TimeoutError
asyncio.exceptions.TimeoutError
asyncio.wait¶
awaitableオブジェクトを同時に実行する
完了したタスクと保留中のタスクのset(集合)を返す
return_when
でいつ結果を返すかを指定する
1import asyncio
2import time
3
4
5async def main():
6 tasks = [
7 asyncio.create_task(asyncio.sleep(1)),
8 asyncio.create_task(asyncio.sleep(3)),
9 ]
10 done, pending = await asyncio.wait(tasks, timeout=2)
11
12 for t in tasks:
13 print(f"{t} in done: {t in done}")
14 print(f"{t} in pending: {t in pending}")
15
16
17start = time.time()
18asyncio.run(main())
19print(f"time: {time.time() - start}")
<Task finished name='Task-2' coro=<sleep() done, defined at /usr/local/lib/python3.9/asyncio/tasks.py:636> result=None> in done: True
<Task finished name='Task-2' coro=<sleep() done, defined at /usr/local/lib/python3.9/asyncio/tasks.py:636> result=None> in pending: False
<Task pending name='Task-3' coro=<sleep() running at /usr/local/lib/python3.9/asyncio/tasks.py:654> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f0123edb2b0>()]>> in done: False
<Task pending name='Task-3' coro=<sleep() running at /usr/local/lib/python3.9/asyncio/tasks.py:654> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f0123edb2b0>()]>> in pending: True
time: 2.0029919147491455
return_when¶
asyncio.waitがいつ返すかを指定
- FIRST_COMPLETED
いずれかのFutureが終了したかキャンセルされたときに返す
- FIRST_EXCEPTION
いずれかのFutureが例外の送出で終了した場合に返す
例外を送出したFutureがない場合は、
ALL_COMPLETED
と等価になる
- ALL_COMPLETED
すべてのFutureが終了したかキャンセルされたときに返す(デフォルト)
1import asyncio
2import time
3from asyncio.tasks import FIRST_COMPLETED
4
5
6async def main():
7 tasks = [
8 asyncio.create_task(asyncio.sleep(1)),
9 asyncio.create_task(asyncio.sleep(3)),
10 ]
11 done, pending = await asyncio.wait(tasks, timeout=2, return_when=FIRST_COMPLETED)
12
13 for t in tasks:
14 print(f"{t} in done: {t in done}")
15 print(f"{t} in pending: {t in pending}")
16
17
18start = time.time()
19asyncio.run(main())
20print(f"time: {time.time() - start}")
<Task finished name='Task-2' coro=<sleep() done, defined at /usr/local/lib/python3.9/asyncio/tasks.py:636> result=None> in done: True
<Task finished name='Task-2' coro=<sleep() done, defined at /usr/local/lib/python3.9/asyncio/tasks.py:636> result=None> in pending: False
<Task pending name='Task-3' coro=<sleep() running at /usr/local/lib/python3.9/asyncio/tasks.py:654> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fe15a347400>()]>> in done: False
<Task pending name='Task-3' coro=<sleep() running at /usr/local/lib/python3.9/asyncio/tasks.py:654> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fe15a347400>()]>> in pending: True
time: 1.0022432804107666
練習問題¶
asyncio.as_completed
に複数のコルーチンを渡し、うち1つのコルーチンは例外を送出してください1.の例外をtry-except処理してください
ブロッキング/ノンブロッキング¶
ブロッキング¶
処理が終了するまで待つ
待っている間はほかの処理を行えない
time.sleep はブロッキングするので、完了を待ってから次の処理に移る
1import asyncio
2import time
3
4
5async def neru(n):
6 time.sleep(n)
7
8
9async def main():
10 await asyncio.create_task(neru(2))
11 await asyncio.create_task(neru(5))
12
13
14start = time.time()
15asyncio.run(main(), debug=True)
16print(f"time: {time.time() - start}")
実行結果: time: 7.009732246398926
ノンブロッキング¶
ブロッキングされない
ほかの処理ができる
asyncio.sleep はノンブロッキング処理なので、待っている間に次の処理に移れる
1import asyncio
2import time
3
4
5async def main():
6 await asyncio.create_task(asyncio.sleep(2))
7 await asyncio.create_task(asyncio.sleep(5))
8
9
10start = time.time()
11asyncio.run(main())
12print(f"time: {time.time() - start}")
実行結果: time: 5.004534006118774
ノンブロッキング関数の実装¶
Executor は非同期呼び出しをするためのクラス
1import asyncio
2import time
3
4
5async def neru(n):
6 loop = asyncio.get_running_loop()
7 loop.run_in_executor(None, time.sleep, n)
8
9
10async def main():
11 await asyncio.create_task(neru(3))
12 await asyncio.create_task(neru(5))
13
14
15start = time.time()
16asyncio.run(main())
17print(f"time: {time.time() - start}")
実行結果: time: 5.005532503128052
練習問題¶
次のコードを非同期(ノンブロッキング)に実装してください
1import time
2import urllib.request
3
4
5def _get_status_code(url):
6 with urllib.request.urlopen(url) as res:
7 return res.status
8
9
10def get_status_code(cor_name, n):
11 url = f"https://httpbin.org/delay/{n}"
12 http_status = _get_status_code(url)
13 print(f"{cor_name}: {http_status}")
14
15
16def main():
17 get_status_code("cor1", 2)
18 get_status_code("cor2", 5)
19
20
21start = time.time()
22main()
23print(f"time: {time.time() - start}")
イベントループ¶
コルーチンを実行
非同期関数(
async def
)は実行しても処理されないコルーチンの実態はジェネレータ
イベントループはコルーチンをイテレーションする
イベントループの取得¶
- asyncio.get_running_loop
現在のスレッドのイベントループを取得
- asyncio.get_event_loop
現在のイベントループを取得
asyncio.set_event_loop
が呼ばれていない場合は新しいイベントループを作成取得したループはカレントループとなる
より高レベルの asyncio.run を検討できる
- asyncio.set_event_loop
現在のスレッドのイベントループとして設定
- asyncio.new_event_loop
新しいイベントループオブジェクトを作成
asyncio.set_event_loop
でカレントループとなる
イベントループの処理¶
- loop.run_until_complete
Futureが完了するまで実行する
引数にコルーチンが渡された場合、
asyncio.Task
として実行するようにスケジュールされる
- loop.run_forever
stop
が呼び出されるまでイベントループを実行し続ける- loop.stop
イベントループを停止する
- loop.is_running
イベントループが実行中の場合は
True
を返す- loop.is_closed
イベントループがクローズされた場合は
True
を返す- loop.close
イベントループをクローズする
保留中のコールバックは破棄される
executorは終了を待たずに停止される
イベントループのファイナライズ¶
- coroutine loop.shutdown_asyncgens
デフォルトのexecutorの
aclose
をスケジュールするすべての非同期ジェネレータをファイナライズする
asyncio.run
では自動で実行される
- coroutine loop.shutdown_default_executor
デフォルトのexecutorのクローズをスケジュールする
ThreadPoolExecutor
に参加するのを待つasyncio.run
では自動で実行される
参考資料¶
AIOHTTP¶
特徴¶
非同期HTTP通信するための Client / Server ツール
インストール¶
$ pip install aiohttp
Getting Started¶
Client¶
1import aiohttp
2import asyncio
3
4
5async def main():
6 # session 作成。非同期コンテキストマネージャ async with を使うと
7 # 処理終了時に session はクローズされる
8 async with aiohttp.ClientSession() as session:
9
10 # URL へアクセス。.get()時に読み込むのはヘッダー情報のみ
11 async with session.get("http://python.org") as response:
12 print(response.status)
13 print(response.headers["content-type"])
14
15 # body をテキストで非同期に読み込む
16 html = await response.text()
17 print(html[:15])
18
19
20asyncio.run(main())
# windows の場合、
# asyncio.run() の前に asyncio.set_event_loop_policy() をセット
# する必要があるかもしれません
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main())
Server¶
Misc¶
AIOHTTP を使う際によく使うツール¶
multidict : URLにパラメタを渡す時に使う。参照: URLリクエストにパラメータ を渡す
yarl :
ClientSession.get()
などの HTTPメソッドは、文字列URLもしくは yarl.URL インスタンスを引き取る
1from yarl import URL
2
3url = URL('https://connpass.com/')
4
5print(url / 'explore')
6print(url / 'search' % {'q': 'aiohttp'})
出力
https://connpass.com/explore
https://connpass.com/search?q=aiohttp
AIOHTTP Client デモ¶
- PokéAPI を使ってポケモンを150匹Getする方法を3つ紹介し処理の速さを比較する
- PokéAPIについて
endpoint: https://pokeapi.co/docs/v2#pokemon
- API例:
https://pokeapi.co/api/v2/pokemon/25 : id 25 の ピカチュウ情報URL
Requests を使った場合¶
Requests を使って通常のHTTPリクエストでAPIへアクセスする
$ pip install requests
1import time
2
3import requests
4import logging
5logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s", datefmt="%X")
6
7
8def human_requests():
9 for num in range(1,151):
10 url = f"https://pokeapi.co/api/v2/pokemon/{num}"
11 resp = requests.get(url)
12 if resp.status_code == 200:
13 pokemon = resp.json()
14 logging.info(f"{pokemon['id']}: {pokemon['name']}")
15
16start = time.time()
17human_requests()
18end = time.time()
19logging.info(f"実行結果: time: {end-start}")
実行結果: time: 5.970675468444824
非同期にリクエストをする場合¶
await resp.json()
でbody をjsonで要求。bodyを待っている間に、後続のポケモンURL
jsonが返ってきたら、URLを投げている処理を一旦止めて、ポケモン ID と name を出力
1import time
2import asyncio
3
4import aiohttp
5import logging
6logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s", datefmt="%X")
7
8async def main():
9 async with aiohttp.ClientSession() as session:
10 for num in range(1, 151):
11 url = f"https://pokeapi.co/api/v2/pokemon/{num}"
12 async with session.get(url) as resp:
13 pokemon = await resp.json()
14 logging.info(f"{pokemon['id']}: {pokemon['name']}")
15
16
17start = time.time()
18asyncio.run(main())
19end = time.time()
20logging.info(f"実行結果: time: {end-start}")
実行結果: time: 2.4725234508514404
タスクリストを先に作って非同期にリクエストする場合¶
まずは非同期リクエスト用のタスクリストを作成し、 asyncio.create_task へ渡す。
このタスクリストを asyncio.gather へ渡して全てのタスクを並行に実行する。この実行を await して全部完了するまで待つ。返り値の順序は、
create_task
で作った順序と同じ。
1import time
2import asyncio
3import aiohttp
4
5import logging
6
7logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s", datefmt="%X")
8
9
10async def get_pokemon(s, url):
11 async with s.get(url) as resp:
12 pokemon = await resp.json()
13 return (pokemon["id"], pokemon["name"])
14
15
16async def main():
17 async with aiohttp.ClientSession() as session:
18 tasks = list()
19 for num in range(1, 151):
20 url = f"https://pokeapi.co/api/v2/pokemon/{num}"
21 tasks.append(asyncio.create_task(get_pokemon(session, url)))
22
23 pokemons = await asyncio.gather(*tasks)
24 for id, pokemon in pokemons:
25 logging.info(f"{id}: {pokemon}")
26
27
28start = time.time()
29asyncio.run(main())
30logging.info("end")
31end = time.time()
32logging.info(f"実行結果: time: {end-start}")
実行結果: time: 0.4347381591796875
返り値の順番は気にせず、通信が終わった順に取得する場合
1import time
2import asyncio
3import aiohttp
4
5import logging
6
7logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s", datefmt="%X")
8
9
10async def get_pokemon(s, url):
11 async with s.get(url) as resp:
12 pokemon = await resp.json()
13 return (pokemon["id"], pokemon["name"])
14
15
16async def main():
17 async with aiohttp.ClientSession() as session:
18 tasks = list()
19 for num in range(1, 151):
20 url = f"https://pokeapi.co/api/v2/pokemon/{num}"
21 tasks.append(asyncio.create_task(get_pokemon(session, url)))
22
23 for pokemon in asyncio.as_completed(tasks):
24 results = await pokemon
25 print(results)
26
27
28start = time.time()
29asyncio.run(main())
30logging.info("end")
31end = time.time()
32logging.info(f"実行結果: time: {end-start}")
AIOHTTP-Client Quickstart¶
async with¶
1import asyncio
2import aiohttp
3
4
5async def main():
6 async with aiohttp.ClientSession() as session:
7 async with session.get("http://httpbin.org/get") as resp:
8 print(resp.status)
9 print(await resp.json())
10
11asyncio.run(main())
非同期のコンテキストマネージャ
ClientSessionを使った処理が終わったら session を close してくれる。
もし session context manager を使わない場合は以下のように
.close()
メソッドを呼び出し必ずクローズする。
1import asyncio
2import aiohttp
3
4
5async def main():
6 session = aiohttp.ClientSession()
7 resp = await session.get("http://httpbin.org/get")
8 print(await resp.json())
9 await session.close()
10
11asyncio.run(main())
with文のネスト¶
これを async with でもやってみたところ出来ました。
(実はpython3.9でも可)
async with (aiohttp.ClientSession() as session,
session.get("http://httpbin.org/get") as resp):
:
:
async for¶
非同期イテレータ/ジェネレータ用の
for
文下記2つは同じ意味
async for a in async_iterable:
await do_a_thing(a)
it = async_iterable.__aiter__()
while True:
try:
a = await it.__anext__()
except StopAsyncIteration:
break
await do_a_thing(a)
ClientSession¶
クライアントセッションクラス。
インスタンス化して(以下
session
で表現)、そのオブジェクトメソッドを使ってリクエストを行う
session.request()¶
非同期のHTTPリクエストを実行
- 引数(必須)
method (str)
– HTTP methodurl
– URL。文字列もしくは yarl URL オブジェクト
オプション引数:ドキュメント参照
返り値は aiohttp.ClientResponse インスタンスオブジェクト
session.get()¶
GET リクエスト
session.request()
の第一引数が GET に固定されているメソッド- 引数(必須)
url
– URL。文字列もしくは yarl URL オブジェクト
オプション引数:ドキュメント参照
返り値は aiohttp.ClientResponse インスタンスオブジェクト
URLリクエストにパラメータ を渡す¶
1import asyncio
2import aiohttp
3
4
5async def main():
6 async with aiohttp.ClientSession() as session:
7 params = {"limit": "10", "offset": "20"}
8 async with session.get(
9 "https://pokeapi.co/api/v2/pokemon", params=params
10 ) as resp:
11 print(resp.status)
12 print(await resp.json())
13
14
15asyncio.run(main())
16
params
オプションで渡す同じキーに対して2つ以上の値を渡したい場合は、MultiDict もしくは、タプルのリストで渡す
MultiDict({'a': [1, 3]})
MultiDict([('a', 1), ('a', 3)])
([('a', 1), ('a', 3)])
その他HTTPメソッド¶
返り値は aiohttp.ClientResponse インスタンスオブジェクト
ClientResponse¶
session.request()
とそのファミリーが返すクラスAPI call だけがインスタンス化する(以下
resp
と表現する)ユーザがこのクラスをインスタンス化することは一切ない
コンテキストマネージャ
async with
での処理が完了すると release される- 主なインスタンスメソッド
resp.read() : レスポンス body を byte で読み込む。
resp.text(encoding=None): レスポンス body を文字列で読み込む。エンコーディング指定可
resp.json(encoding=None) : レスポンス body を JSON で読み込み、辞書型オブジェクトで返す。エンコーディング指定可
バイナリデータの読み込み¶
バイナリデータは .read() で取得可
1import asyncio
2import aiohttp
3
4
5async def main():
6 async with aiohttp.ClientSession() as session:
7 async with session.get("https://pokeapi.co/api/v2/pokemon/25") as resp:
8 html = await resp.json()
9 async with session.get(html["sprites"]["front_default"]) as png:
10 with open("/tmp/pikachu.png", "wb") as f:
11 f.write(await png.read())
12
13
14asyncio.run(main())
streaming response¶
read()
json()
text()
は メモリにロードするので、巨大なサイズのファイルの読み込みには aiohttp.StreamReader のインスタンスの.content
アトリビュートの利用を検討したほうがよいよく使われる方法としては、chunk size を指定してファイル等に書き込むなどする
1import asyncio
2import aiohttp
3
4
5async def main():
6 async with aiohttp.ClientSession() as session:
7 async with session.get("https://api.github.com/events") as resp:
8 with open("/tmp/bigfile.txt", "wb") as f:
9 while True:
10 chunk = await resp.content.read(100) # 100b, -1 ですべて
11 # print(len(chunk))
12 if not chunk:
13 print("No chunk")
14 break
15 f.write(chunk)
16
17
18asyncio.run(main())
websockets¶
1import asyncio
2import aiohttp
3
4import logging
5
6logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s", datefmt="%X")
7
8
9async def main():
10 async with aiohttp.ClientSession() as session:
11 async with session.ws_connect("wss://ftx.com/ws") as ws:
12 await ws.send_str('{"op":"ping"}')
13 # await ws.send_json({"op":"ping"})
14 msg = await ws.receive()
15 if msg.type == aiohttp.WSMsgType.TEXT:
16 logging.info(msg.json())
17
18asyncio.run(main())
client session を確立後、aiohttp.ClientSession.ws_connect() メソッドでウェブソケットへ接続
URL を渡して初期化すると、ウェブソケットサーバーに接続状態になる。返り値は aiohttp.ClientWebSocketResponse 。(以下
ws
で表現)ws.send_str() メソッドで ping を投げて ws.receive() メソッドでレスポンスを待つ。
ws.send_json() メソッドで json を投げることも可
ws.receive() の返り値は aiohttp.WSMessage オブジェクト。その
type
属性が aiohttp.WSMsgType で、そのタイプによって処理を切り分ける
AIOHTTP-Client Exercise¶
目的¶
Clientを使った簡単な練習問題に取組む
インターネット経由で取得できるデータを、並行に取得して処理する簡単なPythonを書けるようになる
課題1¶
暗号資産取引所 ftx の APIを利用して、取引データをウェブソケット経由で取得する
1import asyncio
2import aiohttp
3
4import logging
5
6logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s", datefmt="%X")
7
8
9async def main():
10 async with aiohttp.ClientSession() as session:
11 async with session.ws_connect("wss://ftx.com/ws") as ws:
12 # サブスクライブ
13 await ws.send_str(
14 '{"op": "subscribe", "channel": "trades", "market": "BTC-PERP"}'
15 )
16 async for msg in ws:
17 if msg.type == aiohttp.WSMsgType.TEXT:
18 logging.info(msg.data)
19 else:
20 break
21
22
23asyncio.run(main())
サブスクライブ時に文字列を渡していますが、json を渡すように書き換えてください。
websockets を参考にして async for ではなく
ws.recieve()
して、非同期イテレータを使わないどうなるのか実験してください。以下は、暗号資産APIに関する課題なので興味がある方はやってみて下さい。
"BTC-PERP" 以外のマーケットをサブスクライブしてデータを取得して下さい。マーケットリストは FTX Markets を参照してください。
orderbook
チャンネルをサブスクライブしてください。参照: Websocket APIBybit の API を使って 適当なリアルタイムデータを取得して下さい。参照: WebSocket Data – Bybit API Docs
課題2¶
今日のハンズオン内容をベースにした問題
バイナリデータの読み込み のポケモン画像取得を非同期で150匹取得してローカルに保存して下さい
https://connpass.com/about/api/ を使って、キーワードが "aiohttp" のイベントを探し、event_id のみリストで取得して下さい。ただし、connpass api は
.json()
で取得出来ないので、.text()
で取得して下さい。