Skip to content

Instantly share code, notes, and snippets.

@gempir
Created October 6, 2025 11:45
Show Gist options
  • Select an option

  • Save gempir/ece2c34bb2c2acde4e12cd6c961ac47e to your computer and use it in GitHub Desktop.

Select an option

Save gempir/ece2c34bb2c2acde4e12cd6c961ac47e to your computer and use it in GitHub Desktop.
OpenWebUI Function for Langflow Integration Streaming
"""
title: [BROKEN] Langflow Integration Streaming
description: broken because it can cause the whole openwebui server to crash if the context is too big like "list all tables in our db"
"""
from pydantic import BaseModel, Field
import requests
import json
import os
class Pipe:
class Valves(BaseModel):
langflow_url: str = Field(
default=os.getenv("LANGFLOW_API_URL", ""),
description="Base URL of Langflow server (e.g. http://localhost:7860)",
)
langflow_api_key: str = Field(
default=os.getenv("LANGFLOW_API_KEY", ""),
description="Langflow API Key (for x-api-key header)",
)
langflow_task_agent_id: str = Field(
default=os.getenv("LANGFLOW_TASK_AGENT_ID", ""),
description="Langflow Agent id for meta requests",
)
def __init__(self):
self.type = "manifold"
self.id = "langflow_streaming"
self.valves = self.Valves()
if not self.valves.langflow_url:
print("⚠️ Please set your Langflow URL via LANGFLOW_API_URL")
if not self.valves.langflow_api_key:
print("⚠️ Please set your Langflow API key via LANGFLOW_API_KEY")
def pipes(self):
url = f"{self.valves.langflow_url.rstrip('/')}/api/v1/flows/"
headers = {
"accept": "application/json",
"x-api-key": self.valves.langflow_api_key,
}
params = {
"remove_example_flows": "true",
"components_only": "false",
"get_all": "true",
"header_flows": "false",
"page": 1,
"size": 50,
}
try:
r = requests.get(url, headers=headers, params=params, timeout=10)
r.raise_for_status()
flows = r.json()
return [
{"id": f.get("id"), "name": f.get("name")}
for f in flows
if f["id"] != self.valves.langflow_task_agent_id
]
except Exception as e:
return [{"id": "error", "name": str(e)}]
def pipe(self, body: dict, __user__: dict, __metadata__: dict):
try:
messages = body.get("messages") or []
model_id = body["model"][body["model"].find(".") + 1 :]
question = self._process_message_content(messages[-1])
chat_id = __metadata__.get("chat_id", "")
is_stream = body.get("stream", False)
is_task = bool(__metadata__.get("task", False))
if is_stream:
return self._run_streaming_request(question, chat_id, model_id, is_task)
else:
return self._run_request(question, chat_id, model_id, is_task)
except Exception as e:
error_msg = f"Error in pipe: {e}"
print(error_msg)
return error_msg
@staticmethod
def _process_message_content(message: dict) -> str:
if isinstance(message.get("content"), list):
parts = []
for item in message["content"]:
if item.get("type") == "text":
parts.append(item.get("text", ""))
return " ".join(parts)
return message.get("content", "") or ""
def _run_request(self, question: str, chat_id: str, model_id: str, is_task: bool):
url = f"{self.valves.langflow_url.rstrip('/')}/api/v1/run/{model_id if not is_task else self.valves.langflow_task_agent_id}"
if is_task:
chat_id += "-task"
payload = {
"input_value": question,
"input_type": "chat",
"output_type": "chat",
"session_id": chat_id,
}
headers = {
"Content-Type": "application/json",
"x-api-key": self.valves.langflow_api_key,
"Accept": "application/json",
}
try:
response = requests.post(url, headers=headers, json=payload, timeout=60)
response.raise_for_status()
data = response.json()
message_response = data["outputs"][0]["outputs"][0]["outputs"]["message"][
"message"
]
return message_response
except requests.HTTPError as e:
print(f"HTTP error: {e} - Response: {response.text}")
return f"Error: {e}"
except Exception as e:
print(f"Error: {e}")
return f"Error: {e}"
def _run_streaming_request(
self, question: str, chat_id: str, model_id: str, is_task: bool
):
url = f"{self.valves.langflow_url.rstrip('/')}/api/v1/run/{model_id if not is_task else self.valves.langflow_task_agent_id}?stream=true"
if is_task:
chat_id += "-task"
payload = {
"input_value": question,
"input_type": "chat",
"output_type": "chat",
"session_id": chat_id,
}
headers = {
"Content-Type": "application/json",
"x-api-key": self.valves.langflow_api_key,
"Accept": "application/json",
}
try:
response = requests.post(
url, headers=headers, json=payload, stream=True, timeout=60
)
response.raise_for_status()
def stream_generator():
previous_text = ""
for line in response.iter_lines():
if not line:
continue
try:
line_str = line.decode("utf-8")
event_data = json.loads(line_str)
event_type = event_data.get("event")
if event_type == "add_message":
data = event_data.get("data", {})
sender = data.get("sender")
current_text = data.get("text", "")
if sender == "Machine" and current_text:
if len(current_text) > len(previous_text):
new_chunk = current_text[len(previous_text) :]
previous_text = current_text
openai_chunk = {
"choices": [
{
"delta": {"content": new_chunk},
"index": 0,
"finish_reason": None,
}
]
}
yield f"data: {json.dumps(openai_chunk)}\n\n".encode(
"utf-8"
)
elif event_type == "end":
final_chunk = {
"choices": [
{"delta": {}, "index": 0, "finish_reason": "stop"}
]
}
yield f"data: {json.dumps(final_chunk)}\n\n".encode("utf-8")
yield b"data: [DONE]\n\n"
break
except json.JSONDecodeError:
continue
except Exception as e:
print(f"Error processing line: {e}")
continue
return stream_generator()
except requests.HTTPError as e:
error_msg = f"HTTP error: {e}"
print(f"{error_msg} - Response: {response.text}")
return error_msg
except Exception as e:
error_msg = f"Error: {e}"
print(error_msg)
return error_msg
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment