细粒度工具流式传输
在延迟敏感型应用中,无需服务器端 JSON 缓冲即可流式传输工具输入。
此功能符合零数据留存 (ZDR) 条件。当你的组织有 ZDR 安排时,通过此功能发送的数据在 API 响应返回后不会被存储。
细粒度工具流式传输在所有模型和所有平台上可用。它支持在不缓冲或验证 JSON 的情况下流式传输工具使用参数值,减少开始接收大型参数的延迟。
使用细粒度工具流式传输时,你可能会收到无效或部分 JSON 输入。请确保在代码中处理这些边缘情况。
如何使用细粒度工具流式传输
细粒度工具流式传输在 Claude API、AWS 上的 Claude Platform、Amazon Bedrock、Vertex AI 和 Microsoft Foundry 上受支持。要使用它,请在你希望启用细粒度流式传输的任何用户定义工具上将 eager_input_streaming 设置为 true,并在请求上启用流式传输。
以下是使用 API 进行细粒度工具流式传输的示例:
curl https://api.anthropic.com/v1/messages \
-H "content-type: application/json" \
-H "x-api-key: $ANTHROPIC_API_KEY" \
-H "anthropic-version: 2023-06-01" \
-d '{
"model": "claude-opus-4-7",
"max_tokens": 65536,
"tools": [
{
"name": "make_file",
"description": "Write text to a file",
"eager_input_streaming": true,
"input_schema": {
"type": "object",
"properties": {
"filename": {
"type": "string",
"description": "The filename to write text to"
},
"lines_of_text": {
"type": "array",
"description": "An array of lines of text to write to the file"
}
},
"required": ["filename", "lines_of_text"]
}
}
],
"messages": [
{
"role": "user",
"content": "Can you write a long poem and make a file called poem.txt?"
}
],
"stream": true
}'
ant messages create --stream --format jsonl <<'YAML' |
model: claude-opus-4-7
max_tokens: 65536
tools:
- name: make_file
description: Write text to a file
eager_input_streaming: true
input_schema:
type: object
properties:
filename:
type: string
description: The filename to write text to
lines_of_text:
type: array
description: An array of lines of text to write to the file
required:
- filename
- lines_of_text
messages:
- role: user
content: Can you write a long poem and make a file called poem.txt?
YAML
jq 'select(.type == "message_delta") | .usage'
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
max_tokens=65536,
model="claude-opus-4-7",
tools=[
{
"name": "make_file",
"description": "Write text to a file",
"eager_input_streaming": True,
"input_schema": {
"type": "object",
"properties": {
"filename": {
"type": "string",
"description": "The filename to write text to",
},
"lines_of_text": {
"type": "array",
"description": "An array of lines of text to write to the file",
},
},
"required": ["filename", "lines_of_text"],
},
}
],
messages=[
{
"role": "user",
"content": "Can you write a long poem and make a file called poem.txt?",
}
],
) as stream:
final_message = stream.get_final_message()
print(f"Input tokens: {final_message.usage.input_tokens}")
print(f"Output tokens: {final_message.usage.output_tokens}")
import Anthropic from "@anthropic-ai/sdk";
const anthropic = new Anthropic();
const stream = anthropic.messages.stream({
model: "claude-opus-4-7",
max_tokens: 65536,
tools: [
{
name: "make_file",
description: "Write text to a file",
eager_input_streaming: true,
input_schema: {
type: "object",
properties: {
filename: {
type: "string",
description: "The filename to write text to"
},
lines_of_text: {
type: "array",
description: "An array of lines of text to write to the file"
}
},
required: ["filename", "lines_of_text"]
}
}
],
messages: [
{
role: "user",
content: "Can you write a long poem and make a file called poem.txt?"
}
]
});
const message = await stream.finalMessage();
console.log(`Input tokens: ${message.usage.input_tokens}`);
console.log(`Output tokens: ${message.usage.output_tokens}`);
using System.Text.Json;
using Anthropic;
using Anthropic.Models.Messages;
AnthropicClient client = new();
MessageCreateParams parameters = new()
{
Model = Model.ClaudeOpus4_7,
MaxTokens = 65536,
Tools =
[
new Tool
{
Name = "make_file",
Description = "Write text to a file",
EagerInputStreaming = true,
InputSchema = new InputSchema
{
Properties = new Dictionary<string, JsonElement>
{
["filename"] = JsonSerializer.SerializeToElement(
new { type = "string", description = "The filename to write text to" }
),
["lines_of_text"] = JsonSerializer.SerializeToElement(
new { type = "array", description = "An array of lines of text to write to the file" }
),
},
Required = ["filename", "lines_of_text"],
},
},
],
Messages =
[
new()
{
Role = Role.User,
Content = "Can you write a long poem and make a file called poem.txt?",
},
],
};
long inputTokens = 0;
long outputTokens = 0;
await foreach (var streamEvent in client.Messages.CreateStreaming(parameters))
{
switch (streamEvent.Value)
{
case RawMessageStartEvent startEvent:
inputTokens = startEvent.Message.Usage.InputTokens;
break;
case RawMessageDeltaEvent deltaEvent:
outputTokens = deltaEvent.Usage.OutputTokens;
break;
}
}
Console.WriteLine({{CONTENT}}quot;Input tokens: {inputTokens}");
Console.WriteLine({{CONTENT}}quot;Output tokens: {outputTokens}");
package main
import (
"context"
"fmt"
"github.com/anthropics/anthropic-sdk-go"
)
func main() {
client := anthropic.NewClient()
makeFileTool := anthropic.ToolParam{
Name: "make_file",
Description: anthropic.String("Write text to a file"),
EagerInputStreaming: anthropic.Bool(true),
InputSchema: anthropic.ToolInputSchemaParam{
Properties: map[string]any{
"filename": map[string]any{
"type": "string",
"description": "The filename to write text to",
},
"lines_of_text": map[string]any{
"type": "array",
"description": "An array of lines of text to write to the file",
},
},
Required: []string{"filename", "lines_of_text"},
},
}
stream := client.Messages.NewStreaming(context.Background(), anthropic.MessageNewParams{
Model: anthropic.ModelClaudeOpus4_7,
MaxTokens: 65536,
Tools: []anthropic.ToolUnionParam{{OfTool: &makeFileTool}},
Messages: []anthropic.MessageParam{
anthropic.NewUserMessage(anthropic.NewTextBlock(
"Can you write a long poem and make a file called poem.txt?",
)),
},
})
message := anthropic.Message{}
for stream.Next() {
event := stream.Current()
if err := message.Accumulate(event); err != nil {
panic(err)
}
}
if err := stream.Err(); err != nil {
panic(err)
}
fmt.Printf("Input tokens: %d\n", message.Usage.InputTokens)
fmt.Printf("Output tokens: %d\n", message.Usage.OutputTokens)
}
import com.anthropic.client.AnthropicClient;
import com.anthropic.client.okhttp.AnthropicOkHttpClient;
import com.anthropic.core.JsonValue;
import com.anthropic.core.http.StreamResponse;
import com.anthropic.helpers.MessageAccumulator;
import com.anthropic.models.messages.MessageCreateParams;
import com.anthropic.models.messages.Model;
import com.anthropic.models.messages.RawMessageStreamEvent;
import com.anthropic.models.messages.Tool;
import com.anthropic.models.messages.Usage;
void main() {
AnthropicClient client = AnthropicOkHttpClient.fromEnv();
Tool makeFileTool = Tool.builder()
.name("make_file")
.description("Write text to a file")
.eagerInputStreaming(true)
.inputSchema(Tool.InputSchema.builder()
.properties(Tool.InputSchema.Properties.builder()
.putAdditionalProperty("filename", JsonValue.from(Map.of(
"type", "string",
"description", "The filename to write text to")))
.putAdditionalProperty("lines_of_text", JsonValue.from(Map.of(
"type", "array",
"description", "An array of lines of text to write to the file")))
.build())
.addRequired("filename")
.addRequired("lines_of_text")
.build())
.build();
MessageCreateParams params = MessageCreateParams.builder()
.model(Model.CLAUDE_OPUS_4_7)
.maxTokens(65536L)
.addTool(makeFileTool)
.addUserMessage("Can you write a long poem and make a file called poem.txt?")
.build();
MessageAccumulator accumulator = MessageAccumulator.create();
try (StreamResponse<RawMessageStreamEvent> streamResponse =
client.messages().createStreaming(params)) {
streamResponse.stream().forEach(accumulator::accumulate);
}
Usage usage = accumulator.message().usage();
IO.println("Input tokens: " + usage.inputTokens());
IO.println("Output tokens: " + usage.outputTokens());
}
<?php
use Anthropic\Client;
use Anthropic\Messages\Model;
use Anthropic\Messages\RawMessageDeltaEvent;
use Anthropic\Messages\RawMessageStartEvent;
$client = new Client();
$stream = $client->messages->createStream(
maxTokens: 65536,
model: Model::CLAUDE_OPUS_4_7,
tools: [
[
'name' => 'make_file',
'description' => 'Write text to a file',
'eager_input_streaming' => true,
'input_schema' => [
'type' => 'object',
'properties' => [
'filename' => [
'type' => 'string',
'description' => 'The filename to write text to',
],
'lines_of_text' => [
'type' => 'array',
'description' => 'An array of lines of text to write to the file',
],
],
'required' => ['filename', 'lines_of_text'],
],
],
],
messages: [
[
'role' => 'user',
'content' => 'Can you write a long poem and make a file called poem.txt?',
],
],
);
$inputTokens = 0;
$outputTokens = 0;
foreach ($stream as $event) {
if ($event instanceof RawMessageStartEvent) {
$inputTokens = $event->message->usage->inputTokens;
} elseif ($event instanceof RawMessageDeltaEvent) {
$outputTokens = $event->usage->outputTokens;
}
}
echo "Input tokens: {$inputTokens}\n";
echo "Output tokens: {$outputTokens}\n";
require "anthropic"
anthropic = Anthropic::Client.new
stream = anthropic.messages.stream(
model: Anthropic::Models::Model::CLAUDE_OPUS_4_7,
max_tokens: 65_536,
tools: [
{
name: "make_file",
description: "Write text to a file",
eager_input_streaming: true,
input_schema: {
type: "object",
properties: {
filename: {
type: "string",
description: "The filename to write text to"
},
lines_of_text: {
type: "array",
description: "An array of lines of text to write to the file"
}
},
required: ["filename", "lines_of_text"]
}
}
],
messages: [
{
role: "user",
content: "Can you write a long poem and make a file called poem.txt?"
}
]
)
usage = stream.accumulated_message.usage
puts "Input tokens: #{usage.input_tokens}"
puts "Output tokens: #{usage.output_tokens}"
在此示例中,细粒度工具流式传输使 Claude 能够将长诗的行流式传输到 make_file 工具调用中,而无需缓冲来验证 lines_of_text 参数是否为有效 JSON。这意味着你可以看到参数流到达的过程,而无需等待整个参数缓冲和验证。
使用细粒度工具流式传输时,工具输入块会更早开始到达,因为服务器跳过了 JSON 验证缓冲。作为副作用,块通常更长,包含更少的中间 token 断裂。
由于细粒度流式传输在没有缓冲或 JSON 验证的情况下发送参数,无法保证结果流将以有效的 JSON 字符串完成。
特别是,如果达到停止原因 max_tokens,流可能在参数中途结束,可能不完整。你通常需要编写特定的支持来处理达到 max_tokens 的情况。
累加工具输入 delta
当 tool_use 内容块流式传输时,初始 content_block_start 事件包含 input: {}(空对象)。这是一个占位符。实际输入作为一系列 input_json_delta 事件到达,每个事件携带一个 partial_json 字符串片段。要组装完整输入,请在块关闭时连接这些片段并解析结果。
当你的 SDK 提供累加器辅助工具(如本页第一个示例中使用的)时,它会为你处理。手动模式适用于没有辅助工具的 SDK,或当你需要在块关闭前对部分输入做出反应时。
累加契约:
- 在
type: "tool_use"的content_block_start时,初始化空字符串:input_json = "" - 对于每个
type: "input_json_delta"的content_block_delta,追加:input_json += event.delta.partial_json - 在
content_block_stop时,解析累加的字符串:json.loads(input_json)
初始 input: {}(对象)和 partial_json(字符串)之间的类型不匹配是有意设计的。空对象标记内容数组中的槽位;delta 字符串构建实际值。
import json
import anthropic
client = anthropic.Anthropic()
tool_inputs: dict[int, str] = {} # index -> accumulated JSON string
with client.messages.stream(
model="claude-opus-4-7",
max_tokens=1024,
tools=[
{
"name": "get_weather",
"description": "Get current weather for a city",
"eager_input_streaming": True,
"input_schema": {
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"],
},
}
],
messages=[{"role": "user", "content": "Weather in Paris?"}],
) as stream:
for event in stream:
match event.type:
case "content_block_start" if event.content_block.type == "tool_use":
tool_inputs[event.index] = ""
case "content_block_delta" if event.delta.type == "input_json_delta":
tool_inputs[event.index] += event.delta.partial_json
case "content_block_stop" if event.index in tool_inputs:
parsed = json.loads(tool_inputs[event.index])
print(f"Tool input: {parsed}")
import Anthropic from "@anthropic-ai/sdk";
const anthropic = new Anthropic();
const toolInputs = new Map<number, string>();
const stream = anthropic.messages.stream({
model: "claude-opus-4-7",
max_tokens: 1024,
tools: [
{
name: "get_weather",
description: "Get current weather for a city",
eager_input_streaming: true,
input_schema: {
type: "object",
properties: { city: { type: "string" } },
required: ["city"]
}
}
],
messages: [{ role: "user", content: "Weather in Paris?" }]
});
for await (const event of stream) {
if (event.type === "content_block_start" && event.content_block.type === "tool_use") {
toolInputs.set(event.index, "");
} else if (event.type === "content_block_delta" && event.delta.type === "input_json_delta") {
toolInputs.set(
event.index,
(toolInputs.get(event.index) ?? "") + event.delta.partial_json
);
} else if (event.type === "content_block_stop" && toolInputs.has(event.index)) {
const parsed = JSON.parse(toolInputs.get(event.index)!);
console.log("Tool input:", parsed);
}
}
using System.Text;
using System.Text.Json;
using Anthropic;
using Anthropic.Models.Messages;
AnthropicClient client = new();
MessageCreateParams parameters = new()
{
Model = Model.ClaudeOpus4_7,
MaxTokens = 1024,
Tools =
[
new Tool
{
Name = "get_weather",
Description = "Get current weather for a city",
EagerInputStreaming = true,
InputSchema = new InputSchema
{
Properties = new Dictionary<string, JsonElement>
{
["city"] = JsonSerializer.SerializeToElement(new { type = "string" }),
},
Required = ["city"],
},
},
],
Messages = [new() { Role = Role.User, Content = "Weather in Paris?" }],
};
// 块索引 -> 累积的 JSON 片段
// C# SDK 目前不提供工具输入的流累加器;
// 此处展示的手动模式是支持的方法。
var toolInputs = new Dictionary<long, StringBuilder>();
await foreach (var streamEvent in client.Messages.CreateStreaming(parameters))
{
if (
streamEvent.TryPickContentBlockStart(out var start)
&& start.ContentBlock.TryPickToolUse(out _)
)
{
toolInputs[start.Index] = new StringBuilder();
}
else if (
streamEvent.TryPickContentBlockDelta(out var delta)
&& delta.Delta.TryPickInputJson(out var inputJson)
)
{
toolInputs[delta.Index].Append(inputJson.PartialJson);
}
else if (
streamEvent.TryPickContentBlockStop(out var stop)
&& toolInputs.TryGetValue(stop.Index, out var accumulated)
)
{
using var parsed = JsonDocument.Parse(accumulated.ToString());
Console.WriteLine({{CONTENT}}quot;Tool input: {parsed.RootElement}");
}
}
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/anthropics/anthropic-sdk-go"
)
func main() {
client := anthropic.NewClient()
toolInputs := map[int64]string{} // content block index -> accumulated JSON
stream := client.Messages.NewStreaming(context.Background(), anthropic.MessageNewParams{
Model: anthropic.ModelClaudeOpus4_7,
MaxTokens: 1024,
Tools: []anthropic.ToolUnionParam{{
OfTool: &anthropic.ToolParam{
Name: "get_weather",
Description: anthropic.String("Get current weather for a city"),
EagerInputStreaming: anthropic.Bool(true),
InputSchema: anthropic.ToolInputSchemaParam{
Properties: map[string]any{
"city": map[string]any{"type": "string"},
},
Required: []string{"city"},
},
},
}},
Messages: []anthropic.MessageParam{
anthropic.NewUserMessage(anthropic.NewTextBlock("Weather in Paris?")),
},
})
for stream.Next() {
switch event := stream.Current().AsAny().(type) {
case anthropic.ContentBlockStartEvent:
if _, ok := event.ContentBlock.AsAny().(anthropic.ToolUseBlock); ok {
toolInputs[event.Index] = ""
}
case anthropic.ContentBlockDeltaEvent:
if delta, ok := event.Delta.AsAny().(anthropic.InputJSONDelta); ok {
toolInputs[event.Index] += delta.PartialJSON
}
case anthropic.ContentBlockStopEvent:
if accumulated, ok := toolInputs[event.Index]; ok {
var parsed map[string]any
if err := json.Unmarshal([]byte(accumulated), &parsed); err != nil {
panic(err)
}
fmt.Println("Tool input:", parsed)
}
}
}
if err := stream.Err(); err != nil {
panic(err)
}
}
import com.anthropic.client.AnthropicClient;
import com.anthropic.client.okhttp.AnthropicOkHttpClient;
import com.anthropic.core.JsonValue;
import com.anthropic.core.http.StreamResponse;
import com.anthropic.models.messages.MessageCreateParams;
import com.anthropic.models.messages.Model;
import com.anthropic.models.messages.RawMessageStreamEvent;
import com.anthropic.models.messages.Tool;
import com.fasterxml.jackson.databind.ObjectMapper;
void main() throws Exception {
AnthropicClient client = AnthropicOkHttpClient.fromEnv();
ObjectMapper objectMapper = new ObjectMapper();
Tool weatherTool = Tool.builder()
.name("get_weather")
.description("Get current weather for a city")
.eagerInputStreaming(true)
.inputSchema(Tool.InputSchema.builder()
.properties(Tool.InputSchema.Properties.builder()
.putAdditionalProperty("city", JsonValue.from(Map.of("type", "string")))
.build())
.addRequired("city")
.build())
.build();
MessageCreateParams createParams = MessageCreateParams.builder()
.model(Model.CLAUDE_OPUS_4_7)
.maxTokens(1024)
.addTool(weatherTool)
.addUserMessage("Weather in Paris?")
.build();
// 内容块索引 -> 累积的工具输入 JSON
Map<Long, StringBuilder> toolInputs = new HashMap<>();
try (StreamResponse<RawMessageStreamEvent> streamResponse = client.messages().createStreaming(createParams)) {
var eventIterator = streamResponse.stream().iterator();
while (eventIterator.hasNext()) {
RawMessageStreamEvent event = eventIterator.next();
if (event.isContentBlockStart()) {
var blockStart = event.asContentBlockStart();
if (blockStart.contentBlock().isToolUse()) {
toolInputs.put(blockStart.index(), new StringBuilder());
}
} else if (event.isContentBlockDelta()) {
var blockDelta = event.asContentBlockDelta();
if (blockDelta.delta().isInputJson() && toolInputs.containsKey(blockDelta.index())) {
toolInputs.get(blockDelta.index()).append(blockDelta.delta().asInputJson().partialJson());
}
} else if (event.isContentBlockStop()) {
var blockStop = event.asContentBlockStop();
if (toolInputs.containsKey(blockStop.index())) {
var parsedInput = objectMapper.readTree(toolInputs.get(blockStop.index()).toString());
IO.println("Tool input: " + parsedInput);
}
}
}
}
}
<?php
use Anthropic\Client;
use Anthropic\Messages\InputJSONDelta;
use Anthropic\Messages\Model;
use Anthropic\Messages\RawContentBlockDeltaEvent;
use Anthropic\Messages\RawContentBlockStartEvent;
use Anthropic\Messages\RawContentBlockStopEvent;
use Anthropic\Messages\ToolUseBlock;
$client = new Client();
// PHP SDK 目前不提供工具输入的流累加器;
// 此处展示的手动模式是支持的方法。
$toolInputs = []; // index => accumulated JSON string
$stream = $client->messages->createStream(
maxTokens: 1024,
model: Model::CLAUDE_OPUS_4_7,
tools: [
[
'name' => 'get_weather',
'description' => 'Get current weather for a city',
'eager_input_streaming' => true,
'input_schema' => [
'type' => 'object',
'properties' => ['city' => ['type' => 'string']],
'required' => ['city'],
],
],
],
messages: [['role' => 'user', 'content' => 'Weather in Paris?']],
);
foreach ($stream as $event) {
if (
$event instanceof RawContentBlockStartEvent
&& $event->contentBlock instanceof ToolUseBlock
) {
$toolInputs[$event->index] = '';
} elseif (
$event instanceof RawContentBlockDeltaEvent
&& $event->delta instanceof InputJSONDelta
) {
$toolInputs[$event->index] .= $event->delta->partialJSON;
} elseif (
$event instanceof RawContentBlockStopEvent
&& isset($toolInputs[$event->index])
) {
$parsed = json_decode($toolInputs[$event->index], associative: true, flags: JSON_THROW_ON_ERROR);
echo "Tool input: " . json_encode($parsed) . "\n";
}
}
require "anthropic"
require "json"
client = Anthropic::Client.new
tool_inputs = {} # index -> accumulated JSON string
stream = client.messages.stream_raw(
model: Anthropic::Models::Model::CLAUDE_OPUS_4_7,
max_tokens: 1024,
tools: [
{
name: "get_weather",
description: "Get current weather for a city",
eager_input_streaming: true,
input_schema: {
type: "object",
properties: {city: {type: "string"}},
required: ["city"]
}
}
],
messages: [{role: "user", content: "Weather in Paris?"}]
)
stream.each do |event|
case event
when Anthropic::Models::RawContentBlockStartEvent
tool_inputs[event.index] = +"" if event.content_block.type == :tool_use
when Anthropic::Models::RawContentBlockDeltaEvent
if event.delta.is_a?(Anthropic::Models::InputJSONDelta)
tool_inputs[event.index] << event.delta.partial_json
end
when Anthropic::Models::RawContentBlockStopEvent
if tool_inputs.key?(event.index)
parsed = JSON.parse(tool_inputs[event.index])
puts "Tool input: #{parsed}"
end
end
end
当你需要在块关闭前对部分输入做出反应时(例如渲染进度指示器),请使用手动模式。否则,请优先使用 SDK 的累加器辅助工具(如本页第一个示例中使用的)。
处理工具响应中的无效 JSON
使用细粒度工具流式传输时,你可能会从模型收到无效或不完整的 JSON。如果你需要将此无效 JSON 传回模型的错误响应块中,可以将其包装在 JSON 对象中以确保正确处理(使用合理的键)。例如:
{
"INVALID_JSON": "<your invalid json string>"
}
此方法帮助模型理解内容是无效 JSON,同时保留原始格式错误的数据以用于调试目的。
包装无效 JSON 时,请确保正确转义无效 JSON 字符串中的任何引号或特殊字符,以在包装对象中维护有效的 JSON 结构。