2024년 6월 30일 일요일

Langgraph 기반 다중 LLM 에이전트 개발하기

이 글은 Langgraph를 이용해, 간단히 다중 LLM 에이전트를 개발하는 방법을 정리한 것이다.
개발 환경 설정
pip install langchain langgraph milvus

RAG 처리하기
import os
from dotenv import load_dotenv
from typing import List
from typing_extensions import TypedDict
from langchain import hub
from langchain.globals import set_verbose, set_debug
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import Chroma
from langchain_community.chat_models import ChatOllama
from langchain.prompts import PromptTemplate
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.output_parsers import StrOutputParser
from langchain_core.output_parsers import JsonOutputParser
from langchain.schema import Document

# setup environment for LLM RAG
load_dotenv()
set_debug(True)
set_verbose(True)

# Load documents
urls = [
"https://lilianweng.github.io/posts/2023-06-23-agent/",
"https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
"https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
]

docs = [WebBaseLoader(url).load() for url in urls]
docs_list = [item for sublist in docs for item in sublist]
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
chunk_size=250, chunk_overlap=0
)
doc_splits = text_splitter.split_documents(docs_list)

# Create vectorstore and retriever
vectorstore = Chroma.from_documents(
documents=doc_splits,
collection_name="rag_Chroma",
embedding=HuggingFaceEmbeddings(),
persist_directory="./Chroma_rag.db",

)
retriever = vectorstore.as_retriever()

# Load LLM model using Ollama
local_llm = 'llama3'
llm = ChatOllama(model=local_llm, format="json", temperature=0)

# Test grader prompt with JSON output
prompt = PromptTemplate(
template="""You are a grader assessing relevance 
of a retrieved document to a user question. If the document contains keywords related to the user question, 
grade it as relevant. It does not need to be a stringent test. The goal is to filter out erroneous retrievals. 
Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question.
Provide the binary score as a JSON with a single key 'score' and no premable or explaination.
 
Here is the retrieved document: 
{document}
Here is the user question: 
{question}
""",
input_variables=["question", "document"],
)

retrieval_grader = prompt | llm | JsonOutputParser()
question = "agent memory"
docs = retriever.invoke(question)
doc_txt = docs[1].page_content
print(retrieval_grader.invoke({"question": question, "document": doc_txt}))

# Test QA prompt
prompt = PromptTemplate(
template="""You are an assistant for question-answering tasks. 
Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. 
Use three sentences maximum and keep the answer concise:
Question: {question} 
Context: {context} 
Answer: 
""",
input_variables=["question", "document"],
)

llm = ChatOllama(model=local_llm, temperature=0)

def format_docs(docs): # Post-processing
return "\n\n".join(doc.page_content for doc in docs)

rag_chain = prompt | llm | StrOutputParser() # Chain

question = "agent memory"
docs = retriever.invoke(question)
generation = rag_chain.invoke({"context": docs, "question": question})
print(generation)

# Test hallucination grader prompt with JSON output
llm = ChatOllama(model=local_llm, format="json", temperature=0)

prompt = PromptTemplate(
template="""You are a grader assessing whether 
an answer is grounded in / supported by a set of facts. Give a binary score 'yes' or 'no' score to indicate 
whether the answer is grounded in / supported by a set of facts. Provide the binary score as a JSON with a 
single key 'score' and no preamble or explanation.
Here are the facts:
{documents} 

Here is the answer: 
{generation}
""",
input_variables=["generation", "documents"],
)

hallucination_grader = prompt | llm | JsonOutputParser()
generation = hallucination_grader.invoke({"documents": docs, "generation": generation})
print(generation)

# Test hallucination grader prompt 
llm = ChatOllama(model=local_llm, format="json", temperature=0)

prompt = PromptTemplate(
template="""You are a grader assessing whether an 
answer is useful to resolve a question. Give a binary score 'yes' or 'no' to indicate whether the answer is 
useful to resolve a question. Provide the binary score as a JSON with a single key 'score' and no preamble or explanation.
 
Here is the answer:
{generation} 

Here is the question: {question}
""",
input_variables=["generation", "question"],
)

answer_grader = prompt | llm | JsonOutputParser()
generation = answer_grader.invoke({"question": question,"generation": generation})
print(generation)

# Test router prompt
llm = ChatOllama(model=local_llm, format="json", temperature=0)

prompt = PromptTemplate(
template="""You are an expert at routing a 
user question to a vectorstore or web search. Use the vectorstore for questions on LLM  agents, 
prompt engineering, and adversarial attacks. You do not need to be stringent with the keywords 
in the question related to these topics. Otherwise, use web-search. Give a binary choice 'web_search' 
or 'vectorstore' based on the question. Return the a JSON with a single key 'datasource' and 
no premable or explaination. 
Question to route: 
{question}""",
input_variables=["question"],
)

question_router = prompt | llm | JsonOutputParser()
question = "llm agent memory"
docs = retriever.get_relevant_documents(question)
doc_txt = docs[1].page_content
print(question_router.invoke({"question": question}))

LLM 기반 다중 에이전트 구현하기
# Web search tool setup
from langchain_community.tools.tavily_search import TavilySearchResults
tavily_api_key = os.environ['TAVILY_API_KEY'] = <Travily API>
web_search_tool = TavilySearchResults(k=3,  tavily_api_key=tavily_api_key)

# Define the graph state using langgraph
from langgraph.graph import END, StateGraph

class GraphState(TypedDict):
"""
Represents the state of our graph.

Attributes:
question: question
generation: LLM generation
web_search: whether to add search
documents: list of documents 
"""
question : str
generation : str
web_search : str
documents : List[str]

def retrieve(state): # node. Retrieve documents from vectorstore
# Args. state (dict): The current graph state
# Returns. state (dict): New key added to state, documents, that contains retrieved documents
print("---RETRIEVE---")
question = state["question"]
documents = retriever.invoke(question)
return {"documents": documents, "question": question}

def generate(state): # node. Generate answer using RAG on retrieved documents
# Args: state (dict): The current graph state
# Returns: state (dict): New key added to state, generation, that contains LLM generation
print("---GENERATE---")
question = state["question"]
documents = state["documents"]
generation = rag_chain.invoke({"context": documents, "question": question})
return {"documents": documents, "question": question, "generation": generation}

def grade_documents(state): # node. Determines whether the retrieved documents are relevant to the question If any document is not relevant, we will set a flag to run web search
# Args: state (dict): The current graph state
# Returns: state (dict): Filtered out irrelevant documents and updated web_search state
print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
question = state["question"]
documents = state["documents"]
filtered_docs = [] # Score each doc
web_search = "No"
for d in documents:
score = retrieval_grader.invoke({"question": question, "document": d.page_content})
grade = score['score']
if grade.lower() == "yes": # Document relevant
print("---GRADE: DOCUMENT RELEVANT---")
filtered_docs.append(d)
else: # Document not relevant
print("---GRADE: DOCUMENT NOT RELEVANT---")
# We do not include the document in filtered_docs. We set a flag to indicate that we want to run web search
web_search = "Yes"
continue
return {"documents": filtered_docs, "question": question, "web_search": web_search}
def web_search(state): # Web search based based on the question 
# Args: state (dict): The current graph state
# Returns: state (dict): Appended web results to documents
print("---WEB SEARCH---")
question = state["question"]
documents = state["documents"]

docs = web_search_tool.invoke({"query": question}) # Web search
web_results = "\n".join([d["content"] for d in docs])
web_results = Document(page_content=web_results)
if documents is not None:
documents.append(web_results)
else:
documents = [web_results]
return {"documents": documents, "question": question}

def route_question(state): # Conditional edge. Route question to web search or RAG.
# Args: state (dict): The current graph state
# Returns: str: Next node to call
print("---ROUTE QUESTION---")
question = state["question"]
print(question)

source = question_router.invoke({"question": question})  
print(source)
print(source['datasource'])
if source['datasource'] == 'web_search':
print("---ROUTE QUESTION TO WEB SEARCH---")
return "websearch"
elif source['datasource'] == 'vectorstore':
print("---ROUTE QUESTION TO RAG---")
return "vectorstore"

def decide_to_generate(state): # Determines whether to generate an answer, or add web search
# Args: state (dict): The current graph state
# Returns: str: Binary decision for next node to call
print("---ASSESS GRADED DOCUMENTS---")
question = state["question"]
web_search = state["web_search"]
filtered_documents = state["documents"]

if web_search == "Yes": # All documents have been filtered check_relevance. We will re-generate a new query
print("---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, INCLUDE WEB SEARCH---")
return "websearch"
else: # We have relevant documents, so generate answer
print("---DECISION: GENERATE---")
return "generate"

def grade_generation_v_documents_and_question(state): # Conditional edge. Determines whether the generation is grounded in the document and answers question.
# Args: state (dict): The current graph state
# Returns: str: Decision for next node to call
print("---CHECK HALLUCINATIONS---")
question = state["question"]
documents = state["documents"]
generation = state["generation"]

score = hallucination_grader.invoke({"documents": documents, "generation": generation})
grade = score['score']

if grade == "yes": # Check hallucination
print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
print("---GRADE GENERATION vs QUESTION---") # Check question-answering
score = answer_grader.invoke({"question": question,"generation": generation})
grade = score['score']
if grade == "yes":
print("---DECISION: GENERATION ADDRESSES QUESTION---")
return "useful"
else:
print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
return "not useful"
else:
pprint("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
return "not supported"

workflow = StateGraph(GraphState)

# Define the nodes
workflow.add_node("websearch", web_search) # web search
workflow.add_node("retrieve", retrieve) # retrieve
workflow.add_node("grade_documents", grade_documents) # grade documents
workflow.add_node("generate", generate) # generatae

# Build graph and compile
workflow.set_conditional_entry_point(
route_question,
{
"websearch": "websearch",
"vectorstore": "retrieve",
},
)

workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
"grade_documents",
decide_to_generate,
{
"websearch": "websearch",
"generate": "generate",
},
)
workflow.add_edge("websearch", "generate")
workflow.add_conditional_edges(
"generate",
grade_generation_v_documents_and_question,
{
"not supported": "generate",
"useful": END,
"not useful": "websearch",
},
)

app = workflow.compile()

# Test the graph
from pprint import pprint
inputs = {"question": "What are the types of agent memory?"}
for output in app.stream(inputs):
for key, value in output.items():
pprint(f"Finished running: {key}:")
pprint(value["generation"])

실행 결과
다음과 같이 출력되면 성공한 것이다..

레퍼런스

댓글 없음:

댓글 쓰기