feat: migrate Python SDK to httpx with async/await support (#26726)

Signed-off-by: lyzno1 <yuanyouhuilyz@gmail.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
lyzno1
2025-10-11 17:45:42 +08:00
committed by GitHub
parent 5217017e69
commit a9b3539b90
11 changed files with 2183 additions and 488 deletions

View File

@@ -1,32 +1,114 @@
import json
from typing import Literal, Union, Dict, List, Any, Optional, IO
import os
from typing import Literal, Dict, List, Any, IO
import requests
import httpx
class DifyClient:
def __init__(self, api_key, base_url: str = "https://api.dify.ai/v1"):
"""Synchronous Dify API client.
This client uses httpx.Client for efficient connection pooling and resource management.
It's recommended to use this client as a context manager:
Example:
with DifyClient(api_key="your-key") as client:
response = client.get_app_info()
"""
def __init__(
self,
api_key: str,
base_url: str = "https://api.dify.ai/v1",
timeout: float = 60.0,
):
"""Initialize the Dify client.
Args:
api_key: Your Dify API key
base_url: Base URL for the Dify API
timeout: Request timeout in seconds (default: 60.0)
"""
self.api_key = api_key
self.base_url = base_url
self._client = httpx.Client(
base_url=base_url,
timeout=httpx.Timeout(timeout, connect=5.0),
)
def __enter__(self):
"""Support context manager protocol."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Clean up resources when exiting context."""
self.close()
def close(self):
"""Close the HTTP client and release resources."""
if hasattr(self, "_client"):
self._client.close()
def _send_request(
self, method: str, endpoint: str, json: dict | None = None, params: dict | None = None, stream: bool = False
self,
method: str,
endpoint: str,
json: dict | None = None,
params: dict | None = None,
stream: bool = False,
**kwargs,
):
"""Send an HTTP request to the Dify API.
Args:
method: HTTP method (GET, POST, PUT, PATCH, DELETE)
endpoint: API endpoint path
json: JSON request body
params: Query parameters
stream: Whether to stream the response
**kwargs: Additional arguments to pass to httpx.request
Returns:
httpx.Response object
"""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
url = f"{self.base_url}{endpoint}"
response = requests.request(method, url, json=json, params=params, headers=headers, stream=stream)
# httpx.Client automatically prepends base_url
response = self._client.request(
method,
endpoint,
json=json,
params=params,
headers=headers,
**kwargs,
)
return response
def _send_request_with_files(self, method, endpoint, data, files):
def _send_request_with_files(self, method: str, endpoint: str, data: dict, files: dict):
"""Send an HTTP request with file uploads.
Args:
method: HTTP method (POST, PUT, etc.)
endpoint: API endpoint path
data: Form data
files: Files to upload
Returns:
httpx.Response object
"""
headers = {"Authorization": f"Bearer {self.api_key}"}
url = f"{self.base_url}{endpoint}"
response = requests.request(method, url, data=data, headers=headers, files=files)
response = self._client.request(
method,
endpoint,
data=data,
headers=headers,
files=files,
)
return response
@@ -65,7 +147,11 @@ class DifyClient:
class CompletionClient(DifyClient):
def create_completion_message(
self, inputs: dict, response_mode: Literal["blocking", "streaming"], user: str, files: dict | None = None
self,
inputs: dict,
response_mode: Literal["blocking", "streaming"],
user: str,
files: dict | None = None,
):
data = {
"inputs": inputs,
@@ -77,7 +163,7 @@ class CompletionClient(DifyClient):
"POST",
"/completion-messages",
data,
stream=True if response_mode == "streaming" else False,
stream=(response_mode == "streaming"),
)
@@ -105,7 +191,7 @@ class ChatClient(DifyClient):
"POST",
"/chat-messages",
data,
stream=True if response_mode == "streaming" else False,
stream=(response_mode == "streaming"),
)
def get_suggested(self, message_id: str, user: str):
@@ -166,10 +252,6 @@ class ChatClient(DifyClient):
embedding_model_name: str,
):
"""Enable or disable annotation reply feature."""
# Backend API requires these fields to be non-None values
if score_threshold is None or embedding_provider_name is None or embedding_model_name is None:
raise ValueError("score_threshold, embedding_provider_name, and embedding_model_name cannot be None")
data = {
"score_threshold": score_threshold,
"embedding_provider_name": embedding_provider_name,
@@ -181,11 +263,9 @@ class ChatClient(DifyClient):
"""Get the status of an annotation reply action job."""
return self._send_request("GET", f"/apps/annotation-reply/{action}/status/{job_id}")
def list_annotations(self, page: int = 1, limit: int = 20, keyword: str = ""):
def list_annotations(self, page: int = 1, limit: int = 20, keyword: str | None = None):
"""List annotations for the application."""
params = {"page": page, "limit": limit}
if keyword:
params["keyword"] = keyword
params = {"page": page, "limit": limit, "keyword": keyword}
return self._send_request("GET", "/apps/annotations", params=params)
def create_annotation(self, question: str, answer: str):
@@ -202,9 +282,47 @@ class ChatClient(DifyClient):
"""Delete an annotation."""
return self._send_request("DELETE", f"/apps/annotations/{annotation_id}")
# Conversation Variables APIs
def get_conversation_variables(self, conversation_id: str, user: str):
"""Get all variables for a specific conversation.
Args:
conversation_id: The conversation ID to query variables for
user: User identifier
Returns:
Response from the API containing:
- variables: List of conversation variables with their values
- conversation_id: The conversation ID
"""
params = {"user": user}
url = f"/conversations/{conversation_id}/variables"
return self._send_request("GET", url, params=params)
def update_conversation_variable(self, conversation_id: str, variable_id: str, value: Any, user: str):
"""Update a specific conversation variable.
Args:
conversation_id: The conversation ID
variable_id: The variable ID to update
value: New value for the variable
user: User identifier
Returns:
Response from the API with updated variable information
"""
data = {"value": value, "user": user}
url = f"/conversations/{conversation_id}/variables/{variable_id}"
return self._send_request("PATCH", url, json=data)
class WorkflowClient(DifyClient):
def run(self, inputs: dict, response_mode: Literal["blocking", "streaming"] = "streaming", user: str = "abc-123"):
def run(
self,
inputs: dict,
response_mode: Literal["blocking", "streaming"] = "streaming",
user: str = "abc-123",
):
data = {"inputs": inputs, "response_mode": response_mode, "user": user}
return self._send_request("POST", "/workflows/run", data)
@@ -252,7 +370,10 @@ class WorkflowClient(DifyClient):
"""Run a specific workflow by workflow ID."""
data = {"inputs": inputs, "response_mode": response_mode, "user": user}
return self._send_request(
"POST", f"/workflows/{workflow_id}/run", data, stream=True if response_mode == "streaming" else False
"POST",
f"/workflows/{workflow_id}/run",
data,
stream=(response_mode == "streaming"),
)
@@ -293,7 +414,7 @@ class KnowledgeBaseClient(DifyClient):
return self._send_request("POST", "/datasets", {"name": name}, **kwargs)
def list_datasets(self, page: int = 1, page_size: int = 20, **kwargs):
return self._send_request("GET", f"/datasets?page={page}&limit={page_size}", **kwargs)
return self._send_request("GET", "/datasets", params={"page": page, "limit": page_size}, **kwargs)
def create_document_by_text(self, name, text, extra_params: dict | None = None, **kwargs):
"""
@@ -333,7 +454,12 @@ class KnowledgeBaseClient(DifyClient):
return self._send_request("POST", url, json=data, **kwargs)
def update_document_by_text(
self, document_id: str, name: str, text: str, extra_params: dict | None = None, **kwargs
self,
document_id: str,
name: str,
text: str,
extra_params: dict | None = None,
**kwargs,
):
"""
Update a document by text.
@@ -368,7 +494,10 @@ class KnowledgeBaseClient(DifyClient):
return self._send_request("POST", url, json=data, **kwargs)
def create_document_by_file(
self, file_path: str, original_document_id: str | None = None, extra_params: dict | None = None
self,
file_path: str,
original_document_id: str | None = None,
extra_params: dict | None = None,
):
"""
Create a document by file.
@@ -395,17 +524,18 @@ class KnowledgeBaseClient(DifyClient):
}
:return: Response from the API
"""
files = {"file": open(file_path, "rb")}
data = {
"process_rule": {"mode": "automatic"},
"indexing_technique": "high_quality",
}
if extra_params is not None and isinstance(extra_params, dict):
data.update(extra_params)
if original_document_id is not None:
data["original_document_id"] = original_document_id
url = f"/datasets/{self._get_dataset_id()}/document/create_by_file"
return self._send_request_with_files("POST", url, {"data": json.dumps(data)}, files)
with open(file_path, "rb") as f:
files = {"file": (os.path.basename(file_path), f)}
data = {
"process_rule": {"mode": "automatic"},
"indexing_technique": "high_quality",
}
if extra_params is not None and isinstance(extra_params, dict):
data.update(extra_params)
if original_document_id is not None:
data["original_document_id"] = original_document_id
url = f"/datasets/{self._get_dataset_id()}/document/create_by_file"
return self._send_request_with_files("POST", url, {"data": json.dumps(data)}, files)
def update_document_by_file(self, document_id: str, file_path: str, extra_params: dict | None = None):
"""
@@ -433,12 +563,13 @@ class KnowledgeBaseClient(DifyClient):
}
:return:
"""
files = {"file": open(file_path, "rb")}
data = {}
if extra_params is not None and isinstance(extra_params, dict):
data.update(extra_params)
url = f"/datasets/{self._get_dataset_id()}/documents/{document_id}/update_by_file"
return self._send_request_with_files("POST", url, {"data": json.dumps(data)}, files)
with open(file_path, "rb") as f:
files = {"file": (os.path.basename(file_path), f)}
data = {}
if extra_params is not None and isinstance(extra_params, dict):
data.update(extra_params)
url = f"/datasets/{self._get_dataset_id()}/documents/{document_id}/update_by_file"
return self._send_request_with_files("POST", url, {"data": json.dumps(data)}, files)
def batch_indexing_status(self, batch_id: str, **kwargs):
"""
@@ -516,6 +647,8 @@ class KnowledgeBaseClient(DifyClient):
:param document_id: ID of the document
:param keyword: query keyword, optional
:param status: status of the segment, optional, e.g. completed
:param kwargs: Additional parameters to pass to the API.
Can include a 'params' dict for extra query parameters.
"""
url = f"/datasets/{self._get_dataset_id()}/documents/{document_id}/segments"
params = {}
@@ -524,7 +657,7 @@ class KnowledgeBaseClient(DifyClient):
if status is not None:
params["status"] = status
if "params" in kwargs:
params.update(kwargs["params"])
params.update(kwargs.pop("params"))
return self._send_request("GET", url, params=params, **kwargs)
def delete_document_segment(self, document_id: str, segment_id: str):
@@ -553,7 +686,10 @@ class KnowledgeBaseClient(DifyClient):
# Advanced Knowledge Base APIs
def hit_testing(
self, query: str, retrieval_model: Dict[str, Any] = None, external_retrieval_model: Dict[str, Any] = None
self,
query: str,
retrieval_model: Dict[str, Any] = None,
external_retrieval_model: Dict[str, Any] = None,
):
"""Perform hit testing on the dataset."""
data = {"query": query}
@@ -632,7 +768,11 @@ class KnowledgeBaseClient(DifyClient):
credential_id: str = None,
):
"""Run a datasource node in RAG pipeline."""
data = {"inputs": inputs, "datasource_type": datasource_type, "is_published": is_published}
data = {
"inputs": inputs,
"datasource_type": datasource_type,
"is_published": is_published,
}
if credential_id:
data["credential_id"] = credential_id
url = f"/datasets/{self._get_dataset_id()}/pipeline/datasource/nodes/{node_id}/run"
@@ -662,5 +802,94 @@ class KnowledgeBaseClient(DifyClient):
def upload_pipeline_file(self, file_path: str):
"""Upload file for RAG pipeline."""
with open(file_path, "rb") as f:
files = {"file": f}
files = {"file": (os.path.basename(file_path), f)}
return self._send_request_with_files("POST", "/datasets/pipeline/file-upload", {}, files)
# Dataset Management APIs
def get_dataset(self, dataset_id: str | None = None):
"""Get detailed information about a specific dataset.
Args:
dataset_id: Dataset ID (optional, uses current dataset_id if not provided)
Returns:
Response from the API containing dataset details including:
- name, description, permission
- indexing_technique, embedding_model, embedding_model_provider
- retrieval_model configuration
- document_count, word_count, app_count
- created_at, updated_at
"""
ds_id = dataset_id or self._get_dataset_id()
url = f"/datasets/{ds_id}"
return self._send_request("GET", url)
def update_dataset(
self,
dataset_id: str | None = None,
name: str | None = None,
description: str | None = None,
indexing_technique: str | None = None,
embedding_model: str | None = None,
embedding_model_provider: str | None = None,
retrieval_model: Dict[str, Any] | None = None,
**kwargs,
):
"""Update dataset configuration.
Args:
dataset_id: Dataset ID (optional, uses current dataset_id if not provided)
name: New dataset name
description: New dataset description
indexing_technique: Indexing technique ('high_quality' or 'economy')
embedding_model: Embedding model name
embedding_model_provider: Embedding model provider
retrieval_model: Retrieval model configuration dict
**kwargs: Additional parameters to pass to the API
Returns:
Response from the API with updated dataset information
"""
ds_id = dataset_id or self._get_dataset_id()
url = f"/datasets/{ds_id}"
# Build data dictionary with all possible parameters
payload = {
"name": name,
"description": description,
"indexing_technique": indexing_technique,
"embedding_model": embedding_model,
"embedding_model_provider": embedding_model_provider,
"retrieval_model": retrieval_model,
}
# Filter out None values and merge with additional kwargs
data = {k: v for k, v in payload.items() if v is not None}
data.update(kwargs)
return self._send_request("PATCH", url, json=data)
def batch_update_document_status(
self,
action: Literal["enable", "disable", "archive", "un_archive"],
document_ids: List[str],
dataset_id: str | None = None,
):
"""Batch update document status (enable/disable/archive/unarchive).
Args:
action: Action to perform on documents
- 'enable': Enable documents for retrieval
- 'disable': Disable documents from retrieval
- 'archive': Archive documents
- 'un_archive': Unarchive documents
document_ids: List of document IDs to update
dataset_id: Dataset ID (optional, uses current dataset_id if not provided)
Returns:
Response from the API with operation result
"""
ds_id = dataset_id or self._get_dataset_id()
url = f"/datasets/{ds_id}/documents/status/{action}"
data = {"document_ids": document_ids}
return self._send_request("PATCH", url, json=data)