Created
November 1, 2024 20:16
-
-
Save acrolink/198e838f15e090a2dbbaaa2c7769e8ae to your computer and use it in GitHub Desktop.
Example of a LangChain Channel definition for use with a Flutter client (using Phoenix Wings for Dart)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule ServerWeb.FlutterChatChannel do | |
use ServerWeb, :channel | |
alias ServerWeb.Chatbot.ChatMessage | |
alias LangChain.Chains.LLMChain | |
alias LangChain.Message | |
alias LangChain.Message.ToolCall | |
alias LangChain.Message.ToolResult | |
alias LangChain.ChatModels.ChatOpenAI | |
alias LangChain.PromptTemplate | |
alias LangChain.Chains.RoutingChain | |
alias LangChain.Routing.PromptRoute | |
defimpl Jason.Encoder, for: LangChain.MessageDelta do | |
def encode(value, opts) do | |
Jason.Encode.map( | |
%{ | |
content: Map.get(value, :content), | |
status: Map.get(value, :status), | |
index: Map.get(value, :index), | |
role: Map.get(value, :role), | |
tool_calls: Map.get(value, :tool_calls) | |
}, | |
opts | |
) | |
end | |
end | |
defimpl Jason.Encoder, for: LangChain.Message do | |
def encode(value, opts) do | |
Jason.Encode.map( | |
%{ | |
content: Map.get(value, :content), | |
status: Map.get(value, :status), | |
index: Map.get(value, :index), | |
role: Map.get(value, :role), | |
name: Map.get(value, :name), | |
tool_calls: Map.get(value, :tool_calls), | |
tool_results: Map.get(value, :tool_results) | |
}, | |
opts | |
) | |
end | |
end | |
# When a user joins the channel, initialize state | |
def join("flutter_chat:" <> private_room_id, payload, socket) do | |
if authorized?(payload) do | |
current_user = %{"id" => 1, "name" => "zaki", "timezone" => "Etc/UTC"} | |
llm_chain = initialize_llm_chain(current_user) | |
socket = | |
assign(socket, :current_user, current_user) | |
|> assign(:room_id, private_room_id) | |
|> assign(:llm_chain, llm_chain) | |
|> assign(:display_messages, [ | |
%ChatMessage{ | |
role: :assistant, | |
hidden: false, | |
content: "Hello! I'm your personal trainer. How can I help you today?" | |
} | |
]) | |
|> assign_llm_chain() | |
{:ok, socket} | |
else | |
{:error, %{reason: "unauthorized"}} | |
end | |
end | |
defp assign_llm_chain(socket) do | |
llm_chain = | |
LLMChain.new!(%{ | |
llm: | |
ChatOpenAI.new!(%{ | |
model: Application.get_env(:langchain, :openai_model), | |
# don't get creative with answers | |
temperature: 0, | |
request_timeout: 60_000, | |
stream: true | |
}), | |
# custom_context: %{ | |
# live_view_pid: self(), | |
# current_user: socket.assigns.current_user | |
# }, | |
verbose: false | |
}) | |
|> LLMChain.add_message(Message.new_system!(~S| | |
You are a helpful American virtual personal strength trainer. Your name is "Max". Limit discussions | |
to ONLY discuss the user's programming langauges. First, know their preferred back-end programing langage. Second, their preferred front-end programming language. Third, their preferred IDE. Do not answer questions | |
off the topic of programming. Answer the user's questions when possible. | |
If you don't know the answer to something, say you don't know; do not make up answers. | |
|)) | |
socket | |
|> assign(:llm_chain, llm_chain) | |
end | |
# Handle incoming "validate" event | |
# def handle_in("validate", %{"chat_message" => params}, socket) do | |
# changeset = ChatMessage.create_changeset(params) |> Map.put(:action, :validate) | |
# push(socket, "validate_response", %{form: to_form(changeset)}) | |
# {:noreply, socket} | |
# end | |
# Handle incoming "save" event | |
def handle_in("save", %{"chat_message" => params}, socket) do | |
IO.puts("handle_in .. save") | |
case ChatMessage.new(params) do | |
{:ok, message} -> | |
IO.inspect(message) | |
updated_socket = add_user_message(socket, message.content) | |
push(updated_socket, "message_saved", %{content: message.content}) | |
{:noreply, run_chain(updated_socket)} | |
{:error, changeset} -> | |
push(socket, "save_error", %{errors: changeset.errors}) | |
{:noreply, socket} | |
end | |
end | |
# Handle timezone event to update user's timezone | |
def handle_in("browser-timezone", %{"timezone" => timezone}, socket) do | |
user = socket.assigns.current_user | |
socket = | |
if timezone != user.timezone do | |
{:ok, updated_user} = FitnessUsers.update_fitness_user(user, %{timezone: timezone}) | |
assign(socket, :current_user, updated_user) | |
else | |
socket | |
end | |
{:noreply, socket} | |
end | |
# Handle incoming chat delta from async processing | |
def handle_info({:chat_delta, delta}, socket) do | |
# This is where LLM generated content gets processed and merged to the | |
# LLMChain managed by the state in this LiveView process. | |
# Apply the delta message to our tracked LLMChain. If it completes the | |
# message, display the message | |
updated_chain = LLMChain.apply_delta(socket.assigns.llm_chain, delta) | |
# is this needed ?! | |
push(socket, "chat_delta", %{delta: delta}) | |
# if this completed the delta, create the message and track on the chain | |
socket = | |
if updated_chain.delta == nil do | |
# the delta completed the message. Examine the last message | |
message = updated_chain.last_message | |
message_object = %ChatMessage{ | |
role: message.role, | |
content: socket.assigns.room_id <> " >> " <> message.content, | |
tool_calls: message.tool_calls, | |
tool_results: message.tool_results | |
} | |
# notify flutter app | |
push(socket, "new_message", %{"chat_message" => message_object}) | |
# no need | |
# append_display_message(socket, message_object) | |
socket | |
else | |
socket | |
end | |
{:noreply, assign(socket, :llm_chain, updated_chain)} | |
end | |
# not in use | |
defp append_display_message(socket, %ChatMessage{} = message) do | |
# since display_messages is not in use .. put empty value in it .. | |
# assign(socket, :display_messages, socket.assigns.display_messages ++ [message]) | |
assign(socket, :display_messages, %{}) | |
end | |
# Handle tool execution message from async processing | |
def handle_info({:tool_executed, tool_message}, socket) do | |
message = %ChatMessage{role: tool_message.role, tool_results: tool_message.tool_results} | |
updated_socket = | |
assign(socket, :llm_chain, LLMChain.add_message(socket.assigns.llm_chain, tool_message)) | |
push(updated_socket, "tool_executed", %{tool_message: message}) | |
{:noreply, updated_socket} | |
end | |
# Handle updated user information | |
def handle_info({:updated_current_user, updated_user}, socket) do | |
socket = | |
assign(socket, :current_user, updated_user) | |
|> assign( | |
:llm_chain, | |
LLMChain.update_custom_context(socket.assigns.llm_chain, %{current_user: updated_user}) | |
) | |
{:noreply, socket} | |
end | |
# Handle task errors in async processing | |
def handle_info({:task_error, reason}, socket) do | |
push(socket, "task_error", %{reason: reason}) | |
{:noreply, socket} | |
end | |
# Function to initialize LLMChain | |
defp initialize_llm_chain(current_user) do | |
LLMChain.new!(%{ | |
llm: | |
ChatOpenAI.new!(%{model: "gpt-4", temperature: 0, request_timeout: 60_000, stream: true}), | |
custom_context: %{live_view_pid: self(), current_user: current_user}, | |
verbose: false | |
}) | |
end | |
# Handle running the LLM chain async | |
defp run_chain(socket) do | |
IO.puts("run_chain called ..") | |
chain = socket.assigns.llm_chain | |
live_view_pid = self() | |
callback_fn = fn | |
%LangChain.MessageDelta{} = delta -> send(live_view_pid, {:chat_delta, delta}) | |
%LangChain.Message{role: :tool} = message -> send(live_view_pid, {:tool_executed, message}) | |
_ -> :ok | |
end | |
Task.start(fn -> | |
case LLMChain.run(chain, while_needs_response: true, callback_fn: callback_fn) do | |
{:ok, _updated_chain, _last_message} -> | |
IO.inspect(_last_message) | |
broadcast(socket, "last_message", _last_message) | |
:ok | |
{:error, reason} -> | |
IO.inspect(reason) | |
send(live_view_pid, {:task_error, reason}) | |
end | |
end) | |
assign(socket, :llm_chain, chain) | |
end | |
# Add user message to chain | |
defp add_user_message(socket, user_text) do | |
updated_chain = LLMChain.add_message(socket.assigns.llm_chain, Message.new_user!(user_text)) | |
assign(socket, :llm_chain, updated_chain) | |
end | |
# Channels can be used in a request/response fashion | |
# by sending replies to requests from the client | |
def handle_in("ping", payload, socket) do | |
{:reply, {:ok, payload}, socket} | |
end | |
# It is also common to receive messages from the client and | |
# broadcast to everyone in the current topic (flutter_chat:lobby). | |
def handle_in("shout", payload, socket) do | |
broadcast(socket, "shout", payload) | |
{:noreply, socket} | |
end | |
def handle_in("say", payload, socket) do | |
broadcast(socket, "say", payload) | |
{:noreply, socket} | |
end | |
# Add authorization logic here as required. | |
defp authorized?(_payload) do | |
true | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment