Skip to content

Instantly share code, notes, and snippets.

@acrolink
Created November 1, 2024 20:16
Show Gist options
  • Save acrolink/198e838f15e090a2dbbaaa2c7769e8ae to your computer and use it in GitHub Desktop.
Save acrolink/198e838f15e090a2dbbaaa2c7769e8ae to your computer and use it in GitHub Desktop.
Example of a LangChain Channel definition for use with a Flutter client (using Phoenix Wings for Dart)
defmodule ServerWeb.FlutterChatChannel do
use ServerWeb, :channel
alias ServerWeb.Chatbot.ChatMessage
alias LangChain.Chains.LLMChain
alias LangChain.Message
alias LangChain.Message.ToolCall
alias LangChain.Message.ToolResult
alias LangChain.ChatModels.ChatOpenAI
alias LangChain.PromptTemplate
alias LangChain.Chains.RoutingChain
alias LangChain.Routing.PromptRoute
defimpl Jason.Encoder, for: LangChain.MessageDelta do
def encode(value, opts) do
Jason.Encode.map(
%{
content: Map.get(value, :content),
status: Map.get(value, :status),
index: Map.get(value, :index),
role: Map.get(value, :role),
tool_calls: Map.get(value, :tool_calls)
},
opts
)
end
end
defimpl Jason.Encoder, for: LangChain.Message do
def encode(value, opts) do
Jason.Encode.map(
%{
content: Map.get(value, :content),
status: Map.get(value, :status),
index: Map.get(value, :index),
role: Map.get(value, :role),
name: Map.get(value, :name),
tool_calls: Map.get(value, :tool_calls),
tool_results: Map.get(value, :tool_results)
},
opts
)
end
end
# When a user joins the channel, initialize state
def join("flutter_chat:" <> private_room_id, payload, socket) do
if authorized?(payload) do
current_user = %{"id" => 1, "name" => "zaki", "timezone" => "Etc/UTC"}
llm_chain = initialize_llm_chain(current_user)
socket =
assign(socket, :current_user, current_user)
|> assign(:room_id, private_room_id)
|> assign(:llm_chain, llm_chain)
|> assign(:display_messages, [
%ChatMessage{
role: :assistant,
hidden: false,
content: "Hello! I'm your personal trainer. How can I help you today?"
}
])
|> assign_llm_chain()
{:ok, socket}
else
{:error, %{reason: "unauthorized"}}
end
end
defp assign_llm_chain(socket) do
llm_chain =
LLMChain.new!(%{
llm:
ChatOpenAI.new!(%{
model: Application.get_env(:langchain, :openai_model),
# don't get creative with answers
temperature: 0,
request_timeout: 60_000,
stream: true
}),
# custom_context: %{
# live_view_pid: self(),
# current_user: socket.assigns.current_user
# },
verbose: false
})
|> LLMChain.add_message(Message.new_system!(~S|
You are a helpful American virtual personal strength trainer. Your name is "Max". Limit discussions
to ONLY discuss the user's programming langauges. First, know their preferred back-end programing langage. Second, their preferred front-end programming language. Third, their preferred IDE. Do not answer questions
off the topic of programming. Answer the user's questions when possible.
If you don't know the answer to something, say you don't know; do not make up answers.
|))
socket
|> assign(:llm_chain, llm_chain)
end
# Handle incoming "validate" event
# def handle_in("validate", %{"chat_message" => params}, socket) do
# changeset = ChatMessage.create_changeset(params) |> Map.put(:action, :validate)
# push(socket, "validate_response", %{form: to_form(changeset)})
# {:noreply, socket}
# end
# Handle incoming "save" event
def handle_in("save", %{"chat_message" => params}, socket) do
IO.puts("handle_in .. save")
case ChatMessage.new(params) do
{:ok, message} ->
IO.inspect(message)
updated_socket = add_user_message(socket, message.content)
push(updated_socket, "message_saved", %{content: message.content})
{:noreply, run_chain(updated_socket)}
{:error, changeset} ->
push(socket, "save_error", %{errors: changeset.errors})
{:noreply, socket}
end
end
# Handle timezone event to update user's timezone
def handle_in("browser-timezone", %{"timezone" => timezone}, socket) do
user = socket.assigns.current_user
socket =
if timezone != user.timezone do
{:ok, updated_user} = FitnessUsers.update_fitness_user(user, %{timezone: timezone})
assign(socket, :current_user, updated_user)
else
socket
end
{:noreply, socket}
end
# Handle incoming chat delta from async processing
def handle_info({:chat_delta, delta}, socket) do
# This is where LLM generated content gets processed and merged to the
# LLMChain managed by the state in this LiveView process.
# Apply the delta message to our tracked LLMChain. If it completes the
# message, display the message
updated_chain = LLMChain.apply_delta(socket.assigns.llm_chain, delta)
# is this needed ?!
push(socket, "chat_delta", %{delta: delta})
# if this completed the delta, create the message and track on the chain
socket =
if updated_chain.delta == nil do
# the delta completed the message. Examine the last message
message = updated_chain.last_message
message_object = %ChatMessage{
role: message.role,
content: socket.assigns.room_id <> " >> " <> message.content,
tool_calls: message.tool_calls,
tool_results: message.tool_results
}
# notify flutter app
push(socket, "new_message", %{"chat_message" => message_object})
# no need
# append_display_message(socket, message_object)
socket
else
socket
end
{:noreply, assign(socket, :llm_chain, updated_chain)}
end
# not in use
defp append_display_message(socket, %ChatMessage{} = message) do
# since display_messages is not in use .. put empty value in it ..
# assign(socket, :display_messages, socket.assigns.display_messages ++ [message])
assign(socket, :display_messages, %{})
end
# Handle tool execution message from async processing
def handle_info({:tool_executed, tool_message}, socket) do
message = %ChatMessage{role: tool_message.role, tool_results: tool_message.tool_results}
updated_socket =
assign(socket, :llm_chain, LLMChain.add_message(socket.assigns.llm_chain, tool_message))
push(updated_socket, "tool_executed", %{tool_message: message})
{:noreply, updated_socket}
end
# Handle updated user information
def handle_info({:updated_current_user, updated_user}, socket) do
socket =
assign(socket, :current_user, updated_user)
|> assign(
:llm_chain,
LLMChain.update_custom_context(socket.assigns.llm_chain, %{current_user: updated_user})
)
{:noreply, socket}
end
# Handle task errors in async processing
def handle_info({:task_error, reason}, socket) do
push(socket, "task_error", %{reason: reason})
{:noreply, socket}
end
# Function to initialize LLMChain
defp initialize_llm_chain(current_user) do
LLMChain.new!(%{
llm:
ChatOpenAI.new!(%{model: "gpt-4", temperature: 0, request_timeout: 60_000, stream: true}),
custom_context: %{live_view_pid: self(), current_user: current_user},
verbose: false
})
end
# Handle running the LLM chain async
defp run_chain(socket) do
IO.puts("run_chain called ..")
chain = socket.assigns.llm_chain
live_view_pid = self()
callback_fn = fn
%LangChain.MessageDelta{} = delta -> send(live_view_pid, {:chat_delta, delta})
%LangChain.Message{role: :tool} = message -> send(live_view_pid, {:tool_executed, message})
_ -> :ok
end
Task.start(fn ->
case LLMChain.run(chain, while_needs_response: true, callback_fn: callback_fn) do
{:ok, _updated_chain, _last_message} ->
IO.inspect(_last_message)
broadcast(socket, "last_message", _last_message)
:ok
{:error, reason} ->
IO.inspect(reason)
send(live_view_pid, {:task_error, reason})
end
end)
assign(socket, :llm_chain, chain)
end
# Add user message to chain
defp add_user_message(socket, user_text) do
updated_chain = LLMChain.add_message(socket.assigns.llm_chain, Message.new_user!(user_text))
assign(socket, :llm_chain, updated_chain)
end
# Channels can be used in a request/response fashion
# by sending replies to requests from the client
def handle_in("ping", payload, socket) do
{:reply, {:ok, payload}, socket}
end
# It is also common to receive messages from the client and
# broadcast to everyone in the current topic (flutter_chat:lobby).
def handle_in("shout", payload, socket) do
broadcast(socket, "shout", payload)
{:noreply, socket}
end
def handle_in("say", payload, socket) do
broadcast(socket, "say", payload)
{:noreply, socket}
end
# Add authorization logic here as required.
defp authorized?(_payload) do
true
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment