Skip to content

Core Thread Management

cortex_agents.core.threads.AgentThreads

AgentThreads(transport: SyncTransport)

Thread operations for the synchronous Cortex Agent client.

Source code in cortex_agents/core/threads.py
def __init__(self, transport: SyncTransport) -> None:
    self._transport = transport

create_thread

create_thread(origin_app: str | None = None) -> dict[str, Any]

Create a new conversation thread.

Parameters:

Name Type Description Default
origin_app str | None

Optional application name for tracking thread origin (max 16 bytes).

None

Returns:

Type Description
dict[str, Any]

Dictionary containing thread metadata including thread_id, thread_name,

dict[str, Any]

origin_application, created_on, and updated_on.

Raises:

Type Description
ValueError

If origin_app exceeds 16 bytes.

SnowflakeAPIError

If the request fails.

Examples:

thread = client.create_thread(origin_app="my_app")
thread_id = thread["thread_id"]
Source code in cortex_agents/core/threads.py
def create_thread(self, origin_app: str | None = None) -> dict[str, Any]:
    """Create a new conversation thread.

    Args:
        origin_app: Optional application name for tracking thread origin (max 16 bytes).

    Returns:
        Dictionary containing thread metadata including thread_id, thread_name,
        origin_application, created_on, and updated_on.

    Raises:
        ValueError: If origin_app exceeds 16 bytes.
        SnowflakeAPIError: If the request fails.

    Examples:
        ```python
        thread = client.create_thread(origin_app="my_app")
        thread_id = thread["thread_id"]
        ```
    """
    payload = _build_create_thread_payload(origin_app)
    result = self._transport.post("cortex/threads", payload)
    # Guard: some API versions return a plain string UUID
    if isinstance(result, str):
        return {"thread_id": result}
    return _parse_timestamps(result)

get_thread

get_thread(thread_id: str | int, *, limit: int = 20, last_message_id: int | None = None) -> dict[str, Any]

Retrieve a conversation thread with its messages.

Parameters:

Name Type Description Default
thread_id str | int

Thread ID to retrieve.

required
limit int

Maximum number of messages to return (default: 20).

20
last_message_id int | None

Message ID to start pagination from (optional).

None

Returns:

Type Description
dict[str, Any]

Dictionary containing thread metadata and messages.

Raises:

Type Description
SnowflakeAPIError

If the thread doesn't exist or request fails.

Examples:

thread = client.get_thread(thread_id, limit=50)
for msg in thread['messages']:
    print(msg['content'])
Source code in cortex_agents/core/threads.py
def get_thread(
    self,
    thread_id: str | int,
    *,
    limit: int = 20,
    last_message_id: int | None = None,
) -> dict[str, Any]:
    """Retrieve a conversation thread with its messages.

    Args:
        thread_id: Thread ID to retrieve.
        limit: Maximum number of messages to return (default: 20).
        last_message_id: Message ID to start pagination from (optional).

    Returns:
        Dictionary containing thread metadata and messages.

    Raises:
        SnowflakeAPIError: If the thread doesn't exist or request fails.

    Examples:
        ```python
        thread = client.get_thread(thread_id, limit=50)
        for msg in thread['messages']:
            print(msg['content'])
        ```
    """
    endpoint = f"cortex/threads/{thread_id}"
    params: dict[str, Any] = {"page_size": limit}
    if last_message_id is not None:
        params["last_message_id"] = last_message_id

    return self._transport.get(endpoint, params)

update_thread

update_thread(thread_id: str | int, name: str) -> dict[str, Any]

Update a thread's name.

Parameters:

Name Type Description Default
thread_id str | int

Thread ID to update.

required
name str

New thread name.

required

Returns:

Type Description
dict[str, Any]

Response dictionary with update status.

Raises:

Type Description
SnowflakeAPIError

If the thread doesn't exist or request fails.

Examples:

client.update_thread(thread_id, "Sales Analysis Chat")
Source code in cortex_agents/core/threads.py
def update_thread(self, thread_id: str | int, name: str) -> dict[str, Any]:
    """Update a thread's name.

    Args:
        thread_id: Thread ID to update.
        name: New thread name.

    Returns:
        Response dictionary with update status.

    Raises:
        SnowflakeAPIError: If the thread doesn't exist or request fails.

    Examples:
        ```python
        client.update_thread(thread_id, "Sales Analysis Chat")
        ```
    """
    endpoint = f"cortex/threads/{thread_id}"
    return self._transport.post(endpoint, {"thread_name": name})

list_threads

list_threads(origin_app: str | None = None) -> list[dict[str, Any]]

List all conversation threads.

Parameters:

Name Type Description Default
origin_app str | None

Filter by application name (optional).

None

Returns:

Type Description
list[dict[str, Any]]

List of thread dictionaries, each containing thread_id, thread_name,

list[dict[str, Any]]

origin_application, created_on, and updated_on.

Raises:

Type Description
SnowflakeAPIError

If the request fails.

Examples:

threads = client.list_threads(origin_app="my_app")
for thread in threads:
    print(f"{thread['thread_id']}: {thread['thread_name']}")
Source code in cortex_agents/core/threads.py
def list_threads(self, origin_app: str | None = None) -> list[dict[str, Any]]:
    """List all conversation threads.

    Args:
        origin_app: Filter by application name (optional).

    Returns:
        List of thread dictionaries, each containing thread_id, thread_name,
        origin_application, created_on, and updated_on.

    Raises:
        SnowflakeAPIError: If the request fails.

    Examples:
        ```python
        threads = client.list_threads(origin_app="my_app")
        for thread in threads:
            print(f"{thread['thread_id']}: {thread['thread_name']}")
        ```
    """
    params = {"origin_application": origin_app} if origin_app else None
    result = self._transport.get("cortex/threads", params)
    return [_parse_timestamps(thread) for thread in result]

delete_thread

delete_thread(thread_id: str | int) -> dict[str, Any]

Delete a conversation thread.

Parameters:

Name Type Description Default
thread_id str | int

Thread ID to delete.

required

Returns:

Type Description
dict[str, Any]

Response dictionary with deletion status.

Raises:

Type Description
SnowflakeAPIError

If the thread doesn't exist or request fails.

Examples:

client.delete_thread(thread_id)
Source code in cortex_agents/core/threads.py
def delete_thread(self, thread_id: str | int) -> dict[str, Any]:
    """Delete a conversation thread.

    Args:
        thread_id: Thread ID to delete.

    Returns:
        Response dictionary with deletion status.

    Raises:
        SnowflakeAPIError: If the thread doesn't exist or request fails.

    Examples:
        ```python
        client.delete_thread(thread_id)
        ```
    """
    endpoint = f"cortex/threads/{thread_id}"
    return self._transport.delete(endpoint)

cortex_agents.core.threads.AsyncAgentThreads

AsyncAgentThreads(transport: AsyncTransport)

Thread operations for the asynchronous Cortex Agent client.

Source code in cortex_agents/core/threads.py
def __init__(self, transport: AsyncTransport) -> None:
    self._transport = transport

create_thread async

create_thread(origin_app: str | None = None) -> dict[str, Any]

Create a new conversation thread (async).

Parameters:

Name Type Description Default
origin_app str | None

Optional application name for tracking thread origin (max 16 bytes).

None

Returns:

Type Description
dict[str, Any]

Dictionary containing thread metadata including thread_id, thread_name,

dict[str, Any]

origin_application, created_on, and updated_on.

Raises:

Type Description
ValueError

If origin_app exceeds 16 bytes.

SnowflakeAPIError

If the request fails.

Examples:

thread = await client.create_thread(origin_app="my_app")
thread_id = thread["thread_id"]
Source code in cortex_agents/core/threads.py
async def create_thread(self, origin_app: str | None = None) -> dict[str, Any]:
    """Create a new conversation thread (async).

    Args:
        origin_app: Optional application name for tracking thread origin (max 16 bytes).

    Returns:
        Dictionary containing thread metadata including thread_id, thread_name,
        origin_application, created_on, and updated_on.

    Raises:
        ValueError: If origin_app exceeds 16 bytes.
        SnowflakeAPIError: If the request fails.

    Examples:
        ```python
        thread = await client.create_thread(origin_app="my_app")
        thread_id = thread["thread_id"]
        ```
    """
    payload = _build_create_thread_payload(origin_app)
    result = await self._transport.post("cortex/threads", payload)
    # Guard: some API versions return a plain string UUID
    if isinstance(result, str):
        return {"thread_id": result}
    return _parse_timestamps(result)

get_thread async

get_thread(thread_id: str | int, *, limit: int = 20, last_message_id: int | None = None) -> dict[str, Any]

Retrieve a conversation thread with its messages (async).

Parameters:

Name Type Description Default
thread_id str | int

Thread ID to retrieve.

required
limit int

Maximum number of messages to return (default: 20).

20
last_message_id int | None

Message ID to start pagination from (optional).

None

Returns:

Type Description
dict[str, Any]

Dictionary containing thread metadata and messages.

Raises:

Type Description
SnowflakeAPIError

If the thread doesn't exist or request fails.

Examples:

thread = await client.get_thread(thread_id, limit=50)
for msg in thread['messages']:
    print(msg['content'])
Source code in cortex_agents/core/threads.py
async def get_thread(
    self,
    thread_id: str | int,
    *,
    limit: int = 20,
    last_message_id: int | None = None,
) -> dict[str, Any]:
    """Retrieve a conversation thread with its messages (async).

    Args:
        thread_id: Thread ID to retrieve.
        limit: Maximum number of messages to return (default: 20).
        last_message_id: Message ID to start pagination from (optional).

    Returns:
        Dictionary containing thread metadata and messages.

    Raises:
        SnowflakeAPIError: If the thread doesn't exist or request fails.

    Examples:
        ```python
        thread = await client.get_thread(thread_id, limit=50)
        for msg in thread['messages']:
            print(msg['content'])
        ```
    """
    endpoint = f"cortex/threads/{thread_id}"
    params: dict[str, Any] = {"page_size": limit}
    if last_message_id is not None:
        params["last_message_id"] = last_message_id

    return await self._transport.get(endpoint, params)

update_thread async

update_thread(thread_id: str | int, name: str) -> dict[str, Any]

Update a thread's name (async).

Parameters:

Name Type Description Default
thread_id str | int

Thread ID to update.

required
name str

New thread name.

required

Returns:

Type Description
dict[str, Any]

Response dictionary with update status.

Raises:

Type Description
SnowflakeAPIError

If the thread doesn't exist or request fails.

Examples:

await client.update_thread(thread_id, "Sales Analysis Chat")
Source code in cortex_agents/core/threads.py
async def update_thread(self, thread_id: str | int, name: str) -> dict[str, Any]:
    """Update a thread's name (async).

    Args:
        thread_id: Thread ID to update.
        name: New thread name.

    Returns:
        Response dictionary with update status.

    Raises:
        SnowflakeAPIError: If the thread doesn't exist or request fails.

    Examples:
        ```python
        await client.update_thread(thread_id, "Sales Analysis Chat")
        ```
    """
    endpoint = f"cortex/threads/{thread_id}"
    return await self._transport.post(endpoint, {"thread_name": name})

list_threads async

list_threads(origin_app: str | None = None) -> list[dict[str, Any]]

List all conversation threads (async).

Parameters:

Name Type Description Default
origin_app str | None

Filter by application name (optional).

None

Returns:

Type Description
list[dict[str, Any]]

List of thread dictionaries, each containing thread_id, thread_name,

list[dict[str, Any]]

origin_application, created_on, and updated_on.

Raises:

Type Description
SnowflakeAPIError

If the request fails.

Examples:

threads = await client.list_threads(origin_app="my_app")
for thread in threads:
    print(f"{thread['thread_id']}: {thread['thread_name']}")
Source code in cortex_agents/core/threads.py
async def list_threads(self, origin_app: str | None = None) -> list[dict[str, Any]]:
    """List all conversation threads (async).

    Args:
        origin_app: Filter by application name (optional).

    Returns:
        List of thread dictionaries, each containing thread_id, thread_name,
        origin_application, created_on, and updated_on.

    Raises:
        SnowflakeAPIError: If the request fails.

    Examples:
        ```python
        threads = await client.list_threads(origin_app="my_app")
        for thread in threads:
            print(f"{thread['thread_id']}: {thread['thread_name']}")
        ```
    """
    params = {"origin_application": origin_app} if origin_app else None
    result = await self._transport.get("cortex/threads", params)
    return [_parse_timestamps(thread) for thread in result]

delete_thread async

delete_thread(thread_id: str | int) -> dict[str, Any]

Delete a conversation thread (async).

Parameters:

Name Type Description Default
thread_id str | int

Thread ID to delete.

required

Returns:

Type Description
dict[str, Any]

Response dictionary with deletion status.

Raises:

Type Description
SnowflakeAPIError

If the thread doesn't exist or request fails.

Examples:

await client.delete_thread(thread_id)
Source code in cortex_agents/core/threads.py
async def delete_thread(self, thread_id: str | int) -> dict[str, Any]:
    """Delete a conversation thread (async).

    Args:
        thread_id: Thread ID to delete.

    Returns:
        Response dictionary with deletion status.

    Raises:
        SnowflakeAPIError: If the thread doesn't exist or request fails.

    Examples:
        ```python
        await client.delete_thread(thread_id)
        ```
    """
    endpoint = f"cortex/threads/{thread_id}"
    return await self._transport.delete(endpoint)