From 185e1d66bbb706d05b41b2dd0f3118b9d4b5f745 Mon Sep 17 00:00:00 2001 From: Adesh Nalpet Adimurthy <390.adesh@gmail.com> Date: Fri, 27 Feb 2026 20:09:42 -0500 Subject: [PATCH 1/3] OpenAI Responses Connection and Setup --- .../agents/api/resource/ResourceName.java | 6 + .../test/ChatModelIntegrationAgent.java | 14 + .../test/ChatModelIntegrationTest.java | 2 +- .../openai/OpenAIResponseModelConnection.java | 468 ++++++++++++++++++ .../openai/OpenAIResponseModelSetup.java | 226 +++++++++ 5 files changed, 715 insertions(+), 1 deletion(-) create mode 100644 integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponseModelConnection.java create mode 100644 integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponseModelSetup.java diff --git a/api/src/main/java/org/apache/flink/agents/api/resource/ResourceName.java b/api/src/main/java/org/apache/flink/agents/api/resource/ResourceName.java index 39da35aa3..762d041b2 100644 --- a/api/src/main/java/org/apache/flink/agents/api/resource/ResourceName.java +++ b/api/src/main/java/org/apache/flink/agents/api/resource/ResourceName.java @@ -71,6 +71,12 @@ public static final class ChatModel { public static final String OPENAI_SETUP = "org.apache.flink.agents.integrations.chatmodels.openai.OpenAIChatModelSetup"; + // OpenAI Responses API + public static final String OPENAI_RESPONSE_CONNECTION = + "org.apache.flink.agents.integrations.chatmodels.openai.OpenAIResponseModelConnection"; + public static final String OPENAI_RESPONSE_SETUP = + "org.apache.flink.agents.integrations.chatmodels.openai.OpenAIResponseModelSetup"; + // Python Wrapper public static final String PYTHON_WRAPPER_CONNECTION = "org.apache.flink.agents.api.chat.model.python.PythonChatModelConnection"; diff --git a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationAgent.java b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationAgent.java index 999771693..38236d3a2 100644 --- a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationAgent.java +++ b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationAgent.java @@ -84,6 +84,11 @@ public static ResourceDescriptor chatModelConnection() { return ResourceDescriptor.Builder.newBuilder(ResourceName.ChatModel.OPENAI_CONNECTION) .addInitialArgument("api_key", apiKey) .build(); + } else if (provider.equals("OPENAI_RESPONSES")) { + return ResourceDescriptor.Builder.newBuilder( + ResourceName.ChatModel.OPENAI_RESPONSE_CONNECTION) + .addInitialArgument("api_key", System.getenv().get("OPENAI_API_KEY")) + .build(); } else if (provider.equals("ANTHROPIC")) { String apiKey = System.getenv().get("ANTHROPIC_API_KEY"); return ResourceDescriptor.Builder.newBuilder( @@ -133,6 +138,15 @@ public static ResourceDescriptor chatModel() { "tools", List.of("calculateBMI", "convertTemperature", "createRandomNumber")) .build(); + } else if (provider.equals("OPENAI_RESPONSES")) { + return ResourceDescriptor.Builder.newBuilder( + ResourceName.ChatModel.OPENAI_RESPONSE_SETUP) + .addInitialArgument("connection", "chatModelConnection") + .addInitialArgument("model", "gpt-4o-mini") + .addInitialArgument( + "tools", + List.of("calculateBMI", "convertTemperature", "createRandomNumber")) + .build(); } else { throw new RuntimeException(String.format("Unknown model provider %s", provider)); } diff --git a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationTest.java b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationTest.java index c843b970a..75a3d5c1e 100644 --- a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationTest.java +++ b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationTest.java @@ -53,7 +53,7 @@ public ChatModelIntegrationTest() throws IOException { } @ParameterizedTest() - @ValueSource(strings = {"ANTHROPIC", "AZURE", "OLLAMA", "OPENAI"}) + @ValueSource(strings = {"ANTHROPIC", "AZURE", "OLLAMA", "OPENAI", "OPENAI_RESPONSES"}) public void testChatModeIntegration(String provider) throws Exception { Assumptions.assumeTrue( (OLLAMA.equals(provider) && ollamaReady) diff --git a/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponseModelConnection.java b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponseModelConnection.java new file mode 100644 index 000000000..93ef95761 --- /dev/null +++ b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponseModelConnection.java @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.agents.integrations.chatmodels.openai; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.openai.client.OpenAIClient; +import com.openai.client.okhttp.OpenAIOkHttpClient; +import com.openai.core.JsonValue; +import com.openai.models.ChatModel; +import com.openai.models.Reasoning; +import com.openai.models.ReasoningEffort; +import com.openai.models.responses.*; +import org.apache.flink.agents.api.chat.messages.ChatMessage; +import org.apache.flink.agents.api.chat.messages.MessageRole; +import org.apache.flink.agents.api.chat.model.BaseChatModelConnection; +import org.apache.flink.agents.api.resource.Resource; +import org.apache.flink.agents.api.resource.ResourceDescriptor; +import org.apache.flink.agents.api.resource.ResourceType; +import org.apache.flink.agents.api.tools.ToolMetadata; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.BiFunction; + +/** + * A dedicated OpenAI chat model integration using the Responses API. + * + *

Unlike {@link OpenAIChatModelConnection} which uses the Chat Completions API and works with + * any OpenAI-compatible provider (DeepSeek, DashScope, etc.), this implementation uses OpenAI's + * Responses API which is specific to OpenAI and offers additional capabilities: + * + *

+ * + *

For OpenAI-compatible providers that only support the Chat Completions API, use {@link + * OpenAIChatModelConnection} instead. + * + *

Supported connection parameters: + * + *

+ * + *

Example usage: + * + *

{@code
+ * public class MyAgent extends Agent {
+ *   @ChatModelConnection
+ *   public static ResourceDesc openAIResponses() {
+ *     return ResourceDescriptor.Builder.newBuilder(OpenAIResponseModelConnection.class.getName())
+ *             .addInitialArgument("api_key", System.getenv("OPENAI_API_KEY"))
+ *             .addInitialArgument("timeout", 120)
+ *             .addInitialArgument("max_retries", 3)
+ *             .build();
+ *   }
+ * }
+ * }
+ */ +public class OpenAIResponseModelConnection extends BaseChatModelConnection { + + private static final TypeReference> MAP_TYPE = new TypeReference<>() {}; + + private final ObjectMapper mapper = new ObjectMapper(); + private final OpenAIClient client; + private final String defaultModel; + + public OpenAIResponseModelConnection( + ResourceDescriptor descriptor, BiFunction getResource) { + super(descriptor, getResource); + + String apiKey = descriptor.getArgument("api_key"); + if (apiKey == null || apiKey.isBlank()) { + throw new IllegalArgumentException("api_key should not be null or empty."); + } + + OpenAIOkHttpClient.Builder builder = new OpenAIOkHttpClient.Builder().apiKey(apiKey); + + String apiBaseUrl = descriptor.getArgument("api_base_url"); + if (apiBaseUrl != null && !apiBaseUrl.isBlank()) { + builder.baseUrl(apiBaseUrl); + } + + Integer timeoutSeconds = descriptor.getArgument("timeout"); + if (timeoutSeconds != null && timeoutSeconds > 0) { + builder.timeout(Duration.ofSeconds(timeoutSeconds)); + } + + Integer maxRetries = descriptor.getArgument("max_retries"); + if (maxRetries != null && maxRetries >= 0) { + builder.maxRetries(maxRetries); + } + + Map defaultHeaders = descriptor.getArgument("default_headers"); + if (defaultHeaders != null && !defaultHeaders.isEmpty()) { + for (Map.Entry header : defaultHeaders.entrySet()) { + builder.putHeader(header.getKey(), header.getValue()); + } + } + + this.defaultModel = descriptor.getArgument("model"); + this.client = builder.build(); + } + + @Override + public ChatMessage chat( + List messages, + List tools, + Map arguments) { + try { + ResponseCreateParams params = buildRequest(messages, tools, arguments); + Response response = client.responses().create(params); + ChatMessage result = convertResponse(response); + + if (response.usage().isPresent()) { + String modelName = arguments != null ? (String) arguments.get("model") : null; + if (modelName == null || modelName.isBlank()) { + modelName = this.defaultModel; + } + if (modelName != null && !modelName.isBlank()) { + recordTokenMetrics( + modelName, + response.usage().get().inputTokens(), + response.usage().get().outputTokens()); + } + } + + return result; + } catch (Exception e) { + throw new RuntimeException("Failed to call OpenAI Responses API.", e); + } + } + + private ResponseCreateParams buildRequest( + List messages, + List tools, + Map rawArguments) { + Map arguments = + rawArguments != null ? new HashMap<>(rawArguments) : new HashMap<>(); + + boolean strictMode = Boolean.TRUE.equals(arguments.remove("strict")); + String modelName = (String) arguments.remove("model"); + if (modelName == null || modelName.isBlank()) { + modelName = this.defaultModel; + } + + List inputItems = convertInputItems(messages); + + ResponseCreateParams.Builder builder = + ResponseCreateParams.builder() + .model(ChatModel.of(modelName)) + .inputOfResponse(inputItems); + + if (tools != null && !tools.isEmpty()) { + builder.tools(convertTools(tools, strictMode)); + } + + Object temperature = arguments.remove("temperature"); + if (temperature instanceof Number) { + builder.temperature(((Number) temperature).doubleValue()); + } + + Object maxTokens = arguments.remove("max_tokens"); + if (maxTokens instanceof Number) { + builder.maxOutputTokens(((Number) maxTokens).longValue()); + } + + Object reasoningEffort = arguments.remove("reasoning_effort"); + if (reasoningEffort instanceof String) { + builder.reasoning( + Reasoning.builder() + .effort(ReasoningEffort.of((String) reasoningEffort)) + .build()); + } + + Object store = arguments.remove("store"); + if (Boolean.TRUE.equals(store)) { + builder.store(true); + } + + Object previousResponseId = arguments.remove("previous_response_id"); + if (previousResponseId instanceof String) { + builder.previousResponseId((String) previousResponseId); + } + + Object instructions = arguments.remove("instructions"); + if (instructions instanceof String) { + builder.instructions((String) instructions); + } + + @SuppressWarnings("unchecked") + Map additionalKwargs = + (Map) arguments.remove("additional_kwargs"); + if (additionalKwargs != null) { + additionalKwargs.forEach( + (key, value) -> builder.putAdditionalBodyProperty(key, toJsonValue(value))); + } + + return builder.build(); + } + + private List convertInputItems(List messages) { + List items = new ArrayList<>(); + for (ChatMessage message : messages) { + items.addAll(convertSingleMessage(message)); + } + return items; + } + + private List convertSingleMessage(ChatMessage message) { + List items = new ArrayList<>(); + MessageRole role = message.getRole(); + String content = Optional.ofNullable(message.getContent()).orElse(""); + + switch (role) { + case SYSTEM: + items.add( + ResponseInputItem.ofMessage( + ResponseInputItem.Message.builder() + .role(ResponseInputItem.Message.Role.SYSTEM) + .addInputTextContent(content) + .build())); + break; + + case USER: + items.add( + ResponseInputItem.ofMessage( + ResponseInputItem.Message.builder() + .role(ResponseInputItem.Message.Role.USER) + .addInputTextContent(content) + .build())); + break; + + case ASSISTANT: + List> toolCalls = message.getToolCalls(); + if (toolCalls != null && !toolCalls.isEmpty()) { + for (Map call : toolCalls) { + Map functionPayload = toMap(call.get("function")); + String callId = String.valueOf(call.get("id")); + String name = String.valueOf(functionPayload.get("name")); + String args = serializeArguments(functionPayload.get("arguments")); + + items.add( + ResponseInputItem.ofFunctionCall( + ResponseFunctionToolCall.builder() + .id(callId) + .callId(callId) + .name(name) + .arguments(args) + .status(ResponseFunctionToolCall.Status.COMPLETED) + .build())); + } + } + if (!content.isEmpty()) { + items.add( + ResponseInputItem.ofEasyInputMessage( + EasyInputMessage.builder() + .role(EasyInputMessage.Role.ASSISTANT) + .content(content) + .build())); + } + break; + + case TOOL: + Object toolCallId = message.getExtraArgs().get("externalId"); + if (toolCallId == null) { + throw new IllegalArgumentException( + "Tool message must have an externalId in extraArgs."); + } + items.add( + ResponseInputItem.ofFunctionCallOutput( + ResponseInputItem.FunctionCallOutput.builder() + .callId(toolCallId.toString()) + .output(content) + .build())); + break; + + default: + throw new IllegalArgumentException("Unsupported role: " + role); + } + return items; + } + + private List convertTools( + List tools, boolean strictMode) { + List responsesTools = new ArrayList<>(tools.size()); + for (org.apache.flink.agents.api.tools.Tool tool : tools) { + ToolMetadata metadata = tool.getMetadata(); + FunctionTool.Builder functionBuilder = + FunctionTool.builder() + .name(metadata.getName()) + .description(metadata.getDescription()); + + String schema = metadata.getInputSchema(); + if (schema != null && !schema.isBlank()) { + functionBuilder.parameters(parseToolParameters(schema)); + } + + functionBuilder.strict(strictMode); + + responsesTools.add(Tool.ofFunction(functionBuilder.build())); + } + return responsesTools; + } + + private ChatMessage convertResponse(Response response) { + List output = response.output(); + if (output == null || output.isEmpty()) { + throw new IllegalStateException("OpenAI Responses API did not return any output."); + } + + StringBuilder textContent = new StringBuilder(); + StringBuilder refusalContent = new StringBuilder(); + List> toolCalls = new ArrayList<>(); + + for (ResponseOutputItem item : output) { + if (item.isMessage()) { + ResponseOutputMessage msg = item.asMessage(); + for (ResponseOutputMessage.Content contentBlock : msg.content()) { + if (contentBlock.isOutputText()) { + textContent.append(contentBlock.asOutputText().text()); + } else if (contentBlock.isRefusal()) { + refusalContent.append(contentBlock.asRefusal().refusal()); + } + } + } else if (item.isFunctionCall()) { + ResponseFunctionToolCall fc = item.asFunctionCall(); + Map callMap = new LinkedHashMap<>(); + + String callId = fc.callId(); + if (callId == null || callId.isBlank()) { + throw new IllegalStateException( + "OpenAI Responses API returned a function call without a call_id."); + } + + callMap.put("id", callId); + callMap.put("type", "function"); + + Map functionMap = new LinkedHashMap<>(); + functionMap.put("name", fc.name()); + functionMap.put("arguments", parseArguments(fc.arguments())); + callMap.put("function", functionMap); + callMap.put("original_id", callId); + + toolCalls.add(callMap); + } + } + + ChatMessage result = ChatMessage.assistant(textContent.toString()); + if (!toolCalls.isEmpty()) { + result.setToolCalls(toolCalls); + } + + if (refusalContent.length() > 0) { + result.getExtraArgs().put("refusal", refusalContent.toString()); + } + + result.getExtraArgs().put("response_id", response.id()); + + return result; + } + + private FunctionTool.Parameters parseToolParameters(String schemaJson) { + try { + JsonNode root = mapper.readTree(schemaJson); + if (root == null || !root.isObject()) { + return FunctionTool.Parameters.builder().build(); + } + FunctionTool.Parameters.Builder builder = FunctionTool.Parameters.builder(); + root.fields() + .forEachRemaining( + entry -> + builder.putAdditionalProperty( + entry.getKey(), + JsonValue.fromJsonNode(entry.getValue()))); + return builder.build(); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to parse tool schema JSON.", e); + } + } + + private Map parseArguments(String arguments) { + if (arguments == null || arguments.isBlank()) { + return Map.of(); + } + try { + return mapper.readValue(arguments, MAP_TYPE); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to parse tool arguments: " + arguments, e); + } + } + + private JsonValue toJsonValue(Object value) { + if (value instanceof JsonValue) { + return (JsonValue) value; + } + if (value instanceof String + || value instanceof Number + || value instanceof Boolean + || value == null) { + return JsonValue.from(value); + } + return JsonValue.fromJsonNode(mapper.valueToTree(value)); + } + + private String serializeArguments(Object arguments) { + if (arguments == null) { + return "{}"; + } + if (arguments instanceof String) { + return (String) arguments; + } + try { + return mapper.writeValueAsString(arguments); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to serialize tool call arguments.", e); + } + } + + private Map toMap(Object value) { + if (value instanceof Map) { + @SuppressWarnings("unchecked") + Map casted = (Map) value; + return new LinkedHashMap<>(casted); + } + if (value == null) { + return new LinkedHashMap<>(); + } + return mapper.convertValue(value, MAP_TYPE); + } + + @Override + public void close() throws Exception { + this.client.close(); + } +} diff --git a/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponseModelSetup.java b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponseModelSetup.java new file mode 100644 index 000000000..7a04c56c7 --- /dev/null +++ b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponseModelSetup.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.agents.integrations.chatmodels.openai; + +import org.apache.flink.agents.api.chat.model.BaseChatModelSetup; +import org.apache.flink.agents.api.resource.Resource; +import org.apache.flink.agents.api.resource.ResourceDescriptor; +import org.apache.flink.agents.api.resource.ResourceType; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * Chat model setup for the OpenAI Responses API. + * + *

Responsible for providing per-chat configuration such as model, temperature, max tokens, tool + * bindings, and Responses API-specific parameters. The setup delegates execution to {@link + * OpenAIResponseModelConnection}. + * + *

Example usage: + * + *

{@code
+ * public class MyAgent extends Agent {
+ *   @ChatModelSetup
+ *   public static ResourceDesc openAIResponses() {
+ *     return ResourceDescriptor.Builder.newBuilder(OpenAIResponseModelSetup.class.getName())
+ *             .addInitialArgument("connection", "myOpenAIResponseConnection")
+ *             .addInitialArgument("model", "gpt-4o")
+ *             .addInitialArgument("temperature", 0.3d)
+ *             .addInitialArgument("max_tokens", 2048)
+ *             .addInitialArgument("strict", true)
+ *             .addInitialArgument("reasoning_effort", "medium")
+ *             .addInitialArgument("store", true)
+ *             .addInitialArgument("tools", List.of("getWeather", "searchDB"))
+ *             .build();
+ *   }
+ * }
+ * }
+ */ +public class OpenAIResponseModelSetup extends BaseChatModelSetup { + + private static final String DEFAULT_MODEL = "gpt-4o"; + private static final double DEFAULT_TEMPERATURE = 0.1d; + private static final boolean DEFAULT_STRICT = false; + private static final boolean DEFAULT_STORE = false; + private static final Set VALID_REASONING_EFFORTS = Set.of("low", "medium", "high"); + + private final Double temperature; + private final Integer maxTokens; + private final Boolean strict; + private final String reasoningEffort; + private final Boolean store; + private final String previousResponseId; + private final String instructions; + private final Map additionalArguments; + + public OpenAIResponseModelSetup( + ResourceDescriptor descriptor, BiFunction getResource) { + super(descriptor, getResource); + + this.temperature = + Optional.ofNullable(descriptor.getArgument("temperature")) + .map(Number::doubleValue) + .orElse(DEFAULT_TEMPERATURE); + if (this.temperature < 0.0 || this.temperature > 2.0) { + throw new IllegalArgumentException("temperature must be between 0.0 and 2.0"); + } + + this.maxTokens = + Optional.ofNullable(descriptor.getArgument("max_tokens")) + .map(Number::intValue) + .orElse(null); + if (this.maxTokens != null && this.maxTokens <= 0) { + throw new IllegalArgumentException("max_tokens must be greater than 0"); + } + + this.strict = + Optional.ofNullable(descriptor.getArgument("strict")) + .orElse(DEFAULT_STRICT); + + this.reasoningEffort = descriptor.getArgument("reasoning_effort"); + if (this.reasoningEffort != null + && !VALID_REASONING_EFFORTS.contains(this.reasoningEffort)) { + throw new IllegalArgumentException( + "reasoning_effort must be one of: low, medium, high"); + } + + this.store = + Optional.ofNullable(descriptor.getArgument("store")).orElse(DEFAULT_STORE); + + this.previousResponseId = descriptor.getArgument("previous_response_id"); + + this.instructions = descriptor.getArgument("instructions"); + + this.additionalArguments = + Optional.ofNullable( + descriptor.>getArgument("additional_kwargs")) + .map(HashMap::new) + .orElseGet(HashMap::new); + + if (this.model == null || this.model.isBlank()) { + this.model = DEFAULT_MODEL; + } + } + + public OpenAIResponseModelSetup( + String model, + double temperature, + Integer maxTokens, + Boolean strict, + String reasoningEffort, + Boolean store, + String previousResponseId, + String instructions, + Map additionalArguments, + List tools, + BiFunction getResource) { + this( + createDescriptor( + model, + temperature, + maxTokens, + strict, + reasoningEffort, + store, + previousResponseId, + instructions, + additionalArguments, + tools), + getResource); + } + + @Override + public Map getParameters() { + Map parameters = new HashMap<>(); + if (model != null) { + parameters.put("model", model); + } + parameters.put("temperature", temperature); + if (maxTokens != null) { + parameters.put("max_tokens", maxTokens); + } + if (strict) { + parameters.put("strict", strict); + } + if (reasoningEffort != null) { + parameters.put("reasoning_effort", reasoningEffort); + } + if (store) { + parameters.put("store", store); + } + if (previousResponseId != null) { + parameters.put("previous_response_id", previousResponseId); + } + if (instructions != null) { + parameters.put("instructions", instructions); + } + if (additionalArguments != null && !additionalArguments.isEmpty()) { + parameters.put("additional_kwargs", additionalArguments); + } + return parameters; + } + + private static ResourceDescriptor createDescriptor( + String model, + double temperature, + Integer maxTokens, + Boolean strict, + String reasoningEffort, + Boolean store, + String previousResponseId, + String instructions, + Map additionalArguments, + List tools) { + ResourceDescriptor.Builder builder = + ResourceDescriptor.Builder.newBuilder(OpenAIResponseModelSetup.class.getName()) + .addInitialArgument("model", model) + .addInitialArgument("temperature", temperature); + + if (maxTokens != null) { + builder.addInitialArgument("max_tokens", maxTokens); + } + if (strict != null) { + builder.addInitialArgument("strict", strict); + } + if (reasoningEffort != null) { + builder.addInitialArgument("reasoning_effort", reasoningEffort); + } + if (store != null) { + builder.addInitialArgument("store", store); + } + if (previousResponseId != null) { + builder.addInitialArgument("previous_response_id", previousResponseId); + } + if (instructions != null) { + builder.addInitialArgument("instructions", instructions); + } + if (additionalArguments != null && !additionalArguments.isEmpty()) { + builder.addInitialArgument("additional_kwargs", additionalArguments); + } + if (tools != null && !tools.isEmpty()) { + builder.addInitialArgument("tools", tools); + } + + return builder.build(); + } +} From 0ac0810968faa3f13c53c82b60c4fb0246e6e2ce Mon Sep 17 00:00:00 2001 From: Adesh Nalpet Adimurthy <390.adesh@gmail.com> Date: Sat, 28 Feb 2026 10:53:22 -0500 Subject: [PATCH 2/3] Remove unused fields for OpenAI responses API --- .../test/ChatModelIntegrationAgent.java | 2 +- .../openai/OpenAIResponseModelConnection.java | 18 ++++++++++-------- .../openai/OpenAIResponseModelSetup.java | 15 +-------------- 3 files changed, 12 insertions(+), 23 deletions(-) diff --git a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationAgent.java b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationAgent.java index 38236d3a2..fc9023171 100644 --- a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationAgent.java +++ b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationAgent.java @@ -84,7 +84,7 @@ public static ResourceDescriptor chatModelConnection() { return ResourceDescriptor.Builder.newBuilder(ResourceName.ChatModel.OPENAI_CONNECTION) .addInitialArgument("api_key", apiKey) .build(); - } else if (provider.equals("OPENAI_RESPONSES")) { + } else if (provider.equals("OPENAI_RESPONSE")) { return ResourceDescriptor.Builder.newBuilder( ResourceName.ChatModel.OPENAI_RESPONSE_CONNECTION) .addInitialArgument("api_key", System.getenv().get("OPENAI_API_KEY")) diff --git a/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponseModelConnection.java b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponseModelConnection.java index 93ef95761..97dbabd5d 100644 --- a/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponseModelConnection.java +++ b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponseModelConnection.java @@ -210,11 +210,6 @@ private ResponseCreateParams buildRequest( builder.store(true); } - Object previousResponseId = arguments.remove("previous_response_id"); - if (previousResponseId instanceof String) { - builder.previousResponseId((String) previousResponseId); - } - Object instructions = arguments.remove("instructions"); if (instructions instanceof String) { builder.instructions((String) instructions); @@ -268,14 +263,15 @@ private List convertSingleMessage(ChatMessage message) { if (toolCalls != null && !toolCalls.isEmpty()) { for (Map call : toolCalls) { Map functionPayload = toMap(call.get("function")); - String callId = String.valueOf(call.get("id")); + String responseId = String.valueOf(call.get("id")); + String callId = String.valueOf(call.get("original_id")); String name = String.valueOf(functionPayload.get("name")); String args = serializeArguments(functionPayload.get("arguments")); items.add( ResponseInputItem.ofFunctionCall( ResponseFunctionToolCall.builder() - .id(callId) + .id(responseId) .callId(callId) .name(name) .arguments(args) @@ -365,7 +361,13 @@ private ChatMessage convertResponse(Response response) { "OpenAI Responses API returned a function call without a call_id."); } - callMap.put("id", callId); + callMap.put( + "id", + fc.id() + .orElseThrow( + () -> + new IllegalStateException( + "OpenAI Responses API returned a function call without an id."))); callMap.put("type", "function"); Map functionMap = new LinkedHashMap<>(); diff --git a/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponseModelSetup.java b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponseModelSetup.java index 7a04c56c7..a1aa1159c 100644 --- a/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponseModelSetup.java +++ b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponseModelSetup.java @@ -69,7 +69,6 @@ public class OpenAIResponseModelSetup extends BaseChatModelSetup { private final Boolean strict; private final String reasoningEffort; private final Boolean store; - private final String previousResponseId; private final String instructions; private final Map additionalArguments; @@ -104,10 +103,7 @@ public OpenAIResponseModelSetup( "reasoning_effort must be one of: low, medium, high"); } - this.store = - Optional.ofNullable(descriptor.getArgument("store")).orElse(DEFAULT_STORE); - - this.previousResponseId = descriptor.getArgument("previous_response_id"); + this.store = Optional.ofNullable(descriptor.getArgument("store")).orElse(DEFAULT_STORE); this.instructions = descriptor.getArgument("instructions"); @@ -129,7 +125,6 @@ public OpenAIResponseModelSetup( Boolean strict, String reasoningEffort, Boolean store, - String previousResponseId, String instructions, Map additionalArguments, List tools, @@ -142,7 +137,6 @@ public OpenAIResponseModelSetup( strict, reasoningEffort, store, - previousResponseId, instructions, additionalArguments, tools), @@ -168,9 +162,6 @@ public Map getParameters() { if (store) { parameters.put("store", store); } - if (previousResponseId != null) { - parameters.put("previous_response_id", previousResponseId); - } if (instructions != null) { parameters.put("instructions", instructions); } @@ -187,7 +178,6 @@ private static ResourceDescriptor createDescriptor( Boolean strict, String reasoningEffort, Boolean store, - String previousResponseId, String instructions, Map additionalArguments, List tools) { @@ -208,9 +198,6 @@ private static ResourceDescriptor createDescriptor( if (store != null) { builder.addInitialArgument("store", store); } - if (previousResponseId != null) { - builder.addInitialArgument("previous_response_id", previousResponseId); - } if (instructions != null) { builder.addInitialArgument("instructions", instructions); } From 3faee26ed97da5b979afb1b1029d8482e7377789 Mon Sep 17 00:00:00 2001 From: Adesh Nalpet Adimurthy <390.adesh@gmail.com> Date: Sat, 28 Feb 2026 11:09:43 -0500 Subject: [PATCH 3/3] Run spotless for linting errors --- .../chatmodels/openai/OpenAIResponseModelSetup.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponseModelSetup.java b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponseModelSetup.java index a1aa1159c..1eee20946 100644 --- a/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponseModelSetup.java +++ b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponseModelSetup.java @@ -103,7 +103,8 @@ public OpenAIResponseModelSetup( "reasoning_effort must be one of: low, medium, high"); } - this.store = Optional.ofNullable(descriptor.getArgument("store")).orElse(DEFAULT_STORE); + this.store = + Optional.ofNullable(descriptor.getArgument("store")).orElse(DEFAULT_STORE); this.instructions = descriptor.getArgument("instructions");