이 글은 Advanced RAG series: Indexing 를 보고 작성한 글입니다.


1. Chunk Optimization

Chunking 전략을 구성하기 전에 물어봐야 할 두 가지 질문이 있다고 함:

  • 데이터가 짧은지, 긴지.
  • 우리가 만들려는 LLM 의 목적에 따라서 Chunking 전랴이 달라질 수 있다고 한다.

 

1번의 경우에는 데이터가 짧다면 그대로 Vector Store 에 넣으면 되니까. 하지만 문서가 크다면 Chunk 로 쪼개야 할거임.

 

2번의 경우에는 확실히 LLM 을 Q&A Chatbot 으로 사용할건지, Summarization Tool 로 사용할건지, 아니면 Agentic Workflow (그러니까 한 LLM 의 출력이 다른 LLM 에게 넘어가서 최종적인 답변이 완성되는 경우) 에 따라 달라질거임.

 

Q&A Chatbot 의 경우에는 Document 를 가지고 예상 질문을 뽑은 후 해당 예상 질문과 Document 를 매핑해서 임베딩해서 Vector Store 에 넣는 식으로 Chunking 전략이 결정될거임.

 

Summarization Tool 의 경우에는 Document 를 맥락을 풍부하게 만들기 위해서 글의 단락 별로 제법 길게 Chunking 전략을 구축할 수 있을거임.

 

Agentic Workflow 의 경우에는 각 단계마다 필요한 데이터가 다를 거니까 같은 데이터라고 하더라도 각 처리 단계에 맞게 사용할 수 있도록 Chunking 전략을 구축해야할거임.

 

대표적인 Chunking 전략들에 대해서 하나씩 살펴보자.

 

 

1) Rule based:

가장 단순한 방법은 Character 크기로 Chunk 를 구축하는 방법임.

 

하지만 유의미한 방법이 아니라는 건 알거임. 문장 중간에 잘라져서 chunk 로 구성된다고 생각해보자. 그게 의미가 있을까? 아무리 overlaping 을 준다고 해도.

 

 

물론 더 나아가서 Sentence Tokenizer 를 이용하는 방법도 있음. 이렇게 하면 문장 단위로 잘 짤린다.

 

하지만 Context 가 여러 글의 단락으로 구성되어 있는 경우에 이런 방법은 통하지 않을 걸 예상할 수 있음.

 

 

2) Recursive structure aware splitting:

훨씬 나은 방법임 이 방법을 쓰면 그래도 글의 단락 별로 Chunking 을 구성하게 만들 수 있음. 대신 Chunk 사이즈가 균일하지는 않을 수 있음.

 

원리는 글의 단락 별로 나눠질 때 구분자가 들어갈거임. (e.g \n, \n\n 등) 이런걸로 단락별로 구별한다고 알면됨.

 

 

3) Content-Aware Splitting

만약 Document 의 포맷에 대해 알고있다고 한다면 그 포맷에 맞춰서 나눠서 Chunking 을 하면 됨.

 

예로 Markdown 이라고 한다면 # ## ### 별로 글의 내용이 구별될텐데 이걸 이용해서 쪼개면 됨.

 

 

2. Multi-representation indexing:

여기서는 또 다양한 방식의 Chunking 전략을 소개함.

 

일반적인 Multi-representation indexing 은 Document 의 원본을 Chunking 해서 Vector Store 에 적재하는게 아니라, Docuemnt 의 요약본을 Vector Store 에 적재시킨다. 그리고 Document 원본은 Document Store 에 따로 저장해둠.

 

사용자의 질문이 오면 질문과 문서 요약본의 유사도 검색을 수행한 후, 요약본 문서에서 메타 데이터로 문서의 id 를 가지고 Document 원본을 조회하는 방법임. 

 

LangChain 의 예시 코드는 다음과 같다: 

 

from langchain_core.documents import Document
from langchain_community.vectorstores import Chroma
from langchain_community.document_loaders import WebBaseLoader, PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.prompts import ChatPromptTemplate
from langchain import hub

loader = WebBaseLoader("https://lilianweng.github.io/posts/2023-06-23-agent/")
docs = loader.load()

loader = WebBaseLoader("https://lilianweng.github.io/posts/2024-02-05-human-data-quality/")
docs.extend(loader.load())

chain = (
    {"doc": lambda x: x.page_content}
    | ChatPromptTemplate.from_template("Summarize the following document:\n\n{doc}")
    | ChatOpenAI(model="gpt-3.5-turbo",max_retries=0)
    | StrOutputParser()
)

summaries = chain.batch(docs, {"max_concurrency": 5})

from langchain.storage import InMemoryByteStore
from langchain.retrievers.multi_vector import MultiVectorRetriever
import uuid

docstore = InMemoryByteStore() # To store the documents
vectorstore = Chroma(collection_name="summaries", embedding_function=OpenAIEmbeddings()) # To store the embeddings from the summeries of the documents

# ids that map summeries to the documents
doc_ids = [str(uuid.uuid4()) for _ in docs]

# Create documents from summeries. 
summary_docs = [Document(page_content=s, metadata={"doc_id": doc_id}) for s, doc_id in zip(summaries, doc_ids)]

# Create the retriever
retriever = MultiVectorRetriever(
    vectorstore=vectorstore,
    byte_store=docstore,
    id_key="doc_id"
)

# Add summaries to the vectorstore
retriever.vectorstore.add_documents(summary_docs)

# Add docuemnts to the docstore
retriever.docstore.mset(list(zip(doc_ids, docs)))

query = "Memory in agents"
sub_docs = vectorstore.similarity_search(query,k=1)
sub_docs[0]

retrieved_docs = retriever.invoke(query)
len(retrieved_docs[0].page_content)

 

 

Parent Document

주어진 질문으로 Chunk 를 검색했을 때 해당 Chunk 를 포함하고 있는 Document 원본을 Context 로 쓰는 방법임.

 

Document 가 가장 컨택스트가 풍부할거니까.

 

그러나 문제점은 Document 가 Context Window 보다 큰 경우가 있을 수 있다는 거임.

 

이런 경우에는 Document 의 내용을 요약시켜서 Context Window 사이즈에 맞게 줄이는 방법임.

 

 

Dense X Retrieval

이 방법은 조금 색다른 방법인데 Chunk 에 넣을 문장들을 문장이나 글의 일부 단락으로 넣는게 아니라 명제 (proposition) 으로 구별해서 넣는 방법이라고 함.

 

이렇게 명제 별로 넣어서 Vector Store 에 적재했을 때 검색 성능이 35%, 22.5% 정도 더 잘 나온다고 함.

 

 

3. Specialized Embeddings

여기서의 방법은 더 유사도 높은 검색을 위해서 Embedding 모델 부분에거 개선을 시도한 것.

 

Fine-tuning:

임베딩 모델을 파인튜닝해서 특정 도메인의 언어를 더 잘 사용하도록 만들면 훨씬 해당 분야의 언어 이해를 더 잘하기 때문에 주어진 질문에 대해 관련있는 Chunk 를 더 잘 찾을 수 있다고 함.

 

ColBERT

임베딩 모델로 ColBERT 를 사용하면 이것도 주어진 질문과 관련있는 Chunk 를 더 잘 찾는다고 함.

 

ColBERT 는 BERT 모델에서 더 업그레이드 한 걸로 더 빠르게, 더 정확하게 검색할 수 있는 모델임. 그래서 대량의 데이터 검색에도 잘 쓸 수 있다고 함.

 

BERT 모델 자체가 Encoder-Decoder 모델로 언어의 유의미함을 잘 발견하는 편이긴 함.

 

유사도 검색 알고리즘으로는 Cosine 을 쓰는게 아니라 MaxSim 이라는 걸 쓴다고 한다.

 

 

4. Hierarchical Indexing:

RAPTOR (Recursive Abstractive Processing for Tree-Organized Retrieval

 

RAPTOR의 Chunking 전략은 다음과 같음:

  • Document 를 Chapter 별로 쪼갠다. 그리고 Chapter 를 다시 글의 단락별로 쪼갠다.
  • 각 단락의 글을 요약해서 해당 단락의 핵심 내용을 추출해서 사라지지 않도록 잘 요약해서 Chunk 를 만든다. 이 Chunk 는 Vector Store 에 저장된다.
  • 각 챕터의 단락 요약을 모아서 챕터 요약을 만든다. 이 챕터 요약도 Vector Store 의 Chunk 로 저장된다.
  • 글의 챕터의 요약도 모두 모아서 최종 문서 요약도 만든다. 그리고 이 요약 도 Vector Store 에 저장된다.
  • 이런식으로 Tree Construction 을 만드는 거임.

이렇게 Vector Store 에 문서 요약 트리 구조의 내용을 넣어서 검색하면 훨씬 더 성능이 잘 나온다고 함. 

 

Multi-representation indexing 전략이 원본 도큐먼트를 거의 그대로 사용하는 반면에, RAPTOR 전략은 계층화된 구조로 문서의 내용을 추상화해서 정리한 구조로 만들어서 LLM 이 여러 Multiple Documents 를 참고해서 답변을 내리도록 할 때 유용함. 

 

RAPTOR 에서 하는 방식은 다음과 같이 하나의 텍스트 Chunk 가 여러개의 부모 노드에 Clustering 될 수 있도록 하는 Soft Clustering 을 일반적으로 사용한다고 함. 

 

Soft Clustering 을 사용하는 방법은 크게 두 가지가 있음: 

  • GMM (Gaussian Mixture Model): 청크된 텍스트 데이터를 보고, 분류군을 만드는데 사용된다.
  • UMAP (Uniform Manifold Approximation and Projection): GMM 의 문제인 고차원 데이터를 잘 분류하지 못하는 문제를 해결하기 위해서 청크된 텍스트 데이터를 좀 더 저차원으로 줄이기 위해서 사용됨. 

 

LangChain 에서 RAPTOR 를 이용하는 코드 예시는 다음과 같다: 

from typing import Dict, List, Optional, Tuple

import numpy as np
import pandas as pd
import umap.umap_ as umap
from langchain.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from sklearn.mixture import GaussianMixture

RANDOM_SEED = 224  # Fixed seed for reproducibility

### --- Code from citations referenced above (added comments and docstrings) --- ###


def global_cluster_embeddings(
    embeddings: np.ndarray,
    dim: int,
    n_neighbors: Optional[int] = None,
    metric: str = "cosine",
) -> np.ndarray:
    """
    Perform global dimensionality reduction on the embeddings using UMAP.

    Parameters:
    - embeddings: The input embeddings as a numpy array.
    - dim: The target dimensionality for the reduced space.
    - n_neighbors: Optional; the number of neighbors to consider for each point.
                   If not provided, it defaults to the square root of the number of embeddings.
    - metric: The distance metric to use for UMAP.

    Returns:
    - A numpy array of the embeddings reduced to the specified dimensionality.
    """
    if n_neighbors is None:
        n_neighbors = int((len(embeddings) - 1) ** 0.5)
    return umap.UMAP(
        n_neighbors=n_neighbors, n_components=dim, metric=metric
    ).fit_transform(embeddings)


def local_cluster_embeddings(
    embeddings: np.ndarray, dim: int, num_neighbors: int = 10, metric: str = "cosine"
) -> np.ndarray:
    """
    Perform local dimensionality reduction on the embeddings using UMAP, typically after global clustering.

    Parameters:
    - embeddings: The input embeddings as a numpy array.
    - dim: The target dimensionality for the reduced space.
    - num_neighbors: The number of neighbors to consider for each point.
    - metric: The distance metric to use for UMAP.

    Returns:
    - A numpy array of the embeddings reduced to the specified dimensionality.
    """
    return umap.UMAP(
        n_neighbors=num_neighbors, n_components=dim, metric=metric
    ).fit_transform(embeddings)


def get_optimal_clusters(
    embeddings: np.ndarray, max_clusters: int = 50, random_state: int = RANDOM_SEED
) -> int:
    """
    Determine the optimal number of clusters using the Bayesian Information Criterion (BIC) with a Gaussian Mixture Model.

    Parameters:
    - embeddings: The input embeddings as a numpy array.
    - max_clusters: The maximum number of clusters to consider.
    - random_state: Seed for reproducibility.

    Returns:
    - An integer representing the optimal number of clusters found.
    """
    max_clusters = min(max_clusters, len(embeddings)) # Maximum number of clusters is limited by the number of embeddings
    n_clusters = np.arange(1, max_clusters) # Range of clusters to consider (1 to max_clusters)
    bics = []
    for n in n_clusters:
        gm = GaussianMixture(n_components=n, random_state=random_state) # For each number of clusters (i.e., number of mixture components) n, calculate gaussian mixture distribution parameters 
        gm.fit(embeddings)
        bics.append(gm.bic(embeddings)) # Calculate the Bayesian Information Criterion (BIC) for the current number of clusters
    return n_clusters[np.argmin(bics)] # Return the number of clusters that minimized the BIC


def GMM_cluster(embeddings: np.ndarray, threshold: float, random_state: int = 0):
    """
    Cluster embeddings using a Gaussian Mixture Model (GMM) based on a probability threshold.

    Parameters:
    - embeddings: The input embeddings as a numpy array.
    - threshold: The probability threshold for assigning an embedding to a cluster.
    - random_state: Seed for reproducibility.

    Returns:
    - A tuple containing the cluster labels and the number of clusters determined.
    """
    n_clusters = get_optimal_clusters(embeddings) # Determine the optimal number of clusters using BIC
    gm = GaussianMixture(n_components=n_clusters, random_state=random_state)
    gm.fit(embeddings) # Fit the Gaussian mixture distribution with parameters related to the optimal number of clusters to the embeddings
    probs = gm.predict_proba(embeddings) # Calculate the probabilities of each embedding belonging to each cluster
    labels = [np.where(prob > threshold)[0] for prob in probs] # Assign embeddings to clusters based on the threshold
    return labels, n_clusters


def perform_clustering(
    embeddings: np.ndarray,
    dim: int,
    threshold: float,
) -> List[np.ndarray]:
    """
    Perform clustering on the embeddings by first reducing their dimensionality globally, then clustering
    using a Gaussian Mixture Model, and finally performing local clustering within each global cluster.

    Parameters:
    - embeddings: The input embeddings as a numpy array.
    - dim: The target dimensionality for UMAP reduction.
    - threshold: The probability threshold for assigning an embedding to a cluster in GMM.

    Returns:
    - A list of numpy arrays, where each array contains the cluster IDs for each embedding.
    """
    if len(embeddings) <= dim + 1:
        # Avoid clustering when there's insufficient data
        return [np.array([0]) for _ in range(len(embeddings))]

    # Global dimensionality reduction
    reduced_embeddings_global = global_cluster_embeddings(embeddings, dim)
    # Global clustering
    global_clusters, n_global_clusters = GMM_cluster(
        reduced_embeddings_global, threshold
    )

    all_local_clusters = [np.array([]) for _ in range(len(embeddings))]
    total_clusters = 0

    # Iterate through each global cluster to perform local clustering
    for i in range(n_global_clusters):
        # Extract embeddings belonging to the current global cluster
        global_cluster_embeddings_ = embeddings[
            np.array([i in gc for gc in global_clusters])
        ]

        if len(global_cluster_embeddings_) == 0:
            continue
        if len(global_cluster_embeddings_) <= dim + 1:
            # Handle small clusters with direct assignment
            local_clusters = [np.array([0]) for _ in global_cluster_embeddings_]
            n_local_clusters = 1
        else:
            # Local dimensionality reduction and clustering
            reduced_embeddings_local = local_cluster_embeddings(
                global_cluster_embeddings_, dim
            )
            local_clusters, n_local_clusters = GMM_cluster(
                reduced_embeddings_local, threshold
            )

        # Assign local cluster IDs, adjusting for total clusters already processed
        for j in range(n_local_clusters):
            local_cluster_embeddings_ = global_cluster_embeddings_[
                np.array([j in lc for lc in local_clusters])
            ]
            indices = np.where(
                (embeddings == local_cluster_embeddings_[:, None]).all(-1)
            )[1]
            for idx in indices:
                all_local_clusters[idx] = np.append(
                    all_local_clusters[idx], j + total_clusters
                )

        total_clusters += n_local_clusters

    return all_local_clusters


def embed(texts):
    """
    Generate embeddings for a list of text documents.

    This function assumes the existence of an `embd` object with a method `embed_documents`
    that takes a list of texts and returns their embeddings.

    Parameters:
    - texts: List[str], a list of text documents to be embedded.

    Returns:
    - numpy.ndarray: An array of embeddings for the given text documents.
    """
    text_embeddings = embeddings.embed_documents(texts)
    text_embeddings_np = np.array(text_embeddings)
    return text_embeddings_np


def embed_cluster_texts(texts):
    """
    Embeds a list of texts and clusters them, returning a DataFrame with texts, their embeddings, and cluster labels.

    This function combines embedding generation and clustering into a single step. It assumes the existence
    of a previously defined `perform_clustering` function that performs clustering on the embeddings.

    Parameters:
    - texts: List[str], a list of text documents to be processed.

    Returns:
    - pandas.DataFrame: A DataFrame containing the original texts, their embeddings, and the assigned cluster labels.
    """
    text_embeddings_np = embed(texts)  # Generate embeddings
    cluster_labels = perform_clustering(
        text_embeddings_np, 10, 0.1
    )  # Perform clustering on the embeddings
    df = pd.DataFrame()  # Initialize a DataFrame to store the results
    df["text"] = texts  # Store original texts
    df["embd"] = list(text_embeddings_np)  # Store embeddings as a list in the DataFrame
    df["cluster"] = cluster_labels  # Store cluster labels
    return df


def fmt_txt(df: pd.DataFrame) -> str:
    """
    Formats the text documents in a DataFrame into a single string.

    Parameters:
    - df: DataFrame containing the 'text' column with text documents to format.

    Returns:
    - A single string where all text documents are joined by a specific delimiter.
    """
    unique_txt = df["text"].tolist()
    return "--- --- \n --- --- ".join(unique_txt)


def embed_cluster_summarize_texts(
    texts: List[str], level: int
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Embeds, clusters, and summarizes a list of texts. This function first generates embeddings for the texts,
    clusters them based on similarity, expands the cluster assignments for easier processing, and then summarizes
    the content within each cluster.

    Parameters:
    - texts: A list of text documents to be processed.
    - level: An integer parameter that could define the depth or detail of processing.

    Returns:
    - Tuple containing two DataFrames:
      1. The first DataFrame (`df_clusters`) includes the original texts, their embeddings, and cluster assignments.
      2. The second DataFrame (`df_summary`) contains summaries for each cluster, the specified level of detail,
         and the cluster identifiers.
    """

    # Embed and cluster the texts, resulting in a DataFrame with 'text', 'embd', and 'cluster' columns
    df_clusters = embed_cluster_texts(texts)

    # Prepare to expand the DataFrame for easier manipulation of clusters
    expanded_list = []

    # Expand DataFrame entries to document-cluster pairings for straightforward processing
    for index, row in df_clusters.iterrows():
        for cluster in row["cluster"]:
            expanded_list.append(
                {"text": row["text"], "embd": row["embd"], "cluster": cluster}
            )

    # Create a new DataFrame from the expanded list
    expanded_df = pd.DataFrame(expanded_list)

    # Retrieve unique cluster identifiers for processing
    all_clusters = expanded_df["cluster"].unique()

    print(f"--Generated {len(all_clusters)} clusters--")

    # Summarization
    template = """Here is a sub-set of LangChain Expression Language doc. 
    
    LangChain Expression Language provides a way to compose chain in LangChain.
    
    Give a detailed summary of the documentation provided.
    
    Documentation:
    {context}
    """
    prompt = ChatPromptTemplate.from_template(template)
    chain = prompt | llm | StrOutputParser()

    # Format text within each cluster for summarization
    summaries = []
    for i in all_clusters:
        df_cluster = expanded_df[expanded_df["cluster"] == i]
        formatted_txt = fmt_txt(df_cluster)
        summaries.append(chain.invoke({"context": formatted_txt}))

    # Create a DataFrame to store summaries with their corresponding cluster and level
    df_summary = pd.DataFrame(
        {
            "summaries": summaries,
            "level": [level] * len(summaries),
            "cluster": list(all_clusters),
        }
    )

    return df_clusters, df_summary


def recursive_embed_cluster_summarize(
    texts: List[str], level: int = 1, n_levels: int = 3
) -> Dict[int, Tuple[pd.DataFrame, pd.DataFrame]]:
    """
    Recursively embeds, clusters, and summarizes texts up to a specified level or until
    the number of unique clusters becomes 1, storing the results at each level.

    Parameters:
    - texts: List[str], texts to be processed.
    - level: int, current recursion level (starts at 1).
    - n_levels: int, maximum depth of recursion.

    Returns:
    - Dict[int, Tuple[pd.DataFrame, pd.DataFrame]], a dictionary where keys are the recursion
      levels and values are tuples containing the clusters DataFrame and summaries DataFrame at that level.
    """
    results = {}  # Dictionary to store results at each level

    # Perform embedding, clustering, and summarization for the current level
    df_clusters, df_summary = embed_cluster_summarize_texts(texts, level)

    # Store the results of the current level
    results[level] = (df_clusters, df_summary)

    # Determine if further recursion is possible and meaningful
    unique_clusters = df_summary["cluster"].nunique()
    if level < n_levels and unique_clusters > 1:
        # Use summaries as the input texts for the next level of recursion
        new_texts = df_summary["summaries"].tolist()
        next_level_results = recursive_embed_cluster_summarize(
            new_texts, level + 1, n_levels
        )

        # Merge the results from the next level into the current results dictionary
        results.update(next_level_results)

    return results
    

leaf_texts = [d.page_content for d in docs]
results = recursive_embed_cluster_summarize(leaf_texts, level=1, n_levels=3)

all_texts = leaf_texts.copy()

# Iterate through the results to extract summaries from each level and add them to all_texts
for level in sorted(results.keys()):
    # Extract summaries from the current level's DataFrame
    summaries = results[level][1]["summaries"].tolist()
    # Extend all_texts with the summaries from the current level
    all_texts.extend(summaries)

# Now, use all_texts to build the vectorstore with Chroma
vectorstore = Chroma.from_texts(texts=all_texts, embedding=embeddings)
retriever = vectorstore.as_retriever()

prompt = hub.pull("rlm/rag-prompt")


# Post-processing
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)


# Chain
rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

# Question
rag_chain.invoke("What is the difference between QLoRA and LoRA?")

 

 

 

Conclusion

 

RAPTOR 전략을 Retrieve 에서 Auto-merging 전략과 같이 쓰면 훨씬 나을 수도?

+ Recent posts