Skip to content

Async Cortex Agent

cortex_agents.async_agent.AsyncCortexAgent

AsyncCortexAgent(account_url: str | None = None, pat: str | None = None, enable_logging: bool = True, token_type: str | None = None)

Bases: BaseAgent

Async client for Snowflake Cortex Agents.

Exposes ergonomic helpers for managing agents, executing runs with streaming responses, orchestrating conversation threads, and collecting feedback. Use the async context manager for automatic resource management.

Examples:

# Use with async context manager for automatic cleanup
async with AsyncCortexAgent(account_url="https://...", pat="token123") as client:
    # Create an agent
    await client.create_agent(
        name="MY_AGENT",
        config={
            "instructions": {"system": "You are helpful"},
            "models": {"orchestration": "claude-sonnet-4-6"}
        },
        database="MY_DB",
        schema="MY_SCHEMA"
    )

    # Run the agent
    response = await client.run(
        "What's the revenue?",
        agent_name="MY_AGENT",
        database="MY_DB",
        schema="MY_SCHEMA"
    )

    # Stream results
    async for event in response.astream():
        if event['type'] == 'text.delta':
            print(event['data']['text'], end='', flush=True)

    # Submit feedback
    await client.submit_feedback(
        agent_name="MY_AGENT",
        database="MY_DB",
        schema="MY_SCHEMA",
        positive=True,
        orig_request_id=response.request_id
    )

Initialize the async Cortex agent facade and lazy managers.

Parameters:

Name Type Description Default
account_url str | None

Optional Snowflake account URL; falls back to environment variable.

None
pat str | None

Optional personal access token; falls back to environment variable.

None
enable_logging bool

Whether to emit debug log entries for HTTP activity.

True
token_type str | None

Authorization token type (e.g. "KEYPAIR_JWT").

None
Source code in cortex_agents/async_agent.py
def __init__(
    self,
    account_url: str | None = None,
    pat: str | None = None,
    enable_logging: bool = True,
    token_type: str | None = None,
) -> None:
    """Initialize the async Cortex agent facade and lazy managers.

    Args:
        account_url: Optional Snowflake account URL; falls back to environment variable.
        pat: Optional personal access token; falls back to environment variable.
        enable_logging: Whether to emit debug log entries for HTTP activity.
        token_type: Authorization token type (e.g. ``"KEYPAIR_JWT"``).
    """
    super().__init__(account_url=account_url, pat=pat, enable_logging=enable_logging, token_type=token_type)

    self._client: httpx.AsyncClient | None = None
    self._transport: AsyncTransport | None = None
    self._entity: AsyncAgentEntity | None = None
    self._runner: AsyncAgentRun | None = None
    self._threads: AsyncAgentThreads | None = None
    self._feedback: AsyncAgentFeedback | None = None

    if enable_logging:
        logger.info("AsyncCortexAgent initialized")

close

close() -> None

Synchronously close resources for compatibility with BaseAgent.

Source code in cortex_agents/async_agent.py
def close(self) -> None:
    """Synchronously close resources for compatibility with BaseAgent."""
    if not self._client:
        return

    try:
        asyncio.get_running_loop()
    except RuntimeError:
        asyncio.run(self.aclose())
    else:
        raise SnowflakeAPIError("close() cannot run while the event loop is active; use 'await aclose()' instead")

aclose async

aclose() -> None

Close the httpx client and drop cached managers for reuse.

Source code in cortex_agents/async_agent.py
async def aclose(self) -> None:
    """Close the httpx client and drop cached managers for reuse."""
    if self._client:
        await self._client.aclose()
        logger.debug("AsyncCortexAgent client closed")

    self._client = None
    self._transport = None
    self._entity = None
    self._runner = None
    self._threads = None
    self._feedback = None

create_agent async

create_agent(name: str, config: dict[str, Any], database: str, schema: str, create_mode: str | None = None) -> dict[str, Any]

Create a Cortex agent with the provided configuration.

Parameters:

Name Type Description Default
name str

Logical name for the agent within the schema.

required
config dict[str, Any]

Configuration payload including instructions, models, tools, etc.

required
database str

Target database for the agent.

required
schema str

Target schema for the agent.

required
create_mode str | None

Optional behaviour flag (for example, "CREATE OR REPLACE").

None

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Raw Snowflake API response describing the new agent.

Source code in cortex_agents/async_agent.py
async def create_agent(
    self,
    name: str,
    config: dict[str, Any],
    database: str,
    schema: str,
    create_mode: str | None = None,
) -> dict[str, Any]:
    """Create a Cortex agent with the provided configuration.

    Args:
        name: Logical name for the agent within the schema.
        config: Configuration payload including instructions, models, tools, etc.
        database: Target database for the agent.
        schema: Target schema for the agent.
        create_mode: Optional behaviour flag (for example, "CREATE OR REPLACE").

    Returns:
        dict[str, Any]: Raw Snowflake API response describing the new agent.
    """
    entity = self._ensure_entity_manager()
    return await entity.create_agent(
        name=name,
        config=config,
        database=database,
        schema=schema,
        create_mode=create_mode,
    )

get_agent async

get_agent(name: str, database: str, schema: str) -> dict[str, Any]

Retrieve a single agent's metadata.

Parameters:

Name Type Description Default
name str

Agent name to fetch.

required
database str

Database containing the agent.

required
schema str

Schema containing the agent.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Agent definition and metadata.

Source code in cortex_agents/async_agent.py
async def get_agent(self, name: str, database: str, schema: str) -> dict[str, Any]:
    """Retrieve a single agent's metadata.

    Args:
        name: Agent name to fetch.
        database: Database containing the agent.
        schema: Schema containing the agent.

    Returns:
        dict[str, Any]: Agent definition and metadata.
    """
    entity = self._ensure_entity_manager()
    return await entity.get_agent(name=name, database=database, schema=schema)

update_agent async

update_agent(name: str, config: dict[str, Any], database: str, schema: str) -> dict[str, Any]

Update an existing agent with a partial or full configuration payload.

Parameters:

Name Type Description Default
name str

Agent name to modify.

required
config dict[str, Any]

Partial or full configuration update.

required
database str

Database containing the agent.

required
schema str

Schema containing the agent.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Raw Snowflake API response for the update request.

Source code in cortex_agents/async_agent.py
async def update_agent(
    self,
    name: str,
    config: dict[str, Any],
    database: str,
    schema: str,
) -> dict[str, Any]:
    """Update an existing agent with a partial or full configuration payload.

    Args:
        name: Agent name to modify.
        config: Partial or full configuration update.
        database: Database containing the agent.
        schema: Schema containing the agent.

    Returns:
        dict[str, Any]: Raw Snowflake API response for the update request.
    """
    entity = self._ensure_entity_manager()
    return await entity.update_agent(name=name, config=config, database=database, schema=schema)

list_agents async

list_agents(database: str, schema: str, like: str | None = None, from_name: str | None = None, limit: int | None = None) -> list[dict[str, Any]]

List agents in the specified schema with optional pagination and filters.

Parameters:

Name Type Description Default
database str

Database containing the agents.

required
schema str

Schema containing the agents.

required
like str | None

Optional SQL-style filter pattern.

None
from_name str | None

Continue listing after this agent name.

None
limit int | None

Maximum number of results to return.

None

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: Collection of agent metadata dictionaries.

Source code in cortex_agents/async_agent.py
async def list_agents(
    self,
    database: str,
    schema: str,
    like: str | None = None,
    from_name: str | None = None,
    limit: int | None = None,
) -> list[dict[str, Any]]:
    """List agents in the specified schema with optional pagination and filters.

    Args:
        database: Database containing the agents.
        schema: Schema containing the agents.
        like: Optional SQL-style filter pattern.
        from_name: Continue listing after this agent name.
        limit: Maximum number of results to return.

    Returns:
        list[dict[str, Any]]: Collection of agent metadata dictionaries.
    """
    entity = self._ensure_entity_manager()
    return await entity.list_agents(
        database=database,
        schema=schema,
        like=like,
        from_name=from_name,
        limit=limit,
    )

delete_agent async

delete_agent(name: str, database: str, schema: str, if_exists: bool = False) -> dict[str, Any]

Delete a Cortex agent, optionally ignoring missing agents.

Parameters:

Name Type Description Default
name str

Agent name to delete.

required
database str

Database containing the agent.

required
schema str

Schema containing the agent.

required
if_exists bool

Do not raise an error when the agent is absent.

False

Returns:

Type Description
dict[str, Any]

dict[str, Any]: API response confirming the operation.

Source code in cortex_agents/async_agent.py
async def delete_agent(
    self,
    name: str,
    database: str,
    schema: str,
    if_exists: bool = False,
) -> dict[str, Any]:
    """Delete a Cortex agent, optionally ignoring missing agents.

    Args:
        name: Agent name to delete.
        database: Database containing the agent.
        schema: Schema containing the agent.
        if_exists: Do not raise an error when the agent is absent.

    Returns:
        dict[str, Any]: API response confirming the operation.
    """
    entity = self._ensure_entity_manager()
    return await entity.delete_agent(
        name=name,
        database=database,
        schema=schema,
        if_exists=if_exists,
    )

run async

run(query: str | None = None, agent_name: str | None = None, database: str | None = None, schema: str | None = None, thread_id: str | int | None = None, parent_message_id: str | int | None = None, tool_choice: dict[str, Any] | None = None, messages: list[dict[str, Any]] | None = None, stream: bool = True, agent_config: AgentInlineConfig | None = None) -> AgentResponse

Execute an agent run and return an async response wrapper.

The method supports saved agents, inline configurations, and conversational context via threads. The returned AgentResponse exposes streaming helpers for consuming Server-Sent Events (SSE) asynchronously.

Parameters:

Name Type Description Default
query str | None

User question to send to the agent.

None
agent_name str | None

Saved agent name (required when database/schema are provided).

None
database str | None

Database containing the saved agent.

None
schema str | None

Schema containing the saved agent.

None
thread_id str | int | None

Optional conversation thread identifier.

None
parent_message_id str | int | None

Optional parent message id used with threads.

None
tool_choice dict[str, Any] | None

Tool execution strategy override.

None
messages list[dict[str, Any]] | None

Conversation history to include in the request payload.

None
agent_config AgentInlineConfig | None

Inline configuration for ad-hoc runs.

None

Returns:

Name Type Description
AgentResponse AgentResponse

Wrapper offering async iteration over streaming events.

Examples:

async with AsyncCortexAgent(account_url="...", pat="...") as client:
    # With saved agent
    response = await client.run(
        "What's the revenue?",
        agent_name="MY_AGENT",
        database="SALES_DB",
        schema="ANALYTICS"
    )

    # Stream results
    async for event in response.astream():
        if event['type'] == 'text.delta':
            print(event['data']['text'], end='', flush=True)

Source code in cortex_agents/async_agent.py
async def run(
    self,
    query: str | None = None,
    agent_name: str | None = None,
    database: str | None = None,
    schema: str | None = None,
    thread_id: str | int | None = None,
    parent_message_id: str | int | None = None,
    tool_choice: dict[str, Any] | None = None,
    messages: list[dict[str, Any]] | None = None,
    stream: bool = True,
    agent_config: AgentInlineConfig | None = None,
) -> AgentResponse:
    """Execute an agent run and return an async response wrapper.

    The method supports saved agents, inline configurations, and conversational
    context via threads. The returned ``AgentResponse`` exposes streaming
    helpers for consuming Server-Sent Events (SSE) asynchronously.

    Args:
        query: User question to send to the agent.
        agent_name: Saved agent name (required when ``database``/``schema`` are provided).
        database: Database containing the saved agent.
        schema: Schema containing the saved agent.
        thread_id: Optional conversation thread identifier.
        parent_message_id: Optional parent message id used with threads.
        tool_choice: Tool execution strategy override.
        messages: Conversation history to include in the request payload.
        agent_config: Inline configuration for ad-hoc runs.

    Returns:
        AgentResponse: Wrapper offering async iteration over streaming events.
    Examples:
    ```python
    async with AsyncCortexAgent(account_url="...", pat="...") as client:
        # With saved agent
        response = await client.run(
            "What's the revenue?",
            agent_name="MY_AGENT",
            database="SALES_DB",
            schema="ANALYTICS"
        )

        # Stream results
        async for event in response.astream():
            if event['type'] == 'text.delta':
                print(event['data']['text'], end='', flush=True)
    ```
    """
    runner = self._ensure_run_manager()
    return await runner.run(
        query=query,
        agent_name=agent_name,
        database=database,
        schema=schema,
        thread_id=thread_id,
        parent_message_id=parent_message_id,
        tool_choice=tool_choice,
        messages=messages,
        agent_config=agent_config,
        stream=stream,
    )

create_thread async

create_thread(origin_app: str | None = None) -> dict[str, Any]

Create a new Cortex conversation thread.

Parameters:

Name Type Description Default
origin_app str | None

Optional application identifier (max 16 bytes). Example: "my_app" or "sales_bot"

None

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Thread metadata including the thread identifier.

Raises:

Type Description
ValueError

If origin_app exceeds 16 bytes.

Source code in cortex_agents/async_agent.py
async def create_thread(self, origin_app: str | None = None) -> dict[str, Any]:
    """Create a new Cortex conversation thread.

    Args:
        origin_app: Optional application identifier (max 16 bytes).
            Example: "my_app" or "sales_bot"

    Returns:
        dict[str, Any]: Thread metadata including the thread identifier.

    Raises:
        ValueError: If origin_app exceeds 16 bytes.
    """
    threads = self._ensure_threads_manager()
    return await threads.create_thread(origin_app=origin_app)

get_thread async

get_thread(thread_id: str | int, *, limit: int = 20, last_message_id: int | None = None) -> dict[str, Any]

Fetch thread metadata and messages with optional pagination.

Parameters:

Name Type Description Default
thread_id str | int

Thread identifier to fetch.

required
limit int

Number of messages to return (max 100).

20
last_message_id int | None

Continue listing messages after this id.

None

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Metadata and message history for the thread.

Source code in cortex_agents/async_agent.py
async def get_thread(
    self,
    thread_id: str | int,
    *,
    limit: int = 20,
    last_message_id: int | None = None,
) -> dict[str, Any]:
    """Fetch thread metadata and messages with optional pagination.

    Args:
        thread_id: Thread identifier to fetch.
        limit: Number of messages to return (max 100).
        last_message_id: Continue listing messages after this id.

    Returns:
        dict[str, Any]: Metadata and message history for the thread.
    """
    threads = self._ensure_threads_manager()
    return await threads.get_thread(
        thread_id=thread_id,
        limit=limit,
        last_message_id=last_message_id,
    )

update_thread async

update_thread(thread_id: str | int, name: str) -> dict[str, Any]

Rename an existing conversation thread.

Parameters:

Name Type Description Default
thread_id str | int

Identifier for the thread to update.

required
name str

New display name for the thread.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: API response describing the updated thread.

Source code in cortex_agents/async_agent.py
async def update_thread(self, thread_id: str | int, name: str) -> dict[str, Any]:
    """Rename an existing conversation thread.

    Args:
        thread_id: Identifier for the thread to update.
        name: New display name for the thread.

    Returns:
        dict[str, Any]: API response describing the updated thread.
    """
    threads = self._ensure_threads_manager()
    return await threads.update_thread(thread_id=thread_id, name=name)

list_threads async

list_threads(origin_app: str | None = None) -> list[dict[str, Any]]

List available threads, optionally filtered by origin application.

Parameters:

Name Type Description Default
origin_app str | None

Optional application identifier to filter the results.

None

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: Collection of thread metadata entries.

Source code in cortex_agents/async_agent.py
async def list_threads(self, origin_app: str | None = None) -> list[dict[str, Any]]:
    """List available threads, optionally filtered by origin application.

    Args:
        origin_app: Optional application identifier to filter the results.

    Returns:
        list[dict[str, Any]]: Collection of thread metadata entries.
    """
    threads = self._ensure_threads_manager()
    return await threads.list_threads(origin_app=origin_app)

delete_thread async

delete_thread(thread_id: str | int) -> dict[str, Any]

Delete a conversation thread and its messages.

Parameters:

Name Type Description Default
thread_id str | int

Identifier for the thread to remove.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: API response confirming deletion.

Source code in cortex_agents/async_agent.py
async def delete_thread(self, thread_id: str | int) -> dict[str, Any]:
    """Delete a conversation thread and its messages.

    Args:
        thread_id: Identifier for the thread to remove.

    Returns:
        dict[str, Any]: API response confirming deletion.
    """
    threads = self._ensure_threads_manager()
    return await threads.delete_thread(thread_id=thread_id)

submit_feedback async

submit_feedback(agent_name: str, database: str, schema: str, *, positive: bool, feedback_message: str | None = None, categories: list[str] | None = None, orig_request_id: str | None = None, thread_id: str | int | None = None) -> dict[str, Any]

Submit feedback about an agent response or interaction.

Parameters:

Name Type Description Default
agent_name str

Agent associated with the feedback entry.

required
database str

Database containing the agent.

required
schema str

Schema containing the agent.

required
positive bool

Indicates positive (True) or negative (False) feedback.

required
feedback_message str | None

Optional free-form feedback text.

None
categories list[str] | None

Optional list of structured feedback categories.

None
orig_request_id str | None

Optional request identifier from a previous run.

None
thread_id str | int | None

Optional conversation thread to associate with feedback.

None

Returns:

Type Description
dict[str, Any]

dict[str, Any]: API acknowledgement payload.

Source code in cortex_agents/async_agent.py
async def submit_feedback(
    self,
    agent_name: str,
    database: str,
    schema: str,
    *,
    positive: bool,
    feedback_message: str | None = None,
    categories: list[str] | None = None,
    orig_request_id: str | None = None,
    thread_id: str | int | None = None,
) -> dict[str, Any]:
    """Submit feedback about an agent response or interaction.

    Args:
        agent_name: Agent associated with the feedback entry.
        database: Database containing the agent.
        schema: Schema containing the agent.
        positive: Indicates positive (True) or negative (False) feedback.
        feedback_message: Optional free-form feedback text.
        categories: Optional list of structured feedback categories.
        orig_request_id: Optional request identifier from a previous run.
        thread_id: Optional conversation thread to associate with feedback.

    Returns:
        dict[str, Any]: API acknowledgement payload.
    """
    feedback = self._ensure_feedback_manager()
    return await feedback.submit_feedback(
        agent_name=agent_name,
        database=database,
        schema=schema,
        positive=positive,
        feedback_message=feedback_message,
        categories=categories,
        orig_request_id=orig_request_id,
        thread_id=thread_id,
    )

list_models async

list_models() -> dict[str, Any]

Return the list of Cortex models visible to the current account.

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Mapping of available models and their attributes.

Source code in cortex_agents/async_agent.py
async def list_models(self) -> dict[str, Any]:
    """Return the list of Cortex models visible to the current account.

    Returns:
        dict[str, Any]: Mapping of available models and their attributes.
    """
    transport = self._ensure_transport()
    return await transport.get("cortex/models")