Транспорты и протоколы

Предисловие

Транспорты и протоколы используются низкоуровневыми API-интерфейсами событийного цикла, такими как loop.create_connection(). Они используют стиль программирования на основе обратного вызова и обеспечивают высокопроизводительную реализацию сетевых протоколов или протоколов IPC (например, HTTP).

По сути, транспорты и протоколы следует использовать только в библиотеках и фреймворках, а не в высокоуровневых асинхронных приложениях.

Эта страница документации охватывает как Транспорты, так и Протоколы.

Вступление

На самом высоком уровне транспорт связан с тем, как передаются байты, в то время как протокол определяет, какие байты передать (и в некоторой степени, когда).

Другой способ сказать то же самое: транспорт — это абстракция для сокета (или аналогичной конечной точки ввода-вывода), а протокол — это абстракция для приложения с точки зрения транспорта.

Ещё одна точка зрения состоит в том, что транспортный и протокольный интерфейсы вместе определяют абстрактный интерфейс для использования сетевого ввода-вывода и межпроцессного ввода-вывода.

Между транспортными и протокольными объектами всегда существует отношение 1:1, протокол вызывает методы транспорта для отправки данных, а транспорт вызывает методы протокола для передачи полученных данных.

Большинство методов событийного цикла, ориентированных на соединение (например, loop.create_connection()), обычно принимают аргумент protocol_factory, используемый для создания объекта Protocol для принятого соединения, представленного объектом Transport. Такие методы обычно возвращают кортеж (transport, protocol).

Содержание

Эта страница документации содержит следующие разделы:

Транспорты

Исходный код: Lib/asyncio/transports.py


Транспорты — это классы, предоставляемые asyncio для абстрагирования различных типов каналов связи.

Транспортные объекты всегда создаются с помощью событийного цикла asyncio.

asyncio реализует транспорт для TCP, UDP, SSL и каналов (pipes) подпроцесса. Доступные для транспорта методы зависят от типа транспорта.

Транспортные классы: не потокобезопасны.

Иерархия транспортов

class asyncio.BaseTransport

Базовый класс для всех транспортов. Содержит методы, общие для всех asyncio транспортов.

class asyncio.WriteTransport(BaseTransport)

Базовый транспорт для подключений только для записи.

Экземпляры класса WriteTransport возвращаются из метода событийного цикла loop.connect_write_pipe() и также используются методами, связанными с подпроцессами, такими как loop.subprocess_exec().

class asyncio.ReadTransport(BaseTransport)

Базовый транспорт для подключений только для чтения.

Экземпляры класса ReadTransport возвращаются из метода событийного цикла loop.connect_read_pipe() и также используются методами, связанными с подпроцессами, такими как loop.subprocess_exec().

class asyncio.Transport(WriteTransport, ReadTransport)

Интерфейс, представляющий двунаправленный транспорт, например TCP- соединение.

Пользователь не создаёт экземпляр транспорта напрямую; они вызывают служебную функцию, передавая ей фабрику протокола и другую информацию, необходимую для создания транспорта и протокола.

Экземпляры класса Transport возвращаются или используются такими методами событийного цикла, как loop.create_connection(), loop.create_unix_connection(), loop.create_server(), loop.sendfile() и т. д.

class asyncio.DatagramTransport(BaseTransport)

Транспорт для датаграммных (UDP) соединений.

Экземпляры класса DatagramTransport возвращаются из метода событийного цикла loop.create_datagram_endpoint().

class asyncio.SubprocessTransport(BaseTransport)

Абстракция для представления связи между родительским и дочерним процессом ОС.

Экземпляры класса SubprocessTransport возвращаются из методов событийного цикла loop.subprocess_shell() и loop.subprocess_exec().

Базовый транспорт

BaseTransport.close()

Закрыть транспорт.

Если в транспорте есть буфер для исходящих данных, буферизованные данные будут сброшены асинхронно. Больше никаких данных не будет. После того, как все буферизованные данные будут сброшены, будет вызван метод протокола protocol.connection_lost() с None в качестве аргумента.

BaseTransport.is_closing()

Возвращает True, если транспорт закрывается или закрывается.

BaseTransport.get_extra_info(name, default=None)

Возвращает информацию об используемом транспорте или базовых ресурсах.

name — это строка, представляющая часть информации, относящейся к транспорту, которую нужно получить.

default — это значение, которое нужно возвращает, если информация недоступна, или если транспорт не поддерживает запросы к ней с помощью данной сторонней реализации событийного цикла или на текущей платформе.

Например, следующий код пытается получить базовый объект сокета транспорта:

sock = transport.get_extra_info('socket')
if sock is not None:
    print(sock.getsockopt(...))

Категории информации, которая может быть запрошена на некоторых транспортных средствах:

  • сокет:
  • SSL-сокет:
    • 'compression': алгоритм сжатия, используемый как строка, или None, если соединение не сжато; результат ssl.SSLSocket.compression()
    • 'cipher': кортеж из трех значений, содержащий имя используемого шифра, версию протокола SSL, определяющую его использование, и количество используемых секретных битов; результат ssl.SSLSocket.cipher()
    • 'peercert': сертификат партнёра; результат ssl.SSLSocket.getpeercert()
    • 'sslcontext': экземпляр ssl.SSLContext
    • 'ssl_object': экземпляр ssl.SSLObject или ssl.SSLSocket
  • труба:
    • 'pipe': объект трубы
  • подпроцесс:
BaseTransport.set_protocol(protocol)

Установить новый протокол.

Протокол переключения должен выполняться только в том случае, если оба протокола задокументированы для поддержки коммутатора.

BaseTransport.get_protocol()

Возвращает текущий протокол.

Транспорты только для чтения

ReadTransport.is_reading()

Возвращает True, если транспорт принимает новые данные.

Добавлено в версии 3.7.

ReadTransport.pause_reading()

Приостановить принимающую сторону транспорта. Никакие данные не будут переданы методу протокола protocol.data_received(), пока не будет вызван resume_reading().

Изменено в версии 3.7: Метод идемпотентен, т. е. его можно вызвать, когда транспорт уже приостановлен или закрыт.

ReadTransport.resume_reading()

Возобновить приёмный конец. Метод протокола protocol.data_received() будет вызван ещё раз, если некоторые данные доступны для чтения.

Изменено в версии 3.7: Метод идемпотентен, т.е. его можно вызвать, когда транспорт уже читает.

Транспорты только для записи

WriteTransport.abort()

Немедленно закрыть транспорт, не дожидаясь завершения незавершенных операций. Буферизованные данные будут потеряны. Больше никаких данных не будет. В конечном итоге метод протокола protocol.connection_lost() будет вызываться с None в качестве аргумента.

WriteTransport.can_write_eof()

Возвращает True, если транспорт поддерживает write_eof(), и False, если нет.

WriteTransport.get_write_buffer_size()

Возвращает текущий размер выходного буфера, используемого транспортом.

WriteTransport.get_write_buffer_limits()

Получить водяные знаки high и low для управления потоком записи. Возвращает кортеж (low, high), где low и high — положительное количество байтов.

Используйте set_write_buffer_limits(), чтобы установить пределы.

Добавлено в версии 3.4.2.

WriteTransport.set_write_buffer_limits(high=None, low=None)

Установить водяные знаки high и low для управления потоком записи.

Эти два значения (измеряемые в байтах) определяют, когда вызываются методы протокола protocol.pause_writing() и protocol.resume_writing(). Если указано, нижний водяной знак должен быть меньше или равен high водяному знаку. Ни high, ни low не могут быть отрицательными.

pause_writing() вызывается, когда размер буфера становится больше или равен значению high. Если запись была приостановлена, resume_writing() вызывается, когда размер буфера становится меньше или равен значению low.

Значения по умолчанию зависят от реализации. Если задан только high водяной знак, low водяной знак по умолчанию принимает значение, зависящее от реализации, меньшее или равное водяному знаку high. Установка high в ноль приводит к обнулению low и вызывает вызов pause_writing() всякий раз, когда буфер становится непустым. Установка low в ноль вызывает вызов resume_writing() только после того, как буфер пуст. Использование нуля для любого предела обычно неоптимально, поскольку снижает возможности одновременного выполнения операций ввода-вывода и вычислений.

Используйте get_write_buffer_limits(), чтобы получить пределы.

WriteTransport.write(data)

Записать в транспорт несколько байтов data.

Данный метод не блокирует; он буферизует данные и организует их асинхронную отправку.

WriteTransport.writelines(list_of_data)

Записать список (или любой повторяемый) байтов данных в транспорт. Это функционально эквивалентно вызову write() для каждого элемента, полученного в результате итерации, но может быть реализовано более эффективно.

WriteTransport.write_eof()

Закрыть конец записи транспорта после очистки всех буферизованных данных. Данные всё ещё могут быть получены.

Этот метод может вызвать NotImplementedError, если транспорт (например, SSL) не поддерживает полузакрытые соединения.

Транспорты дейтаграмм

DatagramTransport.sendto(data, addr=None)

Отправить байты data удаленному узлу, заданному addr (целевой адрес, зависящий от транспорта). Если addrNone, данные отправляются на целевой адрес, указанный при создании транспорта.

Этот метод не блокирует; он буферизует данные и организует их асинхронную отправку.

DatagramTransport.abort()

Немедленно закрыть транспорт, не дожидаясь завершения незавершенных операций. Буферизованные данные будут потеряны. Больше никаких данных не будет. В конечном итоге метод протокола protocol.connection_lost() будет вызываться с None в качестве аргумента.

Транспорты подпроцесса

SubprocessTransport.get_pid()

Возвращает идентификатор процесса подпроцесса как целое число.

SubprocessTransport.get_pipe_transport(fd)

Возвращает транспорт для канала связи, соответствующий целочисленному файловому дескриптору fd:

  • 0: читаемый потоковый транспорт стандартного ввода (stdin) или None, если подпроцесс не был создан с помощью stdin=PIPE
  • 1: доступный для записи потоковый транспорт стандартного вывода (stdout) или None, если подпроцесс не был создан с помощью stdout=PIPE
  • 2: доступный для записи потоковый транспорт стандартной ошибки (stderr) или None, если подпроцесс не был создан с помощью stderr=PIPE
  • другое fd: None
SubprocessTransport.get_returncode()

Возвращает код возврата подпроцесса как целое число или None, если он не вернулся, что аналогично атрибуту subprocess.Popen.returncode.

SubprocessTransport.kill()

Удалить подпроцесс.

В системах POSIX функция отправляет SIGKILL подпроцессу. В Windows этот метод является псевдонимом для terminate().

См. также subprocess.Popen.kill().

SubprocessTransport.send_signal(signal)

Отправить номер signal в подпроцесс, как в subprocess.Popen.send_signal().

SubprocessTransport.terminate()

Остановить подпроцесс.

В системах POSIX этот метод отправляет SIGTERM подпроцессу. В Windows для остановки подпроцесса вызывается функция Windows API TerminateProcess().

Также subprocess.Popen.terminate().

SubprocessTransport.close()

Завершить подпроцесс, вызвав метод kill().

Если подпроцесс ещё не вернулся, закрыть транспорты труб stdin, stdout и stderr.

Протоколы

Исходный код: Lib/asyncio/protocols.py


asyncio предоставляет множество абстрактных базовых классов, которые следует используйте для реализации сетевых протоколов. Эти классы предназначены для использования вместе с транспортами.

Подклассы абстрактных базовых классов протокола могут реализовывать некоторые или все методы. Все эти методы являются обратными вызовами: они вызываются транспортом при определенных событиях, например, при получении некоторых данных. Метод базового протокола должен вызываться соответствующим транспортом.

Базовые протоколы

class asyncio.BaseProtocol

Базовый протокол с методами, общими для всех протоколов.

class asyncio.Protocol(BaseProtocol)

Базовый класс для реализации потоковых протоколов (TCP, сокеты Unix и т. д.).

class asyncio.BufferedProtocol(BaseProtocol)

Базовый класс для реализации потоковых протоколов с ручным управлением приёмным буфером.

class asyncio.DatagramProtocol(BaseProtocol)

Базовый класс для реализации протоколов дейтаграмм (UDP).

class asyncio.SubprocessProtocol(BaseProtocol)

Базовый класс для реализации протоколов, взаимодействующих с дочерними процессами (однонаправленные каналы).

Базовый протокол

Все протоколы asyncio могут реализовывать обратные вызовы базового протокола.

Обратные вызовы соединения

Обратные вызовы соединения вызываются для всех протоколов ровно один раз при успешном соединении. Все остальные обратные вызовы протокола могут вызываться только между этими двумя методами.

BaseProtocol.connection_made(transport)

Вызывается при установлении соединения.

Аргумент transport — это транспорт, представляющий соединение. Протокол отвечает за хранение ссылки на свой транспорт.

BaseProtocol.connection_lost(exc)

Вызывается, когда соединение потеряно или закрыто.

Аргументом является либо объект исключения, либо None. Последнее означает, что получен обычный EOF, или соединение было прервано или закрыто этой стороной соединения.

Обратные вызовы управления потоком

Транспорты могут вызывать обратные вызовы управления потоком для приостановки или возобновления записи, выполняемой протоколом.

Дополнительную информацию см. в документации по методу set_write_buffer_limits().

BaseProtocol.pause_writing()

Вызывается, когда буфер транспорта выходит за high водяной знак.

BaseProtocol.resume_writing()

Вызывается, когда буфер транспорта опускается ниже low водяного знака.

Если размер буфера равен верхнему значению уровня, pause_writing() не вызывается: размер буфера должен строго превышать.

И наоборот, resume_writing() вызывается, когда размер буфера равен или меньше нижнего водяного знака. Эти конечные условия важны для обеспечения того, чтобы все шло так, как ожидалось, когда любая из оценок равна нулю.

Потоковые протоколы

Методы событий, такие как loop.create_server(), loop.create_unix_server(), loop.create_connection(), loop.create_unix_connection(), loop.connect_accepted_socket(), loop.connect_read_pipe() и loop.connect_write_pipe(), принимают фабрики, возвращающие протоколы потоковой передачи.

Protocol.data_received(data)

Вызывается при получении данных. data — это непустой байтовый объект, содержащий входящие данные.

Будут ли данные буферизированы, разбиты на части или повторно собраны, зависит от транспорта. В общем, вы не должны полагаться на конкретную семантику и вместо этого сделайте свой парсинг универсальным и гибким. Однако данные всегда поступают в правильном порядке.

Метод можно вызывать произвольное количество раз, пока открыто соединение.

Однако protocol.eof_received() вызывается не более одного раза. После вызова eof_received () data_received() больше не вызывается.

Protocol.eof_received()

Вызывается, когда другой конец сообщает, что больше не будет отправлять данные (например, путем вызова transport.write_eof(), если другой конец также использует asyncio).

Этот метод может возвращает ложное значение (включая None), и в этом случае транспорт закроется сам. И наоборот, если этот метод возвращает истинное значение, используемый протокол определяет, закрывать ли транспорт. Поскольку реализация по умолчанию возвращает None, она неявно закрывает соединение.

Некоторые транспорты, включая SSL, не поддерживают полузакрытые соединения, и в этом случае возвращение истины из этого метода приведёт к закрытию соединения.

Машина состояний:

start -> connection_made
    [-> data_received]*
    [-> eof_received]?
-> connection_lost -> end

Буферизованные протоколы потоковой передачи

Добавлено в версии 3.7: Важно: было добавлено в asyncio в Python 3.7 на временной основе! Это экспериментальный API, который может быть изменен или полностью удален в Python 3.8.

Буферизованные протоколы можно использовать с любым методом событийного цикла, который поддерживает Потоковые протоколы.

Реализации BufferedProtocol допускают явное ручное выделение буфера приёма и управление им. Событийные циклы могут затем использовать буфер, предоставленный протоколом, чтобы избежать ненужных копий данных. Это может привести к заметному повышению производительности протоколов, которые получают большие объемы данных. Сложные реализации протокола могут значительно сократить количество выделяемых буферов.

Следующие обратные вызовы вызываются экземплярами BufferedProtocol:

BufferedProtocol.get_buffer(sizehint)

Вызывается для выделения нового буфера приёма.

sizehint — рекомендуемый минимальный размер возвращаемого буфера. Допустимо возвращать буферы меньшего или большего размера, чем предлагает sizehint. Если установлено значение -1, размер буфера может быть произвольным. Ошибка при возврате буфера нулевого размера.

get_buffer() должен возвращать объект, реализующий буферный протокол.

BufferedProtocol.buffer_updated(nbytes)

Вызывается, когда буфер был обновлен полученными данными.

nbytes — общее количество байтов, записанных в буфер.

BufferedProtocol.eof_received()

См. документацию по методу protocol.eof_received().

get_buffer() можно вызывать произвольное количество раз во время соединения. Тем не менее, protocol.eof_received() вызывается не более одного раза, и в случае вызова get_buffer() и buffer_updated() не будут вызываться после него.

Машина состояний:

start -> connection_made
    [-> get_buffer
        [-> buffer_updated]?
    ]*
    [-> eof_received]?
-> connection_lost -> end

Протоколы дейтаграмм

Экземпляры протокола дейтаграмм должны создаваться фабриками протоколов, переданными методу loop.create_datagram_endpoint().

DatagramProtocol.datagram_received(data, addr)

Вызывается при получении дейтаграммы. data — это байтовый объект, содержащий входящие данные. addr — адрес партнёра, отправляющего данные; точный формат зависит от транспорта.

DatagramProtocol.error_received(exc)

Вызывается, когда предыдущая операция отправки или получения вызывает OSError. exc — это экземпляр OSError.

Этот метод вызывается в редких случаях, когда транспорт (например, UDP) обнаруживает, что дейтаграмма не может быть доставлена получателю. Однако во многих случаях недоставленные дейтаграммы автоматически отбрасываются.

Примечание

В системах BSD (macOS, FreeBSD и т. д.) управление потоком не поддерживается для протоколов дейтаграмм, потому что нет надежного способа обнаружения сбоев отправки, вызванных записью слишком большого количества пакетов.

Сокет всегда отображается как «готовый», а лишние пакеты отбрасываются. OSError с errno, установленным на errno.ENOBUFS, может или не может быть поднят; если он поднят, он будет отправлен на DatagramProtocol.error_received(), но в противном случае будет проигнорирован.

Протоколы подпроцесса

Экземпляры протокола дейтаграмм должны быть созданы фабриками протоколов, переданными в методы loop.subprocess_exec() и loop.subprocess_shell().

SubprocessProtocol.pipe_data_received(fd, data)

Вызывается, когда дочерний процесс записывает данные в свой stdout или stderr канал.

fd — это целочисленный файловый дескриптор канала.

data — это непустой байтовый объект, содержащий полученные данные.

SubprocessProtocol.pipe_connection_lost(fd, exc)

Вызывается, когда один из каналов, взаимодействующих с дочерним процессом, закрывается.

fd — это закрытый целочисленный файловый дескриптор.

SubprocessProtocol.process_exited()

Вызывается после выхода из дочернего процесса.

Примеры

TCP эхо-серер

Создать TCP эхо-сервер, используя метод loop.create_server(), отправить обратно полученные данные и закрыть соединение:

import asyncio


class EchoServerProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('Data received: {!r}'.format(message))

        print('Send: {!r}'.format(message))
        self.transport.write(data)

        print('Close the client socket')
        self.transport.close()


async def main():
    # Получить ссылку на событийный цикл, как мы планируем использовать
    # низкоуровневое API
    loop = asyncio.get_running_loop()

    server = await loop.create_server(
        lambda: EchoServerProtocol(),
        '127.0.0.1', 8888)

    async with server:
        await server.serve_forever()


asyncio.run(main())

См.также

В примере TCP эхо-сервер с использованием потоков используется высокоуровневая функция asyncio.start_server().

TCP эхо-клиент

TCP эхо-клиент, использующий метод loop.create_connection(), отправляет данные и ждёт, пока соединение не будет закрыто:

import asyncio


class EchoClientProtocol(asyncio.Protocol):
    def __init__(self, message, on_con_lost):
        self.message = message
        self.on_con_lost = on_con_lost

    def connection_made(self, transport):
        transport.write(self.message.encode())
        print('Data sent: {!r}'.format(self.message))

    def data_received(self, data):
        print('Data received: {!r}'.format(data.decode()))

    def connection_lost(self, exc):
        print('The server closed the connection')
        self.on_con_lost.set_result(True)


async def main():
    # Получить ссылку на событийный цикл, как мы планируем использовать
    # низкоуровневое API
    loop = asyncio.get_running_loop()

    on_con_lost = loop.create_future()
    message = 'Hello World!'

    transport, protocol = await loop.create_connection(
        lambda: EchoClientProtocol(message, on_con_lost),
        '127.0.0.1', 8888)

    # Дождаться сигнала протокола о потере соединения и
    # закрыть транспорт.
    try:
        await on_con_lost
    finally:
        transport.close()


asyncio.run(main())

См.также

В примере TCP эхо-клиент с использованием потоков используется функция высокого уровня asyncio.open_connection().

Эхо-сервер UDP

Эхо-сервер UDP, используя метод loop.create_datagram_endpoint(), отправляет полученные данные обратно:

import asyncio


class EchoServerProtocol:
    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        message = data.decode()
        print('Received %r from %s' % (message, addr))
        print('Send %r to %s' % (message, addr))
        self.transport.sendto(data, addr)


async def main():
    print("Starting UDP server")

    # Получение ссылки на событийный цикл, как мы планировали использовать
    # низкоуровневое API
    loop = asyncio.get_running_loop()

    # Для обслуживания всех будет создана одна сущность протокола
    # запроса клиента.
    transport, protocol = await loop.create_datagram_endpoint(
        lambda: EchoServerProtocol(),
        local_addr=('127.0.0.1', 9999))

    try:
        await asyncio.sleep(3600)  # Ожидать в течение 1 часа.
    finally:
        transport.close()


asyncio.run(main())

UDP эхо-клиент

Эхо-клиент UDP, используя метод loop.create_datagram_endpoint(), отправляет данные и закрывает транспорт, когда получает ответ:

import asyncio


class EchoClientProtocol:
    def __init__(self, message, on_con_lost):
        self.message = message
        self.on_con_lost = on_con_lost
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport
        print('Send:', self.message)
        self.transport.sendto(self.message.encode())

    def datagram_received(self, data, addr):
        print("Received:", data.decode())

        print("Close the socket")
        self.transport.close()

    def error_received(self, exc):
        print('Error received:', exc)

    def connection_lost(self, exc):
        print("Connection closed")
        self.on_con_lost.set_result(True)


async def main():
    # Получить ссылку на событийный цикл, как мы планировали использовать
    # низкоуровневое API
    loop = asyncio.get_running_loop()

    on_con_lost = loop.create_future()
    message = "Hello World!"

    transport, protocol = await loop.create_datagram_endpoint(
        lambda: EchoClientProtocol(message, on_con_lost),
        remote_addr=('127.0.0.1', 9999))

    try:
        await on_con_lost
    finally:
        transport.close()


asyncio.run(main())

Подключение существующих сокетов

Подождать, пока сокет получит данные, используя метод loop.create_connection() с протоколом:

import asyncio
import socket


class MyProtocol(asyncio.Protocol):

    def __init__(self, on_con_lost):
        self.transport = None
        self.on_con_lost = on_con_lost

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print("Received:", data.decode())

        # Мы закончили: закрыть транспорт;
        # connection_lost() будет вызван автоматически.
        self.transport.close()

    def connection_lost(self, exc):
        # Этот сокет закрыт
        self.on_con_lost.set_result(True)


async def main():
    # Получить ссылку на событийный цикл, как мы планируем использовать
    # низкоуровневое API
    loop = asyncio.get_running_loop()
    on_con_lost = loop.create_future()

    # Создать пару подключённых сокетов
    rsock, wsock = socket.socketpair()

    # Зарегистрировать сокет для ожидания данных.
    transport, protocol = await loop.create_connection(
        lambda: MyProtocol(on_con_lost), sock=rsock)

    # Смоделировать приём данных из сети.
    loop.call_soon(wsock.send, 'abc'.encode())

    try:
        await protocol.on_con_lost
    finally:
        transport.close()
        wsock.close()

asyncio.run(main())

См.также

В примере слежение за дескриптором файла для событий чтения для регистрации FD используется низкоуровневый метод loop.add_reader().

В примере регистрируется открытый сокет для ожидания данных с помощью потоков используются высокоуровневые потоки, созданные функцией open_connection() в корутине.

loop.subprocess_exec() и SubprocessProtocol

Пример протокола подпроцесса, используемого для получения выходных данных подпроцесса и ожидания выхода подпроцесса.

Подпроцесс создаётся методом loop.subprocess_exec():

import asyncio
import sys

class DateProtocol(asyncio.SubprocessProtocol):
    def __init__(self, exit_future):
        self.exit_future = exit_future
        self.output = bytearray()

    def pipe_data_received(self, fd, data):
        self.output.extend(data)

    def process_exited(self):
        self.exit_future.set_result(True)

async def get_date():
    # Получить ссылку на событийный цикл, как мы планируем использовать
    # низкоуровневое API.
    loop = asyncio.get_running_loop()

    code = 'import datetime; print(datetime.datetime.now())'
    exit_future = asyncio.Future(loop=loop)

    # Создать подпроцессы, которым управляет DateProtocol;
    # перенаправить стандартный вывод в пайп.
    transport, protocol = await loop.subprocess_exec(
        lambda: DateProtocol(exit_future),
        sys.executable, '-c', code,
        stdin=None, stderr=None)

    # Дождаться выхода подпроцесса, используя process_exited()
    # метод протокола.
    await exit_future

    # Закрытие пайпа stdout.
    transport.close()

    # Прочитать выходные данные, собранные
    # pipe_data_received() методом протокола.
    data = bytes(protocol.output)
    return data.decode('ascii').rstrip()

date = asyncio.run(get_date())
print(f"Current date: {date}")

См. также пример, написанный с использованием высокоуровневого API.