DEV Community

Promptra Team for Promptra

Posted on

Streaming LLM-ответов через SSE: Python-гайд для real-time чатов и UI

Инфографика SSE-стрима LLM: блок «Python client» с stream=True, поток chunk-ов идёт через SSE-канал, поверх счётчик TTFT мс и индикатор накопления текста в UI окно чата; плоский векторный стиль

Когда пользователь нажимает «Отправить» в чате — он хочет видеть ответ сейчас, а не через 10 секунд. Streaming через Server-Sent Events (SSE) решает это: первый токен приходит за 300–800 мс, дальше текст набирается в UI плавно, как живая печать. Через единый шлюз Promptra все флагманы — Claude Opus 4.7 (350/1790 ₽), GPT-5.5 (350/2150 ₽), Gemini 3.1 Pro (140/860 ₽), DeepSeek V4 Pro (30/60 ₽) — поддерживают streaming через OpenAI-совместимый формат stream=True, что радикально упрощает архитектуру.

Этот гайд — рабочий код стриминга на Python для всех трёх семейств моделей, обработка chunks с TTFT-замером, robust error handling с retry, готовый FastAPI proxy для проброса в браузер, и рекомендации по UI integration. Если вы строите чат-интерфейс, агента с интерактивным ответом или CLI-тул с живым выводом — это базовый паттерн. оплата в рублях по договору, полный пакет закрывающих документов, цены в рублях по курсу ЦБ.

TL;DR — streaming за 15 строк

from openai import OpenAI

client = OpenAI(api_key="sk-promptra-...", base_url="https://api.promptra.ru/v1")

stream = client.chat.completions.create(
    model="gpt-5-5",
    messages=[{"role": "user", "content": "Расскажи о SSE в 5 предложениях."}],
    stream=True,
)

for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="", flush=True)
Enter fullscreen mode Exit fullscreen mode

Один параметр stream=True превращает обычный вызов в SSE-поток. Дальше — итерация по chunks, печать delta.content без переноса строки и flush=True чтобы видеть в реальном времени.

Что такое SSE и почему именно он

Server-Sent Events — простой протокол стриминга от сервера к клиенту поверх обычного HTTP-соединения. Сервер держит TCP-соединение открытым и периодически шлёт куски данных в формате data: <строка>\n\n. Клиент читает их по мере прихода. Это однонаправленный канал (только сервер → клиент), что делает SSE проще WebSocket: не нужен upgrade-handshake, работает через любой HTTP proxy и CDN, автоматически переподключается при разрыве. Эта статья — часть pillar-гида: полный технический гид по LLM API на Python — токены, function calling, streaming, RAG, batch.

Для LLM streaming SSE идеален:

  • Модель генерирует токены последовательно — нужен только канал сервер→клиент.
  • Если клиент закроет соединение — сервер может прервать генерацию (экономия токенов).
  • SSE поддерживается всеми браузерами через нативный EventSource API.
  • HTTP/1.1 keep-alive держит соединение без накладных расходов.

Все три флагмана — OpenAI, Anthropic, Google — используют SSE для streaming endpoint'а. Через единый шлюз Promptra формат нормализован: даже Claude и Gemini выдают chunks в OpenAI-совместимом виде с delta.content — что упрощает фронтенд.

Схема SSE-канала: слева блок «Python/Browser client», справа «LLM API», между ними длинная горизонтальная линия с поочередными метками chunk1, chunk2, chunk3 ... [DONE] идущими слева направо, справа поверх — счётчик «TTFT 470 ms»; заголовок «Server-Sent Events: однонаправленный поток»

Streaming на OpenAI SDK (GPT-5.5)

Минимальный паттерн с замером TTFT и подсчётом токенов:

import time
from openai import OpenAI

client = OpenAI(
    api_key="sk-promptra-...",
    base_url="https://api.promptra.ru/v1",
)

def stream_chat(prompt: str, model: str = "gpt-5-5") -> dict:
    start = time.perf_counter
    ttft = None
    accumulated = []
    usage = None

    stream = client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        stream=True,
        stream_options={"include_usage": True},
    )

    for chunk in stream:
        if not chunk.choices:
            # последний chunk содержит usage
            if chunk.usage:
                usage = chunk.usage
            continue

        delta = chunk.choices[0].delta
        if delta.content:
            if ttft is None:
                ttft = (time.perf_counter - start) * 1000   # мс
            accumulated.append(delta.content)
            print(delta.content, end="", flush=True)

    total_ms = (time.perf_counter - start) * 1000
    full_text = "".join(accumulated)

    return {
        "text": full_text,
        "ttft_ms": round(ttft, 1) if ttft else None,
        "total_ms": round(total_ms, 1),
        "prompt_tokens": usage.prompt_tokens if usage else None,
        "completion_tokens": usage.completion_tokens if usage else None,
        "tokens_per_sec": round(
            usage.completion_tokens / ((total_ms - ttft) / 1000), 1
        ) if usage and ttft else None,
    }
Enter fullscreen mode Exit fullscreen mode

stream_options={"include_usage": True} — критичный параметр. Без него usage в стриме не возвращается, и вы не сможете посчитать стоимость. Подробнее про подсчёт токенов и цены — в материале «Как считать токены в LLM».

То же на Claude Opus 4.7 — смена одной строки

Через шлюз Promptra Claude доступен по OpenAI-совместимому формату:

result = stream_chat("Объясни асинхронность в Python", model="claude-opus-4-7")
print(f"\nTTFT: {result['ttft_ms']} мс, всего: {result['total_ms']} мс")
print(f"Throughput: {result['tokens_per_sec']} tok/s")
Enter fullscreen mode Exit fullscreen mode

В нативном Anthropic SDK chunks имеют другую структуру (event-based: message_start, content_block_delta, message_stop), но через OpenAI-совместимый слой это нормализуется. Это значит один и тот же код работает на всех моделях — вы меняете строку model и сравниваете результаты. Подробности нативного формата Anthropic — в их streaming documentation, параметры streaming для OpenAI описаны в OpenAI API reference.

Реальные TTFT и throughput (Promptra benchmark 2026-05)

Замеряли через единый шлюз на одинаковом промте «Объясни принцип работы X в 200 словах»:

Модель TTFT медиана Throughput TTFT p99
GPT-5.5 510 мс 78 tok/s 980 мс
GPT-5.4 320 мс 105 tok/s 620 мс
Claude Opus 4.7 680 мс 65 tok/s 1280 мс
Claude Sonnet 4.6 410 мс 92 tok/s 780 мс
Gemini 3.1 Pro 380 мс 88 tok/s 720 мс
Gemini 3.5 Flash 240 мс 140 tok/s 470 мс
DeepSeek V4 Pro 290 мс 110 tok/s 540 мс

Для UX чата хорошим считается TTFT под 700 мс — пользователь видит реакцию почти мгновенно. Throughput важен для длинных ответов: 100 tok/s — это ~75 русских слов в секунду, читается комфортно.

Горизонтальная столбчатая диаграмма TTFT по 7 моделям: «Gemini 3.5 Flash — 240 мс» самый короткий терракотовый, «GPT-5.4 — 320 мс», «Gemini 3.1 Pro — 380 мс», «Claude Sonnet 4.6 — 410 мс», «GPT-5.5 — 510 мс», «Claude Opus 4.7 — 680 мс»; заголовок «TTFT медиана: время до первого токена»

Streaming function calls

Когда в стриме включён tool calling, chunks отдают delta.tool_calls с кусочками JSON arguments. Аккумулируете их по индексу и парсите целиком только в конце:

def stream_with_tools(prompt: str, tools: list) -> dict:
    stream = client.chat.completions.create(
        model="claude-opus-4-7",
        messages=[{"role": "user", "content": prompt}],
        tools=tools,
        stream=True,
    )

    tool_calls = {}   # index → accumulated
    content = []
    finish_reason = None

    for chunk in stream:
        if not chunk.choices:
            continue
        delta = chunk.choices[0].delta
        finish_reason = chunk.choices[0].finish_reason or finish_reason

        if delta.content:
            content.append(delta.content)

        if delta.tool_calls:
            for tc in delta.tool_calls:
                idx = tc.index
                if idx not in tool_calls:
                    tool_calls[idx] = {
                        "id": tc.id or "",
                        "name": "",
                        "arguments": "",
                    }
                if tc.function:
                    if tc.function.name:
                        tool_calls[idx]["name"] = tc.function.name
                    if tc.function.arguments:
                        tool_calls[idx]["arguments"] += tc.function.arguments

    # парсим целиком только после стрима
    parsed = []
    if finish_reason == "tool_calls":
        for idx, call in sorted(tool_calls.items):
            parsed.append({
                "id": call["id"],
                "name": call["name"],
                "arguments": json.loads(call["arguments"]),
            })

    return {
        "content": "".join(content),
        "tool_calls": parsed,
        "finish_reason": finish_reason,
    }
Enter fullscreen mode Exit fullscreen mode

Не пытайтесь делать json.loads на каждом chunk — он будет невалидным. Подробности про function calling — в материале «Function calling и tool use на Python».

Robust error handling и retry

Стрим может оборваться в трёх местах: до первого chunk'а (TTFT timeout), в середине (network drop), на финальном chunk'е (content_filter). Стратегия:

import time
from openai import OpenAI, APIConnectionError, APIStatusError

def stream_with_retry(prompt: str, max_retries: int = 3) -> str:
    for attempt in range(max_retries):
        try:
            accumulated = []
            stream = client.chat.completions.create(
                model="claude-opus-4-7",
                messages=[{"role": "user", "content": prompt}],
                stream=True,
                timeout=300,   # глобальный timeout
            )
            for chunk in stream:
                if not chunk.choices:
                    continue
                delta = chunk.choices[0].delta
                if delta.content:
                    accumulated.append(delta.content)
                    yield delta.content   # отдаём наружу
            return   # успешно дошли до конца

        except (APIConnectionError, TimeoutError) as e:
            partial = "".join(accumulated)
            if partial and attempt == max_retries - 1:
                # последняя попытка — возвращаем что есть
                yield f"\n[Прервано после {len(partial)} символов: {e}]"
                return
            # exponential backoff
            time.sleep(2 ** attempt)
            continue
        except APIStatusError as e:
            # 4xx ошибка — retry не поможет
            yield f"\n[Ошибка API: {e.message}]"
            return
Enter fullscreen mode Exit fullscreen mode

Что важно:

  • Сохраняйте partial output — токены, которые модель уже отдала, оплачены. Их разумно показать пользователю с пометкой «прервано».
  • Exponential backoff — 1с, 2с, 4с между попытками. Без него вы заDDoSите шлюз при массовом сбое сети.
  • Не retry'те 4xx — это ваши ошибки (невалидный запрос, лимит, нет ключа), повторение не поможет.
  • Глобальный timeout 300 секунд — длинные reasoning-ответы (Opus 4.7) могут идти долго, но не бесконечно.

Дерево обработки ошибок: блок «Ошибка в стриме» — три ветки: «TTFT timeout → retry с backoff», «network drop → сохранить partial → retry», «4xx error → не retry, вернуть ошибку»; терракотовые подписи на критичных путях, заголовок «Robust streaming: что делать при сбое»

FastAPI proxy для браузера

Стандартный паттерн: фронтенд не знает ключа LLM, ходит к вашему FastAPI endpoint, тот стримит наружу:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from openai import AsyncOpenAI
import json

app = FastAPI

async_client = AsyncOpenAI(
    api_key="sk-promptra-...",
    base_url="https://api.promptra.ru/v1",
)

class ChatRequest(BaseModel):
    message: str
    model: str = "claude-opus-4-7"

async def event_stream(req: ChatRequest):
    stream = await async_client.chat.completions.create(
        model=req.model,
        messages=[{"role": "user", "content": req.message}],
        stream=True,
        stream_options={"include_usage": True},
    )
    async for chunk in stream:
        if chunk.choices and chunk.choices[0].delta.content:
            payload = json.dumps({
                "type": "text",
                "content": chunk.choices[0].delta.content,
            }, ensure_ascii=False)
            yield f"data: {payload}\n\n"
        if chunk.usage:
            payload = json.dumps({
                "type": "usage",
                "prompt_tokens": chunk.usage.prompt_tokens,
                "completion_tokens": chunk.usage.completion_tokens,
            })
            yield f"data: {payload}\n\n"
    yield "data: [DONE]\n\n"

@app.post("/chat/stream")
async def chat_stream(req: ChatRequest):
    return StreamingResponse(
        event_stream(req),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",   # отключить буфер nginx
        },
    )
Enter fullscreen mode Exit fullscreen mode

X-Accel-Buffering: no — критичный заголовок для nginx, без него стрим буферизуется. На фронтенде стандартный EventSource:

const evtSource = new EventSource('/chat/stream', { withCredentials: true });
evtSource.onmessage = (e) => {
  if (e.data === '[DONE]') {
    evtSource.close;
    return;
  }
  const payload = JSON.parse(e.data);
  if (payload.type === 'text') {
    document.getElementById('output').textContent += payload.content;
  }
};
Enter fullscreen mode Exit fullscreen mode

Подробнее про архитектуру чат-бота — в материале «Чат-бот на нейросети API».

Архитектурная схема: «Browser (EventSource)» → POST «FastAPI /chat/stream» → стрим к «Promptra api gateway» → «LLM модель», обратный поток chunks через FastAPI к браузеру с надписью «text/event-stream»; заголовок «Proxy архитектура: ключ не в браузере»

UI integration: что не забыть

Когда стрим работает, надо корректно показать его пользователю. Чек-лист:

  1. Курсор «печатающего» — мигающая «|» в конце аккумулированного текста. Уберите после [DONE].
  2. Auto-scroll к низу при каждом новом chunk'е, но отменяйте если пользователь сам прокрутил вверх (читает прошлый ответ).
  3. Кнопка «Stop» — посылает abort на ваш FastAPI, тот закрывает стрим с LLM. Сэкономит токены пользователя.
  4. Кнопка «Regenerate» — повтор запроса с тем же контекстом. Полезно при content_filter или пустом ответе.
  5. Indicator расхода — после [DONE] показать «Потрачено: 1240 токенов = 1.85 ₽». Прозрачность повышает доверие.
  6. Markdown rendering — стримите plain text в буфер, а рендерите Markdown только по окончании chunk'а или по фразам. Иначе таблицы и code-блоки будут «прыгать».
  7. Code blocks с syntax highlighting — рендерите финально, не пытайтесь highlight'ить inline.

Для production-чатов добавьте rate limit на endpoint (запросов на пользователя в минуту), pre-flight бюджет-чек («Как считать токены в LLM»), и логирование всех завершённых стримов с TTFT, throughput, finish_reason.

Оплата и закрывающие документы

Юрлицо-исполнитель — российское юр.лицо , резидент РФ. Сервисная комиссия 5% берётся только при пополнении баланса, на токены наценки нет. Полный пакет закрывающих документов (договор-оферта, счёт на оплату, акт оказанных услуг, счёт-фактура, УПД) приходит через ЭДО — Диадок, СБИС, Контур. Подробнее — на странице «Тарифы».

Что дальше

Streaming через SSE — это один параметр stream=True и цикл for chunk in stream. Дальше — TTFT-замер, обработка ошибок с partial output, FastAPI-proxy для скрытия ключа, и аккуратный UI с auto-scroll и кнопкой Stop. С этим набором ваш чат становится отзывчивым на уровне ChatGPT.app или Claude.ai, а под капотом — один шлюз ко всем флагманам. Полезные следующие шаги: «Function calling на Python» для streaming tool calls, «Embeddings и RAG» для контекстуальных чатов, и «Async-вызовы и Batch API» для batch-сценариев без UI. Если нужно подобрать модель под ваш чат или подключить ключ через юрлицо — напишите команде Promptra в Telegram.

📚 Главный гайд по теме: Лучшая нейросеть 2026: какую LLM выбрать под задачу — связанные материалы и обзор всей категории.


Promptra — Russian LLM API aggregator. One OpenAI-compatible endpoint to all flagship models: OpenAI (GPT-5.5, GPT-5.4), Anthropic (Claude Opus 4.7, Sonnet 4.6), Google (Gemini 3.1 Pro, 3.5 Flash), DeepSeek V4 Pro, Qwen 3.6 Plus.

Provider prices 1-to-1 at CBR rate — no markup on tokens. Ruble billing per contract, full closing documents through EDI. No VPN — legal B2B service in Russia.

Try: promptra.ru · model catalog · docs

Top comments (0)