logo NodeSeekbeta

Codex CLI 调用免费的 Agnes AI 模型的配置方法

Agnes AI 这个模型现在应该是限时免费,可以配置到 Codex CLI 来使用

  1. Debian 13 环境,安装对应的 Python FastAPI:
apt-get install python3-fastapi python3-uvicorn
  1. 编写文件 agnes_bridge.py
import json os time uuid requests
from typing import Any
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse, StreamingResponse


app = FastAPI()

UPSTREAM_URL = os.environ.get(
    "UPSTREAM_URL", "https://apihub.agnes-ai.com/v1/chat/completions"
)
DEFAULT_MODEL = os.environ.get("MODEL", "agnes-2.0-flash")
UPSTREAM_API_KEY = os.environ.get("AGNES_API_KEY") or os.environ.get("API_KEY")
FORWARD_TOOLS = os.environ.get("FORWARD_TOOLS", "1") not in {"0", "false", "False"}


def _text_from_content(content: Any) -> str:
    if content is None:
        return ""
    if isinstance(content, str):
        return content
    if isinstance(content, list):
        chunks = []
        for part in content:
            if isinstance(part, str):
                chunks.append(part)
            elif isinstance(part, dict):
                text = part.get("text") or part.get("content")
                if text:
                    chunks.append(str(text))
        return "\n".join(chunks)
    return str(content)


def _responses_input_to_chat_messages(body: dict[str, Any]) -> list[dict[str, Any]]:
    messages: list[dict[str, Any]] = []

    instructions = body.get("instructions")
    if instructions:
        messages.append({"role": "system", "content": str(instructions)})

    input_value = body.get("input")
    if isinstance(input_value, str):
        messages.append({"role": "user", "content": input_value})
        return messages

    if not isinstance(input_value, list):
        return messages

    for item in input_value:
        if isinstance(item, str):
            messages.append({"role": "user", "content": item})
            continue

        if not isinstance(item, dict):
            continue

        item_type = item.get("type")
        if item_type == "function_call":
            call_id = item.get("call_id") or item.get("id") or f"call_{uuid.uuid4().hex}"
            messages.append(
                {
                    "role": "assistant",
                    "content": None,
                    "tool_calls": [
                        {
                            "id": call_id,
                            "type": "function",
                            "function": {
                                "name": item.get("name", ""),
                                "arguments": item.get("arguments", ""),
                            },
                        }
                    ],
                }
            )
            continue

        if item_type == "function_call_output":
            messages.append(
                {
                    "role": "tool",
                    "tool_call_id": item.get("call_id", ""),
                    "content": _text_from_content(item.get("output")),
                }
            )
            continue

        role = item.get("role")
        if role in {"system", "developer"}:
            role = "system"
        elif role not in {"user", "assistant", "tool"}:
            role = "user"

        message: dict[str, Any] = {
            "role": role,
            "content": _text_from_content(item.get("content", item.get("text"))),
        }
        if role == "tool" and item.get("tool_call_id"):
            message["tool_call_id"] = item["tool_call_id"]
        messages.append(message)

    return messages


def _responses_tools_to_chat_tools(tools: Any) -> list[dict[str, Any]]:
    if not FORWARD_TOOLS or not isinstance(tools, list):
        return []

    chat_tools = []
    for tool in tools:
        if not isinstance(tool, dict):
            continue
        if tool.get("type") != "function":
            continue

        if "function" in tool:
            chat_tools.append(tool)
            continue

        chat_tools.append(
            {
                "type": "function",
                "function": {
                    "name": tool.get("name", ""),
                    "description": tool.get("description", ""),
                    "parameters": tool.get("parameters") or {},
                },
            }
        )
    return chat_tools


def _sse(event: dict[str, Any]) -> str:
    return f"data: {json.dumps(event, ensure_ascii=False)}\n\n"


def _message_output_item(item_id: str, content: str) -> dict[str, Any]:
    return {
        "id": item_id,
        "type": "message",
        "status": "completed",
        "role": "assistant",
        "content": [
            {
                "type": "output_text",
                "text": content,
                "annotations": [],
            }
        ],
    }


def _function_call_output_item(
    item_id: str, call_id: str, name: str, arguments: str
) -> dict[str, Any]:
    return {
        "id": item_id,
        "type": "function_call",
        "status": "completed",
        "call_id": call_id,
        "name": name,
        "arguments": arguments,
    }


def _response_object(
    response_id: str, model: str, output: list[dict[str, Any]], status: str
) -> dict[str, Any]:
    return {
        "id": response_id,
        "object": "response",
        "created_at": int(time.time()),
        "status": status,
        "model": model,
        "output": output,
    }


def _stream_response_events(data: dict[str, Any], model: str):
    response_id = data.get("id") or f"resp_{uuid.uuid4().hex}"
    output: list[dict[str, Any]] = []

    yield _sse(
        {
            "type": "response.created",
            "response": _response_object(response_id, model, [], "in_progress"),
        }
    )

    choice = (data.get("choices") or [{}])[0]
    message = choice.get("message") or {}
    content = message.get("content") or ""
    tool_calls = message.get("tool_calls") or []
    output_index = 0

    if content:
        item_id = f"msg_{uuid.uuid4().hex}"
        in_progress_item = {
            "id": item_id,
            "type": "message",
            "status": "in_progress",
            "role": "assistant",
            "content": [],
        }
        yield _sse(
            {
                "type": "response.output_item.added",
                "output_index": output_index,
                "item": in_progress_item,
            }
        )
        yield _sse(
            {
                "type": "response.content_part.added",
                "item_id": item_id,
                "output_index": output_index,
                "content_index": 0,
                "part": {"type": "output_text", "text": "", "annotations": []},
            }
        )
        yield _sse(
            {
                "type": "response.output_text.delta",
                "item_id": item_id,
                "output_index": output_index,
                "content_index": 0,
                "delta": content,
            }
        )
        yield _sse(
            {
                "type": "response.output_text.done",
                "item_id": item_id,
                "output_index": output_index,
                "content_index": 0,
                "text": content,
            }
        )
        done_item = _message_output_item(item_id, content)
        output.append(done_item)
        yield _sse(
            {
                "type": "response.content_part.done",
                "item_id": item_id,
                "output_index": output_index,
                "content_index": 0,
                "part": done_item["content"][0],
            }
        )
        yield _sse(
            {
                "type": "response.output_item.done",
                "output_index": output_index,
                "item": done_item,
            }
        )
        output_index += 1

    for tool_call in tool_calls:
        function = tool_call.get("function") or {}
        call_id = tool_call.get("id") or f"call_{uuid.uuid4().hex}"
        item_id = f"fc_{uuid.uuid4().hex}"
        name = function.get("name") or ""
        arguments = function.get("arguments") or ""
        item = _function_call_output_item(item_id, call_id, name, "")

        yield _sse(
            {
                "type": "response.output_item.added",
                "output_index": output_index,
                "item": item,
            }
        )
        if arguments:
            yield _sse(
                {
                    "type": "response.function_call_arguments.delta",
                    "item_id": item_id,
                    "output_index": output_index,
                    "delta": arguments,
                }
            )
        done_item = _function_call_output_item(item_id, call_id, name, arguments)
        output.append(done_item)
        yield _sse(
            {
                "type": "response.function_call_arguments.done",
                "item_id": item_id,
                "output_index": output_index,
                "arguments": arguments,
            }
        )
        yield _sse(
            {
                "type": "response.output_item.done",
                "output_index": output_index,
                "item": done_item,
            }
        )
        output_index += 1

    yield _sse(
        {
            "type": "response.completed",
            "response": _response_object(response_id, model, output, "completed"),
        }
    )
    yield "data: [DONE]\n\n"


@app.post("/v1/responses")
async def responses(req: Request):
    body = await req.json()
    model = body.get("model") or DEFAULT_MODEL
    messages = _responses_input_to_chat_messages(body)

    headers = {"Content-Type": "application/json"}
    if UPSTREAM_API_KEY:
        headers["Authorization"] = f"Bearer {UPSTREAM_API_KEY}"
    elif req.headers.get("authorization"):
        headers["Authorization"] = req.headers["authorization"]
    else:
        raise HTTPException(
            status_code=500,
            detail="Set AGNES_API_KEY or API_KEY before starting agnes-bridge.",
        )

    payload: dict[str, Any] = {
        "model": model,
        "messages": messages,
        "stream": False,
    }

    tools = _responses_tools_to_chat_tools(body.get("tools"))
    if tools:
        payload["tools"] = tools
        if body.get("tool_choice") in {"auto", "none", "required"}:
            payload["tool_choice"] = body["tool_choice"]

    upstream = requests.post(
        UPSTREAM_URL,
        headers=headers,
        json=payload,
        timeout=120,
    )
    if upstream.status_code >= 400:
        raise HTTPException(status_code=upstream.status_code, detail=upstream.text)

    data = upstream.json()
    if body.get("stream", True):
        return StreamingResponse(
            _stream_response_events(data, model),
            media_type="text/event-stream",
        )

    choice = (data.get("choices") or [{}])[0]
    message = choice.get("message") or {}
    output = []
    if message.get("content"):
        output.append(_message_output_item(f"msg_{uuid.uuid4().hex}", message["content"]))
    for tool_call in message.get("tool_calls") or []:
        function = tool_call.get("function") or {}
        output.append(
            _function_call_output_item(
                f"fc_{uuid.uuid4().hex}",
                tool_call.get("id") or f"call_{uuid.uuid4().hex}",
                function.get("name") or "",
                function.get("arguments") or "",
            )
        )

    return JSONResponse(
        _response_object(
            data.get("id") or f"resp_{uuid.uuid4().hex}",
            model,
            output,
            "completed",
        )
    )
  1. 启动命令,端口随意,这里用 50513:
python3 -m uvicorn agnes_bridge:app --host 127.0.0.1 --port 50513
  1. 编写调用 Agnes AI 的 Codex 配置 .codex/agnes.config.toml
model = "agnes-2.0-flash"
model_provider = "agnes"
model_reasoning_effort = "none"

[model_providers.agnes]
name = "agnes"
base_url = "http://localhost:50513/v1"
env_key = "AGNES_API_KEY"
wire_api = "responses"
  1. 现在导入 KEY:
export AGNES_API_KEY="sk-..."

也可以不用这么麻烦,把 KEY 写到 .codex/.env,2种方法都可以

然后就可以在 Codex CLI 里使用 Agnes AI 模型了,要选择使用 agnes 这个档案:

codex -p agnes
  • 好帖!

  • 免费版有限制的是不

  • @xrj975 #2 目前体验是没有限制,不过我都是单并发

    生图也是无限制,无审核(但是会矫正提示词),我觉得是目前体验最好的了,虽然质量不算很好

你好啊,陌生人!

我的朋友,看起来你是新来的,如果想参与到讨论中,点击下面的按钮!

📈用户数目📈

目前论坛共有60888位seeker

🎉欢迎新用户🎉