Closed Nuclear6 closed 4 months ago
Have you tried Qwen or Doubao to extract the entities?
To be honest, I used these models to extract entities. I also tried Baidu's Wenxin 4, and the extraction results were relatively poor. I am now going to change the prompt.
How well does using openai gpt4 work for Chinese? Which models have you already tried? I've tried a few Microsoft models before and none of them seem to work well in Chinese.
Before I am using llama3 and gemma2, they perform not well on the Chinese documents like network novel. Yesterday, I was trying DeepSeeker, this looks good for me. And now it is able to neo4j to visualize them. You can access my 微信公众号 喂饭教程!全网首发Neo4J可视化GraphRAG索引to test it.
@Nuclear6 if you trying prompt tune, you should try a large language model and optimize for chinese like qwen or moonshot. Before I was using gemma2 9b, it is very bad, the prompt is too bad, the generated example is wrong.
Besides, it's hard to complete index procedure when using prompt-tune, you will meet lots of errors. I have tried this for an entire afternoon and I gave up.
But you can try.
python -m graphrag.prompt_tune --root . --domain "Chinese web novels" --language Chinese --chunk-size 300 --output prompt_zh
Remember update your entity in settings.yaml when you done prompt-tune.
To be honest, I used these models to extract entities. I also tried Baidu's Wenxin 4, and the extraction results were relatively poor. I am now going to change the prompt.
Hi, do you mean prompts that are written in Chinese? Could you share them if possible? Thanks!
I changed the entities extraction prompt to Chinese and got a graph with less entities but they looks kind of better than English prompt's result. Is there any test data like a pair of raw file with high quality generated graph to compare Chinese prompt with English prompt for graph generation?
我这边有8个说明书文档,总共130KB,做了如下优化效果才稍微好点:
构建索引阶段,模型使用豆包128k,跑一次10块钱:
查询阶段发现查出来的实体和query相差太大,原因是采用自定义的embedding服务,需要去掉和cl100k_base的相关操作,修改之后,效果有所提升。
这是我的中文电子说明书优化经验,供大家参考!!!
I made the following optimizations:
Index building phase:
The embedding service deploys the open source bge-large-zh model by itself, and deploys it with the help of oneapi;
The block logic is modified with reference to LangchainChatChat to avoid the problem of garbled characters when using cl100k_base to split tokens;
Redefine the entity type. I gave a part of my document to 4o and asked him to help me summarize which entities need to be defined;
The prompt is changed to Chinese, and examples that are not related to the document are removed. The corresponding examples are generated with the help of the 4o model;
In the query phase, it is found that the searched entities are too different from the query. The reason is that the custom embedding service is used, and the operations related to cl100k_base need to be removed. After the modification, the effect is improved.
This is my experience in optimizing Chinese electronic manuals for your reference! ! !
@Nuclear6
感谢大佬分享,我用deepseeker构建索引 注册后免费500万,应该足够跑了。
另外有一个问题请教 我有多篇文档,都是论文摘要,是放在一个文档里使用自动切分比较好,还是每个摘要一个文档会比较好?感谢
Thank you for sharing. I used Deepseeker to build the index. After registering, I get 5 million for free, which should be enough to run.
2. Is the chunking logic referring to modifying the chunking in GraphRAG? Does it need to be done according to the chunking of the large model used?
3. In points 3 and 4, you didn’t use Prompt-Tune for automatic tuning? Did you directly use ChatGPT for translation and modification?
Additionally, I have a question to ask: I have multiple documents, all of which are paper abstracts. Is it better to put them in one document and use automatic splitting, or is it better to have each abstract in a separate document? Thank you.
@KylinMountain
1 官方用的分块是先把文档token化,按照token数进行切分,对于中文来说容易出现乱码,我看Langchain-ChatChat开源项目中用中文字符数进行切分,有效避免chunk存在乱码。
官方chunk:https://github.com/microsoft/graphrag/blob/main/graphrag/index/verbs/text/chunk/strategies/tokens.py 参考chunk:https://github.com/chatchat-space/Langchain-Chatchat/blob/master/libs/chatchat-server/chatchat/server/file_rag/text_splitter/chinese_recursive_text_splitter.py
2 我感觉分块跟模型没有太大关系,选择中文那种分块逻辑能够保证句子完整性,模型理解可能更好点。
3 没有使用官方的prompt调优,听你说容易报错,我直接拿4o对照翻译生成对应的模板。
4 按照我的理解,一个文档还是多个文档区别不大。它是针对分块抽取实体,然后针对实体和描述构建embedding,文档名没看到有太大的联系。
非法感谢~我确实有看到Chunk中存在乱码,感谢解惑~ 👍👍👍
@Nuclear6 if you trying prompt tune, you should try a large language model and optimize for chinese like qwen or moonshot. Before I was using gemma2 9b, it is very bad, the prompt is too bad, the generated example is wrong.
Besides, it's hard to complete index procedure when using prompt-tune, you will meet lots of errors. I have tried this for an entire afternoon and I gave up.
But you can try.
python -m graphrag.prompt_tune --root . --domain "Chinese web novels" --language Chinese --chunk-size 300 --output prompt_zh
Remember update your entity in settings.yaml when you done prompt-tune.
hi, will the auto prompting help me define all the entities in domain? or I have to manually define in the setting
@dinhngoc267 It is recommended that the input document example defines the entity type with the help of the gpt-4o model
from my experience, using auto prompt tune, it didn’t generate all of the domain entities. It references your input document and generate some examples. I feel it doesn’t perform well. As @Nuclear6 said, maybe it’s better using gpt4 help to generate prompt giving your input document as example for gpt-4 reference.
代码改动,避免使用cl100k_base切分token出现乱码的问题,感谢Nuclear6 提供的思路
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License
"""A module containing run and split_text_on_tokens methods definition."""
import logging
import re
from typing import Any, List, Optional
from langchain.text_splitter import RecursiveCharacterTextSplitter
from collections.abc import Iterable
from typing import Any
import tiktoken
from datashaper import ProgressTicker
from graphrag.index.text_splitting import Tokenizer
from graphrag.index.verbs.text.chunk.typing import TextChunk
DEFAULT_CHUNK_SIZE = 2500 # tokens
DEFAULT_CHUNK_OVERLAP = 300 # tokens
def run(
input: list[str], args: dict[str, Any], tick: ProgressTicker
) -> Iterable[TextChunk]:
"""Chunks text into multiple parts. A pipeline verb."""
tokens_per_chunk = args.get("chunk_size", DEFAULT_CHUNK_SIZE)
chunk_overlap = args.get("chunk_overlap", DEFAULT_CHUNK_OVERLAP)
encoding_name = args.get("encoding_name", "cl100k_base")
enc = tiktoken.get_encoding(encoding_name)
def encode(text: str) -> list[int]:
if not isinstance(text, str):
text = f"{text}"
return enc.encode(text)
def decode(tokens: list[int]) -> str:
return enc.decode(tokens)
return split_text_on_tokens(
input,
Tokenizer(
chunk_overlap=chunk_overlap,
tokens_per_chunk=tokens_per_chunk,
encode=encode,
decode=decode,
),
tick,
chunk_overlap=chunk_overlap, #### update
tokens_per_chunk=tokens_per_chunk ### update
)
# Adapted from - https://github.com/langchain-ai/langchain/blob/77b359edf5df0d37ef0d539f678cf64f5557cb54/libs/langchain/langchain/text_splitter.py#L471
# So we could have better control over the chunking process
def split_text_on_tokens(
texts: list[str], enc: Tokenizer, tick: ProgressTicker, chunk_overlap, tokens_per_chunk #update
) -> list[TextChunk]:
"""Split incoming text and return chunks."""
result = []
mapped_ids = []
# for source_doc_idx, text in enumerate(texts):
# encoded = enc.encode(text)
# tick(1)
# mapped_ids.append((source_doc_idx, encoded))
# input_ids: list[tuple[int, int]] = [
# (source_doc_idx, id) for source_doc_idx, ids in mapped_ids for id in ids
# ]
for source_doc_idx, text in enumerate(texts):
tick(1)
mapped_ids.append((source_doc_idx, text))
text_splitter = ChineseRecursiveTextSplitter(
keep_separator=True, is_separator_regex=True, chunk_size=tokens_per_chunk, chunk_overlap=chunk_overlap
)
for source_doc_idx, text in mapped_ids:
chunks = text_splitter.split_text(text)
for chunk in chunks:
result.append(
TextChunk(
text_chunk=chunk,
source_doc_indices=[source_doc_idx] * len(chunk),
n_tokens=len(chunk),
)
)
# start_idx = 0
# cur_idx = min(start_idx + enc.tokens_per_chunk, len(input_ids))
# chunk_ids = input_ids[start_idx:cur_idx]
# while start_idx < len(input_ids):
# chunk_text = enc.decode([id for _, id in chunk_ids])
# doc_indices = list({doc_idx for doc_idx, _ in chunk_ids})
# result.append(
# TextChunk(
# text_chunk=chunk_text,
# source_doc_indices=doc_indices,
# n_tokens=len(chunk_ids),
# )
# )
# start_idx += enc.tokens_per_chunk - enc.chunk_overlap
# cur_idx = min(start_idx + enc.tokens_per_chunk, len(input_ids))
# chunk_ids = input_ids[start_idx:cur_idx]
return result
#-----------------------------------------------------------------------------------
# 适用中文
def _split_text_with_regex_from_end(
text: str, separator: str, keep_separator: bool
) -> List[str]:
# Now that we have the separator, split the text
if separator:
if keep_separator:
# The parentheses in the pattern keep the delimiters in the result.
_splits = re.split(f"({separator})", text)
splits = ["".join(i) for i in zip(_splits[0::2], _splits[1::2])]
if len(_splits) % 2 == 1:
splits += _splits[-1:]
# splits = [_splits[0]] + splits
else:
splits = re.split(separator, text)
else:
splits = list(text)
return [s for s in splits if s != ""]
class ChineseRecursiveTextSplitter(RecursiveCharacterTextSplitter):
def __init__(
self,
separators: Optional[List[str]] = None,
keep_separator: bool = True,
is_separator_regex: bool = True,
**kwargs: Any,
) -> None:
"""Create a new TextSplitter."""
super().__init__(keep_separator=keep_separator, **kwargs)
self._separators = separators or [
"\n\n",
"\n",
"。|!|?",
"\.\s|\!\s|\?\s",
";|;\s",
",|,\s",
]
self._is_separator_regex = is_separator_regex
def _split_text(self, text: str, separators: List[str]) -> List[str]:
"""Split incoming text and return chunks."""
final_chunks = []
# Get appropriate separator to use
separator = separators[-1]
new_separators = []
for i, _s in enumerate(separators):
_separator = _s if self._is_separator_regex else re.escape(_s)
if _s == "":
separator = _s
break
if re.search(_separator, text):
separator = _s
new_separators = separators[i + 1 :]
break
_separator = separator if self._is_separator_regex else re.escape(separator)
splits = _split_text_with_regex_from_end(text, _separator, self._keep_separator)
# Now go merging things, recursively splitting longer texts.
_good_splits = []
_separator = "" if self._keep_separator else separator
for s in splits:
if self._length_function(s) < self._chunk_size:
_good_splits.append(s)
else:
if _good_splits:
merged_text = self._merge_splits(_good_splits, _separator)
final_chunks.extend(merged_text)
_good_splits = []
if not new_separators:
final_chunks.append(s)
else:
other_info = self._split_text(s, new_separators)
final_chunks.extend(other_info)
if _good_splits:
merged_text = self._merge_splits(_good_splits, _separator)
final_chunks.extend(merged_text)
return [
re.sub(r"\n{2,}", "\n", chunk.strip())
for chunk in final_chunks
if chunk.strip() != ""
]
@Nuclear6 if you trying prompt tune, you should try a large language model and optimize for chinese like qwen or moonshot. Before I was using gemma2 9b, it is very bad, the prompt is too bad, the generated example is wrong. Besides, it's hard to complete index procedure when using prompt-tune, you will meet lots of errors. I have tried this for an entire afternoon and I gave up. But you can try.
python -m graphrag.prompt_tune --root . --domain "Chinese web novels" --language Chinese --chunk-size 300 --output prompt_zh
Remember update your entity in settings.yaml when you done prompt-tune.
hi, will the auto prompting help me define all the entities in domain? or I have to manually define in the setting
actually no, I have tried and the entity type are random. You can try first use prompt tune and then update the entity extraction prompt with your custom entities
@KylinMountain Hey, I've tried this
python -m graphrag.prompt_tune --root . --domain "Chinese web novels" --language Chinese --chunk-size 300 --output prompt_zh
but it said there is no language argument? did u meet this error?
@dinhngoc267 this feature is not release yet, it is still on the main branch. You may try pull the code
@dinhngoc267 this feature is not release yet, it is still on the main branch. You may try pull the code
What is the function of the --language Chinese
parameter? Where can I find out?
Sorry, I didn't carefully read the development documentation. I have found it in the document.
https://microsoft.github.io/graphrag/posts/prompt_tuning/auto_prompt_tuning/
If you are using the Open source model which doesn't support json mode, the generated prompt doesn't work well. You may meet some error like EmptyNetworkError. You can see the prompt in the entity_extraction.txt is very too bad.
I have made a fix: https://github.com/microsoft/graphrag/pull/661, which works well for me.
Hi @KylinMountain did you notice that some records in the community records are English? It makes the result is English too if there is a question use that records. Where to custom the prompt in the final answer? Or should I modify the question of python -m graphrag.query --root ./ragtest --method local {question} as [question] + [some language require description]?
But If I modify like this I think it affect the process of ranking node in retrieval step , as it doesn't expect nose in the question
代码严重,避免使用cl100k_base切分token出现乱码的问题,感谢Nuclear6提供的思路
# Copyright (c) 2024 Microsoft Corporation. # Licensed under the MIT License """A module containing run and split_text_on_tokens methods definition.""" import logging import re from typing import Any, List, Optional from langchain.text_splitter import RecursiveCharacterTextSplitter from collections.abc import Iterable from typing import Any import tiktoken from datashaper import ProgressTicker from graphrag.index.text_splitting import Tokenizer from graphrag.index.verbs.text.chunk.typing import TextChunk DEFAULT_CHUNK_SIZE = 2500 # tokens DEFAULT_CHUNK_OVERLAP = 300 # tokens def run( input: list[str], args: dict[str, Any], tick: ProgressTicker ) -> Iterable[TextChunk]: """Chunks text into multiple parts. A pipeline verb.""" tokens_per_chunk = args.get("chunk_size", DEFAULT_CHUNK_SIZE) chunk_overlap = args.get("chunk_overlap", DEFAULT_CHUNK_OVERLAP) encoding_name = args.get("encoding_name", "cl100k_base") enc = tiktoken.get_encoding(encoding_name) def encode(text: str) -> list[int]: if not isinstance(text, str): text = f"{text}" return enc.encode(text) def decode(tokens: list[int]) -> str: return enc.decode(tokens) return split_text_on_tokens( input, Tokenizer( chunk_overlap=chunk_overlap, tokens_per_chunk=tokens_per_chunk, encode=encode, decode=decode, ), tick, chunk_overlap=chunk_overlap, #### update tokens_per_chunk=tokens_per_chunk ### update ) # Adapted from - https://github.com/langchain-ai/langchain/blob/77b359edf5df0d37ef0d539f678cf64f5557cb54/libs/langchain/langchain/text_splitter.py#L471 # So we could have better control over the chunking process def split_text_on_tokens( texts: list[str], enc: Tokenizer, tick: ProgressTicker, chunk_overlap, tokens_per_chunk #update ) -> list[TextChunk]: """Split incoming text and return chunks.""" result = [] mapped_ids = [] # for source_doc_idx, text in enumerate(texts): # encoded = enc.encode(text) # tick(1) # mapped_ids.append((source_doc_idx, encoded)) # input_ids: list[tuple[int, int]] = [ # (source_doc_idx, id) for source_doc_idx, ids in mapped_ids for id in ids # ] for source_doc_idx, text in enumerate(texts): tick(1) mapped_ids.append((source_doc_idx, text)) text_splitter = ChineseRecursiveTextSplitter( keep_separator=True, is_separator_regex=True, chunk_size=tokens_per_chunk, chunk_overlap=chunk_overlap ) for source_doc_idx, text in mapped_ids: chunks = text_splitter.split_text(text) for chunk in chunks: result.append( TextChunk( text_chunk=chunk, source_doc_indices=[source_doc_idx] * len(chunk), n_tokens=len(chunk), ) ) # start_idx = 0 # cur_idx = min(start_idx + enc.tokens_per_chunk, len(input_ids)) # chunk_ids = input_ids[start_idx:cur_idx] # while start_idx < len(input_ids): # chunk_text = enc.decode([id for _, id in chunk_ids]) # doc_indices = list({doc_idx for doc_idx, _ in chunk_ids}) # result.append( # TextChunk( # text_chunk=chunk_text, # source_doc_indices=doc_indices, # n_tokens=len(chunk_ids), # ) # ) # start_idx += enc.tokens_per_chunk - enc.chunk_overlap # cur_idx = min(start_idx + enc.tokens_per_chunk, len(input_ids)) # chunk_ids = input_ids[start_idx:cur_idx] return result #----------------------------------------------------------------------------------- # 适用中文 def _split_text_with_regex_from_end( text: str, separator: str, keep_separator: bool ) -> List[str]: # Now that we have the separator, split the text if separator: if keep_separator: # The parentheses in the pattern keep the delimiters in the result. _splits = re.split(f"({separator})", text) splits = ["".join(i) for i in zip(_splits[0::2], _splits[1::2])] if len(_splits) % 2 == 1: splits += _splits[-1:] # splits = [_splits[0]] + splits else: splits = re.split(separator, text) else: splits = list(text) return [s for s in splits if s != ""] class ChineseRecursiveTextSplitter(RecursiveCharacterTextSplitter): def __init__( self, separators: Optional[List[str]] = None, keep_separator: bool = True, is_separator_regex: bool = True, **kwargs: Any, ) -> None: """Create a new TextSplitter.""" super().__init__(keep_separator=keep_separator, **kwargs) self._separators = separators or [ "\n\n", "\n", "。|!|?", "\.\s|\!\s|\?\s", ";|;\s", ",|,\s", ] self._is_separator_regex = is_separator_regex def _split_text(self, text: str, separators: List[str]) -> List[str]: """Split incoming text and return chunks.""" final_chunks = [] # Get appropriate separator to use separator = separators[-1] new_separators = [] for i, _s in enumerate(separators): _separator = _s if self._is_separator_regex else re.escape(_s) if _s == "": separator = _s break if re.search(_separator, text): separator = _s new_separators = separators[i + 1 :] break _separator = separator if self._is_separator_regex else re.escape(separator) splits = _split_text_with_regex_from_end(text, _separator, self._keep_separator) # Now go merging things, recursively splitting longer texts. _good_splits = [] _separator = "" if self._keep_separator else separator for s in splits: if self._length_function(s) < self._chunk_size: _good_splits.append(s) else: if _good_splits: merged_text = self._merge_splits(_good_splits, _separator) final_chunks.extend(merged_text) _good_splits = [] if not new_separators: final_chunks.append(s) else: other_info = self._split_text(s, new_separators) final_chunks.extend(other_info) if _good_splits: merged_text = self._merge_splits(_good_splits, _separator) final_chunks.extend(merged_text) return [ re.sub(r"\n{2,}", "\n", chunk.strip()) for chunk in final_chunks if chunk.strip() != "" ]
大佬,问下这个代码具体要怎么在graphrag中使用
代码改动,避免使用cl100k_base切分token出现乱码的问题,感谢Nuclear6 提供的思路
# Copyright (c) 2024 Microsoft Corporation. # Licensed under the MIT License """A module containing run and split_text_on_tokens methods definition.""" import logging import re from typing import Any, List, Optional from langchain.text_splitter import RecursiveCharacterTextSplitter from collections.abc import Iterable from typing import Any import tiktoken from datashaper import ProgressTicker from graphrag.index.text_splitting import Tokenizer from graphrag.index.verbs.text.chunk.typing import TextChunk DEFAULT_CHUNK_SIZE = 2500 # tokens DEFAULT_CHUNK_OVERLAP = 300 # tokens def run( input: list[str], args: dict[str, Any], tick: ProgressTicker ) -> Iterable[TextChunk]: """Chunks text into multiple parts. A pipeline verb.""" tokens_per_chunk = args.get("chunk_size", DEFAULT_CHUNK_SIZE) chunk_overlap = args.get("chunk_overlap", DEFAULT_CHUNK_OVERLAP) encoding_name = args.get("encoding_name", "cl100k_base") enc = tiktoken.get_encoding(encoding_name) def encode(text: str) -> list[int]: if not isinstance(text, str): text = f"{text}" return enc.encode(text) def decode(tokens: list[int]) -> str: return enc.decode(tokens) return split_text_on_tokens( input, Tokenizer( chunk_overlap=chunk_overlap, tokens_per_chunk=tokens_per_chunk, encode=encode, decode=decode, ), tick, chunk_overlap=chunk_overlap, #### update tokens_per_chunk=tokens_per_chunk ### update ) # Adapted from - https://github.com/langchain-ai/langchain/blob/77b359edf5df0d37ef0d539f678cf64f5557cb54/libs/langchain/langchain/text_splitter.py#L471 # So we could have better control over the chunking process def split_text_on_tokens( texts: list[str], enc: Tokenizer, tick: ProgressTicker, chunk_overlap, tokens_per_chunk #update ) -> list[TextChunk]: """Split incoming text and return chunks.""" result = [] mapped_ids = [] # for source_doc_idx, text in enumerate(texts): # encoded = enc.encode(text) # tick(1) # mapped_ids.append((source_doc_idx, encoded)) # input_ids: list[tuple[int, int]] = [ # (source_doc_idx, id) for source_doc_idx, ids in mapped_ids for id in ids # ] for source_doc_idx, text in enumerate(texts): tick(1) mapped_ids.append((source_doc_idx, text)) text_splitter = ChineseRecursiveTextSplitter( keep_separator=True, is_separator_regex=True, chunk_size=tokens_per_chunk, chunk_overlap=chunk_overlap ) for source_doc_idx, text in mapped_ids: chunks = text_splitter.split_text(text) for chunk in chunks: result.append( TextChunk( text_chunk=chunk, source_doc_indices=[source_doc_idx] * len(chunk), n_tokens=len(chunk), ) ) # start_idx = 0 # cur_idx = min(start_idx + enc.tokens_per_chunk, len(input_ids)) # chunk_ids = input_ids[start_idx:cur_idx] # while start_idx < len(input_ids): # chunk_text = enc.decode([id for _, id in chunk_ids]) # doc_indices = list({doc_idx for doc_idx, _ in chunk_ids}) # result.append( # TextChunk( # text_chunk=chunk_text, # source_doc_indices=doc_indices, # n_tokens=len(chunk_ids), # ) # ) # start_idx += enc.tokens_per_chunk - enc.chunk_overlap # cur_idx = min(start_idx + enc.tokens_per_chunk, len(input_ids)) # chunk_ids = input_ids[start_idx:cur_idx] return result #----------------------------------------------------------------------------------- # 适用中文 def _split_text_with_regex_from_end( text: str, separator: str, keep_separator: bool ) -> List[str]: # Now that we have the separator, split the text if separator: if keep_separator: # The parentheses in the pattern keep the delimiters in the result. _splits = re.split(f"({separator})", text) splits = ["".join(i) for i in zip(_splits[0::2], _splits[1::2])] if len(_splits) % 2 == 1: splits += _splits[-1:] # splits = [_splits[0]] + splits else: splits = re.split(separator, text) else: splits = list(text) return [s for s in splits if s != ""] class ChineseRecursiveTextSplitter(RecursiveCharacterTextSplitter): def __init__( self, separators: Optional[List[str]] = None, keep_separator: bool = True, is_separator_regex: bool = True, **kwargs: Any, ) -> None: """Create a new TextSplitter.""" super().__init__(keep_separator=keep_separator, **kwargs) self._separators = separators or [ "\n\n", "\n", "。|!|?", "\.\s|\!\s|\?\s", ";|;\s", ",|,\s", ] self._is_separator_regex = is_separator_regex def _split_text(self, text: str, separators: List[str]) -> List[str]: """Split incoming text and return chunks.""" final_chunks = [] # Get appropriate separator to use separator = separators[-1] new_separators = [] for i, _s in enumerate(separators): _separator = _s if self._is_separator_regex else re.escape(_s) if _s == "": separator = _s break if re.search(_separator, text): separator = _s new_separators = separators[i + 1 :] break _separator = separator if self._is_separator_regex else re.escape(separator) splits = _split_text_with_regex_from_end(text, _separator, self._keep_separator) # Now go merging things, recursively splitting longer texts. _good_splits = [] _separator = "" if self._keep_separator else separator for s in splits: if self._length_function(s) < self._chunk_size: _good_splits.append(s) else: if _good_splits: merged_text = self._merge_splits(_good_splits, _separator) final_chunks.extend(merged_text) _good_splits = [] if not new_separators: final_chunks.append(s) else: other_info = self._split_text(s, new_separators) final_chunks.extend(other_info) if _good_splits: merged_text = self._merge_splits(_good_splits, _separator) final_chunks.extend(merged_text) return [ re.sub(r"\n{2,}", "\n", chunk.strip()) for chunk in final_chunks if chunk.strip() != "" ]
我报了个错哎 无法将“bool | str”类型的参数分配给函数“_split_text_with_regex_from_end”中类型为“bool”的参数“keep_separator” 类型“bool | str”与类型“bool”不兼容 “str”与“bool”不兼容
Hi @KylinMountain did you notice that some records in the community records are English? It makes the result is English too if there is a question use that records. Where to custom the prompt in the final answer? Or should I modify the question of python -m graphrag.query --root ./ragtest --method local {question} as [question] + [some language require description]?
But If I modify like this I think it affect the process of ranking node in retrieval step , as it doesn't expect nose in the question
@dinhngoc267 you are right, if you query like "这个故事的主题是什么,以中文回复", you may miss the ranking score and the answer is resorted unlike the answer in English.
代码严重,避免使用cl100k_base切分token出现乱码的问题,感谢Nuclear6提供的思路
# Copyright (c) 2024 Microsoft Corporation. # Licensed under the MIT License """A module containing run and split_text_on_tokens methods definition.""" import logging import re from typing import Any, List, Optional from langchain.text_splitter import RecursiveCharacterTextSplitter from collections.abc import Iterable from typing import Any import tiktoken from datashaper import ProgressTicker from graphrag.index.text_splitting import Tokenizer from graphrag.index.verbs.text.chunk.typing import TextChunk DEFAULT_CHUNK_SIZE = 2500 # tokens DEFAULT_CHUNK_OVERLAP = 300 # tokens def run( input: list[str], args: dict[str, Any], tick: ProgressTicker ) -> Iterable[TextChunk]: """Chunks text into multiple parts. A pipeline verb.""" tokens_per_chunk = args.get("chunk_size", DEFAULT_CHUNK_SIZE) chunk_overlap = args.get("chunk_overlap", DEFAULT_CHUNK_OVERLAP) encoding_name = args.get("encoding_name", "cl100k_base") enc = tiktoken.get_encoding(encoding_name) def encode(text: str) -> list[int]: if not isinstance(text, str): text = f"{text}" return enc.encode(text) def decode(tokens: list[int]) -> str: return enc.decode(tokens) return split_text_on_tokens( input, Tokenizer( chunk_overlap=chunk_overlap, tokens_per_chunk=tokens_per_chunk, encode=encode, decode=decode, ), tick, chunk_overlap=chunk_overlap, #### update tokens_per_chunk=tokens_per_chunk ### update ) # Adapted from - https://github.com/langchain-ai/langchain/blob/77b359edf5df0d37ef0d539f678cf64f5557cb54/libs/langchain/langchain/text_splitter.py#L471 # So we could have better control over the chunking process def split_text_on_tokens( texts: list[str], enc: Tokenizer, tick: ProgressTicker, chunk_overlap, tokens_per_chunk #update ) -> list[TextChunk]: """Split incoming text and return chunks.""" result = [] mapped_ids = [] # for source_doc_idx, text in enumerate(texts): # encoded = enc.encode(text) # tick(1) # mapped_ids.append((source_doc_idx, encoded)) # input_ids: list[tuple[int, int]] = [ # (source_doc_idx, id) for source_doc_idx, ids in mapped_ids for id in ids # ] for source_doc_idx, text in enumerate(texts): tick(1) mapped_ids.append((source_doc_idx, text)) text_splitter = ChineseRecursiveTextSplitter( keep_separator=True, is_separator_regex=True, chunk_size=tokens_per_chunk, chunk_overlap=chunk_overlap ) for source_doc_idx, text in mapped_ids: chunks = text_splitter.split_text(text) for chunk in chunks: result.append( TextChunk( text_chunk=chunk, source_doc_indices=[source_doc_idx] * len(chunk), n_tokens=len(chunk), ) ) # start_idx = 0 # cur_idx = min(start_idx + enc.tokens_per_chunk, len(input_ids)) # chunk_ids = input_ids[start_idx:cur_idx] # while start_idx < len(input_ids): # chunk_text = enc.decode([id for _, id in chunk_ids]) # doc_indices = list({doc_idx for doc_idx, _ in chunk_ids}) # result.append( # TextChunk( # text_chunk=chunk_text, # source_doc_indices=doc_indices, # n_tokens=len(chunk_ids), # ) # ) # start_idx += enc.tokens_per_chunk - enc.chunk_overlap # cur_idx = min(start_idx + enc.tokens_per_chunk, len(input_ids)) # chunk_ids = input_ids[start_idx:cur_idx] return result #----------------------------------------------------------------------------------- # 适用中文 def _split_text_with_regex_from_end( text: str, separator: str, keep_separator: bool ) -> List[str]: # Now that we have the separator, split the text if separator: if keep_separator: # The parentheses in the pattern keep the delimiters in the result. _splits = re.split(f"({separator})", text) splits = ["".join(i) for i in zip(_splits[0::2], _splits[1::2])] if len(_splits) % 2 == 1: splits += _splits[-1:] # splits = [_splits[0]] + splits else: splits = re.split(separator, text) else: splits = list(text) return [s for s in splits if s != ""] class ChineseRecursiveTextSplitter(RecursiveCharacterTextSplitter): def __init__( self, separators: Optional[List[str]] = None, keep_separator: bool = True, is_separator_regex: bool = True, **kwargs: Any, ) -> None: """Create a new TextSplitter.""" super().__init__(keep_separator=keep_separator, **kwargs) self._separators = separators or [ "\n\n", "\n", "。|!|?", "\.\s|\!\s|\?\s", ";|;\s", ",|,\s", ] self._is_separator_regex = is_separator_regex def _split_text(self, text: str, separators: List[str]) -> List[str]: """Split incoming text and return chunks.""" final_chunks = [] # Get appropriate separator to use separator = separators[-1] new_separators = [] for i, _s in enumerate(separators): _separator = _s if self._is_separator_regex else re.escape(_s) if _s == "": separator = _s break if re.search(_separator, text): separator = _s new_separators = separators[i + 1 :] break _separator = separator if self._is_separator_regex else re.escape(separator) splits = _split_text_with_regex_from_end(text, _separator, self._keep_separator) # Now go merging things, recursively splitting longer texts. _good_splits = [] _separator = "" if self._keep_separator else separator for s in splits: if self._length_function(s) < self._chunk_size: _good_splits.append(s) else: if _good_splits: merged_text = self._merge_splits(_good_splits, _separator) final_chunks.extend(merged_text) _good_splits = [] if not new_separators: final_chunks.append(s) else: other_info = self._split_text(s, new_separators) final_chunks.extend(other_info) if _good_splits: merged_text = self._merge_splits(_good_splits, _separator) final_chunks.extend(merged_text) return [ re.sub(r"\n{2,}", "\n", chunk.strip()) for chunk in final_chunks if chunk.strip() != "" ]
大佬,问下这个代码具体要怎么在graphrag中使用 修改路径:graphrag/index/verbs/text/chunk/strategies/tokens.py
代码改动,避免使用cl100k_base切分token出现乱码的问题,感谢Nuclear6 提供的思路
# Copyright (c) 2024 Microsoft Corporation. # Licensed under the MIT License """A module containing run and split_text_on_tokens methods definition.""" import logging import re from typing import Any, List, Optional from langchain.text_splitter import RecursiveCharacterTextSplitter from collections.abc import Iterable from typing import Any import tiktoken from datashaper import ProgressTicker from graphrag.index.text_splitting import Tokenizer from graphrag.index.verbs.text.chunk.typing import TextChunk DEFAULT_CHUNK_SIZE = 2500 # tokens DEFAULT_CHUNK_OVERLAP = 300 # tokens def run( input: list[str], args: dict[str, Any], tick: ProgressTicker ) -> Iterable[TextChunk]: """Chunks text into multiple parts. A pipeline verb.""" tokens_per_chunk = args.get("chunk_size", DEFAULT_CHUNK_SIZE) chunk_overlap = args.get("chunk_overlap", DEFAULT_CHUNK_OVERLAP) encoding_name = args.get("encoding_name", "cl100k_base") enc = tiktoken.get_encoding(encoding_name) def encode(text: str) -> list[int]: if not isinstance(text, str): text = f"{text}" return enc.encode(text) def decode(tokens: list[int]) -> str: return enc.decode(tokens) return split_text_on_tokens( input, Tokenizer( chunk_overlap=chunk_overlap, tokens_per_chunk=tokens_per_chunk, encode=encode, decode=decode, ), tick, chunk_overlap=chunk_overlap, #### update tokens_per_chunk=tokens_per_chunk ### update ) # Adapted from - https://github.com/langchain-ai/langchain/blob/77b359edf5df0d37ef0d539f678cf64f5557cb54/libs/langchain/langchain/text_splitter.py#L471 # So we could have better control over the chunking process def split_text_on_tokens( texts: list[str], enc: Tokenizer, tick: ProgressTicker, chunk_overlap, tokens_per_chunk #update ) -> list[TextChunk]: """Split incoming text and return chunks.""" result = [] mapped_ids = [] # for source_doc_idx, text in enumerate(texts): # encoded = enc.encode(text) # tick(1) # mapped_ids.append((source_doc_idx, encoded)) # input_ids: list[tuple[int, int]] = [ # (source_doc_idx, id) for source_doc_idx, ids in mapped_ids for id in ids # ] for source_doc_idx, text in enumerate(texts): tick(1) mapped_ids.append((source_doc_idx, text)) text_splitter = ChineseRecursiveTextSplitter( keep_separator=True, is_separator_regex=True, chunk_size=tokens_per_chunk, chunk_overlap=chunk_overlap ) for source_doc_idx, text in mapped_ids: chunks = text_splitter.split_text(text) for chunk in chunks: result.append( TextChunk( text_chunk=chunk, source_doc_indices=[source_doc_idx] * len(chunk), n_tokens=len(chunk), ) ) # start_idx = 0 # cur_idx = min(start_idx + enc.tokens_per_chunk, len(input_ids)) # chunk_ids = input_ids[start_idx:cur_idx] # while start_idx < len(input_ids): # chunk_text = enc.decode([id for _, id in chunk_ids]) # doc_indices = list({doc_idx for doc_idx, _ in chunk_ids}) # result.append( # TextChunk( # text_chunk=chunk_text, # source_doc_indices=doc_indices, # n_tokens=len(chunk_ids), # ) # ) # start_idx += enc.tokens_per_chunk - enc.chunk_overlap # cur_idx = min(start_idx + enc.tokens_per_chunk, len(input_ids)) # chunk_ids = input_ids[start_idx:cur_idx] return result #----------------------------------------------------------------------------------- # 适用中文 def _split_text_with_regex_from_end( text: str, separator: str, keep_separator: bool ) -> List[str]: # Now that we have the separator, split the text if separator: if keep_separator: # The parentheses in the pattern keep the delimiters in the result. _splits = re.split(f"({separator})", text) splits = ["".join(i) for i in zip(_splits[0::2], _splits[1::2])] if len(_splits) % 2 == 1: splits += _splits[-1:] # splits = [_splits[0]] + splits else: splits = re.split(separator, text) else: splits = list(text) return [s for s in splits if s != ""] class ChineseRecursiveTextSplitter(RecursiveCharacterTextSplitter): def __init__( self, separators: Optional[List[str]] = None, keep_separator: bool = True, is_separator_regex: bool = True, **kwargs: Any, ) -> None: """Create a new TextSplitter.""" super().__init__(keep_separator=keep_separator, **kwargs) self._separators = separators or [ "\n\n", "\n", "。|!|?", "\.\s|\!\s|\?\s", ";|;\s", ",|,\s", ] self._is_separator_regex = is_separator_regex def _split_text(self, text: str, separators: List[str]) -> List[str]: """Split incoming text and return chunks.""" final_chunks = [] # Get appropriate separator to use separator = separators[-1] new_separators = [] for i, _s in enumerate(separators): _separator = _s if self._is_separator_regex else re.escape(_s) if _s == "": separator = _s break if re.search(_separator, text): separator = _s new_separators = separators[i + 1 :] break _separator = separator if self._is_separator_regex else re.escape(separator) splits = _split_text_with_regex_from_end(text, _separator, self._keep_separator) # Now go merging things, recursively splitting longer texts. _good_splits = [] _separator = "" if self._keep_separator else separator for s in splits: if self._length_function(s) < self._chunk_size: _good_splits.append(s) else: if _good_splits: merged_text = self._merge_splits(_good_splits, _separator) final_chunks.extend(merged_text) _good_splits = [] if not new_separators: final_chunks.append(s) else: other_info = self._split_text(s, new_separators) final_chunks.extend(other_info) if _good_splits: merged_text = self._merge_splits(_good_splits, _separator) final_chunks.extend(merged_text) return [ re.sub(r"\n{2,}", "\n", chunk.strip()) for chunk in final_chunks if chunk.strip() != "" ]
我报了个错哎 无法将“bool | str”类型的参数分配给函数“_split_text_with_regex_from_end”中类型为“bool”的参数“keep_separator” 类型“bool | str”与类型“bool”不兼容 “str”与“bool”不兼容
这是你入参传入的报错,你有做过任何改动吗?或者提供详细的报错日志
Consolidating language support issues here: #696
请教,你这份代码可以直接跑通处理中文chunk,下面下面中,为什么是[source_doc_idx] len(chunk),而不是单单[source_doc_idx] 一个呢。如下代码生成的create_base_text_units.csv表中,document_ids一栏每个chunk项都有n_tokens项目,都是重复的。有什么意义呢? for source_doc_idx, text in mapped_ids: chunks = text_splitter.split_text(text) for chunk in chunks: result.append( TextChunk( text_chunk=chunk, source_doc_indices=[source_doc_idx] len(chunk), n_tokens=len(chunk), ) )
请教,你这份代码可以直接跑通处理中文chunk,下面下面中,为什么是[source_doc_idx] len(chunk),而不是单单[source_doc_idx] 一个呢。如下代码生成的create_base_text_units.csv表中,document_ids一栏每个chunk项都有n_tokens项目,都是重复的。有什么意义呢? for source_doc_idx, text in mapped_ids: chunks = text_splitter.split_text(text) for chunk in chunks: result.append( TextChunk( text_chunk=chunk, source_doc_indices=[source_doc_idx] len(chunk), n_tokens=len(chunk), ) )
这段代码并没有任何意义,只是为了符合graphrag的输入
I used the Chinese manual document to build it, and found that the extracted entities were very messy. Is there any good way to optimize it?