Hermes Agent Python NousResearch/hermes-agent

Provider-Agnostic LLM Adapters: One Agent, Eight APIs

How Hermes Agent normalizes OpenAI, Anthropic, Bedrock, Gemini, Codex Responses, and custom endpoints behind a single call interface

6 stops ~30 min Verified 2026-04-30
What you will learn
  • How each adapter file isolates all provider-specific wire format differences so the rest of the agent never sees them
  • Why Anthropic's `build_anthropic_kwargs()` must handle OAuth identity spoofing, adaptive thinking budget negotiation, and fast-mode beta headers in one place
  • How Bedrock's Converse API forces a different tool schema (`toolSpec` + `inputSchema.json`) that cannot be shared with the OpenAI or Anthropic paths
  • Why Gemini requires role and tool-call mapping that differs structurally from every other provider: `functionCall` parts, `functionResponse` parts, and a `systemInstruction` top-level key instead of a system message
  • How the Codex Responses adapter enforces a strict contract (`store: false`, required fields, no unexpected keys) before the payload ever reaches the wire
  • How `CredentialPool` selects, leases, and rotates API keys across three configurable strategies -- round-robin, least-used, and random -- with automatic exhaustion cooldown and cross-process token sync
Prerequisites
  • Comfortable reading Python; no provider SDK knowledge required
  • Familiarity with what an LLM API call looks like (messages array, tools, max_tokens)
  • Understanding that OpenAI's chat/completions format is the internal lingua franca Hermes normalizes from and to
1 / 6

The Adapter Pattern

agent/anthropic_adapter.py:1

How convention — not inheritance — enforces the isolation contract all adapters follow

Hermes Agent supports eight or more LLM providers without a shared abstract base class or Protocol. The pattern is enforced by convention: each adapter file exports a build_*_kwargs() function that converts Hermes's internal OpenAI-style message list into the exact dict the provider's SDK expects, and a normalize_*_response() that converts the response back to an OpenAI-compatible SimpleNamespace. The module docstring at the top of every adapter is the specification — it names the translation direction, the auth modes supported, and any compatibility caveats.

The lazy import for anthropic is a deliberate startup-time optimization. Provider SDKs are large; deferring the import behind _get_anthropic_sdk() and caching it after the first call means the agent starts in milliseconds and only pays the SDK import cost when that provider is actually used.

anthropic_adapter.py at 1921 lines is the largest adapter because the Anthropic path carries the most auth complexity: three credential sources, OAuth token refresh, Claude Code identity spoofing, and adaptive thinking configuration that differs across model generations.

Key takeaway

Each adapter file is a complete translation layer: wire format in, OpenAI-compatible format out. The lazy SDK import is a deliberate startup-time choice, not an oversight. ---

"""Anthropic Messages API adapter for Hermes Agent.

Translates between Hermes's internal OpenAI-style message format and
Anthropic's Messages API. Follows the same pattern as the codex_responses
adapter — all provider-specific logic is isolated here.

Auth supports:
  - Regular API keys (sk-ant-api*) → x-api-key header
  - OAuth setup-tokens (sk-ant-oat*) → Bearer auth + beta header
  - Claude Code credentials (~/.claude.json or ~/.claude/.credentials.json) → Bearer auth
"""

import copy
import json
import logging
import os
import platform
import subprocess
from pathlib import Path

from hermes_constants import get_hermes_home
from typing import Any, Dict, List, Optional, Tuple
from utils import base_url_host_matches, normalize_proxy_env_vars

# NOTE: `import anthropic` is deliberately NOT at module top — the SDK pulls
# ~220 ms of imports (anthropic.types, anthropic.lib.tools._beta_runner, etc.)
# and the 3 usage sites (build_anthropic_client, build_anthropic_bedrock_client,
# read_claude_code_credentials_from_keychain) are all on cold user-triggered
# paths. Access via the `_get_anthropic_sdk()` accessor below, which caches
# the module after the first call and returns None on ImportError.
_anthropic_sdk: Any = ...  # sentinel — None means "tried and missing"

2 / 6

Anthropic Adapter -- Building the Request

agent/anthropic_adapter.py:1709

How build_anthropic_kwargs() handles four concerns — message conversion, output cap resolution, OAuth identity, and tool choice mapping — in one function

build_anthropic_kwargs() is the reference implementation for the adapter pattern. Its thirteen parameters map to four distinct concerns:

  1. Message conversionconvert_messages_to_anthropic() splits the OpenAI-format list into a system prompt and an anthropic_messages list, since Anthropic requires the system prompt at the top level.
  2. Output cap resolution_resolve_anthropic_messages_max_tokens() clamps max_tokens against the model's native ceiling and the caller-supplied context_length, producing a clear local error instead of a cryptic 400 from the API.
  3. OAuth identity — when is_oauth is True, the function injects a Claude Code system prefix, rewrites product name references in the system prompt, and prefixes all tool names with mcp_ to match Claude Code's tool namespace.
  4. Tool choice mapping — OpenAI's "required" becomes Anthropic's {"type": "any"}; a specific name becomes {"type": "tool", "name": ...}.

None of this logic lives in the agent loop. The calling code passes an OpenAI-style dict and receives an Anthropic-style dict. The ~90 lines after this window (1833--1921) handle reasoning budget negotiation across model generations and the extra_body.speed parameter with its beta header list.

Key takeaway

build_anthropic_kwargs() is not only a dict builder. It also carries OAuth identity spoofing, cross-model thinking normalization, and the Kimi-family exception that disables thinking on endpoints that speak the Anthropic wire format but have incompatible reasoning semantics. ---

def build_anthropic_kwargs(
    model: str,
    messages: List[Dict],
    tools: Optional[List[Dict]],
    max_tokens: Optional[int],
    reasoning_config: Optional[Dict[str, Any]],
    tool_choice: Optional[str] = None,
    is_oauth: bool = False,
    preserve_dots: bool = False,
    context_length: Optional[int] = None,
    base_url: str | None = None,
    fast_mode: bool = False,
    drop_context_1m_beta: bool = False,
) -> Dict[str, Any]:
    """Build kwargs for anthropic.messages.create().

    Naming note — two distinct concepts, easily confused:
      max_tokens     = OUTPUT token cap for a single response.
                       Anthropic's API calls this "max_tokens" but it only
                       limits the *output*.  Anthropic's own native SDK
                       renamed it "max_output_tokens" for clarity.
      context_length = TOTAL context window (input tokens + output tokens).
                       The API enforces: input_tokens + max_tokens ≤ context_length.
                       Stored on the ContextCompressor; reduced on overflow errors.

    When *max_tokens* is None the model's native output ceiling is used
    (e.g. 128K for Opus 4.6, 64K for Sonnet 4.6).

    When *context_length* is provided and the model's native output ceiling
    exceeds it (e.g. a local endpoint with an 8K window), the output cap is
    clamped to context_length − 1.  This only kicks in for unusually small
    context windows; for full-size models the native output cap is always
    smaller than the context window so no clamping happens.
    NOTE: this clamping does not account for prompt size — if the prompt is
    large, Anthropic may still reject the request.  The caller must detect
    "max_tokens too large given prompt" errors and retry with a smaller cap
    (see parse_available_output_tokens_from_error + _ephemeral_max_output_tokens).

    When *is_oauth* is True, applies Claude Code compatibility transforms:
    system prompt prefix, tool name prefixing, and prompt sanitization.

    When *preserve_dots* is True, model name dots are not converted to hyphens
    (for Alibaba/DashScope anthropic-compatible endpoints: qwen3.5-plus).

    When *base_url* points to a third-party Anthropic-compatible endpoint,
    thinking block signatures are stripped (they are Anthropic-proprietary).

    When *fast_mode* is True, adds ``extra_body["speed"] = "fast"`` and the
    fast-mode beta header for ~2.5x faster output throughput on Opus 4.6.
    Currently only supported on native Anthropic endpoints (not third-party
    compatible ones).
    """
    system, anthropic_messages = convert_messages_to_anthropic(
        messages, base_url=base_url, model=model
    )
    anthropic_tools = convert_tools_to_anthropic(tools) if tools else []

    model = normalize_model_name(model, preserve_dots=preserve_dots)
    # effective_max_tokens = output cap for this call (≠ total context window)
    # Use the resolver helper so non-positive values (negative ints,
    # fractional floats, NaN, non-numeric) fail locally with a clear error
    # rather than 400-ing at the Anthropic API. See openclaw/openclaw#66664.
    effective_max_tokens = _resolve_anthropic_messages_max_tokens(
        max_tokens, model, context_length=context_length
    )

    # Clamp output cap to fit inside the total context window.
    # Only matters for small custom endpoints where context_length < native
    # output ceiling.  For standard Anthropic models context_length (e.g.
    # 200K) is always larger than the output ceiling (e.g. 128K), so this
    # branch is not taken.
    if context_length and effective_max_tokens > context_length:
        effective_max_tokens = max(context_length - 1, 1)

    # ── OAuth: Claude Code identity ──────────────────────────────────
    if is_oauth:
        # 1. Prepend Claude Code system prompt identity
        cc_block = {"type": "text", "text": _CLAUDE_CODE_SYSTEM_PREFIX}
        if isinstance(system, list):
            system = [cc_block] + system
        elif isinstance(system, str) and system:
            system = [cc_block, {"type": "text", "text": system}]
        else:
            system = [cc_block]

        # 2. Sanitize system prompt — replace product name references
        #    to avoid Anthropic's server-side content filters.
        for block in system:
            if isinstance(block, dict) and block.get("type") == "text":
                text = block.get("text", "")
                text = text.replace("Hermes Agent", "Claude Code")
                text = text.replace("Hermes agent", "Claude Code")
                text = text.replace("hermes-agent", "claude-code")
                text = text.replace("Nous Research", "Anthropic")
                block["text"] = text

        # 3. Prefix tool names with mcp_ (Claude Code convention)
        if anthropic_tools:
            for tool in anthropic_tools:
                if "name" in tool:
                    tool["name"] = _MCP_TOOL_PREFIX + tool["name"]

        # 4. Prefix tool names in message history (tool_use and tool_result blocks)
        for msg in anthropic_messages:
            content = msg.get("content")
            if isinstance(content, list):
                for block in content:
                    if isinstance(block, dict):
                        if block.get("type") == "tool_use" and "name" in block:
                            if not block["name"].startswith(_MCP_TOOL_PREFIX):
                                block["name"] = _MCP_TOOL_PREFIX + block["name"]
                        elif block.get("type") == "tool_result" and "tool_use_id" in block:
                            pass  # tool_result uses ID, not name

    kwargs: Dict[str, Any] = {
        "model": model,
        "messages": anthropic_messages,
        "max_tokens": effective_max_tokens,
    }

    if system:
        kwargs["system"] = system

    if anthropic_tools:
        kwargs["tools"] = anthropic_tools
3 / 6

Bedrock Adapter -- AWS Converse Schema

agent/bedrock_adapter.py:397

Three schema differences that make sharing any request-building code between Bedrock and the other adapters impractical

Bedrock's Converse API is structurally incompatible with both the OpenAI and Anthropic wire formats in three ways:

  • Tool definitions wrap parameters inside "toolSpec": { "inputSchema": { "json": ... } } instead of OpenAI's flat "function": { "parameters": ... } or Anthropic's "input_schema": .... convert_tools_to_converse() handles this reshaping in 15 lines.
  • The model key is "modelId" (not "model"), and max_tokens, temperature, and top_p nest inside an "inferenceConfig" object.
  • Bedrock Converse supports an optional guardrailConfig for AWS content filtering, which has no equivalent in the other adapters.

The _model_supports_tool_use() guard at line 906 is a Bedrock-specific shim: DeepSeek R1 and some reasoning-only Bedrock models raise a ValidationException if toolConfig is present. Rather than requiring callers to track which model IDs support tool calling, the adapter silently strips tools and logs a warning.

Key takeaway

Bedrock's Converse API requires a nested inferenceConfig, a toolSpec/inputSchema.json tool schema, and a model-capability guard — three incompatibilities that rule out sharing request-building code with the other adapters. ---

def convert_tools_to_converse(tools: List[Dict]) -> List[Dict]:
    """Convert OpenAI-format tool definitions to Bedrock Converse ``toolConfig``.

    OpenAI format::

        {"type": "function", "function": {"name": "...", "description": "...",
         "parameters": {"type": "object", "properties": {...}}}}

    Converse format::

        {"toolSpec": {"name": "...", "description": "...",
         "inputSchema": {"json": {"type": "object", "properties": {...}}}}}
    """
    if not tools:
        return []
    result = []
    for t in tools:
        fn = t.get("function", {})
        name = fn.get("name", "")
        description = fn.get("description", "")
        parameters = fn.get("parameters", {"type": "object", "properties": {}})
        result.append({
            "toolSpec": {
                "name": name,
                "description": description,
                "inputSchema": {"json": parameters},
            }
        })
    return result


def _convert_content_to_converse(content) -> List[Dict]:
    """Convert OpenAI message content (string or list) to Converse content blocks.


def build_converse_kwargs(
    model: str,
    messages: List[Dict],
    tools: Optional[List[Dict]] = None,
    max_tokens: int = 4096,
    temperature: Optional[float] = None,
    top_p: Optional[float] = None,
    stop_sequences: Optional[List[str]] = None,
    guardrail_config: Optional[Dict] = None,
) -> Dict[str, Any]:
    """Build kwargs for ``bedrock-runtime.converse()`` or ``converse_stream()``.

    Converts OpenAI-format inputs to Converse API parameters.
    """
    system_prompt, converse_messages = convert_messages_to_converse(messages)

    kwargs: Dict[str, Any] = {
        "modelId": model,
        "messages": converse_messages,
        "inferenceConfig": {
            "maxTokens": max_tokens,
        },
    }

    if system_prompt:
        kwargs["system"] = system_prompt

    if temperature is not None:
        kwargs["inferenceConfig"]["temperature"] = temperature

    if top_p is not None:
        kwargs["inferenceConfig"]["topP"] = top_p

    if stop_sequences:
        kwargs["inferenceConfig"]["stopSequences"] = stop_sequences

    if tools:
        converse_tools = convert_tools_to_converse(tools)
        if converse_tools:
            # Some Bedrock models don't support tool/function calling (e.g.
            # DeepSeek R1, reasoning-only models).  Sending toolConfig to
            # these models causes a ValidationException → retry loop → failure.
            # Strip tools for known non-tool-calling models and warn the user.
            # Ref: PR #7920 feedback from @ptlally, pattern from PR #4346.
            if _model_supports_tool_use(model):
                kwargs["toolConfig"] = {"tools": converse_tools}
            else:
                logger.warning(
                    "Model %s does not support tool calling — tools stripped. "
                    "The agent will operate in text-only mode.", model
                )

    if guardrail_config:
        kwargs["guardrailConfig"] = guardrail_config

    return kwargs


def call_converse(
    region: str,
4 / 6

Gemini Native Adapter -- Google's Distinct Schema

agent/gemini_native_adapter.py:228

Why Gemini's parts-based content model requires a complete message-format rewrite rather than incremental adaptation

Gemini's parts-based content model is the structural difference that forces a complete rewrite of message translation. In OpenAI format, tool calls are a top-level tool_calls array on the assistant message. In Gemini format, each tool call becomes a {"functionCall": {"name": ..., "args": ...}} part inside the message's parts array, alongside any text the model produced. Tool results, which OpenAI sends as role: "tool" messages, map to Gemini user role messages containing {"functionResponse": ...} parts — because Gemini has no tool role.

The tool_name_by_call_id dict solves a lookup problem: Gemini's functionResponse requires the function name, but OpenAI tool result messages carry only the tool_call_id. _build_gemini_contents() builds the id-to-name map during the forward scan and uses it when translating each result.

System messages get the most divergent treatment. Where OpenAI accepts {"role": "system"} inline and Anthropic uses a separate system key, Gemini requires a top-level systemInstruction key with its own {"parts": [{"text": ...}]} structure. _build_gemini_contents() collects all system messages, joins them, and returns systemInstruction as a second return value.

Key takeaway

Gemini's parts model, functionCall/functionResponse part types, absent tool role, and top-level systemInstruction together require a complete translation rewrite — no code is shared with the Anthropic or Bedrock paths. ---

def _translate_tool_call_to_gemini(tool_call: Dict[str, Any]) -> Dict[str, Any]:
    fn = tool_call.get("function") or {}
    args_raw = fn.get("arguments", "")
    try:
        args = json.loads(args_raw) if isinstance(args_raw, str) and args_raw else {}
    except json.JSONDecodeError:
        args = {"_raw": args_raw}
    if not isinstance(args, dict):
        args = {"_value": args}

    part: Dict[str, Any] = {
        "functionCall": {
            "name": str(fn.get("name") or ""),
            "args": args,
        }
    }
    thought_signature = _tool_call_extra_signature(tool_call)
    if thought_signature:
        part["thoughtSignature"] = thought_signature
    return part


def _translate_tool_result_to_gemini(
    message: Dict[str, Any],
    tool_name_by_call_id: Optional[Dict[str, str]] = None,
) -> Dict[str, Any]:

def _build_gemini_contents(messages: List[Dict[str, Any]]) -> tuple[List[Dict[str, Any]], Optional[Dict[str, Any]]]:
    system_text_parts: List[str] = []
    contents: List[Dict[str, Any]] = []
    tool_name_by_call_id: Dict[str, str] = {}

    for msg in messages:
        if not isinstance(msg, dict):
            continue
        role = str(msg.get("role") or "user")

        if role == "system":
            system_text_parts.append(_coerce_content_to_text(msg.get("content")))
            continue

        if role in {"tool", "function"}:
            contents.append(
                {
                    "role": "user",
                    "parts": [
                        _translate_tool_result_to_gemini(
                            msg,
                            tool_name_by_call_id=tool_name_by_call_id,
                        )
                    ],
                }
            )
            continue

        gemini_role = "model" if role == "assistant" else "user"
        parts: List[Dict[str, Any]] = []

        content_parts = _extract_multimodal_parts(msg.get("content"))
        parts.extend(content_parts)

        tool_calls = msg.get("tool_calls") or []
        if isinstance(tool_calls, list):
            for tool_call in tool_calls:
                if isinstance(tool_call, dict):
                    tool_call_id = str(tool_call.get("id") or tool_call.get("call_id") or "")
                    tool_name = str(((tool_call.get("function") or {}).get("name") or ""))
                    if tool_call_id and tool_name:
                        tool_name_by_call_id[tool_call_id] = tool_name
                    parts.append(_translate_tool_call_to_gemini(tool_call))

        if parts:
            contents.append({"role": gemini_role, "parts": parts})

    system_instruction = None
    joined_system = "\n".join(part for part in system_text_parts if part).strip()
    if joined_system:
        system_instruction = {"parts": [{"text": joined_system}]}
    return contents, system_instruction


def _translate_tools_to_gemini(tools: Any) -> List[Dict[str, Any]]:

def build_gemini_request(
    *,
    messages: List[Dict[str, Any]],
    tools: Any = None,
    tool_choice: Any = None,
    temperature: Optional[float] = None,
    max_tokens: Optional[int] = None,
    top_p: Optional[float] = None,
    stop: Any = None,
    thinking_config: Any = None,
) -> Dict[str, Any]:
    contents, system_instruction = _build_gemini_contents(messages)
    request: Dict[str, Any] = {"contents": contents}
    if system_instruction:
        request["systemInstruction"] = system_instruction

    gemini_tools = _translate_tools_to_gemini(tools)
    if gemini_tools:
        request["tools"] = gemini_tools

    tool_config = _translate_tool_choice_to_gemini(tool_choice)
    if tool_config:
        request["toolConfig"] = tool_config

    generation_config: Dict[str, Any] = {}
    if temperature is not None:
        generation_config["temperature"] = temperature
    if max_tokens is not None:
        generation_config["maxOutputTokens"] = max_tokens
    if top_p is not None:
        generation_config["topP"] = top_p
    if stop:
        generation_config["stopSequences"] = stop if isinstance(stop, list) else [str(stop)]
    normalized_thinking = _normalize_thinking_config(thinking_config)
    if normalized_thinking:
        generation_config["thinkingConfig"] = normalized_thinking
    if generation_config:
        request["generationConfig"] = generation_config

    return request


def _map_gemini_finish_reason(reason: str) -> str:
5 / 6

Codex Responses Adapter -- OpenAI's Newer API

agent/codex_responses_adapter.py:247

How _preflight_codex_api_kwargs() enforces the Responses API contract before any network call — and why store: false is non-negotiable

OpenAI's Responses API replaces the messages array with an input array of typed items, and the system prompt moves from a {"role": "system"} message to a top-level instructions string. Tool results change from {"role": "tool", "tool_call_id": ...} messages to {"type": "function_call_output", "call_id": ...} items. _chat_messages_to_responses_input() handles this conversion — discarding system messages, remapping call IDs, and replaying codex_reasoning_items from previous turns so the API can maintain coherent reasoning chains across requests.

store: false is non-negotiable. The Responses API's stateful store mode would send conversation data to OpenAI's servers without user consent; _preflight_codex_api_kwargs() raises immediately if the caller passes anything other than False. The allowed_keys set serves as executable schema documentation: any deprecated chat/completions parameter (frequency_penalty, presence_penalty) raises on entry, and new Responses API fields must be explicitly opted in.

Key takeaway

The Responses API's input item model, encrypted reasoning item replay, and hard store: false enforcement represent OpenAI's newest wire format. The preflight function doubles as a machine-readable schema for what parameters the adapter actually supports. ---

def _chat_messages_to_responses_input(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Convert internal chat-style messages to Responses input items."""
    items: List[Dict[str, Any]] = []
    seen_item_ids: set = set()

    for msg in messages:
        if not isinstance(msg, dict):
            continue
        role = msg.get("role")
        if role == "system":
            continue

        if role in {"user", "assistant"}:
            content = msg.get("content", "")
            if isinstance(content, list):
                content_parts = _chat_content_to_responses_parts(content, role=role)
                text_type = "output_text" if role == "assistant" else "input_text"
                content_text = "".join(
                    p.get("text", "") for p in content_parts if p.get("type") == text_type
                )
            else:
                content_parts = []
                content_text = str(content) if content is not None else ""

            if role == "assistant":
                # Replay encrypted reasoning items from previous turns
                # so the API can maintain coherent reasoning chains.
                codex_reasoning = msg.get("codex_reasoning_items")
                has_codex_reasoning = False
                if isinstance(codex_reasoning, list):
                    for ri in codex_reasoning:
                        if isinstance(ri, dict) and ri.get("encrypted_content"):
                            item_id = ri.get("id")
                            if item_id and item_id in seen_item_ids:
                                continue
                            # Strip the "id" field — with store=False the
                            # Responses API cannot look up items by ID and
                            # returns 404.  The encrypted_content blob is
                            # self-contained for reasoning chain continuity.
                            replay_item = {k: v for k, v in ri.items() if k != "id"}
                            items.append(replay_item)
                            if item_id:
                                seen_item_ids.add(item_id)
                            has_codex_reasoning = True

                # Replay exact assistant message items (with id/phase) from
                # previous turns so the API can maintain prefix-cache hits.
                # OpenAI docs: "preserve and resend phase on all assistant
                # messages — dropping it can degrade performance."
                codex_message_items = msg.get("codex_message_items")
                replayed_message_items = 0
                if isinstance(codex_message_items, list):
                    for raw_item in codex_message_items:
                        if not isinstance(raw_item, dict):
                            continue
                        if raw_item.get("type") != "message" or raw_item.get("role") != "assistant":
                            continue
                        raw_content_parts = raw_item.get("content")
                        if not isinstance(raw_content_parts, list):
                            continue

                        normalized_content_parts = []
                        for part in raw_content_parts:
                            if not isinstance(part, dict):

def _preflight_codex_api_kwargs(
    api_kwargs: Any,
    *,
    allow_stream: bool = False,
) -> Dict[str, Any]:
    if not isinstance(api_kwargs, dict):
        raise ValueError("Codex Responses request must be a dict.")

    required = {"model", "instructions", "input"}
    missing = [key for key in required if key not in api_kwargs]
    if missing:
        raise ValueError(f"Codex Responses request missing required field(s): {', '.join(sorted(missing))}.")

    model = api_kwargs.get("model")
    if not isinstance(model, str) or not model.strip():
        raise ValueError("Codex Responses request 'model' must be a non-empty string.")
    model = model.strip()

    instructions = api_kwargs.get("instructions")
    if instructions is None:
        instructions = ""
    if not isinstance(instructions, str):
        instructions = str(instructions)
    instructions = instructions.strip() or DEFAULT_AGENT_IDENTITY

    normalized_input = _preflight_codex_input_items(api_kwargs.get("input"))

    tools = api_kwargs.get("tools")
    normalized_tools = None
    if tools is not None:
        if not isinstance(tools, list):
            raise ValueError("Codex Responses request 'tools' must be a list when provided.")
        normalized_tools = []
        for idx, tool in enumerate(tools):
            if not isinstance(tool, dict):
                raise ValueError(f"Codex Responses tools[{idx}] must be an object.")
            if tool.get("type") != "function":
                raise ValueError(f"Codex Responses tools[{idx}] has unsupported type {tool.get('type')!r}.")

            name = tool.get("name")
            parameters = tool.get("parameters")
            if not isinstance(name, str) or not name.strip():
                raise ValueError(f"Codex Responses tools[{idx}] is missing a valid name.")
            if not isinstance(parameters, dict):
                raise ValueError(f"Codex Responses tools[{idx}] is missing valid parameters.")

            description = tool.get("description", "")
            if description is None:
                description = ""
            if not isinstance(description, str):
                description = str(description)

            strict = tool.get("strict", False)
            if not isinstance(strict, bool):
                strict = bool(strict)

            normalized_tools.append(
                {
                    "type": "function",
                    "name": name.strip(),
                    "description": description,
                    "strict": strict,
                    "parameters": parameters,
                }
            )

    store = api_kwargs.get("store", False)
    if store is not False:
        raise ValueError("Codex Responses contract requires 'store' to be false.")

    allowed_keys = {
        "model", "instructions", "input", "tools", "store",
        "reasoning", "include", "max_output_tokens", "temperature",
        "tool_choice", "parallel_tool_calls", "prompt_cache_key", "service_tier",
        "extra_headers",
    }
    normalized: Dict[str, Any] = {
6 / 6

Credential Pool -- Cycling Through Multiple Keys

agent/credential_pool.py:59

How CredentialPool selects, marks exhausted, and rotates keys across three strategies — and why round-robin persists to disk

CredentialPool holds a sorted list of PooledCredential entries and selects among them using one of three strategies:

  • Round-robin — rotates the priority field, moves the selected entry to the end, renumbers all entries sequentially, and persists to disk. Selection order survives process restarts.
  • Least-used — picks the entry with the lowest request_count and increments the counter on selection so load distributes within a session.
  • Random — calls random.choice() with no state update.

All three strategies call _available_entries() first with clear_expired=True and refresh=True. This is where exhaustion cooldown is enforced: entries with STATUS_EXHAUSTED whose cooldown timestamp has not elapsed are filtered out; expired cooldowns reset to STATUS_OK. OAuth tokens due for refresh are refreshed in the same pass, so the next request gets a valid token without a blocking mid-call refresh.

mark_exhausted_and_rotate() is the 429/auth error handler. It marks the current entry exhausted with the status code and a last_error_reset_at timestamp parsed from the Retry-After header, clears _current_id, and immediately calls _select_unlocked() to pick the next key. acquire_lease() is the concurrency-aware path for parallel requests: it tracks active leases per credential and prefers entries below _max_concurrent, falling back to over-capacity entries only when every credential is at the cap.

Key takeaway

The credential pool implements rate-limit-aware key rotation in ~60 lines: exhaustion cooldown via persisted timestamps, three configurable selection strategies with priority-rotating round-robin that survives restarts, and cross-process token sync that rescues credentials refreshed externally before their cooldown expires. ---

STRATEGY_ROUND_ROBIN = "round_robin"
STRATEGY_RANDOM = "random"
STRATEGY_LEAST_USED = "least_used"
SUPPORTED_POOL_STRATEGIES = {
    STRATEGY_FILL_FIRST,
    STRATEGY_ROUND_ROBIN,
    STRATEGY_RANDOM,
    STRATEGY_LEAST_USED,

    def _select_unlocked(self) -> Optional[PooledCredential]:
        available = self._available_entries(clear_expired=True, refresh=True)
        if not available:
            self._current_id = None
            logger.info("credential pool: no available entries (all exhausted or empty)")
            return None

        if self._strategy == STRATEGY_RANDOM:
            entry = random.choice(available)
            self._current_id = entry.id
            return entry

        if self._strategy == STRATEGY_LEAST_USED and len(available) > 1:
            entry = min(available, key=lambda e: e.request_count)
            # Increment usage counter so subsequent selections distribute load
            updated = replace(entry, request_count=entry.request_count + 1)
            self._replace_entry(entry, updated)
            self._current_id = entry.id
            return updated

        if self._strategy == STRATEGY_ROUND_ROBIN and len(available) > 1:
            entry = available[0]
            rotated = [candidate for candidate in self._entries if candidate.id != entry.id]
            rotated.append(replace(entry, priority=len(self._entries) - 1))
            self._entries = [replace(candidate, priority=idx) for idx, candidate in enumerate(rotated)]
            self._persist()
            self._current_id = entry.id
            return self.current() or entry

        entry = available[0]
        self._current_id = entry.id
        return entry

    def peek(self) -> Optional[PooledCredential]:
        current = self.current()
        if current is not None:
            return current
        available = self._available_entries()
        return available[0] if available else None

    def mark_exhausted_and_rotate(
        self,
        *,
        status_code: Optional[int],
        error_context: Optional[Dict[str, Any]] = None,
    ) -> Optional[PooledCredential]:
        with self._lock:
            entry = self.current() or self._select_unlocked()
            if entry is None:
                return None
            _label = entry.label or entry.id[:8]
            logger.info(
                "credential pool: marking %s exhausted (status=%s), rotating",
                _label, status_code,
            )
            self._mark_exhausted(entry, status_code, error_context)
            self._current_id = None
            next_entry = self._select_unlocked()
            if next_entry:
                _next_label = next_entry.label or next_entry.id[:8]
                logger.info("credential pool: rotated to %s", _next_label)
            return next_entry


    def acquire_lease(self, credential_id: Optional[str] = None) -> Optional[str]:
        """Acquire a soft lease on a credential.

        If a specific credential_id is provided, lease that entry directly.
        Otherwise prefer the least-leased available credential, using priority as
        a stable tie-breaker. When every credential is already at the soft cap,
        still return the least-leased one instead of blocking.
        """
        with self._lock:
            if credential_id:
                self._active_leases[credential_id] = self._active_leases.get(credential_id, 0) + 1
                self._current_id = credential_id
                return credential_id

            available = self._available_entries(clear_expired=True, refresh=True)
            if not available:
                return None

            below_cap = [
                entry for entry in available
                if self._active_leases.get(entry.id, 0) < self._max_concurrent
            ]
            candidates = below_cap if below_cap else available
            chosen = min(
                candidates,
                key=lambda entry: (self._active_leases.get(entry.id, 0), entry.priority),
            )
            self._active_leases[chosen.id] = self._active_leases.get(chosen.id, 0) + 1
            self._current_id = chosen.id
            return chosen.id

    def release_lease(self, credential_id: str) -> None:
        """Release a previously acquired credential lease."""
        with self._lock:
            count = self._active_leases.get(credential_id, 0)
            if count <= 1:
                self._active_leases.pop(credential_id, None)
Your codebase next

Create code tours for your project

Intraview lets AI create interactive walkthroughs of any codebase. Install the free VS Code extension and generate your first tour in minutes.

Install Intraview Free