morph-kgc / morph-kgc

Powerful RDF Knowledge Graph Generation with RML Mappings
https://morph-kgc.readthedocs.io
Apache License 2.0
169 stars 32 forks source link

out-of-memory in etl pipeline with batching #258

Closed Stiksels closed 4 weeks ago

Stiksels commented 1 month ago

Hi,

We're seeing an issue when running our transformation code, where morph_kgc is involved. In short; we're iterating through a dataset of approximately 1mio results by processing relatively small batches (1000 rows) . Every iteration involves generating a graph_store from the batch of results. While running the transformation code, we see a memory increment at this step of around 20 MiB. After processing 500K results, the transformation job reaches it's memory limit and fails with an out-of-memory error.

I'm trying to optimize the code so that garbage collection is ensured and memory is freed up after every processed batch, but it doesn't seem to work.

Memory profiler logs:

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    27    157.6 MiB    157.6 MiB           1   @profile
    28                                         def push_data(data_type, latest_update_fuseki_query):
    29    160.0 MiB      2.4 MiB           1       fuseki_client = FusekiClient(FUSEKI_OUTPUT_PORT_NAME)
    30    160.0 MiB      0.0 MiB           3       config_ini = f"""[CONFIGURATION]
    31                                                         output_format=N-QUADS
    32                                         
    33                                                         [DataSource1]
    34    160.0 MiB      0.0 MiB           1                   mappings: yarrrml-data/{data_type}_yarrrml.yml
    35    160.0 MiB      0.0 MiB           1                   file_path: {os.path.join(tempfile.gettempdir(), 'gbq_query_results.csv')}
    36                                                         """
    37                                         
    38                                             # Find the latest modified date, to only add newly modified/created entities
    39    160.0 MiB      0.0 MiB           2       latest_date = get_latest_fuseki_update(
    40    160.0 MiB      0.0 MiB           1           fuseki_client, latest_update_fuseki_query, data_type
    41                                             )
    42                                             # Ensure latest_date is in the correct format (ISO 8601)
    43    160.0 MiB      0.0 MiB           1       logging.info(f"Using latest_date for BigQuery: {latest_date}")
    44                                             
    45                                             # Find the count_value of results in BigQuery to determine stop condition
    46    162.8 MiB      2.7 MiB           1       bigquery_data_count = query_bigquery_count(latest_date, data_type)
    47    162.8 MiB      0.0 MiB           1       print(bigquery_data_count)
    48                                         
    49    162.8 MiB      0.0 MiB           1       offset = 0
    50                                             # Convert page_size to an integer
    51    162.8 MiB      0.0 MiB           1       try: 
    52    162.8 MiB      0.0 MiB           1           page_size = int(get_environment_variable(f'INPUT_{BIG_QUERY_INPUT_PORT_NAME}_pageSize'))
    53                                             except EnvironmentError:
    54                                                 page_size = 1000  # Default value if page_size is not specified
    55                                         
    56                                         
    57                                             # Iterate through BigQuery results until all data is processed
    58    184.6 MiB      0.0 MiB           2       while offset < bigquery_data_count:
    59                                         
    60                                                 # Step 1: Query Google BigQuery with the latest dates and store results as CSV
    61    163.4 MiB      0.7 MiB           1           bigquery_data = query_bigquery(latest_date, query_type=data_type, offset=offset)
    62                                         
    63                                                 # Step 2: Write location data to csv in the temp directory
    64    163.4 MiB      0.0 MiB           1           temp_dir = tempfile.gettempdir()
    65    163.4 MiB      0.0 MiB           1           output_csv_path = os.path.join(temp_dir, 'gbq_query_results.csv')
    66    163.5 MiB      0.1 MiB           1           write_to_csv(bigquery_data, output_csv_path, output_dir=temp_dir)
    67                                         
    68                                                 # STOP CONDITION: determine if all new data was added
    69                                                 # either no new data was received, or the latest_date stayed the same (=no new data added)
    70    163.5 MiB      0.0 MiB           1           if bigquery_data.empty:
    71                                                     print(f"Stopping condition met. bigquery_data.empty: {bigquery_data.empty}")
    72                                                     # Stop condition met, move to the next data type
    73                                                     break    
    74                                         
    75                                                 # Step 3: Generate RDF graph (n-quads) from CSV (found in `file_path`) using morph-kgc
    76    183.6 MiB     20.1 MiB           1           graph_store = generate_rdf_graph(config_ini)
    77                                         
    78                                                 # Step 4: Clear existing named entity graphs in Fuseki
    79    184.3 MiB      0.7 MiB           1           fuseki_client.clear_graphs(graph_store)
    80                                         
    81                                                 # Step 5: Load the RDF graph (n-quads) into Fuseki
    82    184.6 MiB      0.3 MiB           1           fuseki_client.load_store(graph_store)
    83                                         
    84                                                 # Step 6: Increment the offset by the page size for the next iteration
    85    184.6 MiB      0.0 MiB           1           offset += page_size

morph_kgc memory increment example

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    33    168.0 MiB    168.0 MiB           1   @profile
    34                                         def generate_rdf_graph(config_ini_path):
    35    195.9 MiB     27.9 MiB           1       g = morph_kgc.materialize_oxigraph(config_ini_path)
    36    195.9 MiB      0.0 MiB           1       logging.debug("RDF graph generated successfully.")
    37    195.9 MiB      0.0 MiB           1       return g
arenas-guerrero-julian commented 1 month ago

Hi @Stiksels ,

Please correct me if I am wrong. As I can see you are accessing Google Big Query, storing intermediate results as CSV and then generating the RDF with morph-kgc.

Is there any reason for that page size? I think it is too small you can probably increase it to 100k or more.

Also, have you checked writing the triples to a file instead of fuseki? Just to confirm the problem is not the interaction with fuseki.

arenas-guerrero-julian commented 1 month ago

I also see that you are using Big Query. We could create a connector to it with python-bigquery-sqlalchemy.

Stiksels commented 1 month ago

hi @arenas-guerrero-julian ,

thank you for the fast reply. I experimented with the pageSize; with the batch size of 1000 rows, the transformer job would run out of memory after about an hour. With an increased batch size of 5000 rows, the job would run out of memory faster (~ equal amount of triples generated). I will try to set the pagesize to 100K and see the impact.

Also a BigQuery connector could be very interesting for our implementation

Kind regards, Stan

arenas-guerrero-julian commented 1 month ago

If possible, please, try also writing the triples to disk rather than to fuseki.

Stiksels commented 1 month ago

With the increased batch size, the transformer job instantly runs into state OOMKilled when trying to write to Fuseki...

INFO:root:23 mapping rules retrieved.                                                                                                                                      │
│ INFO:root:Mapping partition with 23 groups generated.                                                                                                                      │
│ INFO:root:Maximum number of rules within mapping group: 1.                                                                                                                 │
│ INFO:root:Mappings processed in 1.814 seconds.                                                                                                                             │
│ INFO:root:Number of triples generated in total: 2060739.                                                                                                                   

our platform expects all transformer jobs to write to fuseki

arenas-guerrero-julian commented 1 month ago

I am not sure if it is fuseki or morph-kgc causing the OOM. I would need to take a closer look. Write to me julian.arenas.guerrero@upm.es if I can help.

Stiksels commented 4 weeks ago

Thanks for the input, @arenas-guerrero-julian . We'll continue experimenting with memory allocation, batch size and fuseki optimization.