Потоки

Исходный код: Lib/asyncio/streams.py


Потоки — это высокоуровневые примитивы с поддержкой async/await для работы с сетевыми подключениями. Потоки позволяют отправлять и получать данные без использования обратных вызовов или низкоуровневых протоколов и транспортов.

Далее пример TCP эхо-клиента, написанного с использованием потоков asyncio:

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

См. также раздел Примеры ниже.

Потоковые функции

Следующие асинхронные функции верхнего уровня могут использоваться для создания потоков и работы с ними:

coroutine asyncio.open_connection(host=None, port=None, *, loop=None, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None)

Устанавливает сетевое соединение и возвращает пару объектов (reader, writer).

Возвращённые объекты reader и writer являются экземплярами классов StreamReader и StreamWriter.

Аргумент loop является необязательным и всегда может быть определён автоматически, когда эта функция ожидается от корутины.

limit определяет предел размера буфера, используемого возвращённым экземпляром StreamReader. По умолчанию для limit установлено значение 64 КиБ.

Остальные аргументы передаются непосредственно в loop.create_connection().

Добавлено в версии 3.7: Параметр ssl_handshake_timeout.

coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, loop=None, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)

Запустить сокетный сервер.

Обратный вызов client_connected_cb вызывается всякий раз, когда устанавливается новое клиентское соединение. Он получает пару (reader, writer) в качестве двух аргументов, экземпляров классов StreamReader и StreamWriter.

client_connected_cb может быть просто вызываемым или функцией корутиной; если это функция корутины, она будет автоматически назначена как Task.

Аргумент loop является необязательным и всегда может быть определён автоматически, когда данный метод ожидается от корутины.

limit определяет предел размера буфера, используемый возвращённым экземпляром StreamReader. По умолчанию для limit установлено значение 64 КиБ.

Остальные аргументы передаются непосредственно в loop.create_server().

Добавлено в версии 3.7: Параметры ssl_handshake_timeout и start_serving.

Сокеты Unix

coroutine asyncio.open_unix_connection(path=None, *, loop=None, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)

Установить соединение с Unix сокетом и вернуть пару (reader, writer).

Аналогичен open_connection(), но работает с Unix сокетами.

См. также документацию loop.create_unix_connection().

Доступность: Unix.

Добавлено в версии 3.7: Параметр ssl_handshake_timeout.

Изменено в версии 3.7: Параметр path теперь может быть путеподобным объектом

coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, loop=None, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)

Запуск сокетного сервера Unix.

Аналогичен start_server(), но работает с сокетами Unix.

См. также документацию loop.create_unix_server().

Доступность: Unix.

Добавлено в версии 3.7: Параметры ssl_handshake_timeout и start_serving.

Изменено в версии 3.7: Параметр path теперь может быть путеподобным объектом.

StreamReader

class asyncio.StreamReader

Представляет объект чтения, предоставляющий API для чтения данных из потока ввода-вывода.

Не рекомендуется создавать экземпляры объектов StreamReader напрямую; использовать вместо них open_connection() и start_server().

coroutine read(n=-1)

Чтение до n байт. Если n не указан или установлено в -1, прочитать до EOF и вернуть все прочитанные байты.

Если был получен EOF, а внутренний буфер пуст, возвращает пустой объект bytes.

coroutine readline()

Прочитать одну строку, где «строка» — это последовательность байтов, оканчивающаяся на \n.

Если получен EOF, а \n не найден, метод возвращает частично прочитанные данные.

Если получен EOF и внутренний буфер пуст, возвращает пустой объект bytes.

coroutine readexactly(n)

Прочитать точно n байт.

Вызывается IncompleteReadError, если EOF достигается до того, как n может быть прочитан. Используйте атрибут IncompleteReadError.partial, чтобы получить частично прочитанные данные.

coroutine readuntil(separator=b'\n')

Прочитать данные из потока, пока не будет найден separator.

В случае успеха данные и separator будут удалены из внутреннего буфера (израсходованы). Возвращаемые данные будут содержать разделитель в конце.

Если объем считываемых данных превышает настроенный предел потока, возникает исключение LimitOverrunError, и данные остаются во внутреннем буфере и могут быть прочитаны снова.

Если EOF достигается до того, как будет найден полный separator, возникает исключение IncompleteReadError и внутренний буфер сбрасывается. Атрибут IncompleteReadError.partial может содержать часть separator’а.

Добавлено в версии 3.5.2.

at_eof()

Возвращает True, если буфер пуст и был вызван feed_eof().

StreamWriter

class asyncio.StreamWriter

Представляет объект записи, который предоставляет API для записи данных в поток ввода-вывода.

Не рекомендуется создавать экземпляры объектов StreamWriter напрямую; использовать вместо них open_connection() и start_server().

write(data)

Метод пытается немедленно записать data в базовый сокет. Если это не удаётся, данные помещаются в очередь во внутреннем буфере записи до тех пор, пока не будут отправлены.

Этот метод следует использовать вместе с методом drain():

stream.write(data)
await stream.drain()
writelines(data)

Метод немедленно записывает список (или любой итерируемый) байтов в базовый сокет. Если это не удаётся, данные помещаются в очередь во внутреннем буфере записи до тех пор, пока не будут отправлены.

Даный метод следует использовать вместе с методом drain():

stream.writelines(lines)
await stream.drain()
close()

Метод закрывает поток и базовый сокет.

Данный метод следует использовать вместе с методом wait_closed():

stream.close()
await stream.wait_closed()
can_write_eof()

Возвращает True, если базовый транспорт поддерживает метод write_eof(), в противном случае — False.

write_eof()

Закрыть конец записи потока после того, как буферизованные данные записи будут сброшены.

transport

Возвращает базовый асинхронный транспорт.

get_extra_info(name, default=None)

Доступ к дополнительной информации о транспорте; подробности см. в BaseTransport.get_extra_info().

coroutine drain()

Подождать, пока можно будет возобновить запись в поток. Пример:

writer.write(data)
await writer.drain()

Данный метод управления потоком, который взаимодействует с нижележащим буфером записи ввода-вывода. Когда размер буфера достигает верхнего водяного знака, drain() блокируется до тех пор, пока размер буфера не опустится до нижнего водяного знака и запись не будет возобновлена. Когда ждать нечего, drain() немедленно возвращается.

is_closing()

Возвращает True, если поток закрыт или находится в процессе закрытия.

Добавлено в версии 3.7.

coroutine wait_closed()

Подождать, пока поток закроется.

Должен вызываться после close(), чтобы дождаться закрытия базового соединения.

Добавлено в версии 3.7.

Примеры

TCP эхо-клиент, использующий потоки

TCP эхо-клиент, использующий функцию asyncio.open_connection():

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()

asyncio.run(tcp_echo_client('Hello World!'))

См.также

В примере протокола TCP эхо-клиента используется низкоуровневый метод loop.create_connection().

TCP эхо-сервер, использующий потоки

TCP эхо-сервер с использованием функции asyncio.start_server():

import asyncio

async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')

    print(f"Received {message!r} from {addr!r}")

    print(f"Send: {message!r}")
    writer.write(data)
    await writer.drain()

    print("Close the connection")
    writer.close()

async def main():
    server = await asyncio.start_server(
        handle_echo, '127.0.0.1', 8888)

    addr = server.sockets[0].getsockname()
    print(f'Serving on {addr}')

    async with server:
        await server.serve_forever()

asyncio.run(main())

См.также

В примере Протокол TCP эхо-сервера используется метод loop.create_server().

Получить HTTP заголовки

Простой пример запроса HTTP-заголовков URL-адреса, переданного в командной строке:

import asyncio
import urllib.parse
import sys

async def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        reader, writer = await asyncio.open_connection(
            url.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(
            url.hostname, 80)

    query = (
        f"HEAD {url.path or '/'} HTTP/1.0\r\n"
        f"Host: {url.hostname}\r\n"
        f"\r\n"
    )

    writer.write(query.encode('latin-1'))
    while True:
        line = await reader.readline()
        if not line:
            break

        line = line.decode('latin1').rstrip()
        if line:
            print(f'HTTP header> {line}')

    # Не обращать внимания на Body, закрыть сокет
    writer.close()

url = sys.argv[1]
asyncio.run(print_http_headers(url))

Использование:

python example.py http://example.com/path/page.html

или с HTTPS:

python example.py https://example.com/path/page.html

Зарегистрировать открытый сокет для ожидания данных с использованием потоков

Корутина ждёт, пока сокет не получит данные, используя функцию open_connection():

import asyncio
import socket

async def wait_for_data():
    # Получить ссылку на текущий событийный цикл, потому что
    # мы хотим получить доступ к низкоуровневым API.
    loop = asyncio.get_running_loop()

    # Создаём пару соединённых сокетов.
    rsock, wsock = socket.socketpair()

    # Регистрируем открытый сокет, чтобы дождаться данных.
    reader, writer = await asyncio.open_connection(sock=rsock)

    # Имитация приёма данных из сети
    loop.call_soon(wsock.send, 'abc'.encode())

    # Ждать данные
    data = await reader.read(100)

    # Получить данные, если все готово: закрыть сокет
    print("Received:", data.decode())
    writer.close()

    # Закрыть второй сокет
    wsock.close()

asyncio.run(wait_for_data())

См.также

В примере Зарегистрировать открытый сокет для ожидания данных с помощью протокола используется низкоуровневый протокол и метод loop.create_connection().

В примере наблюдения за файловым дескриптором на предмет событий чтения для отслеживания файлового дескриптора используется низкоуровневый метод loop.add_reader().