Потоки¶
Потоки — это высокоуровневые примитивы с поддержкой async/await для работы с сетевыми подключениями. Потоки позволяют отправлять и получать данные без использования обратных вызовов или низкоуровневых протоколов и транспортов.
Далее пример TCP эхо-клиента, написанного с использованием потоков asyncio:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
См. также раздел Примеры ниже.
Потоковые функции
Следующие асинхронные функции верхнего уровня могут использоваться для создания потоков и работы с ними:
-
coroutine
asyncio.
open_connection
(host=None, port=None, *, loop=None, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None)¶ Устанавливает сетевое соединение и возвращает пару объектов
(reader, writer)
.Возвращённые объекты reader и writer являются экземплярами классов
StreamReader
иStreamWriter
.Аргумент loop является необязательным и всегда может быть определён автоматически, когда эта функция ожидается от корутины.
limit определяет предел размера буфера, используемого возвращённым экземпляром
StreamReader
. По умолчанию для limit установлено значение 64 КиБ.Остальные аргументы передаются непосредственно в
loop.create_connection()
.Добавлено в версии 3.7: Параметр ssl_handshake_timeout.
-
coroutine
asyncio.
start_server
(client_connected_cb, host=None, port=None, *, loop=None, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)¶ Запустить сокетный сервер.
Обратный вызов client_connected_cb вызывается всякий раз, когда устанавливается новое клиентское соединение. Он получает пару
(reader, writer)
в качестве двух аргументов, экземпляров классовStreamReader
иStreamWriter
.client_connected_cb может быть просто вызываемым или функцией корутиной; если это функция корутины, она будет автоматически назначена как
Task
.Аргумент loop является необязательным и всегда может быть определён автоматически, когда данный метод ожидается от корутины.
limit определяет предел размера буфера, используемый возвращённым экземпляром
StreamReader
. По умолчанию для limit установлено значение 64 КиБ.Остальные аргументы передаются непосредственно в
loop.create_server()
.Добавлено в версии 3.7: Параметры ssl_handshake_timeout и start_serving.
Сокеты Unix
-
coroutine
asyncio.
open_unix_connection
(path=None, *, loop=None, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)¶ Установить соединение с Unix сокетом и вернуть пару
(reader, writer)
.Аналогичен
open_connection()
, но работает с Unix сокетами.См. также документацию
loop.create_unix_connection()
.Доступность: Unix.
Добавлено в версии 3.7: Параметр ssl_handshake_timeout.
Изменено в версии 3.7: Параметр path теперь может быть путеподобным объектом
-
coroutine
asyncio.
start_unix_server
(client_connected_cb, path=None, *, loop=None, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)¶ Запуск сокетного сервера Unix.
Аналогичен
start_server()
, но работает с сокетами Unix.См. также документацию
loop.create_unix_server()
.Доступность: Unix.
Добавлено в версии 3.7: Параметры ssl_handshake_timeout и start_serving.
Изменено в версии 3.7: Параметр path теперь может быть путеподобным объектом.
StreamReader¶
-
class
asyncio.
StreamReader
¶ Представляет объект чтения, предоставляющий API для чтения данных из потока ввода-вывода.
Не рекомендуется создавать экземпляры объектов StreamReader напрямую; использовать вместо них
open_connection()
иstart_server()
.-
coroutine
read
(n=-1)¶ Чтение до n байт. Если n не указан или установлено в
-1
, прочитать до EOF и вернуть все прочитанные байты.Если был получен EOF, а внутренний буфер пуст, возвращает пустой объект
bytes
.
-
coroutine
readline
()¶ Прочитать одну строку, где «строка» — это последовательность байтов, оканчивающаяся на
\n
.Если получен EOF, а
\n
не найден, метод возвращает частично прочитанные данные.Если получен EOF и внутренний буфер пуст, возвращает пустой объект
bytes
.
-
coroutine
readexactly
(n)¶ Прочитать точно n байт.
Вызывается
IncompleteReadError
, если EOF достигается до того, как n может быть прочитан. Используйте атрибутIncompleteReadError.partial
, чтобы получить частично прочитанные данные.
-
coroutine
readuntil
(separator=b'\n')¶ Прочитать данные из потока, пока не будет найден separator.
В случае успеха данные и separator будут удалены из внутреннего буфера (израсходованы). Возвращаемые данные будут содержать разделитель в конце.
Если объем считываемых данных превышает настроенный предел потока, возникает исключение
LimitOverrunError
, и данные остаются во внутреннем буфере и могут быть прочитаны снова.Если EOF достигается до того, как будет найден полный separator, возникает исключение
IncompleteReadError
и внутренний буфер сбрасывается. АтрибутIncompleteReadError.partial
может содержать часть separator’а.Добавлено в версии 3.5.2.
-
at_eof
()¶ Возвращает
True
, если буфер пуст и был вызванfeed_eof()
.
-
coroutine
StreamWriter¶
-
class
asyncio.
StreamWriter
¶ Представляет объект записи, который предоставляет API для записи данных в поток ввода-вывода.
Не рекомендуется создавать экземпляры объектов StreamWriter напрямую; использовать вместо них
open_connection()
иstart_server()
.-
write
(data)¶ Метод пытается немедленно записать data в базовый сокет. Если это не удаётся, данные помещаются в очередь во внутреннем буфере записи до тех пор, пока не будут отправлены.
Этот метод следует использовать вместе с методом
drain()
:stream.write(data) await stream.drain()
-
writelines
(data)¶ Метод немедленно записывает список (или любой итерируемый) байтов в базовый сокет. Если это не удаётся, данные помещаются в очередь во внутреннем буфере записи до тех пор, пока не будут отправлены.
Даный метод следует использовать вместе с методом
drain()
:stream.writelines(lines) await stream.drain()
-
close
()¶ Метод закрывает поток и базовый сокет.
Данный метод следует использовать вместе с методом
wait_closed()
:stream.close() await stream.wait_closed()
-
can_write_eof
()¶ Возвращает
True
, если базовый транспорт поддерживает методwrite_eof()
, в противном случае —False
.
-
write_eof
()¶ Закрыть конец записи потока после того, как буферизованные данные записи будут сброшены.
-
transport
¶ Возвращает базовый асинхронный транспорт.
-
get_extra_info
(name, default=None)¶ Доступ к дополнительной информации о транспорте; подробности см. в
BaseTransport.get_extra_info()
.
-
coroutine
drain
()¶ Подождать, пока можно будет возобновить запись в поток. Пример:
writer.write(data) await writer.drain()
Данный метод управления потоком, который взаимодействует с нижележащим буфером записи ввода-вывода. Когда размер буфера достигает верхнего водяного знака, drain() блокируется до тех пор, пока размер буфера не опустится до нижнего водяного знака и запись не будет возобновлена. Когда ждать нечего,
drain()
немедленно возвращается.
-
is_closing
()¶ Возвращает
True
, если поток закрыт или находится в процессе закрытия.Добавлено в версии 3.7.
-
Примеры¶
TCP эхо-клиент, использующий потоки¶
TCP эхо-клиент, использующий функцию asyncio.open_connection()
:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
asyncio.run(tcp_echo_client('Hello World!'))
См.также
В примере протокола TCP эхо-клиента используется низкоуровневый
метод loop.create_connection()
.
TCP эхо-сервер, использующий потоки¶
TCP эхо-сервер с использованием функции asyncio.start_server()
:
import asyncio
async def handle_echo(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}")
print(f"Send: {message!r}")
writer.write(data)
await writer.drain()
print("Close the connection")
writer.close()
async def main():
server = await asyncio.start_server(
handle_echo, '127.0.0.1', 8888)
addr = server.sockets[0].getsockname()
print(f'Serving on {addr}')
async with server:
await server.serve_forever()
asyncio.run(main())
См.также
В примере Протокол TCP эхо-сервера используется метод
loop.create_server()
.
Получить HTTP заголовки¶
Простой пример запроса HTTP-заголовков URL-адреса, переданного в командной строке:
import asyncio
import urllib.parse
import sys
async def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
reader, writer = await asyncio.open_connection(
url.hostname, 443, ssl=True)
else:
reader, writer = await asyncio.open_connection(
url.hostname, 80)
query = (
f"HEAD {url.path or '/'} HTTP/1.0\r\n"
f"Host: {url.hostname}\r\n"
f"\r\n"
)
writer.write(query.encode('latin-1'))
while True:
line = await reader.readline()
if not line:
break
line = line.decode('latin1').rstrip()
if line:
print(f'HTTP header> {line}')
# Не обращать внимания на Body, закрыть сокет
writer.close()
url = sys.argv[1]
asyncio.run(print_http_headers(url))
Использование:
python example.py http://example.com/path/page.html
или с HTTPS:
python example.py https://example.com/path/page.html
Зарегистрировать открытый сокет для ожидания данных с использованием потоков¶
Корутина ждёт, пока сокет не получит данные, используя функцию
open_connection()
:
import asyncio
import socket
async def wait_for_data():
# Получить ссылку на текущий событийный цикл, потому что
# мы хотим получить доступ к низкоуровневым API.
loop = asyncio.get_running_loop()
# Создаём пару соединённых сокетов.
rsock, wsock = socket.socketpair()
# Регистрируем открытый сокет, чтобы дождаться данных.
reader, writer = await asyncio.open_connection(sock=rsock)
# Имитация приёма данных из сети
loop.call_soon(wsock.send, 'abc'.encode())
# Ждать данные
data = await reader.read(100)
# Получить данные, если все готово: закрыть сокет
print("Received:", data.decode())
writer.close()
# Закрыть второй сокет
wsock.close()
asyncio.run(wait_for_data())
См.также
В примере Зарегистрировать открытый сокет для ожидания
данных с помощью протокола используется низкоуровневый протокол и
метод loop.create_connection()
.
В примере наблюдения за файловым дескриптором на предмет событий чтения для отслеживания файлового дескриптора
используется низкоуровневый метод loop.add_reader()
.