Open 6ixGODD opened 1 month ago
蹲,解决了吗
The following changes in the run.py
file will help complete the indexing process:
Modified load_table_from_storage
function to handle JSON files:
async def load_table_from_storage(name: str) -> pd.DataFrame:
if not await storage.has(name):
msg = f"Could not find {name} in storage!"
raise ValueError(msg)
try:
log.info("read table from storage: %s", name)
# Read JSON data instead of Parquet
content = await storage.get(name, encoding='utf-8')
json_data = [json.loads(line) for line in content.splitlines() if line.strip()]
return pd.DataFrame(json_data)
except Exception:
log.exception("error loading table from storage: %s", name)
raise
Updated inject_workflow_data_dependencies
function to use JSON files:
async def inject_workflow_data_dependencies(workflow: Workflow) -> None:
workflow.add_table(DEFAULT_INPUT_NAME, dataset)
deps = workflow_dependencies[workflow.name]
log.info("dependencies for %s: %s", workflow.name, deps)
for id in deps:
workflow_id = f"workflow:{id}"
# Load JSON file instead of Parquet
table = await load_table_from_storage(f"{id}.json")
workflow.add_table(workflow_id, table)
The run_global_search
and run_local_search
functions still need to be updated to remove the .parquet hardcoding for the query
functionality to work.
Will work on these functions update here once done.
Replace the following functions with below implementations to the run.py file in the query folder tested on local using the documentation example and it works perfectly fine.
run_local_search
def run_local_search(
data_dir: str | None,
root_dir: str | None,
community_level: int,
response_type: str,
query: str,
):
"""Run a local search with the given query."""
data_dir, root_dir, config = _configure_paths_and_settings(data_dir, root_dir)
data_path = Path(data_dir)
def read_json_file(file_path):
with open(file_path, 'r') as f:
return pd.DataFrame([json.loads(line) for line in f if line.strip()])
final_nodes = read_json_file(data_path / "create_final_nodes.json")
final_community_reports = read_json_file(data_path / "create_final_community_reports.json")
final_text_units = read_json_file(data_path / "create_final_text_units.json")
final_relationships = read_json_file(data_path / "create_final_relationships.json")
final_entities = read_json_file(data_path / "create_final_entities.json")
final_covariates_path = data_path / "create_final_covariates.json"
final_covariates = read_json_file(final_covariates_path) if final_covariates_path.exists() else None
vector_store_args = config.embeddings.vector_store if config.embeddings.vector_store else {}
vector_store_type = vector_store_args.get("type", VectorStoreType.LanceDB)
description_embedding_store = __get_embedding_description_store(
vector_store_type=vector_store_type,
config_args=vector_store_args,
)
entities = read_indexer_entities(final_nodes, final_entities, community_level)
store_entity_semantic_embeddings(
entities=entities, vectorstore=description_embedding_store
)
covariates = read_indexer_covariates(final_covariates) if final_covariates is not None else []
search_engine = get_local_search_engine(
config,
reports=read_indexer_reports(
final_community_reports, final_nodes, community_level
),
text_units=read_indexer_text_units(final_text_units),
entities=entities,
relationships=read_indexer_relationships(final_relationships),
covariates={"claims": covariates},
description_embedding_store=description_embedding_store,
response_type=response_type,
)
result = search_engine.search(query=query)
reporter.success(f"Local Search Response: {result.response}")
return result.response
2. ```run_global_search```
````python
def run_global_search(
data_dir: str | None,
root_dir: str | None,
community_level: int,
response_type: str,
query: str,
):
"""Run a global search with the given query."""
data_dir, root_dir, config = _configure_paths_and_settings(data_dir, root_dir)
data_path = Path(data_dir)
def read_json_file(file_path):
with open(file_path, 'r') as f:
return pd.DataFrame([json.loads(line) for line in f if line.strip()])
final_nodes: pd.DataFrame = read_json_file(data_path / "create_final_nodes.json")
final_entities: pd.DataFrame = read_json_file(data_path / "create_final_entities.json")
final_community_reports: pd.DataFrame = read_json_file(data_path / "create_final_community_reports.json")
reports = read_indexer_reports(
final_community_reports, final_nodes, community_level
)
entities = read_indexer_entities(final_nodes, final_entities, community_level)
search_engine = get_global_search_engine(
config,
reports=reports,
entities=entities,
response_type=response_type,
)
result = search_engine.search(query=query)
reporter.success(f"Global Search Response: {result.response}")
return result.response
The original source code hardcoded the emit
type in some functions, causing the --emit
option to have no effect in these functions. Are the two solutions mentioned above part of an official update? The code I pulled in September doesn't seem to have these changes.
Do you need to file an issue?
Describe the bug
When using the command
poetry run poe index --verbose --emit json
to index, set json format in the emit field, the pipeline fails at thecreate_base_text_units
. The error message said that it cannot findcreate_base_text_units.parquet
, even thoughcreate_base_text_units.json
exists in the output directory.Steps to reproduce
after initialize, run
the pipeline output:
check the logs for details:
Expected Behavior
The pipeline should recognize and use the
create_base_text_units.json
file in the output directory when--emit json
is specified.GraphRAG Config Used
Logs and screenshots
Console:
Logs:
indexing-engine.log
logs.json
Additional Information