【AIGC】一篇彻底搞懂RAG:基于pgVector和LangChain构建RAG服务
前言
检索增强生成 (RAG) 是一种技术,它通过使用来自外部来源的事实来增强生成式 AI 模型的知识库,从而提高其准确性和可靠性。RAG 使大型语言模型 (LLM) 能够对主题做出准确、自信和出色的响应。
在本文中,我们将演示如何在应用程序中使用 RAG 技术。为此,我们将使用 Langchain 为 LLM 框架创建一个 Flutter 应用程序,并使用 pgVector(一个用于向量相似性搜索的开源 Postgres 扩展)创建应用程序。
借助数据库,尤其是那些支持向量功能的数据库,如 Neon,我们可以使用 RAG 技术来帮助 LLM 向最终用户提供准确的答案。Neon 是一个完全托管的无服务器 Postgres,它提供单独的存储和计算,以提供自动缩放、分支和无底存储。Neon 在 Apache 2.0 许可下是完全开源的,我们可以在 GitHub 上找到 neon 数据库。
pgVector 是一个 Postgres 扩展,可与矢量嵌入一起使用,用于存储、相似性搜索等。在 Neon 数据库中启用 pgVector 扩展可以简化向量嵌入的存储,以及使用内积 (<#>) 或余弦距离 (<=>) 轻松查询。
Langchain 本身不是一个 LLM,而是一个帮助使用 LLM 进行应用程序开发的框架。因此,它支持需要语言模型进行推理的上下文感知应用程序。
RAG 应用程序通常由两个组件组成:索引和检索。
索引过程包括集成(加载)外部数据源,将其拆分为更小的部分,将文档嵌入为向量,然后存储它。
Langchain 通过向应用程序提供对 OpenAI 嵌入 API 的访问权限来处理拆分和嵌入。霓虹灯在存储过程中发挥作用。
对于检索过程,pgVector 使用其向量相似性索引功能来搜索查询向量与 Neon 数据库中存储的向量之间的距离。然后 Langchain 使用 OpenAI 作为 LLM,以自然语言从查询中生成所需的结果。
以下部分将介绍构建应用程序的所有步骤,从创建 Neon 数据库到构建 Flutter 应用程序。让我们设置一个 Neon 帐户并创建我们的数据库,事不宜迟。
一、创建Neon
如前所述,创建 Neon 帐户后,让我们通过选择为用户身份验证提供的方法之一继续登录该帐户。
成功登录后,我们将被重定向到主页上的“创建项目”屏幕,要求我们填写所需的项目名称、postgres 版本和数据库名称。我们可以探索将分支名称更改为任何其他名称的更多选项,但现在让我们将其保留为main名称,然后单击创建项目。
之后,我们被重定向到主页,在那里我们看到一个弹出窗口,显示与我们之前创建的 Neon 项目的连接详细信息。我们需要这些详细信息才能从我们的应用程序访问 Neon 项目并将其复制到安全文件中。有了这个,我们已经成功地为我们的 Flutter 应用程序创建了一个 Neon 数据库。
Neon 提供三种数据库管理方式:Neon CLI(命令行界面)、Neon API 和 SQL。借助 SQL,Neon 提供了一个 SQL 编辑器,可以直接在控制台上运行 SQL 命令。因此,我们将使用 SQL 来管理我们的 Neon 数据库,但我们将通过从我们的应用程序到 Neon 数据库的 Postgres 连接来实现。
Flutter 应用程序是一个简单的聊天机器人,它根据来自外部数据源的数据(在本例中为 PDF 文件)响应查询。因此,在接下来的章节中,我们将克隆一个 Flutter 模板,将模板连接到 Neon 数据库,并添加在应用程序中实现 RAG 技术的功能。
二、创建Flutter
首先,我们将使用一个 Flutter 模板应用程序,其中包含一个显示区域、一个我们将在其中键入查询的文本区域和一个带有按钮的抽屉来上传我们想要的 PDF。
若要克隆项目,请在终端中运行以下命令:
git clone https://github.com/muyiwexy/neon_rag_with_langchain.git
克隆项目后,运行以下命令:
flutter pub get
此命令获取当前工作目录的 pubspec.yaml 文件中列出的所有依赖项及其传递依赖项。
此项目使用模型视图控制器 (MVC) 体系结构来处理应用程序的特定开发方面。该架构通过将业务(核心)逻辑与 UI(表示层)分离来帮助我们保持可读性。
为了使内容更易于查找,以下是 lib 文件夹结构的 ASCII 表示形式:
lib/
├─ home/
│ ├─ controller/
│ ├─ model/
│ ├─ view/
│ │ ├─ widgets/
│ │ │ ├─ display_area.dart
│ │ │ ├─ text_area.dart
│ │ ├─ home_page.dart
│ ├─ view_model/
├─ core/
│ │ ├─ dependency_injection/
├─ main.dart
由于我们使用的是 MVC 架构,因此 UI 代码位于 lib/home/view 文件夹中。为了继续,我们需要向 pubspec.yaml 文件添加一些构建应用程序所需的外部依赖项。
dependencies:
file_picker
flutter_dotenv
langchain
langchain_openai
path_provider
postgres
provider
syncfusion_flutter_pdf
成功完成此操作后,我们将为整个项目所需的所有服务创建一个抽象。我们把这个抽象类称为 LangchainService——在其中,我们将实现实现 RAG 技术所涉及的流程。因此,接下来,找到 lib/home/view_model 文件夹并在其中创建一个 langchain_service.dart 的 dart 文件。要执行抽象,请将以下代码添加到文件中:
abstract class LangchainService {
// do something
}
三、索引
1.加载
加载过程涉及将文档集成到系统中,系统通常是脱机的。因此,为了实现这一目标,我们将执行以下操作:
- 使用 file_picker 包从本地设备中选择文件
- 使用 syncfusion_flutter_pdf 包阅读文档 (PDF) 并将其转换为文本
- 使用 path_provider 包查找常用的文件生态系统,例如临时目录或 AppData 目录
与其他服务相比,加载过程是脱机的;因此,我们将与其他进程分开执行此操作。要加载文件,请在 lib/home/controller 目录中创建一个 index_notifier.dart。接下来,我们创建一个 ChangeNotifier 类 IndexNotifier,其最终值为 LangchainService。此外,我们将创建两个全局私有字符串变量 _filepath 和 _fileName,以及 _fileName 变量的 getter。
class IndexNotifier extends ChangeNotifier {
late LangchainService langchainService;
IndexNotifier({required this.langchainService});
String? _filepath;
String? _fileName;
String? get fileName => _fileName;
}
实质上,通过 ChangeNotifier,此类将是处理应用程序状态管理负载的两个文件之一。接下来,我们将实现一个函数,该函数从 Langchain 包中返回一个类型 Document。我们将使用该方法从本地设备中选取一个 PDF 文档,并将文件类型和名称分配给之前创建的 String 变量。
此外,我们将有一个 Future 函数,该函数将 PDF 转换为文本,该函数使用 Langchain 的 TextLoader 类作为文档加载。
class IndexNotifier extends ChangeNotifier {
// do something
Future<Document> _pickedFile() async {
FilePickerResult? result = await FilePicker.platform
.pickFiles(type: FileType.custom, allowedExtensions: ['pdf']);
if (result != null) {
_filepath = result.files.single.path;
_fileName = result.files.single.name.replaceAll('.pdf', '').toLowerCase();
final textfile =
_filepath!.isNotEmpty ? await _readPDFandConvertToText() : "";
final loader = TextLoader(textfile);
final document = await loader.load();
Document? docs;
for (var doc in document) {
docs = doc;
}
return docs!;
} else {
throw Exception("No file selected");
}
}
Future<String> _readPDFandConvertToText() async {
File file = File(_filepath!);
List<int> bytes = await file.readAsBytes();
final document = PdfDocument(inputBytes: Uint8List.fromList(bytes));
String text = PdfTextExtractor(document).extractText();
final localPath = await _localPath;
File createFile = File('$localPath/output.txt');
final res = await createFile.writeAsString(text);
document.dispose();
return res.path;
}
Future<String> get _localPath async {
final directory = await getApplicationDocumentsDirectory();
return directory.path;
}
}
我们可以使用上面的代码将 PDF 加载为 Langchain 文档文件。
2.分割和词化
现在,我们需要拆分和嵌入文档并存储它。为了拆分和嵌入 Langchain 文档,我们将返回到 langchain_service.dart 中创建的抽象。在那里,我们将使用以下代码对其进行更新:
abstract class LangchainService {
List<Document> splitDocToChunks(Document doc);
Future<List<List<double>>> embedChunks(List<Document> chunks);
}
我们将在同一目录中创建另一个名为 langchain_service_impl.dart 的文件来实现此抽象。在此文件中,我们将实现之前创建的 LangchainService 抽象。splitDocToChunks 接受参数 Document,该参数是从前面的 IndexNotifier 类中的 _pickedFile 方法返回的。然后,它获取页面内容。
然后,我们使用 RecursiveCharacterTextSplitter 对象创建一个文档,将文本拆分为多个 1000 个字符的块,并将其作为文档列表返回。
接下来,我们将 Document 列表传递给 embedChunks 方法,然后该方法创建此 List 的向量嵌入,并将其作为 List< List <double>>返回。
下面是代码的样子:
class LangchainServicesImpl extends LangchainService {
final OpenAIEmbeddings embeddings;
LangchainServicesImpl({
required this.embeddings,
});
@override
List<Document> splitDocToChunks(Document doc) {
final text = doc.pageContent;
const textSplitter = RecursiveCharacterTextSplitter(chunkSize: 1000);
final chunks = textSplitter.createDocuments([text]);
return chunks
.map(
(e) => Document(
id: e.id,
pageContent: e.pageContent.replaceAll(RegExp('/\n/g'), " "),
metadata: doc.metadata,
),
)
.toList();
}
@override
Future<List<List<double>>> embedChunks(List<Document> chunks) async {
final embedDocs = await embeddings.embedDocuments(chunks);
return embedDocs;
}
}
同样,我们将更新 IndexNotifier 类以控制应用程序的状态,同时执行所有这些过程:
enum CreateandUploadState { initial, loading, loaded, error }
class IndexNotifier extends ChangeNotifier {
late LangchainService langchainService;
IndexNotifier({required this.langchainService});
String? _filepath;
String? _fileName;
String? get fileName => _fileName;
final _createandUploadState =
ValueNotifier<CreateandUploadState>(CreateandUploadState.initial);
ValueNotifier<CreateandUploadState> get createandUploadState =>
_createandUploadState;
Future<void> createAndUploadNeonIndex() async {
try {
// load the document into the application
final pickedDocument = await _pickedFile();
_createandUploadState.value = CreateandUploadState.loading;
// split the document to different chunks
final splitChunks = langchainService.splitDocToChunks(pickedDocument);
// Embed the chunks
final embededDoc = await langchainService.embedChunks(splitChunks);
_createandUploadState.value = CreateandUploadState.loaded;
} catch (e) {
_createandUploadState.value = CreateandUploadState.error;
print(e);
} finally {
await Future.delayed(const Duration(milliseconds: 2000));
_createandUploadState.value = CreateandUploadState.initial;
}
}
Future<Document> _pickedFile() async {
FilePickerResult? result = await FilePicker.platform
.pickFiles(type: FileType.custom, allowedExtensions: ['pdf']);
if (result != null) {
_filepath = result.files.single.path;
_fileName = result.files.single.name.replaceAll('.pdf', '').toLowerCase();
final textfile =
_filepath!.isNotEmpty ? await _readPDFandConvertToText() : "";
final loader = TextLoader(textfile);
final document = await loader.load();
Document? docs;
for (var doc in document) {
docs = doc;
}
return docs!;
} else {
throw Exception("No file selected");
}
}
Future<String> _readPDFandConvertToText() async {
File file = File(_filepath!);
List<int> bytes = await file.readAsBytes();
final document = PdfDocument(inputBytes: Uint8List.fromList(bytes));
String text = PdfTextExtractor(document).extractText();
final localPath = await _localPath;
File createFile = File('$localPath/output.txt');
final res = await createFile.writeAsString(text);
document.dispose();
return res.path;
}
Future<String> get _localPath async {
final directory = await getApplicationDocumentsDirectory();
return directory.path;
}
}
3.存储
到目前为止,我们已经成功启用了 PDF 文档的加载、拆分和嵌入。现在,我们需要存储拆分和嵌入的数据,这就是我们之前创建的 Neon 数据库的用武之地。为此,我们将使用以下代码更新 LangchainService 抽象:
abstract class LangchainService {
// the abstraction above
Future<bool> checkExtExist();
Future<bool> checkTableExist(String tableName);
Future<String> createNeonVecorExt();
Future<String> createNeonTable(String tableName);
Future<String> deleteNeonTableRows(String tableName);
Future<void> storeDocumentData(Document doc, List<Document> chunks,
List<List<double>> embeddedDoc, String tableName);
}
checkExtExist 方法检查向量扩展是否存在,并返回执行结果。此外,checkTableExist 方法检查 Neon 数据库中是否存在表(之前创建_filename私有 String 变量),并返回执行结果,即布尔值。为此,我们将添加以下代码,以便在 langchain_service_impl.dart 文件中实现 LangchainService:
class LangchainServicesImpl extends LangchainService {
final Connection connection;
final OpenAIEmbeddings embeddings;
final OpenAI openAI;
LangchainServicesImpl({
required this.connection,
required this.embeddings,
required this.openAI,
});
// do something
// do something
@override
Future<bool> checkExtExist() async {
final checkExtExist = await connection.execute(
"SELECT EXISTS (SELECT FROM pg_extension WHERE extname = 'vectors');",
);
return checkExtExist.first[0] as bool;
}
@override
Future<bool> checkTableExist(String tableName) async {
final checkTableExist = await connection.execute(
"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = '$tableName');",
);
return checkTableExist.first[0] as bool;
}
}
void debugPrint(String message) {
if (kDebugMode) {
print(message);
}
}
createNeonVecorExt、createNeonTable 和 deleteNeonTableRows 方法分别处理 pgVector 扩展、Neon 数据库表(_filename之前创建的私有 String 变量)和删除任何存储的行(这是在用户想要更新数据库表中的文档并且存在名称冲突的情况下)。在创建 Neon 表时,我们将使用 pgVector 扩展中的 ivfflat 算法同时激活向量索引。该算法为对嵌入等高维数据进行近似最近邻搜索提供了一种有效的解决方案。
class LangchainServicesImpl extends LangchainService {
final Connection connection;
final OpenAIEmbeddings embeddings;
final OpenAI openAI;
LangchainServicesImpl({
required this.connection,
required this.embeddings,
required this.openAI,
});
// do something
// do something
// do somthing
// do something
@override
Future<String> createNeonVecorExt() async {
debugPrint("Creating pgVector extension ...");
await connection.execute("CREATE EXTENSION vector;");
return "Vector extension created Successfully";
}
@override
Future<String> createNeonTable(String tableName) async {
debugPrint("Creating the $tableName table ... ");
await connection.execute(
"CREATE TABLE $tableName (id text, metadata text, embedding vector(1536));",
);
debugPrint("Indexing the $tableName using the ivfflat vector cosine");
await connection.execute(
'CREATE INDEX ON $tableName USING ivfflat (embedding vector_cosine_ops) WITH (lists = 24);');
return "Table created successfully";
}
@override
Future<String> deleteNeonTableRows(String tableName) async {
debugPrint("Deleting tableRows");
await connection.execute("TRUNCATE $tableName;");
return "Table rows deleted successfuly";
}
}
void debugPrint(String message) {
if (kDebugMode) {
print(message);
}
}
对于 storeDocumentData,我们将 Langchain 文档、块、嵌入块和表名传递给它,并在事务中执行 INSERT 命令。
class LangchainServicesImpl extends LangchainService {
final Connection connection;
final OpenAIEmbeddings embeddings;
final OpenAI openAI;
LangchainServicesImpl({
required this.connection,
required this.embeddings,
required this.openAI,
});
// do something
// do something
// do something
// do something
// do something
// do something
@override
Future<void> storeDocumentData(Document doc, List<Document> chunks,
List<List<double>> embeddedDoc, String tableName) async {
debugPrint("Storing chunks and emedded vectors in the $tableName table");
await connection.runTx((s) async {
for (int i = 0; i < chunks.length; i++) {
final txtPath = doc.metadata['source'] as String;
final chunk = chunks[i];
final embeddingArray = embeddedDoc[i];
await s.execute(
Sql.named(
'INSERT INTO $tableName (id, metadata, embedding) VALUES (@id, @metadata, @embedding)',
),
parameters: {
'id': '${txtPath}_$i',
'metadata': {
...chunk.metadata,
'loc': jsonEncode(chunk.metadata['loc']),
'pageContent': chunk.pageContent,
'txtPath': txtPath,
},
'embedding': '$embeddingArray',
},
);
}
});
}
}
void debugPrint(String message) {
if (kDebugMode) {
print(message);
}
}
现在,我们将更新 IndexNotifier,以相应地实现对 LangchainServices 的更改。我们将使用 checkExtExist 和 checkTableExist 作为条件检查器来运行 createNeonVecorExt、createNeonTable 和 deleteNeonTableRows,因为它们满足每个条件。以下是更新后的代码:
enum CreateandUploadState { initial, loading, loaded, error }
class IndexNotifier extends ChangeNotifier {
late LangchainService langchainService;
IndexNotifier({required this.langchainService});
String? _filepath;
String? _fileName;
String? get fileName => _fileName;
final _createandUploadState =
ValueNotifier<CreateandUploadState>(CreateandUploadState.initial);
ValueNotifier<CreateandUploadState> get createandUploadState =>
_createandUploadState;
Future<void> createAndUploadNeonIndex() async {
try {
// load the document into the application
final pickedDocument = await _pickedFile();
_createandUploadState.value = CreateandUploadState.loading;
// split the document to different chunks
final splitChunks = langchainService.splitDocToChunks(pickedDocument);
// Embed the chunks
final embededDoc = await langchainService.embedChunks(splitChunks);
if (!(await langchainService.checkExtExist())) {
await langchainService.createNeonVecorExt();
}
if (!(await langchainService.checkTableExist(_fileName!))) {
await langchainService.createNeonTable(_fileName!);
} else {
await langchainService.deleteNeonTableRows(_fileName!);
}
await langchainService.storeDocumentData(
pickedDocument, splitChunks, embededDoc, _fileName!);
_createandUploadState.value = CreateandUploadState.loaded;
} catch (e) {
_createandUploadState.value = CreateandUploadState.error;
print(e);
} finally {
await Future.delayed(const Duration(milliseconds: 2000));
_createandUploadState.value = CreateandUploadState.initial;
}
}
Future<Document> _pickedFile() async {
FilePickerResult? result = await FilePicker.platform
.pickFiles(type: FileType.custom, allowedExtensions: ['pdf']);
if (result != null) {
_filepath = result.files.single.path;
_fileName = result.files.single.name.replaceAll('.pdf', '').toLowerCase();
final textfile =
_filepath!.isNotEmpty ? await _readPDFandConvertToText() : "";
final loader = TextLoader(textfile);
final document = await loader.load();
Document? docs;
for (var doc in document) {
docs = doc;
}
return docs!;
} else {
throw Exception("No file selected");
}
}
Future<String> _readPDFandConvertToText() async {
File file = File(_filepath!);
List<int> bytes = await file.readAsBytes();
final document = PdfDocument(inputBytes: Uint8List.fromList(bytes));
String text = PdfTextExtractor(document).extractText();
final localPath = await _localPath;
File createFile = File('$localPath/output.txt');
final res = await createFile.writeAsString(text);
document.dispose();
return res.path;
}
Future<String> get _localPath async {
final directory = await getApplicationDocumentsDirectory();
return directory.path;
}
}
我们已成功将 PDF 数据存储在数据库表中,作为 id(text)、元数据(Map 或 JSON)和嵌入。
为了在应用程序中使用 ChangeNotifier 类,我们将使用 Provider 挂载 ChangeNotifier 类以进行依赖注入。在这个过程中,我们将使用 Postgres 包连接 Neon 数据库和我们的 Flutter 应用程序。
执行此操作的方法是使用 MultiProvider 将 main.dart 中的初始无状态小部件包装起来。这样做会将我们的 Providers 和 ChangeNotifierProviders 挂载到小部件树上,从而使我们能够轻松监控应用程序的状态。因此,我们将前往 lib/core/dependency_injection/ 文件夹,创建一个名为 provider_locator.dart 的文件,并粘贴以下代码:
class ProviderLocator {
// provider tree
static Future<MultiProvider> getProvider(Widget child) async {
final langchainService = await _createLangchainService();
return MultiProvider(
providers: [
Provider<LangchainService>.value(value: langchainService),
ChangeNotifierProvider<IndexNotifier>(
create: (_) => IndexNotifier(langchainService: langchainService),
),
],
child: child,
);
}
// langchain services function
static Future<LangchainService> _createLangchainService() async {
final connection = await createPostgresConnection();
final embeddings = await _createEmbeddings();
final openAI = await _createOpenAIConnection();
return LangchainServicesImpl(
connection: connection,
embeddings: embeddings,
openAI: openAI,
);
}
// postgres connection
static Future<Connection> createPostgresConnection() async {
const maxRetries = 3;
for (var retry = 0; retry < maxRetries; retry++) {
try {
final endpoint = Endpoint(
host: dotenv.env['PGHOST']!,
database: dotenv.env['PGDATABASE']!,
port: 5432,
username: dotenv.env['PGUSER']!,
password: dotenv.env['PGPASSWORD']!,
);
final connection = await Connection.open(
endpoint,
settings: ConnectionSettings(
sslMode: SslMode.verifyFull,
connectTimeout: const Duration(milliseconds: 20000),
),
);
if (connection.isOpen) {
if (kDebugMode) {
print("Connection Established!");
}
return connection;
}
} catch (e) {
if (kDebugMode) {
print('Error creating PostgreSQL connection: $e');
}
}
await Future.delayed(const Duration(seconds: 2));
}
// If maxRetries is reached and the connection is still not open, throw an exception
throw Exception(
'Failed to establish a PostgreSQL connection after $maxRetries retries');
}
// Lanchain openAI embeddings
static Future<OpenAIEmbeddings> _createEmbeddings() async {
return OpenAIEmbeddings(
apiKey: dotenv.env['OPENAI_API_KEY'],
model: "text-embedding-ada-002",
);
}
// openAi connection
static Future<OpenAI> _createOpenAIConnection() async {
return OpenAI(apiKey: dotenv.env['OPENAI_API_KEY']);
}
}
ProviderLocator 类执行以下操作:
- 定义一个 getProvider 方法,该方法:
创建 LangchainService 实例。
返回一个 MultiProvider,其中包含一个 LangchainService 提供程序和一个用于 IndexNotifier 的 ChangeNotifierProvider。 - 定义一个方法,createLangchainService:
创建 PostgreSQL 连接。
创建 OpenAIEmbeddings 实例。
创建 OpenAI 实例。
返回一个 LangchainServicesImpl 实例,其中包含创建的连接、嵌入和 OpenAI。 - _定义一个 createPostgresConnection 方法,该方法:
尝试使用前面 Neon 连接详细信息中的指定设置建立 PostgreSQL 连接。
如果连接失败,则重试最多次数。
如果在最大重试次数后未建立连接,则会引发异常。 - _定义返回 OpenAIEmbeddings 实例的方法_createEmbeddings。
- 定义返回 OpenAI 实例的方法_createOpenAIConnection。
现在,让我们使用以下代码更新 main.dart 文件:
void main() async {
WidgetsFlutterBinding.ensureInitialized();
await dotenv.load(fileName: '.env');
runApp(
await ProviderLocator.getProvider(
MaterialApp(
debugShowCheckedModeBanner: false,
theme: ThemeData(
elevatedButtonTheme: ElevatedButtonThemeData(
style: ElevatedButton.styleFrom(
shape: RoundedRectangleBorder(
borderRadius: BorderRadius.circular(10.0),
),
),
),
useMaterial3: true,
),
home: HomePage(),
),
),
);
}
四、检索
检索是一个简化的过程,通常分为两个过程:
-
检索:这是通过将用户查询的向量嵌入与数据库中存在的最接近的可用结果进行比较来完成的。我们使用余弦相似性搜索来比较一个向量与另一个向量。因此,当我们得到最接近的结果时,我们可以将其用于第二个过程。
-
生成:在获得最接近的结果后,我们可以将其用作 LLM 的助手,以根据该特定信息生成响应
为了以编程方式完成此操作,我们将前往 langchain_service.dart,并在抽象中添加以下代码:
abstract class LangchainService {
// do something
Future<String> queryNeonTable(String tableName, String query);
}
上面的方法按照上面的检索过程返回字符串响应。下面是实现的代码:
class LangchainServicesImpl extends LangchainService {
final Connection connection;
final OpenAIEmbeddings embeddings;
final OpenAI openAI;
LangchainServicesImpl({
required this.connection,
required this.embeddings,
required this.openAI,
});
// do something
@override
Future<String> queryNeonTable(String tableName, String query) async {
final embedQuery = await embeddings.embedQuery(query);
List<List<dynamic>> getSimilar = await connection.execute(
"SELECT *, 1 - (embedding <=> '$embedQuery') AS cosine_similarity FROM $tableName WHERE (1 - (embedding <=> '$embedQuery')) BETWEEN 0.3 AND 1.00 ORDER BY cosine_similarity DESC LIMIT 10;");
List<Metadata> pdfMetadata = getSimilar
.map((item) => Metadata.fromJson(json.decode(item[1])))
.toList();
if (pdfMetadata.isNotEmpty) {
final concatPageContent = pdfMetadata.map((e) {
return e.pageContent;
}).join(' ');
final docChain = StuffDocumentsQAChain(llm: openAI);
final response = await docChain.call({
'input_documents': [
Document(pageContent: concatPageContent),
],
'question': query,
});
return response['output'];
} else {
return "Couldn't find anything on that topic";
}
}
}
void debugPrint(String message) {
if (kDebugMode) {
print(message);
}
}
上面的代码执行以下操作:
- 实现一个方法 queryNeonTable:
使用 embeddings 对象嵌入查询。
对连接执行 SQL 查询,以从指定表中获取相似项。
将结果转换为元数据对象的列表。
如果 Metadata 不为空,则连接页面内容,创建 StuffDocumentsQAChain 对象,并使用连接的内容和原始查询调用该对象以获取响应。
如果元数据为空,则返回默认消息:“找不到有关该主题的任何内容”。
然后,我们将创建一个单独的 ChangeNotifier 类来处理查询的状态。这遵循与 IndexNotifier 类相同的模式,但略有变化。代码如下:
import 'package:flutter/material.dart';
import '../view_models/langchain_services.dart';
class Message {
String? query;
String? response;
Message({required this.query, this.response = ""});
}
enum QueryState {
initial,
loading,
loaded,
error,
}
class QueryNotifier extends ChangeNotifier {
late LangchainService langchainService;
QueryNotifier({required this.langchainService});
final List<Message> _messages = [];
final _messagesState = ValueNotifier<List<Message>>([]);
ValueNotifier<List<Message>> get messageState => _messagesState;
final _queryState = ValueNotifier<QueryState>(QueryState.initial);
ValueNotifier<QueryState> get queryState => _queryState;
userqueryResponse(String tableName, String query) async {
_messages.add(Message(query: query));
_messagesState.value = List.from(_messages);
try {
_queryState.value = QueryState.loading;
String response = await langchainService.queryNeonTable(tableName, query);
final List<Message> updatedMessages = List.from(_messages);
updatedMessages.last.response = response;
_messagesState.value = updatedMessages;
_queryState.value = QueryState.loaded;
} catch (e) {
// Handle errors if necessary
print(e);
_queryState.value = QueryState.error;
await Future.delayed(const Duration(milliseconds: 2000));
_queryState.value = QueryState.initial;
}
}
}
上面的代码执行以下操作:
- 定义具有查询和响应字段的 Message 类。
- 定义一个名为 QueryState 的枚举,其中包含以下状态:initial、loading、loaded 和 error。
- 创建一个扩展 ChangeNotifier 的 QueryNotifier 类:
初始化 LangchainService 对象。
维护 Message 对象的列表。
定义 messagesState 和 queryState 的 ValueNotifier 对象。
定义一个方法 userqueryResponse:
向_messages添加新消息。
将 queryState 设置为 loading。
调用 langchainService 的 queryNeonTable 方法获取响应。
更新最后一条消息的响应,并将 queryState 设置为 loaded。
通过将 queryState 设置为 error,然后在延迟后返回到初始值来处理错误。
之后,我们将通过向 MultiProvider 添加另一个 ChangeNotifierProvider 类来更新 provider_locator.dart 文件中的 getProvider 方法。代码如下:
class ProviderLocator {
// provider tree
static Future<MultiProvider> getProvider(Widget child) async {
final langchainService = await _createLangchainService();
return MultiProvider(
providers: [
Provider<LangchainService>.value(value: langchainService),
// IndexNotifier
ChangeNotifierProvider<IndexNotifier>(
create: (_) => IndexNotifier(langchainService: langchainService),
),
// QueryNotifier
ChangeNotifierProvider<QueryNotifier>(
create: (_) => QueryNotifier(langchainService: langchainService),
),
],
child: child,
);
}
}
小结
检索增强生成 (RAG) 通过集成技术来增强 LLM,以确保事实和上下文响应。像 Neon 这样的向量数据库与 RAG 技术和 Langchain 的协作将可学习机器的能力提升到前所未有的水平。这导致了更出色的虚拟助手、数据分析工具等。
RAG 与 pgVector 和 Langchain 的集成证明了 AI 令人难以置信的实力及其充满希望的未来。
- 点赞
- 收藏
- 关注作者
评论(0)