dify实现原理分析-rag-数据检索的实现

news/2025/2/1 7:51:55 标签: 大模型, LLM, 人工智能

数据检索的总体执行步骤

数据检索总体步骤如下:

输入验证
模型初始化
策略选择: 1.单线程检索 2.多线程检索
数据集筛选: 选择符合条件的数据集
执行检索: 使用单线程或多线程检索来查询数据
结果处理: 结果选择和格式化处理
返回格式化内容: 最终返回一个结果字符串

数据检索是在DatasetRetrieval.retrieve函数中实现的,主要实现逻辑分为以下几步:

  1. 检查输入的模型、数据集id列表等是否为空;

  2. 获取模型实例,并把它转换成LargeLanguageModel对象;

  3. 获取模型实例,并获取模型的元数据,主要是模型的各种参数,以及认证参数等。若模型的元数据为空,直接返回None。

  4. 默认情况下,规划路由策略被设置为 REACT_ROUTER。如果模型支持工具调用(TOOL_CALL)或多重工具调用(MULTI_TOOL_CALL),则将规划策略更改为 ROUTER;

  5. 筛选可用的数据集:若数据集为空,或数据集不可用则过滤掉数据集;后续的数据检索,会从这些可用数据集中来进行检索。

  6. 根据配置选择单线程(RetrieveStrategy.SINGLE)或多线程检索(RetrieveStrategy.MULTIPLE),获取检索到的document列表;这里只是设置检索的参数,而这两种检索方式都会调用检索服务的RetrievalService.retrieve(…)函数来检索符合条件的数据集。

  7. 处理dify提供者的document:使用回调函数返回检索结果,根据分数对检索结果进行排序,并返回格式化后的字符串。

    1)获取每个文档的分数(score)的值

    2)查询状态为completed,且可用,doc_id在检索出来的文档列表中的DocumentSegment列表

    3)若segment(文档块)不为空。

    ​ 3.1) 获取segment的id和位置(确定文档内容的读取位置)

    ​ 3.2) 按id所在的position(位置)排序,若id不在字典中排到最后(无穷大inf)

    ​ 3.3) 遍历排好序的segment:根据条件构建新的列表,然后进行一下操作:

    ​ a) 检查每个segment是否包含answer

    ​ b) 包含:构建一个包含问题和答案的字符串

    ​ c) 不包含:则只构建一个问题的字符串

    ​ 3.4) 对已排好序的segment进行遍历

    ​ 3.5) 获取segment对应的dataset_id对应的dataset

    ​ 3.6) 获取segment.document_id对应的document

  8. 使用回调函数返回检索结果,根据分数对检索结果进行排序,并返回格式化后的字符串。

检索的详细执行流程

数据检索的函数声明如下:

class DatasetRetrieval:
    def __init__(self, application_generate_entity=None):
        self.application_generate_entity = application_generate_entity

    def retrieve(
        self,
        app_id: str,
        user_id: str,
        tenant_id: str,
        model_config: ModelConfigWithCredentialsEntity,
        config: DatasetEntity,
        query: str,
        invoke_from: InvokeFrom,
        show_retrieve_source: bool,
        hit_callback: DatasetIndexToolCallbackHandler,
        message_id: str,
        memory: Optional[TokenBufferMemory] = None,
    ) -> Optional[str]:
        """
        Retrieve dataset.
        :param app_id: app_id
        :param user_id: user_id
        :param tenant_id: tenant id
        :param model_config: model config
        :param config: dataset config
        :param query: query
        :param invoke_from: invoke from
        :param show_retrieve_source: show retrieve source
        :param hit_callback: hit callback
        :param message_id: message id
        :param memory: memory
        :return:
        """

该函数的详细实现逻辑如下:

  1. 检查数据集id列表,若数据集id列表为空,则直接返回None;
        # 检查输入的模型、数据集等是否有效。
        dataset_ids = config.dataset_ids
        if len(dataset_ids) == 0:
            return None
  1. 根据模型配置来获取和构建模型实例对象,并获取模型的元数据(各种参数)
        model_type_instance = model_config.provider_model_bundle.model_type_instance
        model_type_instance = cast(LargeLanguageModel, model_type_instance)

        # 获取模型实例
        model_manager = ModelManager()
        model_instance = model_manager.get_model_instance(
            tenant_id=tenant_id, model_type=ModelType.LLM, provider=model_config.provider, model=model_config.model
        )
  1. 设置计划策略,默认情况下计划策略是:PlanningStrategy.REACT_ROUTER,若模型特征支持工具调用:ModelFeature.TOOL_CALL或MULTI_TOOL_CALL,则把计划策略设置成:ROUTER,即:planning_strategy = PlanningStrategy.ROUTER
        # 默认情况下,规划策略被设置为 REACT_ROUTER。
        planning_strategy = PlanningStrategy.REACT_ROUTER
        # 检查模型的特性(features)。
        # 如果模型支持工具调用(TOOL_CALL)或多重工具调用(MULTI_TOOL_CALL),则将规划策略更改为 ROUTER。
        features = model_schema.features
        # 检查模型是否支持工具调用,若支持计划策略设置为ROUTER
        if features:
            if ModelFeature.TOOL_CALL in features or ModelFeature.MULTI_TOOL_CALL in features:
                planning_strategy = PlanningStrategy.ROUTER
        available_datasets = []
  1. 筛选可用数据集:遍历参数中的dataset_ids列表,从数据库中查询对应id的数据集,过滤掉数据集可用文档为0的和数据集的provide为external的数据集。
        # 筛选可用的数据集。
        for dataset_id in dataset_ids:
            # 查询对应id列表的数据集
            dataset = db.session.query(Dataset).filter(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).first()
            # 数据集为空,pass掉
            if not dataset:
                continue
            # 数据集不可用,pass掉
            if dataset and dataset.available_document_count == 0 and dataset.provider != "external":
                continue
            # 把数据集添加到可用数据集列表中
            available_datasets.append(dataset)
  1. 根据配置选择单线程(single_retrieve)或多线程(multiple_retrieve)检索来检索document,得到结果document列表:all_documents。
        if retrieve_config.retrieve_strategy == DatasetRetrieveConfigEntity.RetrieveStrategy.SINGLE:
            all_documents = self.single_retrieve(...)
        elif retrieve_config.retrieve_strategy == DatasetRetrieveConfigEntity.RetrieveStrategy.MULTIPLE:
            all_documents = self.multiple_retrieve(...)
  1. 从all_documents中抽取出:dify_documents(provider == “dify”)和external_documents(provider == “external”)的结果。处理外部和Dify提供者的document,生成相应的上下文和资源信息。
        # 得到不同提供者的document
        dify_documents = [item for item in all_documents if item.provider == "dify"]
        external_documents = [item for item in all_documents if item.provider == "external"]
  1. 处理结果队列dify_documents,步骤如下:

(1)收集评分信息(document_score_list):通过检查 dify_documents 列表中每个文档片段的 score 元数据,构建一个字典 document_score_list,其中键是文档 ID,值是对应的评分。

            # 获取每个文档的分数(score)的值
            for item in dify_documents:
                if item.metadata.get("score"):
                    document_score_list[item.metadata["doc_id"]] = item.metadata["score"]

(2)过滤和排序文档片段:根据给定的 dataset_ids 和一些状态条件(如 status=completed, enabled=True),从数据库中查询相关的文档片段(DocumentSegment)。然后将这些文档片段按其在原始列表中的索引顺序进行排序。

(3)构建文档上下文(document_context_list):对于每个排序后的文档片段,创建一个 DocumentContext 实例,并将其添加到 document_context_list 中。如果文档片段包含答案(answer),则将答案与问题一起作为一个字符串存储在内容字段中;否则,只存储问题。

             # 获取segment的id和位置(确定文档内容的读取位置)
                index_node_id_to_position = {id: position for position, id in enumerate(index_node_ids)}
                # 按id所在的position(位置)排序,若id不在字典中排到最后(无穷大inf)
                sorted_segments = sorted(
                    segments, key=lambda segment: index_node_id_to_position.get(segment.index_node_id, float("inf"))
                )
                # 遍历排好序的segment:根据条件构建新的列表
                for segment in sorted_segments:
                    # 检查每个segment是否包含answer
                    if segment.answer: # 包含:构建一个包含问题和答案的字符串
                        document_context_list.append(
                            DocumentContext(
                                content=f"question:{segment.get_sign_content()} answer:{segment.answer}",
                                score=document_score_list.get(segment.index_node_id, None),
                            )
                        )
                    else: # 不包含:则只构建一个问题的字符串
                        document_context_list.append(
                            DocumentContext(
                                content=segment.get_sign_content(),
                                score=document_score_list.get(segment.index_node_id, None),
                            )
                        )

(4)构建检索资源(retrieval_resource_list):如果设置了 show_retrieve_source 标志为真,对于每个排序后的文档片段,查询相关的数据集(dataset)和文档(document)信息。创建一个 source 字典,其中包含数据集、文档的详细信息以及文档片段的相关属性(如评分、命中次数、词数等)。将包含详细信息的 source 字典添加到 retrieval_resource_list 中。

 	          if show_retrieve_source: # 设置了展示检索源的标识			
    			  for segment in sorted_segments: # 遍历排序segment
                        # 获取segment对应的dataset_id对应的dataset
                        dataset = Dataset.query.filter_by(id=segment.dataset_id).first()
                        # 获取segment.document_id对应的document
                        document = DatasetDocument.query.filter(
                            DatasetDocument.id == segment.document_id,
                            ...
                        ).first()
                        # 若2者同时存在
                        if dataset and document:
                            # 构建source字典,包含各种信息
                            source = {
                                "dataset_id": dataset.id,
                                "dataset_name": dataset.name,
                                "document_id": document.id,
                                "document_name": document.name,
                                "data_source_type": document.data_source_type,
                                "segment_id": segment.id,
                                "retriever_from": invoke_from.to_source(),
                                "score": document_score_list.get(segment.index_node_id, 0.0),
                            }
							...
                            # 若segment的回答不为空,则获取:question与answer
                            if segment.answer:
                                source["content"] = f"question:{segment.content} \nanswer:{segment.answer}"
                            else: # 仅获取question
                                source["content"] = segment.content
                            # 将源字典添加到retrieval源列表中
                            retrieval_resource_list.append(source)
  1. 使用回调函数返回检索结果,根据分数对检索结果进行排序,并返回格式化后的字符串。
        # 使用回调函数返回检索结果,根据分数对检索结果进行排序,并返回格式化后的字符串。                   
        if hit_callback and retrieval_resource_list:
            # 根据segment所在doc_id的分数进行排序
            retrieval_resource_list = sorted(retrieval_resource_list, key=lambda x: x.get("score") or 0.0, reverse=True)
            # 获取检索列表中的位置参数
            for position, item in enumerate(retrieval_resource_list, start=1):
                item["position"] = position
            hit_callback.return_retriever_resource_info(retrieval_resource_list)
  1. 按分数进行排序,并把文档内容合并在一个字符串中返回
if document_context_list:
    # 按分数进行排序,并把文档内容合并在一个字符串中返回
    document_context_list = sorted(document_context_list, key=lambda x: x.score or 0.0, reverse=True)
    return str("\n".join([document_context.content for document_context in document_context_list]))

总结

总结一下数据检索的主要步骤:(1)参数验证和模型选择;(2)检索策略选择:单线程或多线程检索(3)结果筛选和处理(4)结果合并和格式化处理,然后返回。

不管是单线程检索还是多线程检索,都会调用检索服务的retrieve函数来实现检索功能,检索服务的检索具体实现会在后面的文章中进行分析。


http://www.niftyadmin.cn/n/5839159.html

相关文章

Unet 改进:在encoder和decoder间加入TransformerBlock

目录 1. TransformerBlock 2. Unet 改进 3. 完整代码 Tips:融入模块后的网络经过测试,可以直接使用,设置好输入和输出的图片维度即可 1. TransformerBlock TransformerBlock是Transformer模型架构的基本组件,广泛应用于机器翻译、文本摘要和情感分析等自然语言处理任务…

【开源免费】基于SpringBoot+Vue.JS美食推荐商城(JAVA毕业设计)

本文项目编号 T 166 ,文末自助获取源码 \color{red}{T166,文末自助获取源码} T166,文末自助获取源码 目录 一、系统介绍二、数据库设计三、配套教程3.1 启动教程3.2 讲解视频3.3 二次开发教程 四、功能截图五、文案资料5.1 选题背景5.2 国内…

Elastic Cloud Serverless 获得主要合规认证

作者:来自 Elastic Oliver Mao 我们很高兴地宣布,Elastic Cloud Serverless 已获得多项重要的合规性认证。这一里程碑加强了我们对安全性、隐私性和法规遵从性的承诺。Elastic Cloud Serverless 现已通过以下行业领先框架的审核或认证:SOC 2 …

FFmpeg(7.1版本)的基本组成

1. 前言 FFmpeg 是一个非常流行的开源项目,它提供了处理音频、视频以及其他多媒体内容的强大工具。FFmpeg 包含了大量的库,可以用来解码、编码、转码、处理和播放几乎所有类型的多媒体文件。它广泛用于视频和音频的录制、转换、流媒体传输等领域。 2. F…

前端学习-事件解绑,mouseover和mouseenter的区别(二十九)

目录 前言 解绑事件 语法 鼠标经过事件的区别 鼠标经过事件 示例代码 两种注册事件的区别 总结 前言 人道洛阳花似锦,偏我来时不逢春 解绑事件 on事件方式,直接使用null覆盖就可以实现事件的解绑 语法 btn.onclick function(){alert(点击了…

Git进阶之旅:Git Hub注册创建仓库

介绍: GitHub 是一个面向开源及私有软件项目的托管平台,因为只支持 git 作为唯一的版本库格式进行托管,故名 GitHub 仓库注册: GitHub官网:https://github.com/ 修改本地仓库用户名: git config --local…

《Foundation 面板:设计与功能的完美融合》

《Foundation 面板:设计与功能的完美融合》 引言 在当今科技迅速发展的时代,优秀的界面设计已成为产品成功的关键因素之一。Foundation 面板,作为一款领先的设计工具,以其卓越的设计理念和强大的功能,在界面设计领域独树一帜。本文将深入探讨 Foundation 面板的独特设计…

JVM的GC详解

获取GC日志方式大抵有两种 第一种就是设定JVM参数在程序启动时查看,具体的命令参数为: -XX:PrintGCDetails # 打印GC日志 -XX:PrintGCTimeStamps # 打印每一次触发GC时发生的时间第二种则是在服务器上监控:使用jstat查看,如下所示,命令格式为jstat -gc…