Created
October 6, 2025 11:45
-
-
Save gempir/ece2c34bb2c2acde4e12cd6c961ac47e to your computer and use it in GitHub Desktop.
OpenWebUI Function for Langflow Integration Streaming
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| title: [BROKEN] Langflow Integration Streaming | |
| description: broken because it can cause the whole openwebui server to crash if the context is too big like "list all tables in our db" | |
| """ | |
| from pydantic import BaseModel, Field | |
| import requests | |
| import json | |
| import os | |
| class Pipe: | |
| class Valves(BaseModel): | |
| langflow_url: str = Field( | |
| default=os.getenv("LANGFLOW_API_URL", ""), | |
| description="Base URL of Langflow server (e.g. http://localhost:7860)", | |
| ) | |
| langflow_api_key: str = Field( | |
| default=os.getenv("LANGFLOW_API_KEY", ""), | |
| description="Langflow API Key (for x-api-key header)", | |
| ) | |
| langflow_task_agent_id: str = Field( | |
| default=os.getenv("LANGFLOW_TASK_AGENT_ID", ""), | |
| description="Langflow Agent id for meta requests", | |
| ) | |
| def __init__(self): | |
| self.type = "manifold" | |
| self.id = "langflow_streaming" | |
| self.valves = self.Valves() | |
| if not self.valves.langflow_url: | |
| print("⚠️ Please set your Langflow URL via LANGFLOW_API_URL") | |
| if not self.valves.langflow_api_key: | |
| print("⚠️ Please set your Langflow API key via LANGFLOW_API_KEY") | |
| def pipes(self): | |
| url = f"{self.valves.langflow_url.rstrip('/')}/api/v1/flows/" | |
| headers = { | |
| "accept": "application/json", | |
| "x-api-key": self.valves.langflow_api_key, | |
| } | |
| params = { | |
| "remove_example_flows": "true", | |
| "components_only": "false", | |
| "get_all": "true", | |
| "header_flows": "false", | |
| "page": 1, | |
| "size": 50, | |
| } | |
| try: | |
| r = requests.get(url, headers=headers, params=params, timeout=10) | |
| r.raise_for_status() | |
| flows = r.json() | |
| return [ | |
| {"id": f.get("id"), "name": f.get("name")} | |
| for f in flows | |
| if f["id"] != self.valves.langflow_task_agent_id | |
| ] | |
| except Exception as e: | |
| return [{"id": "error", "name": str(e)}] | |
| def pipe(self, body: dict, __user__: dict, __metadata__: dict): | |
| try: | |
| messages = body.get("messages") or [] | |
| model_id = body["model"][body["model"].find(".") + 1 :] | |
| question = self._process_message_content(messages[-1]) | |
| chat_id = __metadata__.get("chat_id", "") | |
| is_stream = body.get("stream", False) | |
| is_task = bool(__metadata__.get("task", False)) | |
| if is_stream: | |
| return self._run_streaming_request(question, chat_id, model_id, is_task) | |
| else: | |
| return self._run_request(question, chat_id, model_id, is_task) | |
| except Exception as e: | |
| error_msg = f"Error in pipe: {e}" | |
| print(error_msg) | |
| return error_msg | |
| @staticmethod | |
| def _process_message_content(message: dict) -> str: | |
| if isinstance(message.get("content"), list): | |
| parts = [] | |
| for item in message["content"]: | |
| if item.get("type") == "text": | |
| parts.append(item.get("text", "")) | |
| return " ".join(parts) | |
| return message.get("content", "") or "" | |
| def _run_request(self, question: str, chat_id: str, model_id: str, is_task: bool): | |
| url = f"{self.valves.langflow_url.rstrip('/')}/api/v1/run/{model_id if not is_task else self.valves.langflow_task_agent_id}" | |
| if is_task: | |
| chat_id += "-task" | |
| payload = { | |
| "input_value": question, | |
| "input_type": "chat", | |
| "output_type": "chat", | |
| "session_id": chat_id, | |
| } | |
| headers = { | |
| "Content-Type": "application/json", | |
| "x-api-key": self.valves.langflow_api_key, | |
| "Accept": "application/json", | |
| } | |
| try: | |
| response = requests.post(url, headers=headers, json=payload, timeout=60) | |
| response.raise_for_status() | |
| data = response.json() | |
| message_response = data["outputs"][0]["outputs"][0]["outputs"]["message"][ | |
| "message" | |
| ] | |
| return message_response | |
| except requests.HTTPError as e: | |
| print(f"HTTP error: {e} - Response: {response.text}") | |
| return f"Error: {e}" | |
| except Exception as e: | |
| print(f"Error: {e}") | |
| return f"Error: {e}" | |
| def _run_streaming_request( | |
| self, question: str, chat_id: str, model_id: str, is_task: bool | |
| ): | |
| url = f"{self.valves.langflow_url.rstrip('/')}/api/v1/run/{model_id if not is_task else self.valves.langflow_task_agent_id}?stream=true" | |
| if is_task: | |
| chat_id += "-task" | |
| payload = { | |
| "input_value": question, | |
| "input_type": "chat", | |
| "output_type": "chat", | |
| "session_id": chat_id, | |
| } | |
| headers = { | |
| "Content-Type": "application/json", | |
| "x-api-key": self.valves.langflow_api_key, | |
| "Accept": "application/json", | |
| } | |
| try: | |
| response = requests.post( | |
| url, headers=headers, json=payload, stream=True, timeout=60 | |
| ) | |
| response.raise_for_status() | |
| def stream_generator(): | |
| previous_text = "" | |
| for line in response.iter_lines(): | |
| if not line: | |
| continue | |
| try: | |
| line_str = line.decode("utf-8") | |
| event_data = json.loads(line_str) | |
| event_type = event_data.get("event") | |
| if event_type == "add_message": | |
| data = event_data.get("data", {}) | |
| sender = data.get("sender") | |
| current_text = data.get("text", "") | |
| if sender == "Machine" and current_text: | |
| if len(current_text) > len(previous_text): | |
| new_chunk = current_text[len(previous_text) :] | |
| previous_text = current_text | |
| openai_chunk = { | |
| "choices": [ | |
| { | |
| "delta": {"content": new_chunk}, | |
| "index": 0, | |
| "finish_reason": None, | |
| } | |
| ] | |
| } | |
| yield f"data: {json.dumps(openai_chunk)}\n\n".encode( | |
| "utf-8" | |
| ) | |
| elif event_type == "end": | |
| final_chunk = { | |
| "choices": [ | |
| {"delta": {}, "index": 0, "finish_reason": "stop"} | |
| ] | |
| } | |
| yield f"data: {json.dumps(final_chunk)}\n\n".encode("utf-8") | |
| yield b"data: [DONE]\n\n" | |
| break | |
| except json.JSONDecodeError: | |
| continue | |
| except Exception as e: | |
| print(f"Error processing line: {e}") | |
| continue | |
| return stream_generator() | |
| except requests.HTTPError as e: | |
| error_msg = f"HTTP error: {e}" | |
| print(f"{error_msg} - Response: {response.text}") | |
| return error_msg | |
| except Exception as e: | |
| error_msg = f"Error: {e}" | |
| print(error_msg) | |
| return error_msg |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment