ohmygpt-logoOhMyGPT Cookbook

OhMyGPT API 允许从任何模型流式响应。这对于构建聊天界面或其他应用程序非常有用,因为用户界面应该在模型生成响应时进行更新。

要启用流式传输,您可以在请求中将 stream 参数设置为 true 。然后,模型将以块的形式将响应流式传输到客户端,而不是一次性返回整个响应。

下面是如何流式传输响应并处理它的示例:

import requests
import json
 
question = "How would you build the tallest building ever?"
 
url = "https://api.ohmygpt.com/v1/chat/completions"
headers = {
  "Authorization": f"Bearer <API_KEY>",
  "Content-Type": "application/json"
}
 
payload = {
  "model": "gpt-4o",
  "messages": [{"role": "user", "content": question}],
  "stream": True
}
 
buffer = ""
with requests.post(url, headers=headers, json=payload, stream=True) as r:
  for chunk in r.iter_content(chunk_size=1024, decode_unicode=True):
    buffer += chunk
    while True:
      try:
        # Find the next complete SSE line
        line_end = buffer.find('\n')
        if line_end == -1:
          break
 
        line = buffer[:line_end].strip()
        buffer = buffer[line_end + 1:]
 
        if line.startswith('data: '):
          data = line[6:]
          if data == '[DONE]':
            break
 
          try:
            data_obj = json.loads(data)
            content = data_obj["choices"][0]["delta"].get("content")
            if content:
              print(content, end="", flush=True)
          except json.JSONDecodeError:
            pass
      except Exception:
        break

流取消

通过中止连接可以取消流式请求。对于支持的提供者,这会立即停止模型处理和计费。

实现流取消的方法:

import requests
from threading import Event, Thread
 
def stream_with_cancellation(prompt: str, cancel_event: Event):
    with requests.Session() as session:
        response = session.post(
            "https://api.ohmygpt.com/v1/chat/completions",
            headers={"Authorization": f"Bearer <API_KEY>"},
            json={"model": "gpt-4o", "messages": [{"role": "user", "content": prompt}], "stream": True},
            stream=True
        )
 
        try:
            for line in response.iter_lines():
                if cancel_event.is_set():
                    response.close()
                    return
                if line:
                    print(line.decode(), end="", flush=True)
        finally:
            response.close()
 
# Example usage:
cancel_event = Event()
stream_thread = Thread(target=lambda: stream_with_cancellation("Write a story", cancel_event))
stream_thread.start()
 
# To cancel the stream:
cancel_event.set()

取消仅适用于与支持的提供者的流式请求。对于非流式请求或不支持的提供者,模型将继续处理,您将为完整响应付费。

目录